From cfd8f2212baa19ca08edec2a03d6acf294114133 Mon Sep 17 00:00:00 2001 From: Yusuf Falade Date: Sat, 4 Oct 2025 04:34:08 +0100 Subject: [PATCH] websocket service real time ticket scans --- src/manager.py | 52 +++++++++++++++++++++++++++++++ src/schemas.py | 11 +++++++ src/websocket.py | 62 +++++++++++++++++++++++++++++++++++++ tests/test_websocket.py | 68 +++++++++++++++++++++++++++++++++++++++++ 4 files changed, 193 insertions(+) create mode 100644 src/manager.py create mode 100644 src/schemas.py create mode 100644 src/websocket.py create mode 100644 tests/test_websocket.py diff --git a/src/manager.py b/src/manager.py new file mode 100644 index 0000000..28e0969 --- /dev/null +++ b/src/manager.py @@ -0,0 +1,52 @@ +# app/manager.py +import asyncio +from starlette.websockets import WebSocket +from typing import Set +import logging + +logger = logging.getLogger("ticket_scans.manager") + +class TicketScanManager: + def __init__(self): + # set of active WebSocket connections + self.active_connections: Set[WebSocket] = set() + self._lock = asyncio.Lock() + + async def connect(self, websocket: WebSocket): + await websocket.accept() + async with self._lock: + self.active_connections.add(websocket) + logger.info("WebSocket connected. total connections=%d", len(self.active_connections)) + + async def disconnect(self, websocket: WebSocket): + async with self._lock: + if websocket in self.active_connections: + self.active_connections.remove(websocket) + logger.info("WebSocket disconnected. total connections=%d", len(self.active_connections)) + + async def broadcast_scan(self, scan_payload: dict): + """ + Broadcasts a scan payload (dict) to all connected clients. + This awaits send_json on each WebSocket. It removes any dead websockets. + """ + async with self._lock: + connections = list(self.active_connections) + + if not connections: + logger.info("Broadcast called but no active connections.") + return + + logger.info("Broadcasting scan to %d connection(s).", len(connections)) + to_remove = [] + for ws in connections: + try: + await ws.send_json(scan_payload) + except Exception as e: + logger.exception("Error sending to websocket; scheduling removal. Error: %s", e) + to_remove.append(ws) + + if to_remove: + async with self._lock: + for ws in to_remove: + self.active_connections.discard(ws) + logger.info("Removed %d dead connection(s) after broadcast.", len(to_remove)) diff --git a/src/schemas.py b/src/schemas.py new file mode 100644 index 0000000..44919ed --- /dev/null +++ b/src/schemas.py @@ -0,0 +1,11 @@ +# app/schemas.py +from pydantic import BaseModel +from typing import Optional +from datetime import datetime + +class TicketScan(BaseModel): + ticket_id: str + event_id: str + scanner_id: Optional[str] = None + timestamp: datetime + meta: Optional[dict] = None diff --git a/src/websocket.py b/src/websocket.py new file mode 100644 index 0000000..35d5276 --- /dev/null +++ b/src/websocket.py @@ -0,0 +1,62 @@ +# app/main.py +import logging +from fastapi import FastAPI, WebSocket, WebSocketDisconnect, APIRouter +from fastapi.responses import JSONResponse +from app.manager import TicketScanManager, logger as manager_logger +from app.schemas import TicketScan +from datetime import datetime + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)s [%(name)s] %(message)s" +) +logger = logging.getLogger("ticket_scans.app") +# Keep manager logger consistent +manager_logger.setLevel(logging.INFO) + +app = FastAPI(title="Ticket Scans WebSocket Service") +router = APIRouter() +manager = TicketScanManager() + +@router.websocket("/ws/ticket-scans") +async def websocket_ticket_scans(ws: WebSocket): + """ + WebSocket endpoint that keeps connection open and sends scans when they are broadcast. + """ + await manager.connect(ws) + try: + # Keep the connection alive; optionally handle incoming messages if needed. + while True: + # Wait for any message from client; if you don't expect messages you can await ws.receive_text() + # but we will use receive to detect disconnects from client side. + try: + data = await ws.receive_text() + # For now, we simply ignore messages from clients but log them + logger.info("Received message from client (ignored): %s", data) + except Exception: + # The client may close the connection — break to disconnect and cleanup + break + except WebSocketDisconnect: + logger.info("Client disconnected via WebSocketDisconnect.") + except Exception as e: + logger.exception("Unexpected error in websocket loop: %s", e) + finally: + await manager.disconnect(ws) + +@router.post("/scans", response_class=JSONResponse) +async def post_scan(scan: TicketScan): + """ + POST endpoint to accept a scan and broadcast it to clients. + In production, scanning devices/services would typically call this API when a ticket is scanned, + or you would call manager.broadcast_scan from inside your event pipeline. + """ + payload = scan.dict() + # Optionally add server-received timestamp + payload.setdefault("server_received_at", datetime.utcnow().isoformat()) + # Broadcast but don't block the response when there are many clients (we await because manager.broadcast_scan is async) + await manager.broadcast_scan(payload) + logger.info("Received scan for ticket_id=%s event_id=%s", scan.ticket_id, scan.event_id) + return {"ok": True} + +app.include_router(router) diff --git a/tests/test_websocket.py b/tests/test_websocket.py new file mode 100644 index 0000000..2629914 --- /dev/null +++ b/tests/test_websocket.py @@ -0,0 +1,68 @@ +# tests/test_websocket.py +import pytest +from fastapi.testclient import TestClient +from app.main import app, manager +from datetime import datetime +import time +import threading +import json +import logging + +@pytest.fixture +def client(): + return TestClient(app) + +def _trigger_scan_via_post(client, scan): + client.post("/scans", json=scan) + +def test_websocket_receives_broadcast(client): + scan = { + "ticket_id": "TICKET-123", + "event_id": "EVENT-1", + "scanner_id": "GATE-A", + "timestamp": datetime.utcnow().isoformat(), + "meta": {"seat": "A1"} + } + + with client.websocket_connect("/ws/ticket-scans") as websocket: + # Start a thread that calls POST /scans after a short delay, + # to simulate an external scan arriving while WS is open. + t = threading.Timer(0.1, _trigger_scan_via_post, args=(client, scan)) + t.start() + + # Receive JSON message from websocket; timeout will raise if not received + data = websocket.receive_json(timeout=5) + # Verify payload contains expected fields + assert data["ticket_id"] == scan["ticket_id"] + assert data["event_id"] == scan["event_id"] + assert data["meta"]["seat"] == "A1" + t.cancel() + +def test_connection_logging_is_emitted(client, caplog): + caplog.set_level(logging.INFO) + scan = { + "ticket_id": "TICKET-888", + "event_id": "EVENT-LOG", + "timestamp": datetime.utcnow().isoformat(), + } + + with client.websocket_connect("/ws/ticket-scans"): + # When connection established, manager logs a connect message + # Give small time for logger to emit + time.sleep(0.05) + # Ensure connect logged + found_connect = any("WebSocket connected" in rec.message for rec in caplog.records) + assert found_connect, "connect log not found; logs: %s" % [r.message for r in caplog.records] + + # After context manager exits, disconnect log should exist + found_disconnect = any("WebSocket disconnected" in rec.message for rec in caplog.records) + assert found_disconnect, "disconnect log not found; logs: %s" % [r.message for r in caplog.records] + + # Now connect again and trigger a scan; check broadcast log exists + with client.websocket_connect("/ws/ticket-scans"): + # trigger a scan + client.post("/scans", json=scan) + time.sleep(0.05) + + found_broadcast = any("Broadcasting scan" in rec.message or "Received scan for ticket_id" in rec.message for rec in caplog.records) + assert found_broadcast, "broadcast not logged; logs: %s" % [r.message for r in caplog.records]