import streamlit as st import subprocess import pandas as pd import re import socket # --- Configuration --- PTP_SERVICE = "ptp4l" PTP_CONF_PATH = "/etc/linuxptp/ptp4l.conf" # Default path on Debian/Raspberry Pi OS HOSTNAME = socket.gethostname() # Get the system hostname # --- Helper Functions --- def run_command(cmd): """Executes a shell command and returns the output.""" try: result = subprocess.run(cmd, shell=True, check=True, capture_output=True, text=True) return True, result.stdout except subprocess.CalledProcessError as e: return False, e.stderr def get_service_status(): """Checks if the ptp4l systemd service is active.""" success, output = run_command(f"systemctl is-active {PTP_SERVICE}") return "active" in output.strip() def get_configured_role(): """Reads the ptp4l.conf file to determine the forced role.""" success, output = run_command(f"cat {PTP_CONF_PATH}") if not success: return "Unknown" server_only = "0" client_only = "0" for line in output.splitlines(): if line.startswith("serverOnly"): server_only = line.split()[1] elif line.startswith("clientOnly") or line.startswith("slaveOnly"): client_only = line.split()[1] if server_only == "1": return "Forced Master" elif client_only == "1": return "Forced Slave" else: return "Auto (BMCA)" def get_actual_state(): """Uses pmc to get the current actual state of the physical PTP port.""" success, output = run_command("sudo pmc -u -b 0 'GET PORT_DATA_SET'") if not success: return "Offline/Unavailable" states = re.findall(r"portState\s+(\w+)", output) if not states: return "Initializing..." if "MASTER" in states: return "MASTER" elif "SLAVE" in states: return "SLAVE" elif "UNCALIBRATED" in states: return "UNCALIBRATED" elif "PRE_MASTER" in states: return "PRE_MASTER" elif "FAULTY" in states: return "FAULTY" else: return "LISTENING" def get_timestamping_mode(): """Checks the logs (or config file) to see if hardware or software timestamping is active.""" cmd = f"sudo journalctl -u {PTP_SERVICE} --no-pager | grep 'selected .* as PTP clock' | tail -n 1" success, output = run_command(cmd) if success and output.strip(): if "/dev/ptp" in output: return "Hardware (PHC)" elif "CLOCK_REALTIME" in output: return "Software" success, conf_output = run_command(f"grep 'time_stamping' {PTP_CONF_PATH}") if success and conf_output.strip(): if "software" in conf_output: return "Software" elif "hardware" in conf_output: return "Hardware (PHC)" return "Hardware (Default)" def modify_ptp_role(role): """Modifies the ptp4l.conf file to force a specific role.""" server_only = "1" if role == "Master" else "0" client_only = "1" if role == "Slave" else "0" cmd1 = f"sudo sed -i 's/^serverOnly.*/serverOnly {server_only}/g' {PTP_CONF_PATH}" cmd2 = f"sudo sed -i 's/^clientOnly.*/clientOnly {client_only}/g' {PTP_CONF_PATH}" s1, _ = run_command(cmd1) s2, _ = run_command(cmd2) return s1 and s2 def get_offset_data(lines=200): """Parses journalctl logs for ptp4l offset data.""" success, log_output = run_command(f"sudo journalctl -u {PTP_SERVICE} -n {lines} --no-pager") if not success: return [] offset_pattern = re.compile(r"master offset\s+(-?\d+)\s+s2") offsets = [] for line in log_output.splitlines(): match = offset_pattern.search(line) if match: offset_ns = int(match.group(1)) offsets.append(offset_ns / 1000.0) return offsets # --- Streamlit UI --- st.set_page_config(page_title=f"PTP4L - {HOSTNAME}", layout="wide") st.title(f"⏱️ Pi PTP Manager: {HOSTNAME}") is_running = get_service_status() # Sidebar: Status Dashboard st.sidebar.header(f"Status: {HOSTNAME}") if is_running: st.sidebar.success(f"{PTP_SERVICE} is RUNNING") config_role = get_configured_role() actual_state = get_actual_state() ts_mode = get_timestamping_mode() st.sidebar.metric(label="Timestamping Mode", value=ts_mode) st.sidebar.metric(label="Configured Mode", value=config_role) st.sidebar.metric(label="Actual Behavior", value=actual_state) if actual_state == "LISTENING": st.sidebar.caption("Node is currently listening to the network to determine the best master clock...") else: st.sidebar.error(f"{PTP_SERVICE} is STOPPED") st.sidebar.metric(label="Timestamping Mode", value=get_timestamping_mode()) st.sidebar.metric(label="Configured Mode", value=get_configured_role()) st.sidebar.metric(label="Actual Behavior", value="Offline") st.sidebar.divider() # Sidebar Controls st.sidebar.header("Service Controls") if is_running: if st.sidebar.button("Stop Service"): run_command(f"sudo systemctl stop {PTP_SERVICE}") st.rerun() else: if st.sidebar.button("Start Service"): run_command(f"sudo systemctl start {PTP_SERVICE}") st.rerun() if st.sidebar.button("Restart Service"): run_command(f"sudo systemctl restart {PTP_SERVICE}") st.rerun() st.sidebar.divider() # Role Configuration st.sidebar.header("Node Role Configuration") st.sidebar.write("Force this node's behavior (overrides BMCA):") col1, col2, col3 = st.sidebar.columns(3) if col1.button("Master"): if modify_ptp_role("Master"): run_command(f"sudo systemctl restart {PTP_SERVICE}") st.sidebar.success("Set to Master. Service restarted.") st.rerun() if col2.button("Slave"): if modify_ptp_role("Slave"): run_command(f"sudo systemctl restart {PTP_SERVICE}") st.sidebar.success("Set to Slave. Service restarted.") st.rerun() if col3.button("Auto"): modify_ptp_role("Auto") run_command(f"sudo systemctl restart {PTP_SERVICE}") st.sidebar.success("Set to Auto. Service restarted.") st.rerun() # Main Area: Plotting st.subheader(f"Live Offset from Master ({HOSTNAME})") @st.fragment(run_every=2) def update_graph(): offsets = get_offset_data(lines=300) if offsets: df = pd.DataFrame(offsets, columns=["Offset (us)"]) st.line_chart(df, color="#ff4b4b") st.caption(f"Showing last {len(offsets)} sync events. Note: If this node is acting as the Grandmaster, offsets will not appear.") else: st.info("No offset data found in recent logs. This node might be the Master, or it hasn't synced yet.") if is_running: update_graph() else: st.warning("Start the ptp4l service to see offset data.")