-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlog_server.py
More file actions
105 lines (99 loc) · 4.54 KB
/
log_server.py
File metadata and controls
105 lines (99 loc) · 4.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import threading
import json
from http.server import ThreadingHTTPServer, SimpleHTTPRequestHandler
from urllib.parse import urlparse
from io import BytesIO
import os
import psutil
from log_utils import read_logs, append_entry
from state_utils import read_state, write_state
class APIHandler(SimpleHTTPRequestHandler):
def do_GET(self):
parsed = urlparse(self.path)
if parsed.path == '/api/log':
self.send_response(200)
self.send_header('Content-Type', 'application/json; charset=utf-8')
self.end_headers()
msgs = read_logs()
self.wfile.write(json.dumps({'messages': msgs}, ensure_ascii=False).encode('utf-8'))
return
if parsed.path == '/api/state':
self.send_response(200)
self.send_header('Content-Type', 'application/json; charset=utf-8')
self.end_headers()
state = read_state()
self.wfile.write(json.dumps(state, ensure_ascii=False).encode('utf-8'))
return
return super().do_GET()
def do_POST(self):
parsed = urlparse(self.path)
if parsed.path == '/api/scan':
# Re-scan USB devices and partitions on demand
discos = psutil.disk_partitions()
usbs = [d.device.rstrip('\\') for d in discos if 'removable' in d.opts]
partitions = [
{
'device': d.device.rstrip('\\'),
'mountpoint': d.mountpoint,
'fstype': d.fstype,
'opts': d.opts,
}
for d in discos if 'fixed' in d.opts
]
state = write_state({'usb_devices': usbs, 'partitions': partitions})
self.send_response(200)
self.send_header('Content-Type', 'application/json; charset=utf-8')
self.end_headers()
self.wfile.write(json.dumps({'status':'ok','state': state}, ensure_ascii=False).encode('utf-8'))
return
if parsed.path == '/api/log':
length = int(self.headers.get('Content-Length', '0'))
body = self.rfile.read(length) if length else b''
try:
data = json.loads(body.decode('utf-8')) if body else {}
# If a full entry provided, append; otherwise expect {message,type}
if isinstance(data, dict) and 'message' in data:
entry = {
'id': int(__import__('time').time()),
'type': data.get('type', 'info'),
'message': data.get('message'),
'ts': data.get('ts') or __import__('datetime').datetime.utcnow().isoformat() + 'Z'
}
append_entry(entry)
self.send_response(201)
self.send_header('Content-Type', 'application/json; charset=utf-8')
self.end_headers()
self.wfile.write(json.dumps(entry, ensure_ascii=False).encode('utf-8'))
return
except Exception as e:
self.send_response(400)
self.send_header('Content-Type', 'application/json; charset=utf-8')
self.end_headers()
self.wfile.write(json.dumps({'error': str(e)}).encode('utf-8'))
return
if parsed.path == '/api/state':
length = int(self.headers.get('Content-Length', '0'))
body = self.rfile.read(length) if length else b''
try:
data = json.loads(body.decode('utf-8')) if body else {}
if isinstance(data, dict):
write_state(data)
self.send_response(200)
self.send_header('Content-Type', 'application/json; charset=utf-8')
self.end_headers()
self.wfile.write(json.dumps({'status':'ok','state': data}, ensure_ascii=False).encode('utf-8'))
return
except Exception as e:
self.send_response(400)
self.send_header('Content-Type', 'application/json; charset=utf-8')
self.end_headers()
self.wfile.write(json.dumps({'error': str(e)}).encode('utf-8'))
return
return super().do_POST() # type: ignore
def start_server(port=8000, host='127.0.0.1'):
cwd = os.path.dirname(__file__)
os.chdir(cwd)
server = ThreadingHTTPServer((host, port), APIHandler)
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()
return server