-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtest_websocket.py
More file actions
24 lines (22 loc) · 846 Bytes
/
test_websocket.py
File metadata and controls
24 lines (22 loc) · 846 Bytes
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import asyncio
import websockets
import json
async def test_websocket():
try:
uri = 'ws://localhost:8000/ws'
async with websockets.connect(uri) as websocket:
print('Connected to WebSocket')
# Wait for a few messages
for i in range(5):
try:
message = await asyncio.wait_for(websocket.recv(), timeout=3)
data = json.loads(message)
print(f'Message {i+1}: {data}')
except asyncio.TimeoutError:
print(f'Timeout waiting for message {i+1}')
except Exception as e:
print(f'Error receiving message {i+1}: {e}')
except Exception as e:
print(f'WebSocket connection error: {e}')
if __name__ == "__main__":
asyncio.run(test_websocket())