-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathping_collect.py
More file actions
187 lines (161 loc) · 5.84 KB
/
ping_collect.py
File metadata and controls
187 lines (161 loc) · 5.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
#!/usr/bin/env python3
import argparse
import csv
import json
import os
import re
import subprocess
import sys
import time
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple
TIME_RE = re.compile(r"time[=<]([0-9.]+)\s*ms")
@dataclass
class Location:
name: str
address: str
def build_ping_command(address: str, timeout_s: float) -> List[str]:
if sys.platform.startswith("linux"):
return ["ping", "-c", "1", "-W", str(int(timeout_s)), address]
if sys.platform.startswith("darwin"):
return ["ping", "-c", "1", "-W", str(int(timeout_s * 1000)), address]
return ["ping", "-n", "1", "-w", str(int(timeout_s * 1000)), address]
def parse_latency_ms(output: str) -> Optional[float]:
match = TIME_RE.search(output)
if not match:
return None
return float(match.group(1))
def ping_once(address: str, timeout_s: float) -> Optional[float]:
cmd = build_ping_command(address, timeout_s)
try:
result = subprocess.run(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
check=False,
text=True,
)
except FileNotFoundError as exc:
raise RuntimeError("ping command not found on PATH") from exc
return parse_latency_ms(result.stdout)
def gather_latencies(
locations: List[Location],
duration_s: float,
timeout_s: float,
) -> Dict[str, List[Tuple[float, float]]]:
latencies: Dict[str, List[Tuple[float, float]]] = {loc.address: [] for loc in locations}
start_time = time.monotonic()
end_time = start_time + duration_s
while time.monotonic() < end_time:
for loc in locations:
if time.monotonic() >= end_time:
break
latency = ping_once(loc.address, timeout_s)
if latency is not None:
elapsed = time.monotonic() - start_time
latencies[loc.address].append((elapsed, latency))
print(f"{loc.address} t={elapsed:.2f}s latency={latency:.2f}ms")
if time.monotonic() < end_time:
time.sleep(1.0)
return latencies
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="Ping a set of locations and save latency samples.")
parser.add_argument(
"--addresses-file",
dest="addresses_file",
default="addresses.json",
help="Path to a JSON file containing a list of locations",
)
parser.add_argument(
"--duration",
type=float,
default=60.0,
help="Duration to ping in seconds",
)
parser.add_argument(
"--timeout",
type=float,
default=1.0,
help="Ping timeout in seconds",
)
parser.add_argument(
"--output-dir",
default="ping_output",
help="Output directory for data files",
)
return parser
def parse_args() -> argparse.Namespace:
return build_parser().parse_args()
def load_locations(addresses_file: str) -> List[Location]:
with open(addresses_file, "r", encoding="utf-8") as handle:
data = handle.read()
try:
entries = json.loads(data)
except json.JSONDecodeError as exc:
raise RuntimeError(f"Invalid JSON in {addresses_file}") from exc
if not isinstance(entries, list):
raise RuntimeError("addresses.json must contain a JSON array of objects")
if not entries:
raise RuntimeError("No addresses provided")
locations: List[Location] = []
for entry in entries:
if not isinstance(entry, dict):
raise RuntimeError("Each address entry must be a JSON object")
name = entry.get("name")
address = entry.get("address")
if not isinstance(name, str) or not isinstance(address, str):
raise RuntimeError("Each entry must include string fields: name and address")
locations.append(Location(name=name, address=address))
return locations
def save_json(
output_path: str,
locations: List[Location],
latencies: Dict[str, List[Tuple[float, float]]],
duration_s: float,
) -> None:
location_payload = [{"name": loc.name, "address": loc.address} for loc in locations]
samples = []
for loc in locations:
for elapsed, latency in latencies[loc.address]:
samples.append(
{
"name": loc.name,
"address": loc.address,
"elapsed_s": round(elapsed, 6),
"latency_ms": round(latency, 6),
}
)
payload = {
"duration_s": duration_s,
"locations": location_payload,
"samples": samples,
}
with open(output_path, "w", encoding="utf-8") as handle:
json.dump(payload, handle, indent=2)
def save_csv(
output_path: str,
locations: List[Location],
latencies: Dict[str, List[Tuple[float, float]]],
) -> None:
with open(output_path, "w", encoding="utf-8", newline="") as handle:
writer = csv.writer(handle)
writer.writerow(["name", "address", "elapsed_s", "latency_ms"])
for loc in locations:
for elapsed, latency in latencies[loc.address]:
writer.writerow([loc.name, loc.address, f"{elapsed:.6f}", f"{latency:.6f}"])
def main() -> int:
if len(sys.argv) == 1:
build_parser().print_help()
return 2
args = parse_args()
locations = load_locations(args.addresses_file)
latencies = gather_latencies(locations, args.duration, args.timeout)
os.makedirs(args.output_dir, exist_ok=True)
json_path = os.path.join(args.output_dir, "ping_samples.json")
csv_path = os.path.join(args.output_dir, "ping_samples.csv")
save_json(json_path, locations, latencies, args.duration)
save_csv(csv_path, locations, latencies)
print(f"Saved data to {json_path} and {csv_path}")
return 0
if __name__ == "__main__":
raise SystemExit(main())