1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
|
import os, json, time, requests, io, sys
NTFY_ENDPOINT = os.environ["NTFY_ENDPOINT"]
NTFY_TOKEN = os.environ["NTFY_TOKEN"]
TITLE_PREFIX = os.getenv("NTFY_TITLE_PREFIX", "iperf3")
MIN_DURATION = int(os.getenv("MIN_DURATION_SECONDS", "0"))
LOGFILE = "/logs/iperf3.jsonl"
def iter_json_objects(fobj: io.TextIOBase):
buf, depth, in_string, escape = [], 0, False, False
while True:
chunk = fobj.read()
if not chunk:
time.sleep(0.3)
continue
for ch in chunk:
buf.append(ch)
if ch == '"' and not escape:
in_string = not in_string
if ch == '\\' and not escape:
escape = True
else:
escape = False
if not in_string:
if ch == '{':
depth += 1
elif ch == '}':
depth -= 1
if depth == 0:
s = ''.join(buf).strip()
buf.clear()
if s:
yield s
def human_mbps(bits_per_second: float | None) -> str:
return "n/a" if bits_per_second is None else f"{bits_per_second/1e6:.2f} Mbit/s"
def extract_client_ip(d: dict) -> str:
try:
return d["start"]["connected"][0]["remote_host"]
except Exception:
return d.get("remote_host", "unknown")
def build_message(d: dict) -> tuple[str, str]:
client_ip = extract_client_ip(d)
test = d.get("start", {}).get("test_start", {})
proto = str(test.get("protocol", "TCP")).upper()
duration = test.get("duration")
streams = test.get("num_streams")
if MIN_DURATION and duration and duration < MIN_DURATION:
raise ValueError(f"Ignored: duration {duration}s < {MIN_DURATION}s")
end = d.get("end", {})
body_lines = [f"Gegenstelle: {client_ip}", f"Protokoll: {proto}"]
if proto == "UDP":
s = end.get("sum") or end.get("sum_received") or {}
bps = s.get("bits_per_second")
jitter = s.get("jitter_ms")
loss = s.get("lost_percent")
body_lines.append(f"Speed: {human_mbps(bps)}")
if jitter is not None: body_lines.append(f"Jitter: {jitter} ms")
if loss is not None: body_lines.append(f"Loss: {loss}%")
else:
recv = end.get("sum_received", {})
sent = end.get("sum_sent", {})
bps = recv.get("bits_per_second") or sent.get("bits_per_second")
retrans = (sent or {}).get("retransmits")
body_lines.append(f"Throughput: {human_mbps(bps)}")
if retrans is not None:
body_lines.append(f"Retransmits: {retrans}")
if duration: body_lines.append(f"Dauer: {duration}s")
if streams: body_lines.append(f"Streams: {streams}")
title = f"{TITLE_PREFIX} durch {client_ip} erfolgt"
return title, "\n".join(body_lines)
def publish_to_ntfy(title: str, body: str):
headers = {
"Authorization": f"Bearer {NTFY_TOKEN}",
"Title": title,
"Priority": "default",
}
resp = requests.post(NTFY_ENDPOINT, data=body.encode("utf-8"), headers=headers, timeout=10)
resp.raise_for_status()
def tail_file(path: str):
while not os.path.exists(path):
time.sleep(0.5)
with open(path, "r", encoding="utf-8", errors="ignore") as f:
f.seek(0, io.SEEK_END)
for raw in iter_json_objects(f):
try:
data = json.loads(raw)
title, body = build_message(data)
publish_to_ntfy(title, body)
print(f"[ntfy] Sent: {title}")
except ValueError as skip:
print(f"[parser] Skip: {skip}")
except requests.HTTPError as he:
print(f"[ntfy] HTTPError: {he}", file=sys.stderr)
except Exception as e:
print(f"[parser] Fehler: {e}", file=sys.stderr)
if __name__ == "__main__":
tail_file(LOGFILE)
|