-
Notifications
You must be signed in to change notification settings - Fork 11
Expand file tree
/
Copy pathutils.py
More file actions
59 lines (46 loc) · 1.43 KB
/
utils.py
File metadata and controls
59 lines (46 loc) · 1.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import asyncio
import logging
import socket
import time
from jsonStream import FetchError, fetch
from status import ClientStatus
log = logging.getLogger("utils")
class PingError(Exception):
pass
async def ping(client):
try:
log.info("Ping {}:{}".format(client.ip, client.port))
response, _ = await fetch(client, {"request": "ping"}, timeout=0.1, retries=1)
try:
if response["response"] != "pong":
raise PingError("Not a Pong")
except KeyError as e:
raise PingError("Key '{}' Missing".format(e))
client.status = ClientStatus.READY
return True
except (OSError, PingError, FetchError) as e:
log.error(f"Ping Failed {client.ip}:{client.port} ({type(e).__name__}: {e})")
client.status = ClientStatus.LOST
return False
def clock(fps=60, period=None):
if period is None:
period = 1.0 / fps
frameStart = time.time()
async def tic():
nonlocal frameStart
frameTime = time.time() - frameStart
await asyncio.sleep(max(period - frameTime, 0))
frameStart = time.time()
return tic
def get_ip():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
s.settimeout(0)
try:
# doesn't even have to be reachable
s.connect(("10.254.254.254", 1))
IP = s.getsockname()[0]
except Exception:
IP = "127.0.0.1"
finally:
s.close()
return IP