Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions src/manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
# app/manager.py
import asyncio
from starlette.websockets import WebSocket
from typing import Set
import logging

logger = logging.getLogger("ticket_scans.manager")

class TicketScanManager:
def __init__(self):
# set of active WebSocket connections
self.active_connections: Set[WebSocket] = set()
self._lock = asyncio.Lock()

async def connect(self, websocket: WebSocket):
await websocket.accept()
async with self._lock:
self.active_connections.add(websocket)
logger.info("WebSocket connected. total connections=%d", len(self.active_connections))

async def disconnect(self, websocket: WebSocket):
async with self._lock:
if websocket in self.active_connections:
self.active_connections.remove(websocket)
logger.info("WebSocket disconnected. total connections=%d", len(self.active_connections))

async def broadcast_scan(self, scan_payload: dict):
"""
Broadcasts a scan payload (dict) to all connected clients.
This awaits send_json on each WebSocket. It removes any dead websockets.
"""
async with self._lock:
connections = list(self.active_connections)

if not connections:
logger.info("Broadcast called but no active connections.")
return

logger.info("Broadcasting scan to %d connection(s).", len(connections))
to_remove = []
for ws in connections:
try:
await ws.send_json(scan_payload)
except Exception as e:
logger.exception("Error sending to websocket; scheduling removal. Error: %s", e)
to_remove.append(ws)

if to_remove:
async with self._lock:
for ws in to_remove:
self.active_connections.discard(ws)
logger.info("Removed %d dead connection(s) after broadcast.", len(to_remove))
11 changes: 11 additions & 0 deletions src/schemas.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# app/schemas.py
from pydantic import BaseModel
from typing import Optional
from datetime import datetime

class TicketScan(BaseModel):
ticket_id: str
event_id: str
scanner_id: Optional[str] = None
timestamp: datetime
meta: Optional[dict] = None
62 changes: 62 additions & 0 deletions src/websocket.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
# app/main.py
import logging
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, APIRouter
from fastapi.responses import JSONResponse
from app.manager import TicketScanManager, logger as manager_logger
from app.schemas import TicketScan
from datetime import datetime

# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s [%(name)s] %(message)s"
)
logger = logging.getLogger("ticket_scans.app")
# Keep manager logger consistent
manager_logger.setLevel(logging.INFO)

app = FastAPI(title="Ticket Scans WebSocket Service")
router = APIRouter()
manager = TicketScanManager()

@router.websocket("/ws/ticket-scans")
async def websocket_ticket_scans(ws: WebSocket):
"""
WebSocket endpoint that keeps connection open and sends scans when they are broadcast.
"""
await manager.connect(ws)
try:
# Keep the connection alive; optionally handle incoming messages if needed.
while True:
# Wait for any message from client; if you don't expect messages you can await ws.receive_text()
# but we will use receive to detect disconnects from client side.
try:
data = await ws.receive_text()
# For now, we simply ignore messages from clients but log them
logger.info("Received message from client (ignored): %s", data)
except Exception:
# The client may close the connection — break to disconnect and cleanup
break
except WebSocketDisconnect:
logger.info("Client disconnected via WebSocketDisconnect.")
except Exception as e:
logger.exception("Unexpected error in websocket loop: %s", e)
finally:
await manager.disconnect(ws)

@router.post("/scans", response_class=JSONResponse)
async def post_scan(scan: TicketScan):
"""
POST endpoint to accept a scan and broadcast it to clients.
In production, scanning devices/services would typically call this API when a ticket is scanned,
or you would call manager.broadcast_scan from inside your event pipeline.
"""
payload = scan.dict()
# Optionally add server-received timestamp
payload.setdefault("server_received_at", datetime.utcnow().isoformat())
# Broadcast but don't block the response when there are many clients (we await because manager.broadcast_scan is async)
await manager.broadcast_scan(payload)
logger.info("Received scan for ticket_id=%s event_id=%s", scan.ticket_id, scan.event_id)
return {"ok": True}

app.include_router(router)
68 changes: 68 additions & 0 deletions tests/test_websocket.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# tests/test_websocket.py
import pytest
from fastapi.testclient import TestClient
from app.main import app, manager
from datetime import datetime
import time
import threading
import json
import logging

@pytest.fixture
def client():
return TestClient(app)

def _trigger_scan_via_post(client, scan):
client.post("/scans", json=scan)

def test_websocket_receives_broadcast(client):
scan = {
"ticket_id": "TICKET-123",
"event_id": "EVENT-1",
"scanner_id": "GATE-A",
"timestamp": datetime.utcnow().isoformat(),
"meta": {"seat": "A1"}
}

with client.websocket_connect("/ws/ticket-scans") as websocket:
# Start a thread that calls POST /scans after a short delay,
# to simulate an external scan arriving while WS is open.
t = threading.Timer(0.1, _trigger_scan_via_post, args=(client, scan))
t.start()

# Receive JSON message from websocket; timeout will raise if not received
data = websocket.receive_json(timeout=5)
# Verify payload contains expected fields
assert data["ticket_id"] == scan["ticket_id"]
assert data["event_id"] == scan["event_id"]
assert data["meta"]["seat"] == "A1"
t.cancel()

def test_connection_logging_is_emitted(client, caplog):
caplog.set_level(logging.INFO)
scan = {
"ticket_id": "TICKET-888",
"event_id": "EVENT-LOG",
"timestamp": datetime.utcnow().isoformat(),
}

with client.websocket_connect("/ws/ticket-scans"):
# When connection established, manager logs a connect message
# Give small time for logger to emit
time.sleep(0.05)
# Ensure connect logged
found_connect = any("WebSocket connected" in rec.message for rec in caplog.records)
assert found_connect, "connect log not found; logs: %s" % [r.message for r in caplog.records]

# After context manager exits, disconnect log should exist
found_disconnect = any("WebSocket disconnected" in rec.message for rec in caplog.records)
assert found_disconnect, "disconnect log not found; logs: %s" % [r.message for r in caplog.records]

# Now connect again and trigger a scan; check broadcast log exists
with client.websocket_connect("/ws/ticket-scans"):
# trigger a scan
client.post("/scans", json=scan)
time.sleep(0.05)

found_broadcast = any("Broadcasting scan" in rec.message or "Received scan for ticket_id" in rec.message for rec in caplog.records)
assert found_broadcast, "broadcast not logged; logs: %s" % [r.message for r in caplog.records]
Loading