This commit is contained in:
2026-05-02 13:07:20 +01:00
commit b9d560f214
+208
View File
@@ -0,0 +1,208 @@
import streamlit as st
import subprocess
import pandas as pd
import re
import socket
# --- Configuration ---
PTP_SERVICE = "ptp4l"
PTP_CONF_PATH = "/etc/linuxptp/ptp4l.conf" # Default path on Debian/Raspberry Pi OS
HOSTNAME = socket.gethostname() # Get the system hostname
# --- Helper Functions ---
def run_command(cmd):
"""Executes a shell command and returns the output."""
try:
result = subprocess.run(cmd, shell=True, check=True, capture_output=True, text=True)
return True, result.stdout
except subprocess.CalledProcessError as e:
return False, e.stderr
def get_service_status():
"""Checks if the ptp4l systemd service is active."""
success, output = run_command(f"systemctl is-active {PTP_SERVICE}")
return "active" in output.strip()
def get_configured_role():
"""Reads the ptp4l.conf file to determine the forced role."""
success, output = run_command(f"cat {PTP_CONF_PATH}")
if not success:
return "Unknown"
server_only = "0"
client_only = "0"
for line in output.splitlines():
if line.startswith("serverOnly"):
server_only = line.split()[1]
elif line.startswith("clientOnly") or line.startswith("slaveOnly"):
client_only = line.split()[1]
if server_only == "1":
return "Forced Master"
elif client_only == "1":
return "Forced Slave"
else:
return "Auto (BMCA)"
def get_actual_state():
"""Uses pmc to get the current actual state of the physical PTP port."""
success, output = run_command("sudo pmc -u -b 0 'GET PORT_DATA_SET'")
if not success:
return "Offline/Unavailable"
states = re.findall(r"portState\s+(\w+)", output)
if not states:
return "Initializing..."
if "MASTER" in states:
return "MASTER"
elif "SLAVE" in states:
return "SLAVE"
elif "UNCALIBRATED" in states:
return "UNCALIBRATED"
elif "PRE_MASTER" in states:
return "PRE_MASTER"
elif "FAULTY" in states:
return "FAULTY"
else:
return "LISTENING"
def get_timestamping_mode():
"""Checks the logs (or config file) to see if hardware or software timestamping is active."""
cmd = f"sudo journalctl -u {PTP_SERVICE} --no-pager | grep 'selected .* as PTP clock' | tail -n 1"
success, output = run_command(cmd)
if success and output.strip():
if "/dev/ptp" in output:
return "Hardware (PHC)"
elif "CLOCK_REALTIME" in output:
return "Software"
success, conf_output = run_command(f"grep 'time_stamping' {PTP_CONF_PATH}")
if success and conf_output.strip():
if "software" in conf_output:
return "Software"
elif "hardware" in conf_output:
return "Hardware (PHC)"
return "Hardware (Default)"
def modify_ptp_role(role):
"""Modifies the ptp4l.conf file to force a specific role."""
server_only = "1" if role == "Master" else "0"
client_only = "1" if role == "Slave" else "0"
cmd1 = f"sudo sed -i 's/^serverOnly.*/serverOnly {server_only}/g' {PTP_CONF_PATH}"
cmd2 = f"sudo sed -i 's/^clientOnly.*/clientOnly {client_only}/g' {PTP_CONF_PATH}"
s1, _ = run_command(cmd1)
s2, _ = run_command(cmd2)
return s1 and s2
def get_offset_data(lines=200):
"""Parses journalctl logs for ptp4l offset data."""
success, log_output = run_command(f"sudo journalctl -u {PTP_SERVICE} -n {lines} --no-pager")
if not success:
return []
offset_pattern = re.compile(r"master offset\s+(-?\d+)\s+s2")
offsets = []
for line in log_output.splitlines():
match = offset_pattern.search(line)
if match:
offset_ns = int(match.group(1))
offsets.append(offset_ns / 1000.0)
return offsets
# --- Streamlit UI ---
st.set_page_config(page_title=f"PTP4L - {HOSTNAME}", layout="wide")
st.title(f"⏱️ Pi PTP Manager: {HOSTNAME}")
is_running = get_service_status()
# Sidebar: Status Dashboard
st.sidebar.header(f"Status: {HOSTNAME}")
if is_running:
st.sidebar.success(f"{PTP_SERVICE} is RUNNING")
config_role = get_configured_role()
actual_state = get_actual_state()
ts_mode = get_timestamping_mode()
st.sidebar.metric(label="Timestamping Mode", value=ts_mode)
st.sidebar.metric(label="Configured Mode", value=config_role)
st.sidebar.metric(label="Actual Behavior", value=actual_state)
if actual_state == "LISTENING":
st.sidebar.caption("Node is currently listening to the network to determine the best master clock...")
else:
st.sidebar.error(f"{PTP_SERVICE} is STOPPED")
st.sidebar.metric(label="Timestamping Mode", value=get_timestamping_mode())
st.sidebar.metric(label="Configured Mode", value=get_configured_role())
st.sidebar.metric(label="Actual Behavior", value="Offline")
st.sidebar.divider()
# Sidebar Controls
st.sidebar.header("Service Controls")
if is_running:
if st.sidebar.button("Stop Service"):
run_command(f"sudo systemctl stop {PTP_SERVICE}")
st.rerun()
else:
if st.sidebar.button("Start Service"):
run_command(f"sudo systemctl start {PTP_SERVICE}")
st.rerun()
if st.sidebar.button("Restart Service"):
run_command(f"sudo systemctl restart {PTP_SERVICE}")
st.rerun()
st.sidebar.divider()
# Role Configuration
st.sidebar.header("Node Role Configuration")
st.sidebar.write("Force this node's behavior (overrides BMCA):")
col1, col2, col3 = st.sidebar.columns(3)
if col1.button("Master"):
if modify_ptp_role("Master"):
run_command(f"sudo systemctl restart {PTP_SERVICE}")
st.sidebar.success("Set to Master. Service restarted.")
st.rerun()
if col2.button("Slave"):
if modify_ptp_role("Slave"):
run_command(f"sudo systemctl restart {PTP_SERVICE}")
st.sidebar.success("Set to Slave. Service restarted.")
st.rerun()
if col3.button("Auto"):
modify_ptp_role("Auto")
run_command(f"sudo systemctl restart {PTP_SERVICE}")
st.sidebar.success("Set to Auto. Service restarted.")
st.rerun()
# Main Area: Plotting
st.subheader(f"Live Offset from Master ({HOSTNAME})")
@st.fragment(run_every=2)
def update_graph():
offsets = get_offset_data(lines=300)
if offsets:
df = pd.DataFrame(offsets, columns=["Offset (us)"])
st.line_chart(df, color="#ff4b4b")
st.caption(f"Showing last {len(offsets)} sync events. Note: If this node is acting as the Grandmaster, offsets will not appear.")
else:
st.info("No offset data found in recent logs. This node might be the Master, or it hasn't synced yet.")
if is_running:
update_graph()
else:
st.warning("Start the ptp4l service to see offset data.")