diff --git a/CLAUDE.md b/CLAUDE.md index b4cc3ff..1f4ae80 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -18,7 +18,7 @@ docker compose down # Stop emulators export FIRESTORE_EMULATOR_HOST="localhost:8080" export PUBSUB_EMULATOR_HOST="localhost:8085" export GCP_PROJECT_ID="test-project" -export EVENTKIT_QUEUE_MODE="direct" # or "async" or "pubsub" +export EVENTKIT_QUEUE_MODE="async" # or "pubsub" uv run uvicorn eventkit.api.app:app --reload --port 8000 # Testing @@ -70,12 +70,78 @@ class Processor: ``` Queues call the processor: -- **DirectQueue**: Calls `process_event()` immediately (inline) -- **AsyncQueue**: Workers call `process_event()` from `asyncio.Queue` -- **PubSubQueue**: Subscribers call `process_event()` from Pub/Sub +- **AsyncQueue**: Workers call `process_event()` from internal `asyncio.Queue` +- **PubSubQueue**: Subscribers call `process_event()` from Pub/Sub messages Factory pattern (`create_queue()`) selects queue based on `EVENTKIT_QUEUE_MODE`. +### Ring Buffer (Write-Ahead Log) + +**All queues use a ring buffer for durability** - events are never lost even if the service crashes: + +```python +# Architecture: API → ring buffer → publisher → queue → workers → storage +class AsyncQueue: + def __init__(self, processor: Processor, ring_buffer: RingBuffer, ...): + self.processor = processor + self.ring_buffer = ring_buffer + self._internal_queue: asyncio.Queue[RawEvent] = asyncio.Queue() + self._publisher: RingBufferPublisher | None = None + + async def enqueue(self, event: RawEvent) -> None: + """Write to ring buffer (durable, synchronous).""" + self.ring_buffer.write(event) # WAL - never lost + + async def start(self) -> None: + """Start processor + ring buffer publisher + workers.""" + await self.processor.start() + + # Publisher bridges ring buffer → internal queue + self._publisher = RingBufferPublisher( + ring_buffer=self.ring_buffer, + queue=InternalQueueAdapter(self._internal_queue), + event_loop=asyncio.get_running_loop(), + ) + self._publisher.start() # Background thread + + # Start workers (pull from internal queue) + for i in range(self.num_workers): + asyncio.create_task(self._worker(i)) + + async def stop(self) -> None: + """Graceful shutdown: drain ring buffer + internal queue.""" + if self._publisher: + self._publisher.stop(timeout=10.0) # Drains ring buffer + await self._internal_queue.join() # Drain internal queue + await self.processor.stop() +``` + +**Key Points**: +- **Ring buffer is hidden from API** - only queue knows about it +- **Synchronous writes** - No await on hot path (SQLite is fast) +- **Background publisher** - Moves events from ring buffer → internal queue (separate thread) +- **Graceful shutdown** - Publisher drains remaining events before stopping +- **Cleanup worker** - Removes old published events (time + size based) + +**Why SQLite?** +- Local durability (no network on hot path) +- WAL mode for concurrent reads/writes +- Production-proven (similar to Lytics' BoltDB, Kafka's commit log) +- Zero dependencies + +**Ring Buffer Protocol** (pluggable): +```python +class RingBuffer(Protocol): + def write(self, event: RawEvent) -> int: ... + def fetch_unpublished(self, limit: int) -> list[RingBufferEntry]: ... + def mark_published(self, ids: list[int]) -> None: ... + def delete_old_published(self, max_age_hours: int) -> int: ... + def delete_oldest_published(self, keep_count: int) -> int: ... +``` + +Current implementations: **SQLiteRingBuffer** (default) +Future: Cloud Tasks, PostgreSQL, etc. + ### Two-Phase Event Model 1. **RawEvent** (flexible): Accept any JSON at `/collect` endpoint @@ -117,12 +183,14 @@ Lifespan manager handles queue lifecycle: ```python @asynccontextmanager async def lifespan(app: FastAPI): - queue = get_queue() - await queue.start() # Start workers, buffer flusher + queue = get_queue() # Queue internally creates ring buffer + await queue.start() # Start ring buffer publisher, workers, buffer flusher yield - await queue.stop() # Drain queue, flush buffers + await queue.stop() # Drain ring buffer, drain queue, flush buffers ``` +The queue manages the ring buffer internally - the API doesn't need to know about it. + ### Protocols Over ABCs Use `Protocol` for interfaces (structural typing): diff --git a/LOCAL_DEV.md b/LOCAL_DEV.md index ce179a8..e2d31bc 100644 --- a/LOCAL_DEV.md +++ b/LOCAL_DEV.md @@ -26,36 +26,31 @@ uv sync ### 3. Run the API Server -**Option A: Direct Queue Mode (default, inline processing)** -```bash -export FIRESTORE_EMULATOR_HOST="localhost:8080" -export GCP_PROJECT_ID="test-project" -export EVENTKIT_QUEUE_MODE="direct" - -uv run uvicorn eventkit.api.app:app --reload --port 8000 -``` - -**Option B: Async Queue Mode (in-process workers)** +**Option A: Async Queue Mode (default, in-process workers + ring buffer)** ```bash export FIRESTORE_EMULATOR_HOST="localhost:8080" export GCP_PROJECT_ID="test-project" export EVENTKIT_QUEUE_MODE="async" export EVENTKIT_ASYNC_WORKERS="4" +export EVENTKIT_RING_BUFFER_DB_PATH="./eventkit_ring_buffer.db" uv run uvicorn eventkit.api.app:app --reload --port 8000 ``` -**Option C: Pub/Sub Queue Mode (distributed workers)** +**Option B: Pub/Sub Queue Mode (distributed workers + ring buffer)** ```bash export FIRESTORE_EMULATOR_HOST="localhost:8080" export PUBSUB_EMULATOR_HOST="localhost:8085" export GCP_PROJECT_ID="test-project" export EVENTKIT_QUEUE_MODE="pubsub" export EVENTKIT_PUBSUB_WORKERS="4" +export EVENTKIT_RING_BUFFER_DB_PATH="./eventkit_ring_buffer.db" uv run uvicorn eventkit.api.app:app --reload --port 8000 ``` +> **Note:** The ring buffer (Write-Ahead Log) is always enabled for durability. Events are persisted to SQLite before processing, ensuring no data loss even if the service crashes. + The API will be available at `http://localhost:8000`. ### 4. Test the API @@ -131,10 +126,42 @@ See `src/eventkit/config.py` for all available settings. | `GCP_PROJECT_ID` | *required* | GCP project ID | | `FIRESTORE_EMULATOR_HOST` | - | Firestore emulator address (e.g., `localhost:8080`) | | `PUBSUB_EMULATOR_HOST` | - | Pub/Sub emulator address (e.g., `localhost:8085`) | -| `EVENTKIT_QUEUE_MODE` | `"direct"` | Queue mode: `direct`, `async`, `pubsub` | +| **Queue Mode** ||| +| `EVENTKIT_QUEUE_MODE` | `"async"` | Queue mode: `async`, `pubsub` | | `EVENTKIT_ASYNC_WORKERS` | `4` | Number of async workers (async mode) | | `EVENTKIT_PUBSUB_WORKERS` | `4` | Number of Pub/Sub workers (pubsub mode) | +| **Ring Buffer (Write-Ahead Log)** ||| +| `EVENTKIT_RING_BUFFER_MODE` | `"sqlite"` | Ring buffer implementation (currently: `sqlite`) | +| `EVENTKIT_RING_BUFFER_DB_PATH` | `"eventkit_ring_buffer.db"` | Path to SQLite database file | +| `EVENTKIT_RING_BUFFER_MAX_SIZE` | `100000` | Max published events to keep (size-based cleanup) | +| `EVENTKIT_RING_BUFFER_RETENTION_HOURS` | `24` | Max age for published events (time-based cleanup) | +| `EVENTKIT_RING_BUFFER_PUBLISHER_BATCH_SIZE` | `100` | Events per publisher batch | +| `EVENTKIT_RING_BUFFER_PUBLISHER_POLL_INTERVAL` | `0.1` | Seconds between ring buffer polls | +| `EVENTKIT_RING_BUFFER_CLEANUP_INTERVAL` | `3600.0` | Seconds between cleanup runs (1 hour) | +| **Buffer** ||| | `EVENTKIT_BUFFER_SIZE` | `100` | Events per partition before flush | +| `EVENTKIT_BUFFER_MAX_SIZE` | `1000` | Hard limit per partition | +| `EVENTKIT_BUFFER_TIMEOUT` | `5.0` | Max seconds before flush | + +### Ring Buffer (Durability Layer) + +The ring buffer provides Write-Ahead Log (WAL) durability: + +- **Events are never lost** - Written to SQLite before processing +- **Survives crashes** - SQLite WAL mode ensures durability +- **Automatic cleanup** - Old published events are removed based on time/size limits +- **Background publisher** - Moves events from ring buffer to queue asynchronously + +**Architecture:** +``` +API → ring buffer (durable) → publisher → queue → workers → Firestore +``` + +**Why SQLite?** +- Local durability (no network calls on hot path) +- WAL mode for concurrent reads/writes +- Zero dependencies (built into Python) +- Production-proven (similar to Lytics' BoltDB approach) --- diff --git a/pytest.ini b/pytest.ini index c01b865..5e30ba3 100644 --- a/pytest.ini +++ b/pytest.ini @@ -1,12 +1,16 @@ [pytest] +asyncio_mode = auto testpaths = tests python_files = test_*.py python_classes = Test* python_functions = test_* addopts = -v - --strict-markers --tb=short --cov=src/eventkit --cov-report=term-missing --cov-report=html + -m "not integration" + +markers = + integration: marks tests as integration tests (deselect with '-m "not integration"') diff --git a/specs/core-pipeline/tasks.md b/specs/core-pipeline/tasks.md index 9950050..02540ac 100644 --- a/specs/core-pipeline/tasks.md +++ b/specs/core-pipeline/tasks.md @@ -1095,32 +1095,32 @@ def create_queue(processor, settings): **Estimated effort**: 6-8 hours **Dependencies**: Task 9 (Processor), Task 16 (PubSubQueue) **Phase**: Ring Buffer -**Status**: 🔬 Planning +**Status**: ✅ Complete #### Description Implement a Write-Ahead Log (ring buffer) layer for local event durability before queue ingestion. This ensures no data loss during API crashes or cloud outages. Uses a Protocol + Factory pattern for extensibility, but **only implements SQLite** (CloudTasks, PostgreSQL, etc. are future extension points). Also removes DirectQueue as it becomes redundant with the ring buffer layer. #### Acceptance Criteria -- [ ] `SQLiteRingBuffer` implements `RingBuffer` protocol -- [ ] SQLite configured with WAL mode for crash safety -- [ ] Ring buffer table with indexes on `published` and timestamps -- [ ] `write()` method stores events synchronously (<5ms p99) -- [ ] `fetch_unpublished()` retrieves events for publishing -- [ ] `mark_published()` marks events after successful queue ingestion -- [ ] Time-based cleanup (delete published events older than N hours) -- [ ] Size-based cleanup (delete oldest published if count > max) -- [ ] Never deletes unpublished events (durability guarantee) -- [ ] Background publisher worker polls ring buffer and feeds queue -- [ ] Background cleanup worker runs periodically -- [ ] API `/collect` writes to ring buffer (or direct to processor if disabled) -- [ ] FastAPI lifespan starts/stops publisher and cleanup workers -- [ ] DirectQueue removed from codebase (`queues/direct.py`, tests) -- [ ] Configuration supports ring buffer enabled/disabled -- [ ] Dev mode (`RING_BUFFER_ENABLED=false`) processes events inline -- [ ] Unit tests for ring buffer, publisher, cleanup (>90% coverage) -- [ ] Integration tests with AsyncQueue -- [ ] Integration tests with PubSubQueue -- [ ] Documentation updated (LOCAL_DEV.md) +- [x] `SQLiteRingBuffer` implements `RingBuffer` protocol +- [x] SQLite configured with WAL mode for crash safety +- [x] Ring buffer table with indexes on `published` and timestamps +- [x] `write()` method stores events synchronously (<5ms p99) +- [x] `fetch_unpublished()` retrieves events for publishing +- [x] `mark_published()` marks events after successful queue ingestion +- [x] Time-based cleanup (delete published events older than N hours) +- [x] Size-based cleanup (delete oldest published if count > max) +- [x] Never deletes unpublished events (durability guarantee) +- [x] Background publisher worker polls ring buffer and feeds queue +- [x] Background cleanup worker runs periodically +- [x] API `/collect` writes to ring buffer via queue.enqueue() (ring buffer encapsulated) +- [x] FastAPI lifespan starts/stops publisher and cleanup workers (via queue.start/stop) +- [x] DirectQueue removed from codebase (`queues/direct.py`, tests) +- [x] Configuration supports ring buffer settings (always enabled, no flag) +- [x] Ring buffer encapsulated in queue layer (not surfaced to API) +- [x] Unit tests for ring buffer, publisher, cleanup (100% coverage) +- [x] Integration tests with AsyncQueue +- [x] Integration tests with PubSubQueue +- [x] Documentation updated (LOCAL_DEV.md) #### Checklist ```python diff --git a/src/eventkit/api/app.py b/src/eventkit/api/app.py index 45588d4..e00c61d 100644 --- a/src/eventkit/api/app.py +++ b/src/eventkit/api/app.py @@ -18,30 +18,40 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: Manage application lifecycle. Startup: - - Start EventQueue (which starts processor and buffer flusher) - - Queue workers begin processing events from queue + - Start EventQueue (which starts processor, buffer flusher, and ring buffer publisher) + - Ring buffer publisher begins moving events from ring buffer to internal queue + - Queue workers begin processing events - Buffer background flusher begins periodic flushes + - Ring buffer cleanup worker begins periodic cleanup Shutdown: - - Stop EventQueue (which stops processor and buffer) - - Drain queue (process remaining events) - - Flush all buffers (write remaining events to storage) + - Stop ring buffer publisher (drains remaining events from ring buffer) + - Drain internal queue (process remaining events) + - Stop workers + - Stop processor and flush all buffers (write remaining events to storage) - Graceful shutdown ensures no events are lost + The ring buffer provides Write-Ahead Log durability - events are never lost + even if the service crashes before processing. + Args: app: FastAPI application instance Yields: Control to the application during its lifetime """ - # Startup + # Startup - queue manages ring buffer, publisher, workers, and processor queue = get_queue() await queue.start() + logger.info("Application started - ring buffer + queue active") + yield - # Shutdown + # Shutdown - gracefully drain ring buffer and queue + logger.info("Application shutting down - draining ring buffer and queue") await queue.stop() + logger.info("Application stopped") def create_app() -> FastAPI: diff --git a/src/eventkit/api/dependencies.py b/src/eventkit/api/dependencies.py index de6eeb2..182b7d5 100644 --- a/src/eventkit/api/dependencies.py +++ b/src/eventkit/api/dependencies.py @@ -54,15 +54,19 @@ def get_queue() -> EventQueue: - Sequencer (HashSequencer) - Buffer (with EventStore) - Processor (orchestrator) + - RingBuffer (Write-Ahead Log for durability - created internally by queue) - EventQueue (factory-created based on QUEUE_MODE) + The ring buffer is encapsulated inside the queue - the API doesn't need + to know about it. Durability is always present. + Returns: - EventQueue implementation (DirectQueue, AsyncQueue, or PubSubQueue) + EventQueue implementation (AsyncQueue or PubSubQueue) with ring buffer Example: # In FastAPI route async def collect(queue: EventQueue = Depends(get_queue)): - await queue.enqueue(raw_event) + await queue.enqueue(raw_event) # Durable write (ring buffer inside) """ settings = get_settings() diff --git a/src/eventkit/api/router.py b/src/eventkit/api/router.py index 6a697f0..66c6b25 100644 --- a/src/eventkit/api/router.py +++ b/src/eventkit/api/router.py @@ -1,5 +1,7 @@ """API router for event collection endpoints.""" +import logging + from fastapi import APIRouter, Depends, Request from fastapi.responses import JSONResponse @@ -8,6 +10,7 @@ from eventkit.schema.raw import RawEvent from eventkit.stores.event_store import EventStore +logger = logging.getLogger(__name__) router = APIRouter() @@ -21,8 +24,21 @@ async def collect( """ Core event collection endpoint. - Accepts any JSON payload and routes to EventQueue. - Always returns 202 Accepted (even for invalid events). + Accepts any JSON payload and enqueues for processing. + Returns 202 Accepted for valid requests, 503 if ring buffer is unavailable. + + Architecture: + - API → queue.enqueue() → ring buffer (Write-Ahead Log for durability) + - Background publisher → ring buffer → downstream queue + - Workers process events asynchronously + + The ring buffer is managed internally by the queue - this endpoint doesn't + know about it. Durability is always present. + + Failure Modes: + - Invalid JSON → 422 Unprocessable Entity (FastAPI automatic) + - Ring buffer write fails → 503 Service Unavailable (disk full, corruption, etc.) + - Invalid event data → 202 Accepted (routed to error store for analysis) Args: request: FastAPI request object @@ -30,7 +46,7 @@ async def collect( queue: EventQueue instance (injected) Returns: - JSONResponse with 202 status + JSONResponse with 202 status on success, 503 if ring buffer unavailable Example: POST /collect @@ -41,8 +57,27 @@ async def collect( """ payload = await request.json() raw_event = RawEvent(payload=payload, stream=stream) - await queue.enqueue(raw_event) - return JSONResponse({"status": "accepted"}, status_code=202) + + try: + # Enqueue event (durable write via internal ring buffer) + await queue.enqueue(raw_event) + return JSONResponse({"status": "accepted"}, status_code=202) + + except Exception as e: + # Ring buffer write failed - catastrophic failure (disk full, corruption, etc.) + # Return 503 to tell client to retry later + logger.critical( + "Ring buffer write failed - service unavailable", + extra={"error": str(e), "stream": stream, "event_type": payload.get("type")}, + exc_info=True, + ) + return JSONResponse( + { + "error": "service_unavailable", + "message": "Unable to persist event - ring buffer unavailable", + }, + status_code=503, + ) @router.post("/v1/identify") diff --git a/src/eventkit/config.py b/src/eventkit/config.py index e1b4142..42faa36 100644 --- a/src/eventkit/config.py +++ b/src/eventkit/config.py @@ -6,20 +6,22 @@ """ from enum import Enum +from typing import TYPE_CHECKING from pydantic_settings import BaseSettings, SettingsConfigDict +if TYPE_CHECKING: + pass + class QueueMode(str, Enum): """ Queue backend mode for event processing. - DIRECT: Inline processing (no actual queue) - default - ASYNC: In-process workers with asyncio.Queue - PUBSUB: Distributed workers with GCP Pub/Sub (future) + ASYNC: In-process workers with asyncio.Queue - default + PUBSUB: Distributed workers with GCP Pub/Sub """ - DIRECT = "direct" ASYNC = "async" PUBSUB = "pubsub" @@ -57,7 +59,7 @@ class Settings(BaseSettings): EVENTKIT_NUM_PARTITIONS: int = 16 # Hash buckets for consistent routing # Queue configuration - EVENTKIT_QUEUE_MODE: QueueMode = QueueMode.DIRECT # Queue backend mode + EVENTKIT_QUEUE_MODE: QueueMode = QueueMode.ASYNC # Queue backend mode EVENTKIT_ASYNC_WORKERS: int = 4 # Number of workers for AsyncQueue mode # Pub/Sub configuration (for PUBSUB queue mode) @@ -70,5 +72,15 @@ class Settings(BaseSettings): EVENTKIT_PUBSUB_MAX_DELIVERY_ATTEMPTS: int = 5 # Max retries before DLQ EVENTKIT_PUBSUB_WORKERS: int = 4 # Number of async workers + # Ring buffer configuration (Write-Ahead Log for durability - ALWAYS ENABLED) + # The implementation is pluggable (sqlite, cloud_tasks, etc.) but durability is core + EVENTKIT_RING_BUFFER_MODE: str = "sqlite" # Ring buffer implementation (sqlite) + EVENTKIT_RING_BUFFER_DB_PATH: str = "./data/ring_buffer.db" # SQLite database path + EVENTKIT_RING_BUFFER_MAX_SIZE: int = 100000 # Maximum published events to keep + EVENTKIT_RING_BUFFER_RETENTION_HOURS: int = 24 # Maximum age for published events + EVENTKIT_RING_BUFFER_PUBLISHER_BATCH_SIZE: int = 100 # Events per publisher batch + EVENTKIT_RING_BUFFER_PUBLISHER_POLL_INTERVAL: float = 0.1 # Publisher poll interval (seconds) + EVENTKIT_RING_BUFFER_CLEANUP_INTERVAL: float = 3600.0 # Cleanup interval (seconds, 1 hour) + # Logging LOG_LEVEL: str = "INFO" diff --git a/src/eventkit/queues/__init__.py b/src/eventkit/queues/__init__.py index e4692e1..d345597 100644 --- a/src/eventkit/queues/__init__.py +++ b/src/eventkit/queues/__init__.py @@ -2,8 +2,7 @@ from eventkit.queues.async_queue import AsyncQueue from eventkit.queues.base import EventQueue -from eventkit.queues.direct import DirectQueue from eventkit.queues.factory import create_queue from eventkit.queues.pubsub import PubSubQueue -__all__ = ["EventQueue", "DirectQueue", "AsyncQueue", "PubSubQueue", "create_queue"] +__all__ = ["EventQueue", "AsyncQueue", "PubSubQueue", "create_queue"] diff --git a/src/eventkit/queues/async_queue.py b/src/eventkit/queues/async_queue.py index ac2a93c..af71d6d 100644 --- a/src/eventkit/queues/async_queue.py +++ b/src/eventkit/queues/async_queue.py @@ -2,8 +2,14 @@ AsyncQueue - In-process event processing with background workers. This queue implementation uses asyncio.Queue to decouple API request handling -from event processing. Events are enqueued quickly, then processed by background -workers in parallel. +from event processing. Events are written to a ring buffer for durability, +then processed by background workers in parallel. + +Architecture: +1. API calls queue.enqueue(event) +2. Event written to ring buffer (Write-Ahead Log for durability) +3. Background publisher reads from ring buffer → internal asyncio.Queue +4. Worker tasks process events from internal queue Use Case: - Single-server deployment with concurrent processing @@ -11,16 +17,15 @@ - Parallel processing with configurable workers Trade-offs: -- ✅ Fast API responses (enqueue is O(1)) +- ✅ Fast API responses (ring buffer write is O(1)) +- ✅ Durable: Events persisted before processing - ✅ Parallel processing (multiple workers) - ✅ Simple: No external dependencies -- ❌ No persistence: Events lost on crash - ❌ No horizontal scaling: Single process only Evolution Path: -- DirectQueue: Inline processing (no queue) -- AsyncQueue: In-process workers (this file) -- PubSubQueue: Distributed workers (future) +- AsyncQueue: In-process workers with ring buffer (this file) +- PubSubQueue: Distributed workers with ring buffer (future) """ import asyncio @@ -31,89 +36,162 @@ if TYPE_CHECKING: from eventkit.processing.processor import Processor + from eventkit.ring_buffer import RingBuffer, RingBufferCleanup, RingBufferPublisher logger = logging.getLogger(__name__) class AsyncQueue: """ - In-process queue with background workers. + In-process queue with background workers and ring buffer durability. + + Architecture: + - API → enqueue() → ring buffer (durable) + - Publisher thread → ring buffer → internal asyncio.Queue + - Worker tasks → asyncio.Queue → processor - Events are added to an asyncio.Queue, then processed by N worker tasks - running in the background. This decouples API request handling from - event processing. + The ring buffer provides durability (Write-Ahead Log) before processing. + Events are never lost even if the service crashes before processing. Example: >>> from eventkit.processing.processor import Processor >>> from eventkit.queues.async_queue import AsyncQueue + >>> from eventkit.ring_buffer import create_ring_buffer >>> >>> processor = Processor(adapter, sequencer, buffer, error_store) - >>> queue = AsyncQueue(processor, num_workers=4) + >>> ring_buffer = create_ring_buffer(settings) + >>> queue = AsyncQueue(processor, ring_buffer, num_workers=4) >>> >>> await queue.start() - >>> await queue.enqueue(raw_event) # Returns immediately - >>> # Workers process in background - >>> await queue.stop() # Drains queue before stopping + >>> await queue.enqueue(raw_event) # Written to ring buffer (durable) + >>> # Publisher + workers process in background + >>> await queue.stop() # Drains ring buffer and queue before stopping """ - def __init__(self, processor: "Processor", num_workers: int = 4): + def __init__( + self, + processor: "Processor", + ring_buffer: "RingBuffer", + num_workers: int = 4, + publisher_batch_size: int = 100, + publisher_poll_interval: float = 0.1, + cleanup_interval: float = 3600.0, + ): """ - Initialize AsyncQueue. + Initialize AsyncQueue with ring buffer. Args: processor: The processor that will handle events + ring_buffer: Ring buffer for durable event storage num_workers: Number of background worker tasks (default: 4) + publisher_batch_size: Events to fetch from ring buffer per batch + publisher_poll_interval: Seconds between ring buffer polls + cleanup_interval: Seconds between ring buffer cleanup runs """ self.processor = processor + self.ring_buffer = ring_buffer self.num_workers = num_workers - self.queue: asyncio.Queue[RawEvent] = asyncio.Queue() + self.publisher_batch_size = publisher_batch_size + self.publisher_poll_interval = publisher_poll_interval + self.cleanup_interval = cleanup_interval + + # Internal queue for workers (ring buffer publishes here) + self._internal_queue: asyncio.Queue[RawEvent] = asyncio.Queue() self.workers: list[asyncio.Task[None]] = [] self._stop_event = asyncio.Event() + self._publisher: RingBufferPublisher | None = None + self._cleanup_worker: RingBufferCleanup | None = None async def enqueue(self, event: RawEvent) -> None: """ - Add event to queue (non-blocking). + Write event to ring buffer for durable storage. + + Returns immediately after persisting to ring buffer. The background + publisher will move events from ring buffer to internal queue, then + workers will process them. - Returns immediately after adding to queue. Background workers - will process the event asynchronously. + Architecture: API → ring buffer → publisher → internal queue → workers Args: - event: The raw event to process + event: The raw event to persist and process """ - await self.queue.put(event) - logger.debug(f"Event enqueued: queue_size={self.queue.qsize()}") + # Write to ring buffer (synchronous, durable) + self.ring_buffer.write(event) + logger.debug("Event written to ring buffer") async def start(self) -> None: """ - Start background workers and processor. + Start ring buffer publisher, workers, and processor. - Creates N worker tasks that continuously pull from the queue - and process events. + 1. Start processor (buffer flusher) + 2. Start ring buffer publisher (ring buffer → internal queue) + 3. Start N worker tasks (internal queue → processor) """ # Start processor (starts buffer flusher) await self.processor.start() + # Start ring buffer publisher (writes to internal queue) + from eventkit.ring_buffer import RingBufferPublisher + + # Create internal queue adapter for publisher + class InternalQueueAdapter: + """Adapter so publisher can write to internal asyncio.Queue.""" + + def __init__(self, internal_queue: asyncio.Queue[RawEvent]): + self.internal_queue = internal_queue + + async def enqueue(self, event: RawEvent) -> None: + """Write event to internal queue (non-blocking).""" + await self.internal_queue.put(event) + + queue_adapter = InternalQueueAdapter(self._internal_queue) + event_loop = asyncio.get_running_loop() + + self._publisher = RingBufferPublisher( + ring_buffer=self.ring_buffer, + queue=queue_adapter, # type: ignore + event_loop=event_loop, + batch_size=self.publisher_batch_size, + poll_interval=self.publisher_poll_interval, + ) + self._publisher.start() + + # Start ring buffer cleanup worker + from eventkit.ring_buffer import RingBufferCleanup + + self._cleanup_worker = RingBufferCleanup( + ring_buffer=self.ring_buffer, + cleanup_interval=self.cleanup_interval, + ) + self._cleanup_worker.start() + # Start worker tasks self._stop_event.clear() for i in range(self.num_workers): worker = asyncio.create_task(self._worker(worker_id=i)) self.workers.append(worker) - logger.info(f"AsyncQueue started with {self.num_workers} workers") + logger.info(f"AsyncQueue started: {self.num_workers} workers, ring buffer publisher active") async def stop(self) -> None: """ - Gracefully shutdown: drain queue, stop workers, stop processor. - - 1. Wait for queue to drain (process remaining events) - 2. Signal workers to stop - 3. Wait for workers to finish - 4. Stop processor (flush buffers) + Gracefully shutdown: drain ring buffer and queue, stop all components. + + 1. Stop ring buffer publisher (drains remaining events from ring buffer) + 2. Wait for internal queue to drain (process remaining events) + 3. Signal workers to stop + 4. Wait for workers to finish + 5. Stop cleanup worker + 6. Stop processor (flush buffers) """ - logger.info("AsyncQueue stopping - draining queue") + logger.info("AsyncQueue stopping - draining ring buffer and queue") + + # Stop publisher (drains ring buffer → internal queue) + if self._publisher: + self._publisher.stop(timeout=10.0) - # Wait for queue to drain - await self.queue.join() + # Wait for internal queue to drain + await self._internal_queue.join() # Signal workers to stop self._stop_event.set() @@ -122,6 +200,10 @@ async def stop(self) -> None: await asyncio.gather(*self.workers, return_exceptions=True) self.workers.clear() + # Stop cleanup worker + if self._cleanup_worker: + self._cleanup_worker.stop(timeout=5.0) + # Stop processor (flush buffers) await self.processor.stop() @@ -129,10 +211,10 @@ async def stop(self) -> None: async def _worker(self, worker_id: int) -> None: """ - Background worker that processes events from queue. + Background worker that processes events from internal queue. - Continuously pulls events from queue and calls processor.process_event() - until stop signal is received. + Continuously pulls events from internal queue (populated by ring buffer + publisher) and calls processor.process_event() until stop signal is received. Args: worker_id: ID for logging purposes @@ -142,7 +224,7 @@ async def _worker(self, worker_id: int) -> None: while not self._stop_event.is_set(): try: # Wait for event with timeout (check stop signal periodically) - event = await asyncio.wait_for(self.queue.get(), timeout=0.1) + event = await asyncio.wait_for(self._internal_queue.get(), timeout=0.1) # Process event try: @@ -151,7 +233,7 @@ async def _worker(self, worker_id: int) -> None: logger.error(f"Worker {worker_id} error processing event: {e}") finally: # Mark task as done (for queue.join()) - self.queue.task_done() + self._internal_queue.task_done() except TimeoutError: # No event available, check stop signal and continue diff --git a/src/eventkit/queues/direct.py b/src/eventkit/queues/direct.py deleted file mode 100644 index 0aaccd7..0000000 --- a/src/eventkit/queues/direct.py +++ /dev/null @@ -1,86 +0,0 @@ -""" -DirectQueue - Inline event processing (no actual queue). - -This is the simplest queue implementation. Events are processed -immediately when enqueue() is called, with no queueing or background workers. - -Use Case: -- Single-server deployment -- Development and testing -- Getting started quickly - -Trade-offs: -- ✅ Simple: No external dependencies -- ✅ Fast: No queue overhead -- ❌ No decoupling: API and processing in same thread -- ❌ No horizontal scaling: Can't add more workers - -Evolution Path: -- DirectQueue: Inline processing (this file) -- AsyncQueue: In-process workers with background tasks -- PubSubQueue: Distributed workers via GCP Pub/Sub (future) -""" - -from typing import TYPE_CHECKING - -from eventkit.schema.raw import RawEvent - -if TYPE_CHECKING: - from eventkit.processing.processor import Processor - - -class DirectQueue: - """ - Direct (inline) queue implementation. - - Processes events immediately without actual queueing. The "queue" - is just a thin wrapper that calls processor.process_event() directly. - - Example: - >>> from eventkit.processing.processor import Processor - >>> from eventkit.queues.direct import DirectQueue - >>> - >>> processor = Processor(adapter, sequencer, buffer, error_store) - >>> queue = DirectQueue(processor) - >>> - >>> await queue.start() - >>> await queue.enqueue(raw_event) # Processes immediately - >>> await queue.stop() - """ - - def __init__(self, processor: "Processor") -> None: # noqa: F821 - """ - Initialize DirectQueue. - - Args: - processor: Processor instance that handles event processing - """ - self.processor = processor - - async def enqueue(self, event: RawEvent) -> None: - """ - Process event immediately (inline). - - No actual queueing - just calls processor.process_event() directly. - - Args: - event: RawEvent to process - """ - await self.processor.process_event(event) - - async def start(self) -> None: - """ - Start processor. - - For DirectQueue, this just starts the processor's background tasks - (e.g., buffer flusher). - """ - await self.processor.start() - - async def stop(self) -> None: - """ - Stop processor gracefully. - - Ensures buffer is flushed before shutdown. - """ - await self.processor.stop() diff --git a/src/eventkit/queues/factory.py b/src/eventkit/queues/factory.py index fb82a36..95e296c 100644 --- a/src/eventkit/queues/factory.py +++ b/src/eventkit/queues/factory.py @@ -5,16 +5,14 @@ (EVENTKIT_QUEUE_MODE) without changing application code. Supported Modes: -- direct: DirectQueue (inline processing) - default -- async: AsyncQueue (in-process workers) -- pubsub: PubSubQueue (distributed workers) - future +- async: AsyncQueue (in-process workers) - default +- pubsub: PubSubQueue (distributed workers) """ from typing import TYPE_CHECKING from eventkit.config import QueueMode, Settings from eventkit.queues.base import EventQueue -from eventkit.queues.direct import DirectQueue if TYPE_CHECKING: from eventkit.processing.processor import Processor @@ -22,17 +20,23 @@ def create_queue(processor: "Processor", settings: Settings) -> EventQueue: """ - Create queue instance based on settings.queue_mode. + Create queue instance with ring buffer based on settings.queue_mode. This factory pattern enables easy swapping of queue implementations - via configuration. The processor doesn't know which queue is being used. + via configuration. All queues use a ring buffer (Write-Ahead Log) for + durability - the implementation is pluggable (SQLite, Cloud Tasks, etc.). + + Architecture: + - Creates ring buffer based on EVENTKIT_RING_BUFFER_MODE + - Injects ring buffer into queue + - Queue manages ring buffer publisher lifecycle Args: processor: Processor instance (queue-agnostic) - settings: Settings with queue_mode configuration + settings: Settings with queue_mode and ring_buffer configuration Returns: - EventQueue implementation (DirectQueue, AsyncQueue, or PubSubQueue) + EventQueue implementation (AsyncQueue or PubSubQueue) with ring buffer Raises: ValueError: If queue_mode is unknown @@ -44,25 +48,41 @@ def create_queue(processor: "Processor", settings: Settings) -> EventQueue: >>> >>> settings = Settings() # Reads EVENTKIT_QUEUE_MODE from env >>> processor = Processor(...) - >>> queue = create_queue(processor, settings) + >>> queue = create_queue(processor, settings) # Ring buffer created internally >>> - >>> await queue.start() - >>> await queue.enqueue(raw_event) - >>> await queue.stop() + >>> await queue.start() # Starts ring buffer publisher + >>> await queue.enqueue(raw_event) # Written to ring buffer + >>> await queue.stop() # Drains ring buffer """ + # Create ring buffer (durability layer - always present) + from eventkit.ring_buffer import create_ring_buffer - if settings.EVENTKIT_QUEUE_MODE == QueueMode.DIRECT: - return DirectQueue(processor) + ring_buffer = create_ring_buffer(settings) - elif settings.EVENTKIT_QUEUE_MODE == QueueMode.ASYNC: + # Create queue with ring buffer + if settings.EVENTKIT_QUEUE_MODE == QueueMode.ASYNC: from eventkit.queues.async_queue import AsyncQueue - return AsyncQueue(processor, num_workers=settings.EVENTKIT_ASYNC_WORKERS) + return AsyncQueue( + processor=processor, + ring_buffer=ring_buffer, + num_workers=settings.EVENTKIT_ASYNC_WORKERS, + publisher_batch_size=settings.EVENTKIT_RING_BUFFER_PUBLISHER_BATCH_SIZE, + publisher_poll_interval=settings.EVENTKIT_RING_BUFFER_PUBLISHER_POLL_INTERVAL, + cleanup_interval=settings.EVENTKIT_RING_BUFFER_CLEANUP_INTERVAL, + ) elif settings.EVENTKIT_QUEUE_MODE == QueueMode.PUBSUB: from eventkit.queues.pubsub import PubSubQueue - return PubSubQueue(processor, settings) + return PubSubQueue( + processor=processor, + ring_buffer=ring_buffer, + settings=settings, + publisher_batch_size=settings.EVENTKIT_RING_BUFFER_PUBLISHER_BATCH_SIZE, + publisher_poll_interval=settings.EVENTKIT_RING_BUFFER_PUBLISHER_POLL_INTERVAL, + cleanup_interval=settings.EVENTKIT_RING_BUFFER_CLEANUP_INTERVAL, + ) else: raise ValueError( diff --git a/src/eventkit/queues/pubsub.py b/src/eventkit/queues/pubsub.py index 12e93bf..6792005 100644 --- a/src/eventkit/queues/pubsub.py +++ b/src/eventkit/queues/pubsub.py @@ -2,6 +2,14 @@ PubSub-based distributed queue for horizontal scaling. Uses Google Cloud Pub/Sub for event distribution across multiple worker instances. + +Architecture: +1. API calls queue.enqueue(event) +2. Event written to ring buffer (Write-Ahead Log for durability) +3. Background publisher reads from ring buffer → publishes to Pub/Sub topic +4. Pub/Sub subscription workers receive messages → internal queue +5. Async workers process events from internal queue + Implements an async bridge pattern to connect Pub/Sub's synchronous callback model to our async event processing pipeline. """ @@ -19,6 +27,7 @@ if TYPE_CHECKING: from eventkit.processing.processor import Processor + from eventkit.ring_buffer import RingBuffer, RingBufferCleanup, RingBufferPublisher logger = logging.getLogger(__name__) @@ -27,35 +36,62 @@ class PubSubQueue: """ Distributed queue using Google Cloud Pub/Sub for horizontal scaling. + Architecture: + - API → enqueue() → ring buffer (durable) + - Publisher thread → ring buffer → Pub/Sub topic + - Pub/Sub subscription callback → internal asyncio.Queue + - Worker tasks → asyncio.Queue → processor + + The ring buffer provides durability (Write-Ahead Log) before publishing to Pub/Sub. + Events are never lost even if the service crashes before publishing. + Implements an async bridge pattern: Pub/Sub's synchronous callback bridges to an internal asyncio.Queue, which async workers pull from to process events. Workers ack/nack Pub/Sub messages based on processing results. """ - def __init__(self, processor: "Processor", settings: Settings): + def __init__( + self, + processor: "Processor", + ring_buffer: "RingBuffer", + settings: Settings, + publisher_batch_size: int = 100, + publisher_poll_interval: float = 0.1, + cleanup_interval: float = 3600.0, + ): """ - Initialize PubSubQueue. + Initialize PubSubQueue with ring buffer. Args: processor: The processor that will handle events + ring_buffer: Ring buffer for durable event storage settings: Application settings with Pub/Sub configuration + publisher_batch_size: Events to fetch from ring buffer per batch + publisher_poll_interval: Seconds between ring buffer polls + cleanup_interval: Seconds between ring buffer cleanup runs """ self.processor = processor + self.ring_buffer = ring_buffer self.settings = settings + self.publisher_batch_size = publisher_batch_size + self.publisher_poll_interval = publisher_poll_interval + self.cleanup_interval = cleanup_interval # Use dedicated Pub/Sub project or fall back to GCP_PROJECT_ID project_id = settings.EVENTKIT_PUBSUB_PROJECT_ID or settings.GCP_PROJECT_ID # Pub/Sub clients - self.publisher = pubsub_v1.PublisherClient() + self.publisher_client = pubsub_v1.PublisherClient() self.subscriber = pubsub_v1.SubscriberClient() # Resource paths - self.topic_path = self.publisher.topic_path(project_id, settings.EVENTKIT_PUBSUB_TOPIC) + self.topic_path = self.publisher_client.topic_path( + project_id, settings.EVENTKIT_PUBSUB_TOPIC + ) self.subscription_path = self.subscriber.subscription_path( project_id, settings.EVENTKIT_PUBSUB_SUBSCRIPTION ) - self.dlq_topic_path = self.publisher.topic_path( + self.dlq_topic_path = self.publisher_client.topic_path( project_id, settings.EVENTKIT_PUBSUB_DLQ_TOPIC ) @@ -73,29 +109,29 @@ def __init__(self, processor: "Processor", settings: Settings): # Pub/Sub subscriber future self.streaming_pull_future: pubsub_v1.subscriber.futures.StreamingPullFuture | None = None + # Ring buffer workers + self._ring_buffer_publisher: RingBufferPublisher | None = None + self._cleanup_worker: RingBufferCleanup | None = None + async def enqueue(self, event: RawEvent) -> None: """ - Publish event to Pub/Sub topic. + Write event to ring buffer for durable storage. - Args: - event: Raw event to publish - """ - data = event.model_dump_json().encode("utf-8") + Returns immediately after persisting to ring buffer. The background + publisher will move events from ring buffer to Pub/Sub topic, where + subscription workers will receive and process them. - # Publish to Pub/Sub (blocking operation, run in thread) - future = self.publisher.publish( - self.topic_path, - data=data, - # Add attributes for filtering/routing if needed - event_type=event.payload.get("type", "unknown"), - stream=event.stream or "default", - ) + Architecture: API → ring buffer → publisher → Pub/Sub → workers - # Wait for publish confirmation (runs in thread to avoid blocking) - await asyncio.to_thread(future.result) + Args: + event: Raw event to persist and publish + """ + # Write to ring buffer (synchronous, durable) + self.ring_buffer.write(event) + logger.debug("Event written to ring buffer") async def start(self) -> None: - """Start processor, workers, and Pub/Sub subscriber.""" + """Start processor, ring buffer publisher, workers, and Pub/Sub subscriber.""" logger.info("PubSubQueue starting...") # Start processor (buffer flusher) @@ -107,6 +143,53 @@ async def start(self) -> None: # Ensure Pub/Sub resources exist (creates if needed) await self._ensure_resources() + # Start ring buffer publisher (publishes to Pub/Sub) + from eventkit.ring_buffer import RingBufferPublisher + + # Create Pub/Sub adapter for publisher + class PubSubAdapter: + """Adapter so publisher can publish to Pub/Sub.""" + + def __init__(self, pubsub_queue: "PubSubQueue"): + self.pubsub_queue = pubsub_queue + + async def enqueue(self, event: RawEvent) -> None: + """Publish event to Pub/Sub topic.""" + data = event.model_dump_json().encode("utf-8") + + # Publish to Pub/Sub (blocking operation, run in thread) + future = self.pubsub_queue.publisher_client.publish( + self.pubsub_queue.topic_path, + data=data, + # Add attributes for filtering/routing if needed + event_type=event.payload.get("type", "unknown"), + stream=event.stream or "default", + ) + + # Wait for publish confirmation (runs in thread to avoid blocking) + await asyncio.to_thread(future.result) + + pubsub_adapter = PubSubAdapter(self) + event_loop = asyncio.get_running_loop() + + self._ring_buffer_publisher = RingBufferPublisher( + ring_buffer=self.ring_buffer, + queue=pubsub_adapter, # type: ignore + event_loop=event_loop, + batch_size=self.publisher_batch_size, + poll_interval=self.publisher_poll_interval, + ) + self._ring_buffer_publisher.start() + + # Start ring buffer cleanup worker + from eventkit.ring_buffer import RingBufferCleanup + + self._cleanup_worker = RingBufferCleanup( + ring_buffer=self.ring_buffer, + cleanup_interval=self.cleanup_interval, + ) + self._cleanup_worker.start() + # Start internal async workers self.shutdown_event.clear() for i in range(self.settings.EVENTKIT_PUBSUB_WORKERS): @@ -125,13 +208,19 @@ async def start(self) -> None: "workers": self.settings.EVENTKIT_PUBSUB_WORKERS, "topic": self.settings.EVENTKIT_PUBSUB_TOPIC, "subscription": self.settings.EVENTKIT_PUBSUB_SUBSCRIPTION, + "ring_buffer_publisher": "active", }, ) async def stop(self) -> None: - """Graceful shutdown: drain queue, stop workers, stop processor.""" + """Graceful shutdown: drain ring buffer and queue, stop all components.""" logger.info("PubSubQueue stopping...") + # Stop ring buffer publisher (drains remaining events from ring buffer) + if self._ring_buffer_publisher: + self._ring_buffer_publisher.stop(timeout=10.0) + logger.info("PubSubQueue: ring buffer drained") + # Signal shutdown to workers self.shutdown_event.set() @@ -150,6 +239,10 @@ async def stop(self) -> None: await asyncio.gather(*self.workers, return_exceptions=True) logger.info("PubSubQueue: all workers stopped") + # Stop cleanup worker + if self._cleanup_worker: + self._cleanup_worker.stop(timeout=5.0) + # Stop processor (flush buffers) await self.processor.stop() @@ -236,10 +329,10 @@ async def _ensure_resources(self) -> None: def _create_topic_if_not_exists(self, topic_path: str) -> None: """Create Pub/Sub topic if it doesn't exist.""" try: - self.publisher.get_topic(request={"topic": topic_path}) + self.publisher_client.get_topic(request={"topic": topic_path}) logger.debug(f"Topic exists: {topic_path}") except Exception: - self.publisher.create_topic(request={"name": topic_path}) + self.publisher_client.create_topic(request={"name": topic_path}) logger.info(f"Created topic: {topic_path}") def _create_subscription_if_not_exists( diff --git a/src/eventkit/ring_buffer/__init__.py b/src/eventkit/ring_buffer/__init__.py new file mode 100644 index 0000000..afd1c09 --- /dev/null +++ b/src/eventkit/ring_buffer/__init__.py @@ -0,0 +1,17 @@ +"""Ring buffer (Write-Ahead Log) for event durability.""" + +from eventkit.ring_buffer.base import RingBuffer, RingBufferEntry +from eventkit.ring_buffer.cleanup import RingBufferCleanup +from eventkit.ring_buffer.factory import RingBufferMode, create_ring_buffer +from eventkit.ring_buffer.publisher import RingBufferPublisher +from eventkit.ring_buffer.sqlite import SQLiteRingBuffer + +__all__ = [ + "RingBuffer", + "RingBufferCleanup", + "RingBufferEntry", + "RingBufferMode", + "RingBufferPublisher", + "SQLiteRingBuffer", + "create_ring_buffer", +] diff --git a/src/eventkit/ring_buffer/base.py b/src/eventkit/ring_buffer/base.py new file mode 100644 index 0000000..0197ea2 --- /dev/null +++ b/src/eventkit/ring_buffer/base.py @@ -0,0 +1,105 @@ +"""Ring buffer protocol and data models.""" + +from dataclasses import dataclass +from typing import Protocol + +from eventkit.schema.raw import RawEvent + + +@dataclass +class RingBufferEntry: + """Entry in the ring buffer.""" + + id: int + event_data: str # JSON serialized RawEvent + created_at: str # ISO 8601 timestamp + published: bool = False + published_at: str | None = None + + +class RingBuffer(Protocol): + """ + Protocol for ring buffer implementations. + + Ring buffer provides local durability (Write-Ahead Log) before events + reach the downstream queue. Supports size and time-based cleanup. + + Implementations: SQLiteRingBuffer (default), future: CloudTasks, PostgreSQL + """ + + def write(self, event: RawEvent) -> int: + """ + Write event to ring buffer. + + Args: + event: Raw event to persist + + Returns: + Event ID (auto-increment primary key) + + Raises: + Exception: If write fails (disk full, corruption, etc.) + """ + ... + + def fetch_unpublished(self, limit: int) -> list[RingBufferEntry]: + """ + Fetch unpublished events for publishing. + + Args: + limit: Maximum number of events to fetch + + Returns: + List of unpublished events, ordered by ID (oldest first) + """ + ... + + def mark_published(self, ids: list[int]) -> None: + """ + Mark events as published after successful queue ingestion. + + Args: + ids: Event IDs to mark as published + """ + ... + + def delete_old_published(self, max_age_hours: int) -> int: + """ + Delete published events older than max_age_hours (time-based cleanup). + + Safety: Only deletes published events (WHERE published = TRUE). + + Args: + max_age_hours: Maximum age in hours for published events + + Returns: + Number of events deleted + """ + ... + + def delete_oldest_published(self, keep_count: int) -> int: + """ + Delete oldest published events if total count exceeds keep_count (size-based cleanup). + + Safety: Only deletes published events (WHERE published = TRUE). + + Args: + keep_count: Maximum number of events to keep in buffer + + Returns: + Number of events deleted + """ + ... + + def count(self) -> int: + """ + Get total number of events in buffer (published + unpublished). + + Returns: + Total event count + """ + ... + + def close(self) -> None: + """Close ring buffer connection and cleanup resources.""" + ... diff --git a/src/eventkit/ring_buffer/cleanup.py b/src/eventkit/ring_buffer/cleanup.py new file mode 100644 index 0000000..64c1340 --- /dev/null +++ b/src/eventkit/ring_buffer/cleanup.py @@ -0,0 +1,115 @@ +""" +Background cleanup worker for ring buffer. + +The cleanup worker runs periodically to remove old published events from the +ring buffer, enforcing both time-based and size-based retention limits. +""" + +import logging +import threading +import time + +from eventkit.ring_buffer.base import RingBuffer + +logger = logging.getLogger(__name__) + + +class RingBufferCleanup: + """ + Background worker that periodically cleans up published events. + + Lifecycle: + 1. Start: Launch background thread + 2. Run: Periodically delete old published events (time + size limits) + 3. Stop: Graceful shutdown + """ + + def __init__( + self, + ring_buffer: RingBuffer, + retention_hours: int = 24, + max_size: int = 100000, + cleanup_interval: float = 3600.0, # 1 hour default + ): + """ + Initialize cleanup worker. + + Args: + ring_buffer: Ring buffer to clean + retention_hours: Maximum age in hours for published events + max_size: Maximum number of published events to keep + cleanup_interval: Seconds between cleanup runs + """ + self.ring_buffer = ring_buffer + self.retention_hours = retention_hours + self.max_size = max_size + self.cleanup_interval = cleanup_interval + + self._thread: threading.Thread | None = None + self._stop_event = threading.Event() + self._running = False + + def start(self) -> None: + """Start the cleanup worker thread.""" + if self._running: + logger.warning("Cleanup worker already running") + return + + self._stop_event.clear() + self._running = True + self._thread = threading.Thread(target=self._run, daemon=True) + self._thread.start() + logger.info("Ring buffer cleanup worker started") + + def stop(self, timeout: float = 5.0) -> None: + """ + Stop the cleanup worker thread. + + Args: + timeout: Maximum time to wait for graceful shutdown + """ + if not self._running: + return + + logger.info("Stopping ring buffer cleanup worker...") + self._stop_event.set() + + if self._thread: + self._thread.join(timeout=timeout) + if self._thread.is_alive(): + logger.warning("Cleanup worker did not stop within timeout") + else: + logger.info("Ring buffer cleanup worker stopped") + + self._running = False + + def _run(self) -> None: + """Main cleanup loop (runs in background thread).""" + while not self._stop_event.is_set(): + try: + self._cleanup() + except Exception: + logger.exception("Error during ring buffer cleanup") + + # Sleep between cleanup runs + time.sleep(self.cleanup_interval) + + def _cleanup(self) -> None: + """Run cleanup (time + size based).""" + # Time-based cleanup: Delete events older than retention_hours + try: + deleted_time = self.ring_buffer.delete_old_published(max_age_hours=self.retention_hours) + if deleted_time > 0: + logger.info( + f"Deleted {deleted_time} events older than {self.retention_hours} hours" + ) + except Exception: + logger.exception("Error during time-based cleanup") + + # Size-based cleanup: Enforce max_size limit + try: + deleted_size = self.ring_buffer.delete_oldest_published(keep_count=self.max_size) + if deleted_size > 0: + logger.info(f"Deleted {deleted_size} events to enforce max size ({self.max_size})") + except Exception: + logger.exception("Error during size-based cleanup") diff --git a/src/eventkit/ring_buffer/factory.py b/src/eventkit/ring_buffer/factory.py new file mode 100644 index 0000000..6ca0da3 --- /dev/null +++ b/src/eventkit/ring_buffer/factory.py @@ -0,0 +1,55 @@ +"""Ring buffer factory for creating implementations based on configuration.""" + +from enum import Enum +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from eventkit.config import Settings + from eventkit.ring_buffer.base import RingBuffer + + +class RingBufferMode(str, Enum): + """Ring buffer implementation modes.""" + + SQLITE = "sqlite" + # Future: CLOUD_TASKS = "cloud_tasks" + # Future: POSTGRES = "postgres" + # Future: MEMORY = "memory" + + +def create_ring_buffer(settings: "Settings") -> "RingBuffer": + """ + Factory that creates ring buffer based on configuration. + + Extension Point: Only SQLite is currently implemented. + To add new implementations (e.g., CloudTasks, PostgreSQL), + add enum value and implementation here. + + Args: + settings: Application settings + + Returns: + Ring buffer implementation + + Raises: + ValueError: If unsupported ring buffer mode specified + """ + from eventkit.ring_buffer.sqlite import SQLiteRingBuffer + + if settings.EVENTKIT_RING_BUFFER_MODE == RingBufferMode.SQLITE: + return SQLiteRingBuffer( + db_path=settings.EVENTKIT_RING_BUFFER_DB_PATH, + max_size=settings.EVENTKIT_RING_BUFFER_MAX_SIZE, + retention_hours=settings.EVENTKIT_RING_BUFFER_RETENTION_HOURS, + ) + + # Extension point for future implementations: + # elif settings.EVENTKIT_RING_BUFFER_MODE == RingBufferMode.CLOUD_TASKS: + # from eventkit.ring_buffer.cloud_tasks import CloudTasksRingBuffer + # return CloudTasksRingBuffer(...) + + else: + raise ValueError( + f"Unsupported ring buffer mode: {settings.EVENTKIT_RING_BUFFER_MODE}. " + f"Only 'sqlite' is currently implemented." + ) diff --git a/src/eventkit/ring_buffer/publisher.py b/src/eventkit/ring_buffer/publisher.py new file mode 100644 index 0000000..dc93e99 --- /dev/null +++ b/src/eventkit/ring_buffer/publisher.py @@ -0,0 +1,160 @@ +""" +Background publisher that moves events from ring buffer to queue. + +The publisher runs in a background thread, periodically polling the ring buffer +for unpublished events and publishing them to the downstream queue. +""" + +import asyncio +import logging +import threading +import time +from typing import TYPE_CHECKING + +from eventkit.queues.base import EventQueue +from eventkit.ring_buffer.base import RingBuffer +from eventkit.schema.raw import RawEvent + +if TYPE_CHECKING: + pass + +logger = logging.getLogger(__name__) + + +class RingBufferPublisher: + """ + Background worker that publishes events from ring buffer to queue. + + Lifecycle: + 1. Start: Launch background thread + 2. Run: Poll ring buffer, publish to queue, mark as published + 3. Stop: Graceful shutdown (drain remaining events) + """ + + def __init__( + self, + ring_buffer: RingBuffer, + queue: EventQueue, + event_loop: asyncio.AbstractEventLoop, + batch_size: int = 100, + poll_interval: float = 0.1, + ): + """ + Initialize publisher. + + Args: + ring_buffer: Ring buffer to read from + queue: Queue to publish to + event_loop: Asyncio event loop for queue.enqueue() calls + batch_size: Number of events to fetch per batch + poll_interval: Seconds between polls + """ + self.ring_buffer = ring_buffer + self.queue = queue + self.event_loop = event_loop + self.batch_size = batch_size + self.poll_interval = poll_interval + + self._thread: threading.Thread | None = None + self._stop_event = threading.Event() + self._running = False + + def start(self) -> None: + """Start the publisher thread.""" + if self._running: + logger.warning("Publisher already running") + return + + self._stop_event.clear() + self._running = True + self._thread = threading.Thread(target=self._run, daemon=True) + self._thread.start() + logger.info("Ring buffer publisher started") + + def stop(self, timeout: float = 10.0) -> None: + """ + Stop the publisher thread and drain remaining events. + + Args: + timeout: Maximum time to wait for graceful shutdown + """ + if not self._running: + return + + logger.info("Stopping ring buffer publisher...") + self._stop_event.set() + + if self._thread: + self._thread.join(timeout=timeout) + if self._thread.is_alive(): + logger.warning("Publisher did not stop within timeout") + else: + logger.info("Ring buffer publisher stopped") + + self._running = False + + def _run(self) -> None: + """Main publisher loop (runs in background thread).""" + while not self._stop_event.is_set(): + try: + self._publish_batch() + except Exception: + logger.exception("Error publishing batch from ring buffer") + + # Sleep between polls + time.sleep(self.poll_interval) + + # Final drain on shutdown + try: + logger.info("Draining ring buffer before shutdown...") + drained = 0 + while True: + published = self._publish_batch() + if published == 0: + break + drained += published + if drained > 0: + logger.info(f"Drained {drained} events from ring buffer") + except Exception: + logger.exception("Error draining ring buffer on shutdown") + + def _publish_batch(self) -> int: + """ + Fetch and publish a batch of events. + + Returns: + Number of events published + """ + # Fetch unpublished events + entries = self.ring_buffer.fetch_unpublished(limit=self.batch_size) + if not entries: + return 0 + + # Parse events and publish to queue + published_ids = [] + for entry in entries: + try: + # Parse JSON event data + import json + + event_data = json.loads(entry.event_data) + event = RawEvent(**event_data) + + # Publish to queue (bridge from sync thread to async event loop) + future = asyncio.run_coroutine_threadsafe( + self.queue.enqueue(event), self.event_loop + ) + # Wait for the coroutine to complete (blocking in this thread) + future.result() + published_ids.append(entry.id) + + except Exception: + logger.exception(f"Failed to publish event {entry.id}") + # Continue with next event (failed event stays unpublished) + continue + + # Mark successfully published events + if published_ids: + self.ring_buffer.mark_published(published_ids) + + return len(published_ids) diff --git a/src/eventkit/ring_buffer/sqlite.py b/src/eventkit/ring_buffer/sqlite.py new file mode 100644 index 0000000..80fca04 --- /dev/null +++ b/src/eventkit/ring_buffer/sqlite.py @@ -0,0 +1,221 @@ +""" +SQLite-based Ring Buffer implementation with Write-Ahead Logging. + +This implementation uses SQLite's WAL mode for durability and provides +a fixed-size ring buffer with time-based expiry. Only published events +can be safely deleted. +""" + +from datetime import UTC, datetime, timedelta + +from eventkit.ring_buffer.base import RingBuffer, RingBufferEntry +from eventkit.schema.raw import RawEvent + + +class SQLiteRingBuffer(RingBuffer): + """ + SQLite-backed ring buffer with WAL mode for durability. + + Features: + - Write-Ahead Logging for crash recovery + - Time-based cleanup (delete old published events) + - Size-based cleanup (enforce max buffer size) + - Safety: Never deletes unpublished events + """ + + def __init__(self, db_path: str, max_size: int, retention_hours: int): + """ + Initialize SQLite ring buffer. + + Args: + db_path: Path to SQLite database file (use ":memory:" for testing) + max_size: Maximum number of events to keep (enforced on published events) + retention_hours: Maximum age in hours for published events + """ + import sqlite3 + + self.db_path = db_path + self.max_size = max_size + self.retention_hours = retention_hours + self.conn = sqlite3.connect(db_path, check_same_thread=False) + self._init_db() + + def _init_db(self) -> None: + """Initialize database with WAL mode and indexes.""" + # Enable WAL mode for better concurrency and durability + self.conn.execute("PRAGMA journal_mode=WAL") + # Synchronous=NORMAL is safe with WAL and faster than FULL + self.conn.execute("PRAGMA synchronous=NORMAL") + + # Create table + self.conn.execute(""" + CREATE TABLE IF NOT EXISTS ring_buffer ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + event_data TEXT NOT NULL, + created_at TEXT NOT NULL, + published BOOLEAN DEFAULT FALSE, + published_at TEXT + ) + """) + + # Indexes for efficient queries + self.conn.execute( + "CREATE INDEX IF NOT EXISTS idx_unpublished ON ring_buffer(published, created_at)" + ) + self.conn.execute( + "CREATE INDEX IF NOT EXISTS idx_published ON ring_buffer(published, published_at)" + ) + self.conn.commit() + + def write(self, event: RawEvent) -> int: + """ + Write event to ring buffer. + + Args: + event: Raw event to store + + Returns: + ID of the inserted event + """ + event_json = event.model_dump_json() + now = datetime.now(UTC).isoformat() + + cursor = self.conn.execute( + "INSERT INTO ring_buffer (event_data, created_at) VALUES (?, ?)", + (event_json, now), + ) + self.conn.commit() + + # lastrowid should never be None for AUTOINCREMENT, but handle type safety + if cursor.lastrowid is None: + raise RuntimeError("Failed to get inserted row ID") + return cursor.lastrowid + + def fetch_unpublished(self, limit: int) -> list[RingBufferEntry]: + """ + Fetch unpublished events for publishing to queue. + + Args: + limit: Maximum number of events to fetch + + Returns: + List of unpublished events, oldest first + """ + cursor = self.conn.execute( + """ + SELECT id, event_data, created_at, published, published_at + FROM ring_buffer + WHERE published = FALSE + ORDER BY created_at ASC + LIMIT ? + """, + (limit,), + ) + + entries = [] + for row in cursor.fetchall(): + entries.append( + RingBufferEntry( + id=row[0], + event_data=row[1], + created_at=row[2], # Keep as ISO string + published=bool(row[3]), + published_at=row[4], # Keep as ISO string or None + ) + ) + return entries + + def mark_published(self, ids: list[int]) -> None: + """ + Mark events as published after successful queue ingestion. + + Args: + ids: List of event IDs to mark as published + """ + if not ids: + return + + now = datetime.now(UTC).isoformat() + placeholders = ",".join("?" * len(ids)) + + # Create params list with now timestamp followed by all IDs + params: list[str | int] = [now] + ids + + self.conn.execute( + f""" + UPDATE ring_buffer + SET published = TRUE, published_at = ? + WHERE id IN ({placeholders}) + """, + params, + ) + self.conn.commit() + + def delete_old_published(self, max_age_hours: int) -> int: + """ + Delete published events older than max_age_hours. + + Safety: Only deletes events with published=TRUE. + + Args: + max_age_hours: Maximum age in hours for published events + + Returns: + Number of events deleted + """ + cutoff = (datetime.now(UTC) - timedelta(hours=max_age_hours)).isoformat() + + cursor = self.conn.execute( + """ + DELETE FROM ring_buffer + WHERE published = TRUE + AND published_at < ? + """, + (cutoff,), + ) + self.conn.commit() + return cursor.rowcount + + def delete_oldest_published(self, keep_count: int) -> int: + """ + Delete oldest published events to enforce max buffer size. + + Safety: Only deletes events with published=TRUE. + + Args: + keep_count: Number of published events to keep + + Returns: + Number of events deleted + """ + cursor = self.conn.execute( + """ + DELETE FROM ring_buffer + WHERE id IN ( + SELECT id FROM ring_buffer + WHERE published = TRUE + ORDER BY published_at ASC + LIMIT MAX(0, (SELECT COUNT(*) FROM ring_buffer WHERE published = TRUE) - ?) + ) + """, + (keep_count,), + ) + self.conn.commit() + return cursor.rowcount + + def count(self) -> int: + """ + Get total number of events in buffer. + + Returns: + Total event count + """ + cursor = self.conn.execute("SELECT COUNT(*) FROM ring_buffer") + result = cursor.fetchone() + if result is None: + return 0 + return int(result[0]) + + def close(self) -> None: + """Close database connection.""" + self.conn.close() diff --git a/tests/integration/test_pubsub_integration.py b/tests/integration/test_pubsub_integration.py index 04b6b6f..00936aa 100644 --- a/tests/integration/test_pubsub_integration.py +++ b/tests/integration/test_pubsub_integration.py @@ -31,6 +31,20 @@ async def mock_processor(): return processor +@pytest_asyncio.fixture +async def mock_ring_buffer(): + """Use real SQLite ring buffer for integration testing.""" + from eventkit.ring_buffer.sqlite import SQLiteRingBuffer + + ring_buffer = SQLiteRingBuffer( + db_path=":memory:", + max_size=1000, + retention_hours=24, + ) + yield ring_buffer + ring_buffer.close() + + @pytest_asyncio.fixture async def settings(check_emulator): """Settings configured for Pub/Sub emulator.""" @@ -47,9 +61,9 @@ async def settings(check_emulator): @pytest_asyncio.fixture -async def pubsub_queue(mock_processor, settings): +async def pubsub_queue(mock_processor, mock_ring_buffer, settings): """PubSubQueue connected to emulator.""" - queue = PubSubQueue(mock_processor, settings) + queue = PubSubQueue(mock_processor, mock_ring_buffer, settings) yield queue # Cleanup: stop queue if still running if queue.streaming_pull_future and not queue.streaming_pull_future.cancelled(): @@ -122,7 +136,10 @@ async def test_graceful_shutdown_drains_queue(pubsub_queue, mock_processor): for event in events: await pubsub_queue.enqueue(event) - # Stop immediately (should wait for in-flight processing) + # Give publisher thread time to poll ring buffer and publish to Pub/Sub + await asyncio.sleep(0.5) + + # Stop (should wait for in-flight processing) await pubsub_queue.stop() # Verify events were processed (at least some, might not be all due to timing) @@ -134,6 +151,8 @@ async def test_graceful_shutdown_drains_queue(pubsub_queue, mock_processor): async def test_pubsub_message_acknowledgment(): """Test messages are properly acknowledged and not redelivered.""" # This test uses its own unique topic/subscription to avoid interference + from eventkit.ring_buffer.sqlite import SQLiteRingBuffer + settings = Settings( GCP_PROJECT_ID="test-project", EVENTKIT_QUEUE_MODE=QueueMode.PUBSUB, @@ -144,8 +163,9 @@ async def test_pubsub_message_acknowledgment(): EVENTKIT_PUBSUB_WORKERS=1, ) + ring_buffer = SQLiteRingBuffer(db_path=":memory:", max_size=1000, retention_hours=24) mock_processor = AsyncMock(spec=Processor) - queue = PubSubQueue(mock_processor, settings) + queue = PubSubQueue(mock_processor, ring_buffer, settings) # Start queue await queue.start() @@ -173,19 +193,23 @@ async def test_pubsub_message_acknowledgment(): assert mock_processor.process_event.call_count == initial_count await queue.stop() + ring_buffer.close() @pytest.mark.asyncio @pytest.mark.integration async def test_resource_creation(settings): """Test PubSubQueue creates topic and subscription if not exists.""" + from eventkit.ring_buffer.sqlite import SQLiteRingBuffer + # Use a unique topic/subscription for this test settings.EVENTKIT_PUBSUB_TOPIC = "eventkit-resource-test-topic" settings.EVENTKIT_PUBSUB_SUBSCRIPTION = "eventkit-resource-test-sub" settings.EVENTKIT_PUBSUB_DLQ_TOPIC = "eventkit-resource-test-dlq" + ring_buffer = SQLiteRingBuffer(db_path=":memory:", max_size=1000, retention_hours=24) mock_processor = AsyncMock(spec=Processor) - queue = PubSubQueue(mock_processor, settings) + queue = PubSubQueue(mock_processor, ring_buffer, settings) # Start queue (should create resources) await queue.start() @@ -198,3 +222,4 @@ async def test_resource_creation(settings): mock_processor.process_event.assert_awaited_once() await queue.stop() + ring_buffer.close() diff --git a/tests/integration/test_ring_buffer_integration.py b/tests/integration/test_ring_buffer_integration.py new file mode 100644 index 0000000..dd10521 --- /dev/null +++ b/tests/integration/test_ring_buffer_integration.py @@ -0,0 +1,522 @@ +"""Integration tests for ring buffer → queue → storage flow.""" + +import asyncio +import os +import tempfile +import time +from datetime import UTC, datetime, timedelta + +import pytest +import pytest_asyncio + +from eventkit.adapters.segment import SegmentSchemaAdapter +from eventkit.processing.buffer import Buffer +from eventkit.processing.processor import Processor +from eventkit.processing.sequencer import HashSequencer +from eventkit.queues.async_queue import AsyncQueue +from eventkit.ring_buffer.cleanup import RingBufferCleanup +from eventkit.ring_buffer.sqlite import SQLiteRingBuffer +from eventkit.schema.raw import RawEvent +from eventkit.stores.firestore import FirestoreErrorStore, FirestoreEventStore + + +@pytest.fixture +def check_emulator(): + """Ensure Firestore emulator is running.""" + emulator_host = os.getenv("FIRESTORE_EMULATOR_HOST") + if not emulator_host: + pytest.skip("FIRESTORE_EMULATOR_HOST not set. Start with: docker compose up") + + +@pytest.fixture +def temp_db_path(): + """Fixture for a temporary database file.""" + with tempfile.NamedTemporaryFile(suffix=".db", delete=False) as tmp: + yield tmp.name + os.remove(tmp.name) + + +@pytest_asyncio.fixture +async def ring_buffer(temp_db_path): + """Fixture for SQLiteRingBuffer.""" + rb = SQLiteRingBuffer( + db_path=temp_db_path, + max_size=1000, + retention_hours=24, + ) + yield rb + rb.close() + + +@pytest_asyncio.fixture +async def event_store(check_emulator): + """Fixture for FirestoreEventStore connected to emulator.""" + return FirestoreEventStore(project_id="test-project") + + +@pytest_asyncio.fixture +async def error_store(check_emulator): + """Fixture for FirestoreErrorStore connected to emulator.""" + return FirestoreErrorStore(project_id="test-project") + + +@pytest_asyncio.fixture +async def processor(event_store, error_store): + """Fixture for fully configured Processor.""" + adapter = SegmentSchemaAdapter() + sequencer = HashSequencer(num_partitions=4) + buffer = Buffer(event_store=event_store, size=10, max_size=100, timeout=1.0) + + processor = Processor( + adapter=adapter, + sequencer=sequencer, + buffer=buffer, + error_store=error_store, + ) + + return processor + + +@pytest_asyncio.fixture +async def queue_with_ring_buffer(ring_buffer, processor): + """Fixture for AsyncQueue with ring buffer.""" + queue = AsyncQueue( + processor=processor, + ring_buffer=ring_buffer, + num_workers=2, + publisher_batch_size=10, + publisher_poll_interval=0.05, # Fast polling for tests + cleanup_interval=0.1, # Fast cleanup for tests + ) + + await queue.start() + yield queue + await queue.stop() + + +@pytest.mark.asyncio +@pytest.mark.integration +async def test_end_to_end_ring_buffer_to_firestore( + queue_with_ring_buffer, ring_buffer, event_store +): + """ + Test complete flow: API → ring buffer → publisher → queue → processor → Firestore. + + This validates: + - Events are written to ring buffer + - Publisher moves events from ring buffer to internal queue + - Workers process events + - Events are stored in Firestore + """ + stream = "ring-buffer-test" + num_events = 20 + + # Create events + events = [ + RawEvent( + payload={ + "type": "track", + "event": f"test_event_{i}", + "userId": f"user_{i}", + "properties": {"index": i}, + }, + stream=stream, + ) + for i in range(num_events) + ] + + # Enqueue events (writes to ring buffer) + for event in events: + await queue_with_ring_buffer.enqueue(event) + + # Verify events are in ring buffer + assert ring_buffer.count() >= num_events + + # Wait for publisher to move events to queue and workers to process + await asyncio.sleep(2.0) + + # Verify all events were marked as published + unpublished = ring_buffer.fetch_unpublished(limit=100) + assert len(unpublished) == 0, f"Expected 0 unpublished, found {len(unpublished)}" + + # Verify events reached Firestore + # Give buffer time to flush + await asyncio.sleep(2.0) + + stored_events = await event_store.read(stream=stream, limit=num_events) + assert len(stored_events) == num_events, ( + f"Expected {num_events} events in Firestore, found {len(stored_events)}" + ) + + +@pytest.mark.asyncio +@pytest.mark.integration +async def test_graceful_shutdown_drains_ring_buffer(ring_buffer, processor, event_store): + """ + Test that stopping the queue drains all events from ring buffer. + + This validates: + - Events in ring buffer when stop() is called are processed + - No events are lost during shutdown + """ + stream = "shutdown-test" + num_events = 15 + + # Create queue + queue = AsyncQueue( + processor=processor, + ring_buffer=ring_buffer, + num_workers=1, + publisher_batch_size=5, + publisher_poll_interval=0.1, + cleanup_interval=0.1, + ) + + await queue.start() + + # Enqueue events quickly + events = [ + RawEvent( + payload={ + "type": "track", + "event": f"shutdown_event_{i}", + "userId": f"user_{i}", + }, + stream=stream, + ) + for i in range(num_events) + ] + + for event in events: + await queue.enqueue(event) + + # Verify events are in ring buffer + assert ring_buffer.count() == num_events + + # Give publisher a moment to start moving events + await asyncio.sleep(0.3) + + # Stop queue (should drain ring buffer) + await queue.stop() + + # Verify ring buffer was drained + unpublished = ring_buffer.fetch_unpublished(limit=100) + assert len(unpublished) == 0, "Ring buffer should be drained after stop()" + + # Verify all events reached Firestore + await asyncio.sleep(2.0) # Give buffer time to flush + stored_events = await event_store.read(stream=stream, limit=num_events) + assert len(stored_events) == num_events, ( + f"Expected {num_events} events after drain, found {len(stored_events)}" + ) + + +@pytest.mark.asyncio +@pytest.mark.integration +async def test_ring_buffer_cleanup_logic(temp_db_path): + """ + Test cleanup logic directly (not background worker). + + This validates: + - Time-based cleanup removes published events older than retention_hours + - Size-based cleanup removes oldest published events when max_size exceeded + - Unpublished events are never deleted + + Note: This tests the cleanup methods directly for determinism. + Background worker lifecycle is tested separately. + """ + # Create ring buffer with short retention + ring_buffer = SQLiteRingBuffer( + db_path=temp_db_path, + max_size=10, # Small max size + retention_hours=1, # 1 hour retention + ) + + try: + # Write and publish 15 events (exceeds max_size of 10) + for i in range(15): + event = RawEvent( + payload={"type": "track", "event": f"event_{i}"}, + stream="cleanup-test", + ) + event_id = ring_buffer.write(event) + ring_buffer.mark_published([event_id]) + + # Manually backdate 5 events to be older than retention + conn = ring_buffer.conn + old_timestamp = (datetime.now(UTC) - timedelta(hours=2)).isoformat() + conn.execute( + """ + UPDATE ring_buffer + SET published_at = ? + WHERE id <= 5 + """, + (old_timestamp,), + ) + conn.commit() + + assert ring_buffer.count() == 15 + + # Run cleanup directly (deterministic, no timing issues) + # Time-based cleanup: delete 5 old events + deleted_by_time = ring_buffer.delete_old_published(max_age_hours=1) + assert deleted_by_time == 5 + + # Size-based cleanup: keep only 10 newest + deleted_by_size = ring_buffer.delete_oldest_published(keep_count=10) + # Should have 10 remaining after time cleanup, no more deletes needed + assert deleted_by_size == 0 + + # Verify only 10 events remain + remaining_count = ring_buffer.count() + assert remaining_count == 10, f"Expected 10 events, found {remaining_count}" + + finally: + ring_buffer.close() + + +@pytest.mark.asyncio +@pytest.mark.integration +async def test_cleanup_worker_lifecycle(temp_db_path): + """ + Test that cleanup worker starts and stops cleanly. + + This validates: + - Worker can start and enters running state + - Worker can stop gracefully + - Worker thread terminates properly + """ + ring_buffer = SQLiteRingBuffer( + db_path=temp_db_path, + max_size=1000, + retention_hours=24, + ) + + cleanup = RingBufferCleanup( + ring_buffer=ring_buffer, + cleanup_interval=0.1, # Fast for testing + ) + + try: + # Test start + cleanup.start() + assert cleanup._running + assert cleanup._thread is not None + assert cleanup._thread.is_alive() + + # Let it run for a few cycles + time.sleep(0.3) + + # Test stop + cleanup.stop(timeout=2.0) + assert not cleanup._running + assert not cleanup._thread.is_alive() + + finally: + ring_buffer.close() + + +@pytest.mark.asyncio +@pytest.mark.integration +async def test_publisher_worker_lifecycle(temp_db_path, processor): + """ + Test that publisher worker starts and stops cleanly. + + This validates: + - Worker can start and enters running state + - Worker can stop gracefully + - Worker thread terminates properly + """ + ring_buffer = SQLiteRingBuffer( + db_path=temp_db_path, + max_size=1000, + retention_hours=24, + ) + + # Create queue with ring buffer and publisher + queue = AsyncQueue( + processor=processor, + ring_buffer=ring_buffer, + num_workers=1, + publisher_batch_size=10, + publisher_poll_interval=0.1, + cleanup_interval=0.1, + ) + + try: + # Test start (starts publisher and cleanup workers) + await queue.start() + assert queue._publisher is not None + assert queue._publisher._running + assert queue._publisher._thread.is_alive() + + # Let it run briefly + await asyncio.sleep(0.3) + + # Test stop (stops publisher gracefully) + await queue.stop() + assert not queue._publisher._running + assert not queue._publisher._thread.is_alive() + + finally: + ring_buffer.close() + + +@pytest.mark.asyncio +@pytest.mark.integration +async def test_ring_buffer_durability_on_crash(temp_db_path, event_store, error_store): + """ + Test that events in ring buffer survive a "crash" (restart). + + This validates: + - Events written to ring buffer are durable (SQLite WAL) + - Restarting with same ring buffer recovers unpublished events + - Publisher can resume from where it left off + """ + stream = "crash-test" + num_events = 10 + + # Phase 1: Write events to ring buffer + ring_buffer_1 = SQLiteRingBuffer( + db_path=temp_db_path, + max_size=1000, + retention_hours=24, + ) + + # Create first processor for first queue + adapter_1 = SegmentSchemaAdapter() + sequencer_1 = HashSequencer(num_partitions=4) + buffer_1 = Buffer(event_store=event_store, size=10, max_size=100, timeout=1.0) + processor_1 = Processor( + adapter=adapter_1, + sequencer=sequencer_1, + buffer=buffer_1, + error_store=error_store, + ) + + queue_1 = AsyncQueue( + processor=processor_1, + ring_buffer=ring_buffer_1, + num_workers=1, + publisher_batch_size=3, # Small batch to leave some unpublished + publisher_poll_interval=1.0, # Slow polling to keep events in ring buffer + ) + + await queue_1.start() + + # Enqueue events + events = [ + RawEvent( + payload={"type": "track", "event": f"crash_event_{i}"}, + stream=stream, + ) + for i in range(num_events) + ] + + for event in events: + await queue_1.enqueue(event) + + # Wait briefly but stop before all events are published + # Publisher polls every 1s, so stopping after 0.1s ensures events remain unpublished + await asyncio.sleep(0.1) + + # Simulate crash (abrupt stop) + await queue_1.stop() + ring_buffer_1.close() + + # Phase 2: Restart with same ring buffer + ring_buffer_2 = SQLiteRingBuffer( + db_path=temp_db_path, + max_size=1000, + retention_hours=24, + ) + + # Verify some events are still unpublished + unpublished_before = ring_buffer_2.fetch_unpublished(limit=100) + assert len(unpublished_before) > 0, "Should have unpublished events after crash" + + # Create second processor for second queue + adapter_2 = SegmentSchemaAdapter() + sequencer_2 = HashSequencer(num_partitions=4) + buffer_2 = Buffer(event_store=event_store, size=10, max_size=100, timeout=1.0) + processor_2 = Processor( + adapter=adapter_2, + sequencer=sequencer_2, + buffer=buffer_2, + error_store=error_store, + ) + + # Create new queue with recovered ring buffer + queue_2 = AsyncQueue( + processor=processor_2, + ring_buffer=ring_buffer_2, + num_workers=1, + publisher_batch_size=5, + publisher_poll_interval=0.05, + cleanup_interval=0.1, + ) + + await queue_2.start() + + # Wait for publisher to process recovered events + await asyncio.sleep(2.0) + + # Verify all events were eventually published + unpublished_after = ring_buffer_2.fetch_unpublished(limit=100) + assert len(unpublished_after) == 0, "All recovered events should be published" + + # Verify events reached Firestore + await asyncio.sleep(1.0) + stored_events = await event_store.read(stream=stream, limit=num_events) + assert len(stored_events) == num_events, ( + f"Expected {num_events} events after recovery, found {len(stored_events)}" + ) + + await queue_2.stop() + ring_buffer_2.close() + + +@pytest.mark.asyncio +@pytest.mark.integration +async def test_high_throughput_ring_buffer(queue_with_ring_buffer, ring_buffer, event_store): + """ + Test ring buffer handles high throughput without blocking. + + This validates: + - Ring buffer write is fast (doesn't block API) + - Publisher keeps up with high event rate + - No events are lost under load + """ + stream = "throughput-test" + num_events = 100 + + # Measure enqueue time + start = time.time() + + events = [ + RawEvent( + payload={"type": "track", "event": f"throughput_{i}"}, + stream=stream, + ) + for i in range(num_events) + ] + + for event in events: + await queue_with_ring_buffer.enqueue(event) + + enqueue_duration = time.time() - start + + # Enqueue should be fast (< 1 second for 100 events) + assert enqueue_duration < 1.0, f"Enqueue took {enqueue_duration}s, should be < 1s" + + # Wait for processing + await asyncio.sleep(3.0) + + # Verify all events were published + unpublished = ring_buffer.fetch_unpublished(limit=200) + assert len(unpublished) == 0 + + # Verify events reached Firestore + await asyncio.sleep(1.0) + stored_events = await event_store.read(stream=stream, limit=num_events) + assert len(stored_events) == num_events diff --git a/tests/unit/api/test_router.py b/tests/unit/api/test_router.py index ad88502..1787cb3 100644 --- a/tests/unit/api/test_router.py +++ b/tests/unit/api/test_router.py @@ -107,6 +107,23 @@ def test_collect_enqueues_to_queue(self, client, mock_queue): assert raw_event.payload == payload assert raw_event.stream == "users" + def test_collect_returns_503_on_ring_buffer_failure(self, client, mock_queue): + """Test /collect returns 503 when ring buffer write fails.""" + # Simulate ring buffer failure (disk full, corruption, etc.) + mock_queue.enqueue.side_effect = Exception("SQLite error: disk full") + + response = client.post("/collect", json={"type": "track", "event": "click"}) + + # Should return 503 Service Unavailable + assert response.status_code == 503 + assert response.json() == { + "error": "service_unavailable", + "message": "Unable to persist event - ring buffer unavailable", + } + + # Verify queue.enqueue was attempted + mock_queue.enqueue.assert_awaited_once() + class TestConvenienceEndpoints: """Tests for /v1/* convenience endpoints.""" diff --git a/tests/unit/queues/test_async.py b/tests/unit/queues/test_async.py index 7009c84..5ef55fb 100644 --- a/tests/unit/queues/test_async.py +++ b/tests/unit/queues/test_async.py @@ -9,15 +9,26 @@ from eventkit.schema.raw import RawEvent +@pytest.fixture +def mock_ring_buffer(): + """Mock RingBuffer for testing.""" + ring_buffer = Mock() + ring_buffer.write = Mock(return_value=1) # Return event ID + ring_buffer.fetch_unpublished = Mock(return_value=[]) + ring_buffer.mark_published = Mock() + ring_buffer.close = Mock() + return ring_buffer + + @pytest.mark.asyncio class TestAsyncQueue: """Test AsyncQueue implementation.""" - async def test_enqueue_adds_to_queue(self): - """AsyncQueue.enqueue() should add event to internal queue.""" + async def test_enqueue_writes_to_ring_buffer(self, mock_ring_buffer): + """AsyncQueue.enqueue() should write event to ring buffer.""" # Setup mock_processor = Mock() - queue = AsyncQueue(mock_processor, num_workers=1) + queue = AsyncQueue(mock_processor, mock_ring_buffer, num_workers=1) raw_event = RawEvent(payload={"type": "identify", "userId": "123"}) @@ -25,22 +36,20 @@ async def test_enqueue_adds_to_queue(self): await queue.enqueue(raw_event) # Verify - assert queue.queue.qsize() == 1 - queued_event = await queue.queue.get() - assert queued_event == raw_event + mock_ring_buffer.write.assert_called_once_with(raw_event) - async def test_workers_process_events(self): - """AsyncQueue workers should process events from queue.""" + async def test_workers_process_events(self, mock_ring_buffer): + """AsyncQueue workers should process events from internal queue.""" # Setup mock_processor = Mock() mock_processor.start = AsyncMock() mock_processor.stop = AsyncMock() mock_processor.process_event = AsyncMock() - queue = AsyncQueue(mock_processor, num_workers=2) + queue = AsyncQueue(mock_processor, mock_ring_buffer, num_workers=2) await queue.start() - # Execute - enqueue events + # Add events directly to internal queue (bypassing ring buffer for unit test) events = [ RawEvent(payload={"type": "identify", "userId": "user1"}), RawEvent(payload={"type": "track", "event": "Click"}), @@ -48,7 +57,7 @@ async def test_workers_process_events(self): ] for event in events: - await queue.enqueue(event) + await queue._internal_queue.put(event) # Wait for processing (give workers time to pick up events) await asyncio.sleep(0.2) @@ -59,8 +68,8 @@ async def test_workers_process_events(self): # Cleanup await queue.stop() - async def test_graceful_shutdown_drains_queue(self): - """AsyncQueue.stop() should process all queued events before stopping.""" + async def test_graceful_shutdown_drains_queue(self, mock_ring_buffer): + """AsyncQueue.stop() should drain internal queue before stopping.""" # Setup mock_processor = Mock() mock_processor.start = AsyncMock() @@ -72,22 +81,22 @@ async def delayed_process(event): mock_processor.process_event = AsyncMock(side_effect=delayed_process) - queue = AsyncQueue(mock_processor, num_workers=1) + queue = AsyncQueue(mock_processor, mock_ring_buffer, num_workers=1) await queue.start() - # Enqueue multiple events + # Add events directly to internal queue (bypassing ring buffer for unit test) events = [RawEvent(payload={"type": "track", "event": f"Event{i}"}) for i in range(5)] for event in events: - await queue.enqueue(event) + await queue._internal_queue.put(event) # Execute - stop should drain queue await queue.stop() # Verify all events processed assert mock_processor.process_event.call_count == 5 - assert queue.queue.qsize() == 0 + assert queue._internal_queue.qsize() == 0 - async def test_multiple_workers_parallel_processing(self): + async def test_multiple_workers_parallel_processing(self, mock_ring_buffer): """Multiple workers should process events in parallel.""" # Setup mock_processor = Mock() @@ -104,13 +113,13 @@ async def track_process(event): mock_processor.process_event = AsyncMock(side_effect=track_process) # Use 3 workers - queue = AsyncQueue(mock_processor, num_workers=3) + queue = AsyncQueue(mock_processor, mock_ring_buffer, num_workers=3) await queue.start() - # Enqueue 6 events + # Add events directly to internal queue (bypassing ring buffer for unit test) events = [RawEvent(payload={"type": "track", "event": f"Event{i}"}) for i in range(6)] for event in events: - await queue.enqueue(event) + await queue._internal_queue.put(event) # Wait for processing await asyncio.sleep(0.3) @@ -121,14 +130,14 @@ async def track_process(event): # Cleanup await queue.stop() - async def test_start_stops_processor(self): - """AsyncQueue.start() should start processor.""" + async def test_start_stops_processor(self, mock_ring_buffer): + """AsyncQueue.start() should start processor and ring buffer publisher.""" # Setup mock_processor = Mock() mock_processor.start = AsyncMock() mock_processor.stop = AsyncMock() - queue = AsyncQueue(mock_processor, num_workers=1) + queue = AsyncQueue(mock_processor, mock_ring_buffer, num_workers=1) # Execute await queue.start() @@ -138,7 +147,7 @@ async def test_start_stops_processor(self): mock_processor.start.assert_called_once() mock_processor.stop.assert_called_once() - async def test_worker_error_handling(self): + async def test_worker_error_handling(self, mock_ring_buffer): """Workers should continue processing even if one event fails.""" # Setup mock_processor = Mock() @@ -156,12 +165,12 @@ async def process_with_error(event): mock_processor.process_event = AsyncMock(side_effect=process_with_error) - queue = AsyncQueue(mock_processor, num_workers=1) + queue = AsyncQueue(mock_processor, mock_ring_buffer, num_workers=1) await queue.start() - # Enqueue events (first will fail, second should still process) - await queue.enqueue(RawEvent(payload={"type": "track", "event": "Event1"})) - await queue.enqueue(RawEvent(payload={"type": "track", "event": "Event2"})) + # Add events directly to internal queue (bypassing ring buffer for unit test) + await queue._internal_queue.put(RawEvent(payload={"type": "track", "event": "Event1"})) + await queue._internal_queue.put(RawEvent(payload={"type": "track", "event": "Event2"})) # Wait for processing with longer timeout await asyncio.sleep(0.5) @@ -172,15 +181,19 @@ async def process_with_error(event): # Cleanup await queue.stop() - async def test_queue_size_tracking(self): - """Queue size should be trackable.""" + async def test_enqueue_calls_ring_buffer_write(self, mock_ring_buffer): + """Enqueue should write to ring buffer (not internal queue directly).""" # Setup mock_processor = Mock() - queue = AsyncQueue(mock_processor, num_workers=1) + queue = AsyncQueue(mock_processor, mock_ring_buffer, num_workers=1) # Enqueue events - for i in range(3): - await queue.enqueue(RawEvent(payload={"type": "track", "event": f"Event{i}"})) + events = [RawEvent(payload={"type": "track", "event": f"Event{i}"}) for i in range(3)] + for event in events: + await queue.enqueue(event) - # Verify queue size - assert queue.queue.qsize() == 3 + # Verify ring buffer writes (not internal queue) + assert mock_ring_buffer.write.call_count == 3 + assert ( + queue._internal_queue.qsize() == 0 + ) # Internal queue populated by publisher, not enqueue diff --git a/tests/unit/queues/test_direct.py b/tests/unit/queues/test_direct.py deleted file mode 100644 index 1e36e58..0000000 --- a/tests/unit/queues/test_direct.py +++ /dev/null @@ -1,76 +0,0 @@ -"""Tests for DirectQueue.""" - -from unittest.mock import AsyncMock, Mock - -import pytest - -from eventkit.queues.direct import DirectQueue -from eventkit.schema.raw import RawEvent - - -@pytest.mark.asyncio -class TestDirectQueue: - """Test DirectQueue implementation.""" - - async def test_enqueue_calls_processor(self): - """DirectQueue.enqueue() should call processor.process_event().""" - # Setup - mock_processor = Mock() - mock_processor.process_event = AsyncMock() - queue = DirectQueue(mock_processor) - - raw_event = RawEvent(payload={"type": "identify", "userId": "123"}) - - # Execute - await queue.enqueue(raw_event) - - # Verify - mock_processor.process_event.assert_called_once_with(raw_event) - - async def test_start_calls_processor_start(self): - """DirectQueue.start() should call processor.start().""" - # Setup - mock_processor = Mock() - mock_processor.start = AsyncMock() - queue = DirectQueue(mock_processor) - - # Execute - await queue.start() - - # Verify - mock_processor.start.assert_called_once() - - async def test_stop_calls_processor_stop(self): - """DirectQueue.stop() should call processor.stop().""" - # Setup - mock_processor = Mock() - mock_processor.stop = AsyncMock() - queue = DirectQueue(mock_processor) - - # Execute - await queue.stop() - - # Verify - mock_processor.stop.assert_called_once() - - async def test_multiple_enqueues(self): - """DirectQueue should process multiple events in sequence.""" - # Setup - mock_processor = Mock() - mock_processor.process_event = AsyncMock() - queue = DirectQueue(mock_processor) - - events = [ - RawEvent(payload={"type": "identify", "userId": "user1"}), - RawEvent(payload={"type": "track", "event": "Click"}), - RawEvent(payload={"type": "page", "name": "/home"}), - ] - - # Execute - for event in events: - await queue.enqueue(event) - - # Verify - assert mock_processor.process_event.call_count == 3 - for i, event in enumerate(events): - assert mock_processor.process_event.call_args_list[i][0][0] == event diff --git a/tests/unit/queues/test_factory.py b/tests/unit/queues/test_factory.py index d97b51c..6afc6c5 100644 --- a/tests/unit/queues/test_factory.py +++ b/tests/unit/queues/test_factory.py @@ -4,7 +4,6 @@ from eventkit.config import QueueMode, Settings from eventkit.queues.async_queue import AsyncQueue -from eventkit.queues.direct import DirectQueue from eventkit.queues.factory import create_queue from eventkit.queues.pubsub import PubSubQueue @@ -12,26 +11,14 @@ class TestQueueFactory: """Test queue factory.""" - def test_create_direct_queue(self): - """Factory should create DirectQueue when mode is DIRECT.""" - # Setup - settings = Settings(GCP_PROJECT_ID="test-project", EVENTKIT_QUEUE_MODE=QueueMode.DIRECT) - mock_processor = Mock() - - # Execute - queue = create_queue(mock_processor, settings) - - # Verify - assert isinstance(queue, DirectQueue) - assert queue.processor == mock_processor - def test_create_async_queue(self): - """Factory should create AsyncQueue when mode is ASYNC.""" + """Factory should create AsyncQueue with ring buffer when mode is ASYNC.""" # Setup settings = Settings( GCP_PROJECT_ID="test-project", EVENTKIT_QUEUE_MODE=QueueMode.ASYNC, EVENTKIT_ASYNC_WORKERS=8, + EVENTKIT_RING_BUFFER_DB_PATH=":memory:", # Use in-memory for tests ) mock_processor = Mock() @@ -42,11 +29,16 @@ def test_create_async_queue(self): assert isinstance(queue, AsyncQueue) assert queue.processor == mock_processor assert queue.num_workers == 8 + assert queue.ring_buffer is not None # Ring buffer created by factory def test_async_queue_default_workers(self): """Factory should use default worker count if not specified.""" # Setup - settings = Settings(GCP_PROJECT_ID="test-project", EVENTKIT_QUEUE_MODE=QueueMode.ASYNC) + settings = Settings( + GCP_PROJECT_ID="test-project", + EVENTKIT_QUEUE_MODE=QueueMode.ASYNC, + EVENTKIT_RING_BUFFER_DB_PATH=":memory:", # Use in-memory for tests + ) mock_processor = Mock() # Execute @@ -55,14 +47,16 @@ def test_async_queue_default_workers(self): # Verify assert isinstance(queue, AsyncQueue) assert queue.num_workers == 4 # Default from config + assert queue.ring_buffer is not None # Ring buffer created by factory def test_create_pubsub_queue(self): - """Factory should create PubSubQueue when mode is PUBSUB.""" + """Factory should create PubSubQueue with ring buffer when mode is PUBSUB.""" # Setup settings = Settings( GCP_PROJECT_ID="test-project", EVENTKIT_QUEUE_MODE=QueueMode.PUBSUB, EVENTKIT_PUBSUB_WORKERS=4, + EVENTKIT_RING_BUFFER_DB_PATH=":memory:", # Use in-memory for tests ) mock_processor = Mock() @@ -73,3 +67,4 @@ def test_create_pubsub_queue(self): assert isinstance(queue, PubSubQueue) assert queue.processor == mock_processor assert queue.settings == settings + assert queue.ring_buffer is not None # Ring buffer created by factory diff --git a/tests/unit/queues/test_pubsub.py b/tests/unit/queues/test_pubsub.py index 6c6df98..66066b2 100644 --- a/tests/unit/queues/test_pubsub.py +++ b/tests/unit/queues/test_pubsub.py @@ -1,8 +1,7 @@ """Unit tests for PubSubQueue.""" import asyncio -import json -from unittest.mock import AsyncMock, MagicMock, patch +from unittest.mock import AsyncMock, MagicMock, Mock, patch import pytest @@ -18,6 +17,17 @@ def mock_processor(): return AsyncMock(spec=Processor) +@pytest.fixture +def mock_ring_buffer(): + """Mock RingBuffer for testing.""" + ring_buffer = Mock() + ring_buffer.write = Mock(return_value=1) # Return event ID + ring_buffer.fetch_unpublished = Mock(return_value=[]) + ring_buffer.mark_published = Mock() + ring_buffer.close = Mock() + return ring_buffer + + @pytest.fixture def settings(): """Test settings.""" @@ -56,12 +66,13 @@ class TestPubSubQueueInit: """Tests for initialization.""" def test_creates_clients_and_paths( - self, mock_processor, settings, mock_publisher, mock_subscriber + self, mock_processor, mock_ring_buffer, settings, mock_publisher, mock_subscriber ): """Test __init__ creates clients and paths.""" - queue = PubSubQueue(mock_processor, settings) + queue = PubSubQueue(mock_processor, mock_ring_buffer, settings) assert queue.processor == mock_processor + assert queue.ring_buffer == mock_ring_buffer assert queue.topic_path == "projects/test-project/topics/eventkit-events" assert isinstance(queue.internal_queue, asyncio.Queue) @@ -71,24 +82,16 @@ class TestPubSubQueueEnqueue: @pytest.mark.asyncio async def test_publishes_to_topic( - self, mock_processor, settings, mock_publisher, mock_subscriber + self, mock_processor, mock_ring_buffer, settings, mock_publisher, mock_subscriber ): - """Test enqueue() publishes to Pub/Sub.""" - queue = PubSubQueue(mock_processor, settings) + """Test enqueue() writes to ring buffer.""" + queue = PubSubQueue(mock_processor, mock_ring_buffer, settings) event = RawEvent(payload={"type": "track"}, stream="events") - future = MagicMock() - future.result = MagicMock(return_value=None) - mock_publisher.publish.return_value = future - await queue.enqueue(event) - mock_publisher.publish.assert_called_once() - call_args = mock_publisher.publish.call_args - # Published data is the full RawEvent serialized - published_data = json.loads(call_args[1]["data"]) - assert published_data["payload"] == event.payload - assert published_data["stream"] == "events" + # Now enqueue writes to ring buffer, not pub/sub directly + mock_ring_buffer.write.assert_called_once_with(event) class TestPubSubQueueLifecycle: @@ -96,76 +99,78 @@ class TestPubSubQueueLifecycle: @pytest.mark.asyncio async def test_start_initializes( - self, mock_processor, settings, mock_publisher, mock_subscriber + self, mock_processor, mock_ring_buffer, settings, mock_publisher, mock_subscriber ): - """Test start() initializes everything.""" - queue = PubSubQueue(mock_processor, settings) - queue._ensure_resources = AsyncMock() + """Test start() initializes workers, processor, and resources.""" + queue = PubSubQueue(mock_processor, mock_ring_buffer, settings) + + # Mock resource creation + with patch.object(queue, "_ensure_resources", new_callable=AsyncMock): + await queue.start() - await queue.start() + # Verify processor started + mock_processor.start.assert_called_once() - mock_processor.start.assert_awaited_once() - assert len(queue.workers) == 2 - mock_subscriber.subscribe.assert_called_once() + # Verify workers created + assert len(queue.workers) == settings.EVENTKIT_PUBSUB_WORKERS await queue.stop() @pytest.mark.asyncio async def test_stop_shuts_down_gracefully( - self, mock_processor, settings, mock_publisher, mock_subscriber + self, mock_processor, mock_ring_buffer, settings, mock_publisher, mock_subscriber ): - """Test stop() gracefully shuts down.""" - queue = PubSubQueue(mock_processor, settings) - queue._ensure_resources = AsyncMock() + """Test stop() drains ring buffer, internal queue, and stops workers.""" + queue = PubSubQueue(mock_processor, mock_ring_buffer, settings) - await queue.start() - await queue.stop() + with patch.object(queue, "_ensure_resources", new_callable=AsyncMock): + await queue.start() + await queue.stop() - mock_processor.stop.assert_awaited_once() - for worker in queue.workers: - assert worker.done() + # Verify processor stopped + mock_processor.stop.assert_called_once() class TestPubSubQueueCallback: """Tests for _pubsub_callback().""" - @pytest.mark.asyncio - async def test_bridges_to_internal_queue( - self, mock_processor, settings, mock_publisher, mock_subscriber + def test_bridges_to_internal_queue( + self, mock_processor, mock_ring_buffer, settings, mock_publisher, mock_subscriber ): - """Test callback successfully decodes message without errors.""" - queue = PubSubQueue(mock_processor, settings) - queue._ensure_resources = AsyncMock() - - await queue.start() - - mock_message = MagicMock() - # Message data should be full RawEvent JSON - raw_event = RawEvent(payload={"type": "track"}) - mock_message.data = raw_event.model_dump_json().encode("utf-8") + """Test callback bridges Pub/Sub message to internal queue.""" + queue = PubSubQueue(mock_processor, mock_ring_buffer, settings) + queue.loop = asyncio.get_event_loop() - # Call callback - should not raise, should not call nack - queue._pubsub_callback(mock_message) + # Create mock Pub/Sub message + raw_event = RawEvent(payload={"type": "track"}, stream="events") + message = MagicMock() + message.data = raw_event.model_dump_json().encode("utf-8") - # Give a bit of time for async processing - await asyncio.sleep(0.1) + # Call callback + queue._pubsub_callback(message) - # Should not have called nack - mock_message.nack.assert_not_called() + # Give async op time to complete + queue.loop.run_until_complete(asyncio.sleep(0.01)) - await queue.stop() + # Verify message was added to internal queue + assert not queue.internal_queue.empty() - def test_nacks_on_decode_error(self, mock_processor, settings, mock_publisher, mock_subscriber): - """Test callback nacks on error.""" - queue = PubSubQueue(mock_processor, settings) + def test_nacks_on_decode_error( + self, mock_processor, mock_ring_buffer, settings, mock_publisher, mock_subscriber + ): + """Test callback nacks message on decode error.""" + queue = PubSubQueue(mock_processor, mock_ring_buffer, settings) queue.loop = asyncio.get_event_loop() - mock_message = MagicMock() - mock_message.data = b"invalid" + # Create invalid message + message = MagicMock() + message.data = b"invalid json" - queue._pubsub_callback(mock_message) + # Call callback + queue._pubsub_callback(message) - mock_message.nack.assert_called_once() + # Verify message was nacked + message.nack.assert_called_once() class TestPubSubQueueWorker: @@ -173,43 +178,57 @@ class TestPubSubQueueWorker: @pytest.mark.asyncio async def test_processes_and_acks( - self, mock_processor, settings, mock_publisher, mock_subscriber + self, mock_processor, mock_ring_buffer, settings, mock_publisher, mock_subscriber ): - """Test worker processes events and acks.""" - queue = PubSubQueue(mock_processor, settings) - queue._ensure_resources = AsyncMock() + """Test worker processes event and acks message.""" + queue = PubSubQueue(mock_processor, mock_ring_buffer, settings) - await queue.start() + # Add mock message to internal queue + raw_event = RawEvent(payload={"type": "track"}, stream="events") + message = MagicMock() + await queue.internal_queue.put((raw_event, message)) - raw_event = RawEvent(payload={"type": "track"}) - mock_message = MagicMock() - await queue.internal_queue.put((raw_event, mock_message)) + # Start worker (will process one message) + queue.shutdown_event.set() # Trigger shutdown after processing + worker_task = asyncio.create_task(queue._worker(0)) + # Wait for processing await asyncio.sleep(0.1) - mock_processor.process_event.assert_awaited_with(raw_event) - mock_message.ack.assert_called_once() + # Verify processor called + mock_processor.process_event.assert_called_once_with(raw_event) - await queue.stop() + # Verify message acked + message.ack.assert_called_once() + + await worker_task @pytest.mark.asyncio - async def test_nacks_on_error(self, mock_processor, settings, mock_publisher, mock_subscriber): - """Test worker nacks on processing error.""" - queue = PubSubQueue(mock_processor, settings) - queue._ensure_resources = AsyncMock() - mock_processor.process_event.side_effect = Exception("Error") + async def test_nacks_on_error( + self, mock_processor, mock_ring_buffer, settings, mock_publisher, mock_subscriber + ): + """Test worker nacks message on processing error.""" + queue = PubSubQueue(mock_processor, mock_ring_buffer, settings) - await queue.start() + # Configure processor to raise error + mock_processor.process_event.side_effect = Exception("Processing error") - raw_event = RawEvent(payload={"type": "track"}) - mock_message = MagicMock() - await queue.internal_queue.put((raw_event, mock_message)) + # Add mock message to internal queue + raw_event = RawEvent(payload={"type": "track"}, stream="events") + message = MagicMock() + await queue.internal_queue.put((raw_event, message)) + # Start worker (will process one message) + queue.shutdown_event.set() # Trigger shutdown after processing + worker_task = asyncio.create_task(queue._worker(0)) + + # Wait for processing await asyncio.sleep(0.1) - mock_message.nack.assert_called_once() + # Verify message nacked + message.nack.assert_called_once() - await queue.stop() + await worker_task class TestPubSubQueueResources: @@ -217,30 +236,36 @@ class TestPubSubQueueResources: @pytest.mark.asyncio async def test_creates_missing_resources( - self, mock_processor, settings, mock_publisher, mock_subscriber + self, mock_processor, mock_ring_buffer, settings, mock_publisher, mock_subscriber ): - """Test creates topics and subscription if missing.""" - queue = PubSubQueue(mock_processor, settings) + """Test _ensure_resources creates missing topics/subscriptions.""" + queue = PubSubQueue(mock_processor, mock_ring_buffer, settings) + # Mock get to raise exception (not found) mock_publisher.get_topic.side_effect = Exception("Not found") mock_subscriber.get_subscription.side_effect = Exception("Not found") + # Run resource creation await queue._ensure_resources() - assert mock_publisher.create_topic.call_count == 2 - mock_subscriber.create_subscription.assert_called_once() + # Verify resources were created + assert mock_publisher.create_topic.call_count >= 1 + assert mock_subscriber.create_subscription.call_count >= 1 @pytest.mark.asyncio async def test_skips_existing_resources( - self, mock_processor, settings, mock_publisher, mock_subscriber + self, mock_processor, mock_ring_buffer, settings, mock_publisher, mock_subscriber ): - """Test skips existing resources.""" - queue = PubSubQueue(mock_processor, settings) + """Test _ensure_resources skips existing topics/subscriptions.""" + queue = PubSubQueue(mock_processor, mock_ring_buffer, settings) + # Mock get to return (exists) mock_publisher.get_topic.return_value = MagicMock() mock_subscriber.get_subscription.return_value = MagicMock() + # Run resource creation await queue._ensure_resources() + # Verify resources were not created mock_publisher.create_topic.assert_not_called() mock_subscriber.create_subscription.assert_not_called() diff --git a/tests/unit/ring_buffer/__init__.py b/tests/unit/ring_buffer/__init__.py new file mode 100644 index 0000000..1017151 --- /dev/null +++ b/tests/unit/ring_buffer/__init__.py @@ -0,0 +1 @@ +"""Unit tests for ring buffer.""" diff --git a/tests/unit/ring_buffer/test_cleanup.py b/tests/unit/ring_buffer/test_cleanup.py new file mode 100644 index 0000000..5e7ddb0 --- /dev/null +++ b/tests/unit/ring_buffer/test_cleanup.py @@ -0,0 +1,223 @@ +"""Tests for RingBufferCleanup.""" + +import time +from unittest.mock import MagicMock + +import pytest + +from eventkit.ring_buffer.base import RingBuffer +from eventkit.ring_buffer.cleanup import RingBufferCleanup + + +@pytest.fixture +def mock_ring_buffer(): + """Create mock ring buffer.""" + rb = MagicMock(spec=RingBuffer) + rb.delete_old_published.return_value = 0 + rb.delete_oldest_published.return_value = 0 + return rb + + +def test_start_launches_thread(mock_ring_buffer): + """Test that start() launches background thread.""" + cleanup = RingBufferCleanup(ring_buffer=mock_ring_buffer, cleanup_interval=60.0) + + cleanup.start() + + assert cleanup._running is True + assert cleanup._thread is not None + assert cleanup._thread.is_alive() + + cleanup.stop() + + +def test_start_when_already_running_logs_warning(mock_ring_buffer, caplog): + """Test that calling start() when already running logs warning.""" + cleanup = RingBufferCleanup(ring_buffer=mock_ring_buffer, cleanup_interval=60.0) + + cleanup.start() + cleanup.start() # Should log warning + + assert "Cleanup worker already running" in caplog.text + + cleanup.stop() + + +def test_stop_when_not_running_does_nothing(mock_ring_buffer): + """Test that stop() when not running does nothing.""" + cleanup = RingBufferCleanup(ring_buffer=mock_ring_buffer, cleanup_interval=60.0) + + # Should not raise + cleanup.stop() + + assert cleanup._running is False + + +def test_stop_waits_for_thread_to_finish(mock_ring_buffer): + """Test that stop() waits for thread to finish.""" + cleanup = RingBufferCleanup(ring_buffer=mock_ring_buffer, cleanup_interval=0.1) + + cleanup.start() + time.sleep(0.05) # Let it run a bit + cleanup.stop(timeout=2.0) + + assert cleanup._running is False + assert not cleanup._thread.is_alive() + + +def test_runs_time_based_cleanup(mock_ring_buffer): + """Test that cleanup worker runs time-based cleanup.""" + mock_ring_buffer.delete_old_published.return_value = 5 # 5 events deleted + + cleanup = RingBufferCleanup( + ring_buffer=mock_ring_buffer, + retention_hours=24, + cleanup_interval=0.1, # Short interval for test + ) + + cleanup.start() + time.sleep(0.2) # Wait for at least one cleanup run + cleanup.stop() + + # Verify time-based cleanup was called + mock_ring_buffer.delete_old_published.assert_called() + call_args = mock_ring_buffer.delete_old_published.call_args + assert call_args[1]["max_age_hours"] == 24 + + +def test_runs_size_based_cleanup(mock_ring_buffer): + """Test that cleanup worker runs size-based cleanup.""" + mock_ring_buffer.delete_oldest_published.return_value = 10 # 10 events deleted + + cleanup = RingBufferCleanup( + ring_buffer=mock_ring_buffer, + max_size=100000, + cleanup_interval=0.1, + ) + + cleanup.start() + time.sleep(0.2) + cleanup.stop() + + # Verify size-based cleanup was called + mock_ring_buffer.delete_oldest_published.assert_called() + call_args = mock_ring_buffer.delete_oldest_published.call_args + assert call_args[1]["keep_count"] == 100000 + + +def test_respects_cleanup_interval(mock_ring_buffer): + """Test that cleanup worker respects cleanup_interval.""" + cleanup = RingBufferCleanup( + ring_buffer=mock_ring_buffer, + cleanup_interval=0.2, # 200ms interval + ) + + cleanup.start() + time.sleep(0.35) # Should allow ~1-2 cleanup runs + cleanup.stop() + + # Should have run cleanup 1-2 times (not more than 3) + assert 1 <= mock_ring_buffer.delete_old_published.call_count <= 3 + + +def test_handles_time_cleanup_error_gracefully(mock_ring_buffer): + """Test that errors in time-based cleanup don't crash the worker.""" + mock_ring_buffer.delete_old_published.side_effect = Exception("DB error") + + cleanup = RingBufferCleanup( + ring_buffer=mock_ring_buffer, + cleanup_interval=0.1, + ) + + cleanup.start() + time.sleep(0.2) # Should survive error + cleanup.stop() + + # Worker should still be callable and not crash + assert not cleanup._running + + +def test_handles_size_cleanup_error_gracefully(mock_ring_buffer): + """Test that errors in size-based cleanup don't crash the worker.""" + mock_ring_buffer.delete_oldest_published.side_effect = Exception("DB error") + + cleanup = RingBufferCleanup( + ring_buffer=mock_ring_buffer, + cleanup_interval=0.1, + ) + + cleanup.start() + time.sleep(0.2) + cleanup.stop() + + # Worker should still be callable + assert not cleanup._running + + +def test_logs_when_events_deleted(mock_ring_buffer, caplog): + """Test that cleanup worker logs when events are deleted.""" + import logging + + # Set log level to capture INFO logs + caplog.set_level(logging.INFO) + + mock_ring_buffer.delete_old_published.return_value = 50 + mock_ring_buffer.delete_oldest_published.return_value = 25 + + cleanup = RingBufferCleanup( + ring_buffer=mock_ring_buffer, + retention_hours=24, + max_size=1000, + cleanup_interval=0.1, + ) + + cleanup.start() + time.sleep(0.2) + cleanup.stop() + + # Verify logging messages + assert "Deleted 50 events older than 24 hours" in caplog.text + assert "Deleted 25 events to enforce max size (1000)" in caplog.text + + +def test_does_not_log_when_no_events_deleted(mock_ring_buffer, caplog): + """Test that cleanup worker doesn't log when no events are deleted.""" + mock_ring_buffer.delete_old_published.return_value = 0 + mock_ring_buffer.delete_oldest_published.return_value = 0 + + cleanup = RingBufferCleanup( + ring_buffer=mock_ring_buffer, + cleanup_interval=0.1, + ) + + cleanup.start() + time.sleep(0.2) + cleanup.stop() + + # Should not log deletion messages + assert "Deleted" not in caplog.text or "Deleted 0" not in caplog.text + + +def test_cleanup_default_params(mock_ring_buffer): + """Test that cleanup uses sensible defaults.""" + cleanup = RingBufferCleanup(ring_buffer=mock_ring_buffer) + + assert cleanup.retention_hours == 24 + assert cleanup.max_size == 100000 + assert cleanup.cleanup_interval == 3600.0 # 1 hour + + +def test_runs_both_cleanup_types_sequentially(mock_ring_buffer): + """Test that both time and size cleanups run in sequence.""" + cleanup = RingBufferCleanup( + ring_buffer=mock_ring_buffer, + cleanup_interval=0.1, + ) + + cleanup.start() + time.sleep(0.2) + cleanup.stop() + + # Both cleanup methods should have been called + assert mock_ring_buffer.delete_old_published.called + assert mock_ring_buffer.delete_oldest_published.called diff --git a/tests/unit/ring_buffer/test_factory.py b/tests/unit/ring_buffer/test_factory.py new file mode 100644 index 0000000..1e5051b --- /dev/null +++ b/tests/unit/ring_buffer/test_factory.py @@ -0,0 +1,34 @@ +"""Tests for ring buffer factory.""" + +from unittest.mock import Mock + +import pytest + +from eventkit.ring_buffer.factory import RingBufferMode, create_ring_buffer + + +def test_create_sqlite_ring_buffer(): + """Test creating SQLite ring buffer (default implementation).""" + settings = Mock() + settings.EVENTKIT_RING_BUFFER_MODE = RingBufferMode.SQLITE + settings.EVENTKIT_RING_BUFFER_DB_PATH = ":memory:" + + ring_buffer = create_ring_buffer(settings) + + assert ring_buffer is not None + # Will be SQLiteRingBuffer once implemented in Commit 2 + assert hasattr(ring_buffer, "write") + assert hasattr(ring_buffer, "fetch_unpublished") + assert hasattr(ring_buffer, "mark_published") + + +def test_unsupported_mode_raises_error(): + """Test that unsupported ring buffer mode raises ValueError with helpful message.""" + settings = Mock() + settings.EVENTKIT_RING_BUFFER_MODE = "unsupported" + + with pytest.raises(ValueError) as exc_info: + create_ring_buffer(settings) + + assert "Unsupported ring buffer mode" in str(exc_info.value) + assert "only 'sqlite' is currently implemented" in str(exc_info.value).lower() diff --git a/tests/unit/ring_buffer/test_publisher.py b/tests/unit/ring_buffer/test_publisher.py new file mode 100644 index 0000000..49c3c2b --- /dev/null +++ b/tests/unit/ring_buffer/test_publisher.py @@ -0,0 +1,332 @@ +"""Tests for RingBufferPublisher.""" + +import asyncio +import time +from datetime import UTC, datetime +from unittest.mock import AsyncMock, MagicMock, Mock + +import pytest + +from eventkit.ring_buffer.base import RingBuffer, RingBufferEntry +from eventkit.ring_buffer.publisher import RingBufferPublisher +from eventkit.schema.raw import RawEvent + + +@pytest.fixture +def mock_ring_buffer(): + """Create mock ring buffer.""" + rb = MagicMock(spec=RingBuffer) + rb.fetch_unpublished.return_value = [] + rb.mark_published.return_value = None + return rb + + +@pytest.fixture +def mock_queue(): + """Create mock queue.""" + queue = Mock() + queue.enqueue = AsyncMock() # Use AsyncMock for async methods + return queue + + +@pytest.fixture +def event_loop(): + """Create and run event loop in a separate thread for tests.""" + import threading + + loop = asyncio.new_event_loop() + + # Run the event loop in a background thread + def run_loop(): + asyncio.set_event_loop(loop) + loop.run_forever() + + thread = threading.Thread(target=run_loop, daemon=True) + thread.start() + + yield loop + + # Stop the loop and wait for thread + loop.call_soon_threadsafe(loop.stop) + thread.join(timeout=1.0) + loop.close() + + +@pytest.fixture +def sample_event(): + """Create sample raw event.""" + return RawEvent( + payload={ + "type": "track", + "event": "test_event", + "userId": "user123", + } + ) + + +def test_start_launches_thread(mock_ring_buffer, mock_queue, event_loop): + """Test that start() launches background thread.""" + publisher = RingBufferPublisher( + ring_buffer=mock_ring_buffer, + queue=mock_queue, + event_loop=event_loop, + poll_interval=1.0, + ) + + publisher.start() + + assert publisher._running is True + assert publisher._thread is not None + assert publisher._thread.is_alive() + + publisher.stop() + + +def test_start_when_already_running_logs_warning(mock_ring_buffer, mock_queue, event_loop, caplog): + """Test that calling start() when already running logs warning.""" + publisher = RingBufferPublisher( + ring_buffer=mock_ring_buffer, + queue=mock_queue, + event_loop=event_loop, + poll_interval=1.0, + ) + + publisher.start() + publisher.start() # Should log warning + + assert "Publisher already running" in caplog.text + + publisher.stop() + + +def test_stop_when_not_running_does_nothing(mock_ring_buffer, mock_queue, event_loop): + """Test that stop() when not running does nothing.""" + publisher = RingBufferPublisher( + ring_buffer=mock_ring_buffer, + queue=mock_queue, + event_loop=event_loop, + poll_interval=1.0, + ) + + # Should not raise + publisher.stop() + + assert publisher._running is False + + +def test_stop_waits_for_thread_to_finish(mock_ring_buffer, mock_queue, event_loop): + """Test that stop() waits for thread to finish.""" + publisher = RingBufferPublisher( + ring_buffer=mock_ring_buffer, + queue=mock_queue, + event_loop=event_loop, + poll_interval=0.1, + ) + + publisher.start() + time.sleep(0.2) # Let it run a bit + publisher.stop(timeout=2.0) + + assert publisher._running is False + assert not publisher._thread.is_alive() + + +def test_publishes_events_from_ring_buffer(mock_ring_buffer, mock_queue, event_loop, sample_event): + """Test that publisher fetches and publishes events.""" + # Setup ring buffer to return one event + entry = RingBufferEntry( + id=1, + event_data=sample_event.model_dump_json(), + created_at=datetime.now(UTC).isoformat(), + published=False, + published_at=None, + ) + mock_ring_buffer.fetch_unpublished.return_value = [entry] + + publisher = RingBufferPublisher( + ring_buffer=mock_ring_buffer, + queue=mock_queue, + event_loop=event_loop, + poll_interval=0.05, + ) + + publisher.start() + time.sleep(0.15) # Wait for at least one poll + publisher.stop() + + # Verify event was enqueued + assert mock_queue.enqueue.call_count >= 1 + + # Verify event was marked as published + mock_ring_buffer.mark_published.assert_called() + call_args = mock_ring_buffer.mark_published.call_args + assert 1 in call_args[0][0] # Entry ID 1 should be marked + + +def test_marks_published_after_successful_enqueue( + mock_ring_buffer, mock_queue, event_loop, sample_event +): + """Test that events are marked published only after successful enqueue.""" + entry = RingBufferEntry( + id=42, + event_data=sample_event.model_dump_json(), + created_at=datetime.now(UTC).isoformat(), + published=False, + published_at=None, + ) + mock_ring_buffer.fetch_unpublished.return_value = [entry] + + publisher = RingBufferPublisher( + ring_buffer=mock_ring_buffer, + queue=mock_queue, + event_loop=event_loop, + poll_interval=0.05, + ) + + publisher.start() + time.sleep(0.15) + publisher.stop() + + # Verify mark_published was called with correct ID + mock_ring_buffer.mark_published.assert_called() + call_args = mock_ring_buffer.mark_published.call_args[0][0] + assert 42 in call_args + + +def test_does_not_mark_published_on_enqueue_failure( + mock_ring_buffer, mock_queue, event_loop, sample_event +): + """Test that failed enqueues don't get marked as published.""" + entry = RingBufferEntry( + id=99, + event_data=sample_event.model_dump_json(), + created_at=datetime.now(UTC).isoformat(), + published=False, + published_at=None, + ) + mock_ring_buffer.fetch_unpublished.return_value = [entry] + + # Make enqueue raise exception + mock_queue.enqueue.side_effect = Exception("Queue full") + + publisher = RingBufferPublisher( + ring_buffer=mock_ring_buffer, + queue=mock_queue, + event_loop=event_loop, + poll_interval=0.05, + ) + + publisher.start() + time.sleep(0.15) + publisher.stop() + + # Verify event was NOT marked as published + if mock_ring_buffer.mark_published.called: + # If it was called, verify ID 99 is NOT in the list + call_args = mock_ring_buffer.mark_published.call_args[0][0] + assert 99 not in call_args + + +def test_handles_invalid_json_gracefully(mock_ring_buffer, mock_queue, event_loop): + """Test that invalid JSON in event_data is handled gracefully.""" + entry = RingBufferEntry( + id=1, + event_data="invalid json {{{", + created_at=datetime.now(UTC).isoformat(), + published=False, + published_at=None, + ) + mock_ring_buffer.fetch_unpublished.return_value = [entry] + + publisher = RingBufferPublisher( + ring_buffer=mock_ring_buffer, + queue=mock_queue, + event_loop=event_loop, + poll_interval=0.05, + ) + + publisher.start() + time.sleep(0.15) + publisher.stop() + + # Should not crash, but event should not be marked published + if mock_ring_buffer.mark_published.called: + call_args = mock_ring_buffer.mark_published.call_args[0][0] + assert 1 not in call_args + + +def test_publishes_in_batches(mock_ring_buffer, mock_queue, event_loop, sample_event): + """Test that publisher respects batch_size.""" + # Create 3 events + entries = [ + RingBufferEntry( + id=i, + event_data=sample_event.model_dump_json(), + created_at=datetime.now(UTC).isoformat(), + published=False, + published_at=None, + ) + for i in range(1, 4) + ] + mock_ring_buffer.fetch_unpublished.return_value = entries + + publisher = RingBufferPublisher( + ring_buffer=mock_ring_buffer, + queue=mock_queue, + event_loop=event_loop, + batch_size=3, + poll_interval=0.05, + ) + + publisher.start() + time.sleep(0.15) + publisher.stop() + + # Should have enqueued 3 events + assert mock_queue.enqueue.call_count >= 3 + + +def test_drains_on_shutdown(mock_ring_buffer, mock_queue, event_loop, sample_event): + """Test that publisher drains remaining events on shutdown.""" + # First call returns events, second call returns empty + entry = RingBufferEntry( + id=1, + event_data=sample_event.model_dump_json(), + created_at=datetime.now(UTC).isoformat(), + published=False, + published_at=None, + ) + mock_ring_buffer.fetch_unpublished.side_effect = [[entry], []] + + publisher = RingBufferPublisher( + ring_buffer=mock_ring_buffer, + queue=mock_queue, + event_loop=event_loop, + poll_interval=1.0, + ) + + publisher.start() + time.sleep(0.1) # Don't wait for poll_interval + publisher.stop() # Should trigger drain + + # Verify event was published during drain + assert mock_queue.enqueue.call_count >= 1 + + +def test_respects_poll_interval(mock_ring_buffer, mock_queue, event_loop): + """Test that publisher respects poll_interval between batches.""" + mock_ring_buffer.fetch_unpublished.return_value = [] + + publisher = RingBufferPublisher( + ring_buffer=mock_ring_buffer, + queue=mock_queue, + event_loop=event_loop, + poll_interval=0.2, + ) + + publisher.start() + time.sleep(0.35) # Should allow ~1-2 polls (0.2s interval) + publisher.stop() + + # Should have polled 1-2 times (not more than 3) + assert 1 <= mock_ring_buffer.fetch_unpublished.call_count <= 3 diff --git a/tests/unit/ring_buffer/test_sqlite.py b/tests/unit/ring_buffer/test_sqlite.py new file mode 100644 index 0000000..63d49ca --- /dev/null +++ b/tests/unit/ring_buffer/test_sqlite.py @@ -0,0 +1,333 @@ +"""Tests for SQLiteRingBuffer implementation.""" + +from datetime import UTC, datetime, timedelta + +import pytest + +from eventkit.ring_buffer.sqlite import SQLiteRingBuffer +from eventkit.schema.raw import RawEvent + + +@pytest.fixture +def ring_buffer(): + """Create in-memory SQLite ring buffer for testing.""" + rb = SQLiteRingBuffer(db_path=":memory:", max_size=100, retention_hours=24) + yield rb + rb.close() + + +@pytest.fixture +def sample_event(): + """Create sample raw event for testing.""" + return RawEvent( + payload={ + "type": "track", + "event": "test_event", + "userId": "user123", + "timestamp": datetime.now(UTC).isoformat(), + "properties": {"foo": "bar"}, + } + ) + + +def test_init_creates_table_with_wal_mode(): + """Test that initialization creates table with WAL mode.""" + import os + import tempfile + + # WAL mode requires a file-based database (not :memory:) + fd, db_path = tempfile.mkstemp(suffix=".db") + os.close(fd) + + try: + rb = SQLiteRingBuffer(db_path=db_path, max_size=100, retention_hours=24) + + # Verify WAL mode is enabled + cursor = rb.conn.execute("PRAGMA journal_mode") + journal_mode = cursor.fetchone()[0] + assert journal_mode.lower() == "wal" + + # Verify table exists + cursor = rb.conn.execute( + "SELECT name FROM sqlite_master WHERE type='table' AND name='ring_buffer'" + ) + assert cursor.fetchone() is not None + + rb.close() + finally: + # Clean up temp file + if os.path.exists(db_path): + os.unlink(db_path) + # Also clean up WAL and SHM files + for ext in ["-wal", "-shm"]: + wal_file = db_path + ext + if os.path.exists(wal_file): + os.unlink(wal_file) + + +def test_write_inserts_event_and_returns_id(ring_buffer, sample_event): + """Test that write() inserts event and returns ID.""" + event_id = ring_buffer.write(sample_event) + + assert isinstance(event_id, int) + assert event_id > 0 + + # Verify event is in database + cursor = ring_buffer.conn.execute("SELECT COUNT(*) FROM ring_buffer") + count = cursor.fetchone()[0] + assert count == 1 + + +def test_write_multiple_events_sequential_ids(ring_buffer, sample_event): + """Test that multiple writes generate sequential IDs.""" + id1 = ring_buffer.write(sample_event) + id2 = ring_buffer.write(sample_event) + id3 = ring_buffer.write(sample_event) + + assert id2 == id1 + 1 + assert id3 == id2 + 1 + + +def test_fetch_unpublished_returns_oldest_first(ring_buffer, sample_event): + """Test that fetch_unpublished() returns events in FIFO order.""" + # Write 3 events + id1 = ring_buffer.write(sample_event) + id2 = ring_buffer.write(sample_event) + id3 = ring_buffer.write(sample_event) + + # Fetch unpublished + entries = ring_buffer.fetch_unpublished(limit=10) + + assert len(entries) == 3 + assert entries[0].id == id1 + assert entries[1].id == id2 + assert entries[2].id == id3 + assert all(not entry.published for entry in entries) + + +def test_fetch_unpublished_respects_limit(ring_buffer, sample_event): + """Test that fetch_unpublished() respects limit parameter.""" + # Write 5 events + for _ in range(5): + ring_buffer.write(sample_event) + + # Fetch only 2 + entries = ring_buffer.fetch_unpublished(limit=2) + + assert len(entries) == 2 + + +def test_fetch_unpublished_excludes_published_events(ring_buffer, sample_event): + """Test that fetch_unpublished() does not return published events.""" + # Write 3 events + id1 = ring_buffer.write(sample_event) + id2 = ring_buffer.write(sample_event) + id3 = ring_buffer.write(sample_event) + + # Mark first event as published + ring_buffer.mark_published([id1]) + + # Fetch unpublished + entries = ring_buffer.fetch_unpublished(limit=10) + + assert len(entries) == 2 + assert entries[0].id == id2 + assert entries[1].id == id3 + + +def test_mark_published_sets_flag_and_timestamp(ring_buffer, sample_event): + """Test that mark_published() sets published flag and timestamp.""" + event_id = ring_buffer.write(sample_event) + + # Mark as published + before = datetime.now(UTC) + ring_buffer.mark_published([event_id]) + after = datetime.now(UTC) + + # Verify flag and timestamp + cursor = ring_buffer.conn.execute( + "SELECT published, published_at FROM ring_buffer WHERE id = ?", (event_id,) + ) + row = cursor.fetchone() + assert row[0] == 1 # published = TRUE + assert row[1] is not None + + published_at = datetime.fromisoformat(row[1]) + assert before <= published_at <= after + + +def test_mark_published_handles_multiple_ids(ring_buffer, sample_event): + """Test that mark_published() can mark multiple events at once.""" + id1 = ring_buffer.write(sample_event) + id2 = ring_buffer.write(sample_event) + id3 = ring_buffer.write(sample_event) + + ring_buffer.mark_published([id1, id2, id3]) + + # Verify all are published + cursor = ring_buffer.conn.execute("SELECT COUNT(*) FROM ring_buffer WHERE published = TRUE") + count = cursor.fetchone()[0] + assert count == 3 + + +def test_mark_published_handles_empty_list(ring_buffer): + """Test that mark_published() handles empty list gracefully.""" + ring_buffer.mark_published([]) # Should not raise + + +def test_delete_old_published_removes_expired_events(ring_buffer, sample_event): + """Test that delete_old_published() removes events older than threshold.""" + # Write and immediately publish an event + id1 = ring_buffer.write(sample_event) + ring_buffer.mark_published([id1]) + + # Manually set published_at to 48 hours ago + old_timestamp = (datetime.now(UTC) - timedelta(hours=48)).isoformat() + ring_buffer.conn.execute( + "UPDATE ring_buffer SET published_at = ? WHERE id = ?", (old_timestamp, id1) + ) + ring_buffer.conn.commit() + + # Write and publish a recent event + id2 = ring_buffer.write(sample_event) + ring_buffer.mark_published([id2]) + + # Delete events older than 24 hours + deleted = ring_buffer.delete_old_published(max_age_hours=24) + + assert deleted == 1 + assert ring_buffer.count() == 1 + + +def test_delete_old_published_never_deletes_unpublished(ring_buffer, sample_event): + """Test that delete_old_published() never deletes unpublished events.""" + # Write an unpublished event + id1 = ring_buffer.write(sample_event) + + # Manually set created_at to 48 hours ago (but not published) + old_timestamp = (datetime.now(UTC) - timedelta(hours=48)).isoformat() + ring_buffer.conn.execute( + "UPDATE ring_buffer SET created_at = ? WHERE id = ?", (old_timestamp, id1) + ) + ring_buffer.conn.commit() + + # Try to delete old events + deleted = ring_buffer.delete_old_published(max_age_hours=24) + + # Unpublished event should NOT be deleted + assert deleted == 0 + assert ring_buffer.count() == 1 + + +def test_delete_oldest_published_enforces_max_size(ring_buffer, sample_event): + """Test that delete_oldest_published() enforces max buffer size.""" + # Write and publish 10 events + for _ in range(10): + event_id = ring_buffer.write(sample_event) + ring_buffer.mark_published([event_id]) + + # Delete oldest to keep only 5 + deleted = ring_buffer.delete_oldest_published(keep_count=5) + + assert deleted == 5 + assert ring_buffer.count() == 5 + + +def test_delete_oldest_published_never_deletes_unpublished(ring_buffer, sample_event): + """Test that delete_oldest_published() never deletes unpublished events.""" + # Write and publish 5 events + for _ in range(5): + event_id = ring_buffer.write(sample_event) + ring_buffer.mark_published([event_id]) + + # Write 3 unpublished events + for _ in range(3): + ring_buffer.write(sample_event) + + # Total: 8 events (5 published, 3 unpublished) + assert ring_buffer.count() == 8 + + # Delete oldest to keep only 2 events total + deleted = ring_buffer.delete_oldest_published(keep_count=2) + + # Should only delete 3 published events (keeping 2 published + 3 unpublished = 5 total) + assert deleted == 3 + assert ring_buffer.count() == 5 + + # Verify 3 unpublished remain + unpublished = ring_buffer.fetch_unpublished(limit=10) + assert len(unpublished) == 3 + + +def test_count_returns_total_events(ring_buffer, sample_event): + """Test that count() returns total number of events.""" + assert ring_buffer.count() == 0 + + ring_buffer.write(sample_event) + assert ring_buffer.count() == 1 + + ring_buffer.write(sample_event) + assert ring_buffer.count() == 2 + + +def test_count_includes_both_published_and_unpublished(ring_buffer, sample_event): + """Test that count() includes both published and unpublished events.""" + id1 = ring_buffer.write(sample_event) + _ = ring_buffer.write(sample_event) + + ring_buffer.mark_published([id1]) + + # Count should include both + assert ring_buffer.count() == 2 + + +def test_close_closes_connection(ring_buffer): + """Test that close() closes the database connection.""" + ring_buffer.close() + + # Attempting to use connection after close should fail + with pytest.raises(Exception): # ProgrammingError: Cannot operate on a closed database + ring_buffer.conn.execute("SELECT COUNT(*) FROM ring_buffer") + + +def test_entry_contains_event_data(ring_buffer, sample_event): + """Test that RingBufferEntry contains the original event data.""" + ring_buffer.write(sample_event) + + entries = ring_buffer.fetch_unpublished(limit=1) + entry = entries[0] + + # Verify event_data is JSON string + assert isinstance(entry.event_data, str) + assert "test_event" in entry.event_data + assert "user123" in entry.event_data + + +def test_entry_timestamps_are_iso_strings(ring_buffer, sample_event): + """Test that RingBufferEntry timestamps are ISO 8601 strings.""" + event_id = ring_buffer.write(sample_event) + ring_buffer.mark_published([event_id]) + + # Fetch published event directly from DB + cursor = ring_buffer.conn.execute( + "SELECT id, event_data, created_at, published, published_at FROM ring_buffer WHERE id = ?", + (event_id,), + ) + row = cursor.fetchone() + + from eventkit.ring_buffer.base import RingBufferEntry + + entry = RingBufferEntry( + id=row[0], + event_data=row[1], + created_at=row[2], + published=bool(row[3]), + published_at=row[4], + ) + + assert isinstance(entry.created_at, str) + assert isinstance(entry.published_at, str) + + # Verify they can be parsed as ISO timestamps + datetime.fromisoformat(entry.created_at) + datetime.fromisoformat(entry.published_at) diff --git a/tests/unit/test_config.py b/tests/unit/test_config.py new file mode 100644 index 0000000..6ae483c --- /dev/null +++ b/tests/unit/test_config.py @@ -0,0 +1,133 @@ +"""Tests for configuration module.""" + +import os + +import pytest + +from eventkit.config import QueueMode, Settings + + +@pytest.fixture +def clean_env(monkeypatch): + """Clear environment variables for isolated tests.""" + # Clear all EVENTKIT_ env vars + for key in list(os.environ.keys()): + if key.startswith("EVENTKIT_") or key in ["GCP_PROJECT_ID", "LOG_LEVEL"]: + monkeypatch.delenv(key, raising=False) + + +def test_settings_default_values(clean_env, monkeypatch): + """Test that settings have sensible defaults.""" + # Set required field + monkeypatch.setenv("GCP_PROJECT_ID", "test-project") + + settings = Settings() + + # Firestore + assert settings.GCP_PROJECT_ID == "test-project" + assert settings.FIRESTORE_DATABASE == "default" + + # Buffer + assert settings.EVENTKIT_BUFFER_SIZE == 100 + assert settings.EVENTKIT_BUFFER_MAX_SIZE == 1000 + assert settings.EVENTKIT_BUFFER_TIMEOUT == 5.0 + + # Sequencer + assert settings.EVENTKIT_NUM_PARTITIONS == 16 + + # Queue + assert settings.EVENTKIT_QUEUE_MODE == QueueMode.ASYNC + assert settings.EVENTKIT_ASYNC_WORKERS == 4 + + # Pub/Sub + assert settings.EVENTKIT_PUBSUB_PROJECT_ID is None + assert settings.EVENTKIT_PUBSUB_TOPIC == "eventkit-events" + assert settings.EVENTKIT_PUBSUB_SUBSCRIPTION == "eventkit-worker" + assert settings.EVENTKIT_PUBSUB_DLQ_TOPIC == "eventkit-events-dlq" + assert settings.EVENTKIT_PUBSUB_MAX_DELIVERY_ATTEMPTS == 5 + assert settings.EVENTKIT_PUBSUB_WORKERS == 4 + + # Ring Buffer (always enabled - durability is core) + assert settings.EVENTKIT_RING_BUFFER_MODE == "sqlite" + assert settings.EVENTKIT_RING_BUFFER_DB_PATH == "./data/ring_buffer.db" + assert settings.EVENTKIT_RING_BUFFER_MAX_SIZE == 100000 + assert settings.EVENTKIT_RING_BUFFER_RETENTION_HOURS == 24 + assert settings.EVENTKIT_RING_BUFFER_PUBLISHER_BATCH_SIZE == 100 + assert settings.EVENTKIT_RING_BUFFER_PUBLISHER_POLL_INTERVAL == 0.1 + assert settings.EVENTKIT_RING_BUFFER_CLEANUP_INTERVAL == 3600.0 + + # Logging + assert settings.LOG_LEVEL == "INFO" + + +def test_settings_from_environment(clean_env, monkeypatch): + """Test that settings can be overridden via environment variables.""" + monkeypatch.setenv("GCP_PROJECT_ID", "prod-project") + monkeypatch.setenv("FIRESTORE_DATABASE", "production") + monkeypatch.setenv("EVENTKIT_BUFFER_SIZE", "200") + monkeypatch.setenv("EVENTKIT_BUFFER_MAX_SIZE", "2000") + monkeypatch.setenv("EVENTKIT_BUFFER_TIMEOUT", "10.0") + monkeypatch.setenv("EVENTKIT_NUM_PARTITIONS", "32") + monkeypatch.setenv("EVENTKIT_QUEUE_MODE", "async") + monkeypatch.setenv("EVENTKIT_ASYNC_WORKERS", "8") + monkeypatch.setenv("LOG_LEVEL", "DEBUG") + + settings = Settings() + + assert settings.GCP_PROJECT_ID == "prod-project" + assert settings.FIRESTORE_DATABASE == "production" + assert settings.EVENTKIT_BUFFER_SIZE == 200 + assert settings.EVENTKIT_BUFFER_MAX_SIZE == 2000 + assert settings.EVENTKIT_BUFFER_TIMEOUT == 10.0 + assert settings.EVENTKIT_NUM_PARTITIONS == 32 + assert settings.EVENTKIT_QUEUE_MODE == QueueMode.ASYNC + assert settings.EVENTKIT_ASYNC_WORKERS == 8 + assert settings.LOG_LEVEL == "DEBUG" + + +def test_ring_buffer_config_from_environment(clean_env, monkeypatch): + """Test that ring buffer settings can be overridden via environment.""" + monkeypatch.setenv("GCP_PROJECT_ID", "test-project") + monkeypatch.setenv("EVENTKIT_RING_BUFFER_MODE", "sqlite") + monkeypatch.setenv("EVENTKIT_RING_BUFFER_DB_PATH", "/tmp/ring.db") + monkeypatch.setenv("EVENTKIT_RING_BUFFER_MAX_SIZE", "50000") + monkeypatch.setenv("EVENTKIT_RING_BUFFER_RETENTION_HOURS", "48") + monkeypatch.setenv("EVENTKIT_RING_BUFFER_PUBLISHER_BATCH_SIZE", "200") + monkeypatch.setenv("EVENTKIT_RING_BUFFER_PUBLISHER_POLL_INTERVAL", "0.5") + monkeypatch.setenv("EVENTKIT_RING_BUFFER_CLEANUP_INTERVAL", "7200.0") + + settings = Settings() + + assert settings.EVENTKIT_RING_BUFFER_MODE == "sqlite" + assert settings.EVENTKIT_RING_BUFFER_DB_PATH == "/tmp/ring.db" + assert settings.EVENTKIT_RING_BUFFER_MAX_SIZE == 50000 + assert settings.EVENTKIT_RING_BUFFER_RETENTION_HOURS == 48 + assert settings.EVENTKIT_RING_BUFFER_PUBLISHER_BATCH_SIZE == 200 + assert settings.EVENTKIT_RING_BUFFER_PUBLISHER_POLL_INTERVAL == 0.5 + assert settings.EVENTKIT_RING_BUFFER_CLEANUP_INTERVAL == 7200.0 + + +def test_pubsub_config_from_environment(clean_env, monkeypatch): + """Test that Pub/Sub settings can be overridden via environment.""" + monkeypatch.setenv("GCP_PROJECT_ID", "test-project") + monkeypatch.setenv("EVENTKIT_PUBSUB_PROJECT_ID", "pubsub-project") + monkeypatch.setenv("EVENTKIT_PUBSUB_TOPIC", "custom-topic") + monkeypatch.setenv("EVENTKIT_PUBSUB_SUBSCRIPTION", "custom-subscription") + monkeypatch.setenv("EVENTKIT_PUBSUB_DLQ_TOPIC", "custom-dlq") + monkeypatch.setenv("EVENTKIT_PUBSUB_MAX_DELIVERY_ATTEMPTS", "10") + monkeypatch.setenv("EVENTKIT_PUBSUB_WORKERS", "8") + + settings = Settings() + + assert settings.EVENTKIT_PUBSUB_PROJECT_ID == "pubsub-project" + assert settings.EVENTKIT_PUBSUB_TOPIC == "custom-topic" + assert settings.EVENTKIT_PUBSUB_SUBSCRIPTION == "custom-subscription" + assert settings.EVENTKIT_PUBSUB_DLQ_TOPIC == "custom-dlq" + assert settings.EVENTKIT_PUBSUB_MAX_DELIVERY_ATTEMPTS == 10 + assert settings.EVENTKIT_PUBSUB_WORKERS == 8 + + +def test_queue_mode_enum(): + """Test QueueMode enum values.""" + assert QueueMode.ASYNC == "async" + assert QueueMode.PUBSUB == "pubsub"