From 587d15c0726bc71f88d8c39e931e00fdf98c7897 Mon Sep 17 00:00:00 2001 From: prosdev Date: Tue, 13 Jan 2026 07:45:33 -0800 Subject: [PATCH 1/2] refactor: rename Buffer to EventLoader for clarity MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Buffer → EventLoader refactor to better describe what the component does: batching and loading events to storage (backend-agnostic). **Changes**: 1. **Renamed files**: - buffer.py → event_loader.py - buffer_storage.py → event_loader_storage.py - test_buffer.py → test_event_loader.py 2. **Renamed classes**: - Buffer → EventLoader - BufferStorage → EventLoaderStorage - InMemoryBufferStorage → InMemoryEventLoaderStorage 3. **Renamed parameters** (more descriptive): - size → batch_size - max_size → max_batch_size - timeout → flush_interval 4. **Updated all references**: - Processor now uses event_loader instead of buffer - Dependencies updated (FastAPI DI) - All unit and integration tests updated - Docstrings updated to reflect new terminology **Why this change**: - EventLoader better describes the component's purpose - Aligns with industry terminology (ETL/ELT loaders) - Reflects the EventPlexer pattern from production CDPs - Makes the architecture clearer for users **Testing**: - All 212 unit tests passing - 94% code coverage maintained --- src/eventkit/api/dependencies.py | 14 +- src/eventkit/processing/__init__.py | 13 +- .../processing/{buffer.py => event_loader.py} | 100 +++++++------- ...fer_storage.py => event_loader_storage.py} | 26 ++-- src/eventkit/processing/processor.py | 35 +++-- .../test_ring_buffer_integration.py | 20 ++- .../{test_buffer.py => test_event_loader.py} | 122 +++++++++--------- tests/unit/processing/test_processor.py | 42 +++--- 8 files changed, 195 insertions(+), 177 deletions(-) rename src/eventkit/processing/{buffer.py => event_loader.py} (53%) rename src/eventkit/processing/{buffer_storage.py => event_loader_storage.py} (77%) rename tests/unit/processing/{test_buffer.py => test_event_loader.py} (56%) diff --git a/src/eventkit/api/dependencies.py b/src/eventkit/api/dependencies.py index 182b7d5..71cb6c4 100644 --- a/src/eventkit/api/dependencies.py +++ b/src/eventkit/api/dependencies.py @@ -4,7 +4,7 @@ from eventkit.adapters.segment import SegmentSchemaAdapter from eventkit.config import Settings -from eventkit.processing.buffer import Buffer +from eventkit.processing.event_loader import EventLoader from eventkit.processing.processor import Processor from eventkit.processing.sequencer import HashSequencer from eventkit.queues import EventQueue, create_queue @@ -52,7 +52,7 @@ def get_queue() -> EventQueue: - ErrorStore (Firestore) - Adapter (SegmentSchemaAdapter) - Sequencer (HashSequencer) - - Buffer (with EventStore) + - EventLoader (batches events to storage) - Processor (orchestrator) - RingBuffer (Write-Ahead Log for durability - created internally by queue) - EventQueue (factory-created based on QUEUE_MODE) @@ -86,18 +86,18 @@ async def collect(queue: EventQueue = Depends(get_queue)): sequencer = HashSequencer(num_partitions=settings.EVENTKIT_NUM_PARTITIONS) - buffer = Buffer( + event_loader = EventLoader( event_store=event_store, - size=settings.EVENTKIT_BUFFER_SIZE, - max_size=settings.EVENTKIT_BUFFER_MAX_SIZE, - timeout=settings.EVENTKIT_BUFFER_TIMEOUT, + batch_size=settings.EVENTKIT_BUFFER_SIZE, + max_batch_size=settings.EVENTKIT_BUFFER_MAX_SIZE, + flush_interval=settings.EVENTKIT_BUFFER_TIMEOUT, ) # Create processor processor = Processor( adapter=adapter, sequencer=sequencer, - buffer=buffer, + event_loader=event_loader, error_store=error_store, ) diff --git a/src/eventkit/processing/__init__.py b/src/eventkit/processing/__init__.py index af63135..685565f 100644 --- a/src/eventkit/processing/__init__.py +++ b/src/eventkit/processing/__init__.py @@ -1,14 +1,17 @@ """Event processing primitives.""" -from eventkit.processing.buffer import Buffer -from eventkit.processing.buffer_storage import BufferStorage, InMemoryBufferStorage +from eventkit.processing.event_loader import EventLoader +from eventkit.processing.event_loader_storage import ( + EventLoaderStorage, + InMemoryEventLoaderStorage, +) from eventkit.processing.processor import Processor from eventkit.processing.sequencer import HashSequencer, Sequencer __all__ = [ - "Buffer", - "BufferStorage", - "InMemoryBufferStorage", + "EventLoader", + "EventLoaderStorage", + "InMemoryEventLoaderStorage", "Processor", "Sequencer", "HashSequencer", diff --git a/src/eventkit/processing/buffer.py b/src/eventkit/processing/event_loader.py similarity index 53% rename from src/eventkit/processing/buffer.py rename to src/eventkit/processing/event_loader.py index b560cf4..6099c1f 100644 --- a/src/eventkit/processing/buffer.py +++ b/src/eventkit/processing/event_loader.py @@ -1,4 +1,4 @@ -"""Per-partition event buffer with size and time-based flushing.""" +"""EventLoader - Batches and loads events to storage (backend-agnostic).""" import asyncio import time @@ -7,65 +7,73 @@ import structlog from eventkit.errors import BufferFullError -from eventkit.processing.buffer_storage import BufferStorage, InMemoryBufferStorage +from eventkit.processing.event_loader_storage import EventLoaderStorage, InMemoryEventLoaderStorage from eventkit.schema.events import TypedEvent from eventkit.stores.event_store import EventStore logger = structlog.get_logger(__name__) -class Buffer: +class EventLoader: """ - Per-partition event buffer with size and time-based flushing. + Batches and loads events to storage (backend-agnostic). - Events are buffered in memory per partition and flushed to storage when: - 1. Buffer size reaches threshold (default: 100 events) - 2. Time threshold reached (default: 5 seconds) + Handles batching, timing, and loading events to any storage backend + (Firestore, BigQuery, GCS). Ensures efficient writes through batching + and configurable flush intervals. + + Events are batched in memory per partition and flushed to storage when: + 1. Batch size reached (default: 100 events) + 2. Flush interval reached (default: 5 seconds) 3. Graceful shutdown (flush all) - Buffer storage is pluggable - use in-memory (default), disk, Redis, or custom. + Internal storage is pluggable - use in-memory (default), disk, Redis, or custom. Example: - # Default in-memory storage + # Default configuration (Firestore) store = FirestoreEventStore(...) - buffer = Buffer(event_store=store, size=100, timeout=5.0) + loader = EventLoader(event_store=store, batch_size=100, flush_interval=5.0) + + # BigQuery configuration (larger batches) + bq_store = BigQueryEventStore(...) + loader = EventLoader(event_store=bq_store, batch_size=1000, flush_interval=300.0) - # Custom disk-backed storage - disk_storage = DiskBufferStorage("/tmp/eventkit") - buffer = Buffer(event_store=store, storage=disk_storage) + # Custom storage backend + disk_storage = DiskEventLoaderStorage("/tmp/eventkit") + loader = EventLoader(event_store=store, storage=disk_storage) - await buffer.start_flusher() + await loader.start_flusher() # Enqueue events - await buffer.enqueue(event, partition_id=7) + await loader.enqueue(event, partition_id=7) # Graceful shutdown - await buffer.stop_flusher() + await loader.stop_flusher() """ def __init__( self, event_store: EventStore, - storage: BufferStorage | None = None, - size: int = 100, - max_size: int = 1000, - timeout: float = 5.0, + storage: EventLoaderStorage | None = None, + batch_size: int = 100, + max_batch_size: int = 1000, + flush_interval: float = 5.0, ) -> None: """ - Initialize buffer. + Initialize event loader. Args: - event_store: EventStore for batch writes - storage: BufferStorage backend (default: InMemoryBufferStorage) - size: Max events per partition before flush (default: 100) - max_size: Hard limit per partition (default: 1000, 10x size) - timeout: Max seconds between flushes (default: 5.0) + event_store: EventStore for batch writes (Firestore, BigQuery, GCS) + storage: EventLoaderStorage backend (default: InMemoryEventLoaderStorage) + batch_size: Max events per partition before flush (default: 100) + max_batch_size: Hard limit per partition (default: 1000, 10x batch_size) + flush_interval: Max seconds between flushes (default: 5.0) """ self.event_store = event_store - self.storage = storage or InMemoryBufferStorage() - self.size = size - self.max_size = max_size - self.timeout = timeout + self.storage = storage or InMemoryEventLoaderStorage() + self.batch_size = batch_size + self.max_batch_size = max_batch_size + self.flush_interval = flush_interval # Per-partition flush tracking self.last_flush: dict[int, datetime] = {} @@ -77,42 +85,42 @@ def __init__( async def enqueue(self, event: TypedEvent, partition_id: int) -> None: """ - Enqueue event to partition buffer. + Enqueue event to partition for batched loading. - Flushes immediately if buffer size threshold reached. - If buffer is at max_size, attempts flush before raising BufferFullError. + Flushes immediately if batch_size threshold reached. + If buffer is at max_batch_size, attempts flush before raising BufferFullError. Args: - event: TypedEvent to buffer + event: TypedEvent to enqueue partition_id: Partition ID (from sequencer) Raises: - BufferFullError: If partition buffer exceeds max_size and flush fails + BufferFullError: If partition buffer exceeds max_batch_size and flush fails """ async with self.lock: - # Defensive: If at max_size, try flushing first - if self.storage.len(partition_id) >= self.max_size: + # Defensive: If at max_batch_size, try flushing first + if self.storage.len(partition_id) >= self.max_batch_size: try: await self._flush_partition(partition_id) except Exception: # Flush failed, buffer truly stuck raise BufferFullError( f"Partition {partition_id} buffer full " - f"({self.max_size} events). Flush is failing or too slow." + f"({self.max_batch_size} events). Flush is failing or too slow." ) await self.storage.append(partition_id, event) - buffer_size = self.storage.len(partition_id) + current_size = self.storage.len(partition_id) - logger.debug("buffer_add", partition=partition_id, buffer_size=buffer_size) + logger.debug("event_loader_add", partition=partition_id, current_size=current_size) # Size-based flush - if buffer_size >= self.size: + if current_size >= self.batch_size: await self._flush_partition(partition_id) async def _flush_partition(self, partition_id: int) -> None: """ - Flush single partition to storage. + Flush single partition to storage (load batch). Args: partition_id: Partition to flush @@ -124,7 +132,7 @@ async def _flush_partition(self, partition_id: int) -> None: start_time = time.perf_counter() event_count = len(events) - logger.info("buffer_flush_start", partition=partition_id, event_count=event_count) + logger.info("event_loader_flush_start", partition=partition_id, event_count=event_count) # Batch write to storage await self.event_store.store_batch(events) @@ -135,7 +143,7 @@ async def _flush_partition(self, partition_id: int) -> None: duration_ms = (time.perf_counter() - start_time) * 1000 logger.info( - "buffer_flush_complete", + "event_loader_flush_complete", partition=partition_id, event_count=event_count, duration_ms=round(duration_ms, 2), @@ -158,10 +166,10 @@ async def _run_flusher(self) -> None: """ Background task for time-based flushing. - Flushes all partitions every timeout seconds. + Flushes all partitions every flush_interval seconds. """ while not self.shutdown: - await asyncio.sleep(self.timeout) + await asyncio.sleep(self.flush_interval) await self.flush_all() async def flush_all(self) -> None: diff --git a/src/eventkit/processing/buffer_storage.py b/src/eventkit/processing/event_loader_storage.py similarity index 77% rename from src/eventkit/processing/buffer_storage.py rename to src/eventkit/processing/event_loader_storage.py index 5d1ad30..cda8d3a 100644 --- a/src/eventkit/processing/buffer_storage.py +++ b/src/eventkit/processing/event_loader_storage.py @@ -1,4 +1,4 @@ -"""Buffer storage backends for pluggable buffer implementations.""" +"""EventLoader storage backends for pluggable internal storage implementations.""" from collections import defaultdict from typing import Protocol @@ -6,26 +6,26 @@ from eventkit.schema.events import TypedEvent -class BufferStorage(Protocol): +class EventLoaderStorage(Protocol): """ - Protocol for buffer storage backends. + Protocol for EventLoader internal storage backends. Implementations can be in-memory, disk-backed, Redis, or custom. Implementations: - - InMemoryBufferStorage (default): Fast, volatile, limited by RAM - - DiskBufferStorage (future): Persistent, larger capacity - - RedisBufferStorage (future): Distributed, shared across processes + - InMemoryEventLoaderStorage (default): Fast, volatile, limited by RAM + - DiskEventLoaderStorage (future): Persistent, larger capacity + - RedisEventLoaderStorage (future): Distributed, shared across processes Example: # Default in-memory - storage = InMemoryBufferStorage() + storage = InMemoryEventLoaderStorage() # Custom disk-backed (user implementation) - storage = DiskBufferStorage("/tmp/eventkit_buffers") + storage = DiskEventLoaderStorage("/tmp/eventkit_buffers") - # Use with Buffer - buffer = Buffer(event_store=store, storage=storage) + # Use with EventLoader + loader = EventLoader(event_store=store, storage=storage) """ async def append(self, partition_id: int, event: TypedEvent) -> None: @@ -81,15 +81,15 @@ def partitions(self) -> list[int]: ... -class InMemoryBufferStorage: +class InMemoryEventLoaderStorage: """ - In-memory buffer storage (default). + In-memory EventLoader storage (default). Fast but volatile - data lost on crash. Limited by available RAM. Example: - storage = InMemoryBufferStorage() + storage = InMemoryEventLoaderStorage() await storage.append(partition_id=0, event=event) events = await storage.get_all(partition_id=0) await storage.clear(partition_id=0) diff --git a/src/eventkit/processing/processor.py b/src/eventkit/processing/processor.py index f25a0a2..54c6e6d 100644 --- a/src/eventkit/processing/processor.py +++ b/src/eventkit/processing/processor.py @@ -4,7 +4,7 @@ The Processor is queue-agnostic and handles the core business logic: 1. Adapt: RawEvent → TypedEvent (validation & normalization) 2. Sequence: Assign partition based on identity (consistent routing) -3. Buffer: Enqueue to partition buffer (batch writes) +3. Load: Enqueue to EventLoader for batched storage writes 4. Error Handling: Invalid events → error store (never throw exceptions) Design Philosophy: @@ -19,7 +19,7 @@ import structlog from eventkit.adapters.base import EventAdapter -from eventkit.processing.buffer import Buffer +from eventkit.processing.event_loader import EventLoader from eventkit.processing.sequencer import Sequencer from eventkit.schema.raw import RawEvent from eventkit.stores.error_store import ErrorStore @@ -34,12 +34,12 @@ class Processor: Wires together all processing components: - Adapter: Validates and normalizes events - Sequencer: Routes events to partitions - - Buffer: Batches events for efficient writes + - EventLoader: Batches events for efficient storage writes - ErrorStore: Handles invalid events The processor is queue-agnostic - it doesn't know whether it's being - called by DirectQueue (inline), AsyncQueue (workers), or PubSubQueue - (distributed). This separation enables easy scaling. + called by AsyncQueue (workers) or PubSubQueue (distributed). + This separation enables easy scaling. Example: >>> from eventkit.processing.processor import Processor @@ -47,7 +47,7 @@ class Processor: >>> processor = Processor( ... adapter=SegmentSchemaAdapter(), ... sequencer=HashSequencer(num_partitions=16), - ... buffer=Buffer(event_store, size=100, timeout=5.0), + ... event_loader=EventLoader(event_store, batch_size=100, flush_interval=5.0), ... error_store=FirestoreErrorStore(...) ... ) >>> @@ -65,7 +65,7 @@ def __init__( self, adapter: EventAdapter, sequencer: Sequencer, - buffer: Buffer, + event_loader: EventLoader, error_store: ErrorStore, ) -> None: """ @@ -74,12 +74,12 @@ def __init__( Args: adapter: EventAdapter for validation & normalization sequencer: Sequencer for consistent partition routing - buffer: Buffer for batching events before writes + event_loader: EventLoader for batching events before storage writes error_store: ErrorStore for invalid events """ self.adapter = adapter self.sequencer = sequencer - self.buffer = buffer + self.event_loader = event_loader self.error_store = error_store async def process_event(self, raw_event: RawEvent) -> None: @@ -87,13 +87,12 @@ async def process_event(self, raw_event: RawEvent) -> None: Process a single event through the pipeline. This is the core processing logic that queues call. The processor - doesn't know which queue is calling it (DirectQueue, AsyncQueue, - or PubSubQueue). + doesn't know which queue is calling it (AsyncQueue or PubSubQueue). Pipeline: 1. Adapt: RawEvent → TypedEvent (validation) 2. Sequence: Assign partition based on identity - 3. Buffer: Enqueue to partition buffer + 3. Load: Enqueue to EventLoader for batched storage writes Error Handling: - Invalid events → error_store (not exceptions) @@ -137,21 +136,21 @@ async def process_event(self, raw_event: RawEvent) -> None: logger.debug("event_sequenced", partition=partition_id) - # Step 3: Buffer (batch writes) - await self.buffer.enqueue(result.event, partition_id) + # Step 3: Load (batch to storage) + await self.event_loader.enqueue(result.event, partition_id) async def start(self) -> None: """ Start processor background tasks. - Currently starts the buffer's time-based flusher task. + Starts the EventLoader's time-based flusher task. """ - await self.buffer.start_flusher() + await self.event_loader.start_flusher() async def stop(self) -> None: """ Stop processor gracefully. - Ensures all buffered events are flushed before shutdown. + Ensures all batched events are flushed to storage before shutdown. """ - await self.buffer.stop_flusher() + await self.event_loader.stop_flusher() diff --git a/tests/integration/test_ring_buffer_integration.py b/tests/integration/test_ring_buffer_integration.py index dd10521..c366fd5 100644 --- a/tests/integration/test_ring_buffer_integration.py +++ b/tests/integration/test_ring_buffer_integration.py @@ -10,7 +10,7 @@ import pytest_asyncio from eventkit.adapters.segment import SegmentSchemaAdapter -from eventkit.processing.buffer import Buffer +from eventkit.processing.event_loader import EventLoader from eventkit.processing.processor import Processor from eventkit.processing.sequencer import HashSequencer from eventkit.queues.async_queue import AsyncQueue @@ -65,12 +65,14 @@ async def processor(event_store, error_store): """Fixture for fully configured Processor.""" adapter = SegmentSchemaAdapter() sequencer = HashSequencer(num_partitions=4) - buffer = Buffer(event_store=event_store, size=10, max_size=100, timeout=1.0) + event_loader = EventLoader( + event_store=event_store, batch_size=10, max_batch_size=100, flush_interval=1.0 + ) processor = Processor( adapter=adapter, sequencer=sequencer, - buffer=buffer, + event_loader=event_loader, error_store=error_store, ) @@ -386,11 +388,13 @@ async def test_ring_buffer_durability_on_crash(temp_db_path, event_store, error_ # Create first processor for first queue adapter_1 = SegmentSchemaAdapter() sequencer_1 = HashSequencer(num_partitions=4) - buffer_1 = Buffer(event_store=event_store, size=10, max_size=100, timeout=1.0) + event_loader_1 = EventLoader( + event_store=event_store, batch_size=10, max_batch_size=100, flush_interval=1.0 + ) processor_1 = Processor( adapter=adapter_1, sequencer=sequencer_1, - buffer=buffer_1, + event_loader=event_loader_1, error_store=error_store, ) @@ -438,11 +442,13 @@ async def test_ring_buffer_durability_on_crash(temp_db_path, event_store, error_ # Create second processor for second queue adapter_2 = SegmentSchemaAdapter() sequencer_2 = HashSequencer(num_partitions=4) - buffer_2 = Buffer(event_store=event_store, size=10, max_size=100, timeout=1.0) + event_loader_2 = EventLoader( + event_store=event_store, batch_size=10, max_batch_size=100, flush_interval=1.0 + ) processor_2 = Processor( adapter=adapter_2, sequencer=sequencer_2, - buffer=buffer_2, + event_loader=event_loader_2, error_store=error_store, ) diff --git a/tests/unit/processing/test_buffer.py b/tests/unit/processing/test_event_loader.py similarity index 56% rename from tests/unit/processing/test_buffer.py rename to tests/unit/processing/test_event_loader.py index 5421d1c..84574dd 100644 --- a/tests/unit/processing/test_buffer.py +++ b/tests/unit/processing/test_event_loader.py @@ -1,4 +1,4 @@ -"""Tests for Buffer.""" +"""Tests for EventLoader.""" import asyncio from unittest.mock import AsyncMock @@ -6,7 +6,7 @@ import pytest from eventkit.errors import BufferFullError -from eventkit.processing.buffer import Buffer +from eventkit.processing.event_loader import EventLoader from eventkit.schema.events import IdentifyEvent @@ -18,77 +18,79 @@ def mock_event_store(): return store -class TestBufferInit: - """Tests for Buffer initialization.""" +class TestEventLoaderInit: + """Tests for EventLoader initialization.""" def test_default_params(self, mock_event_store): - """Test default buffer parameters.""" - buffer = Buffer(event_store=mock_event_store) - assert buffer.size == 100 - assert buffer.max_size == 1000 - assert buffer.timeout == 5.0 + """Test default event loader parameters.""" + loader = EventLoader(event_store=mock_event_store) + assert loader.batch_size == 100 + assert loader.max_batch_size == 1000 + assert loader.flush_interval == 5.0 def test_custom_params(self, mock_event_store): - """Test custom buffer parameters.""" - buffer = Buffer(event_store=mock_event_store, size=50, max_size=500, timeout=10.0) - assert buffer.size == 50 - assert buffer.max_size == 500 - assert buffer.timeout == 10.0 + """Test custom event loader parameters.""" + loader = EventLoader( + event_store=mock_event_store, batch_size=50, max_batch_size=500, flush_interval=10.0 + ) + assert loader.batch_size == 50 + assert loader.max_batch_size == 500 + assert loader.flush_interval == 10.0 -class TestBufferEnqueue: - """Tests for Buffer.enqueue() and size-based flush.""" +class TestEventLoaderEnqueue: + """Tests for EventLoader.enqueue() and size-based flush.""" @pytest.mark.asyncio async def test_basic_enqueue(self, mock_event_store): """Test enqueueing events without flush.""" - buffer = Buffer(event_store=mock_event_store, size=100) + loader = EventLoader(event_store=mock_event_store, batch_size=100) event1 = IdentifyEvent(user_id="alice") event2 = IdentifyEvent(user_id="bob") - await buffer.enqueue(event1, partition_id=0) - await buffer.enqueue(event2, partition_id=0) + await loader.enqueue(event1, partition_id=0) + await loader.enqueue(event2, partition_id=0) # Should not have flushed yet (only 2 events) mock_event_store.store_batch.assert_not_called() - assert buffer.storage.len(0) == 2 + assert loader.storage.len(0) == 2 @pytest.mark.asyncio async def test_size_based_flush(self, mock_event_store): """Test size-based flush when buffer is full.""" - buffer = Buffer(event_store=mock_event_store, size=3) + loader = EventLoader(event_store=mock_event_store, batch_size=3) event1 = IdentifyEvent(user_id="user1") event2 = IdentifyEvent(user_id="user2") event3 = IdentifyEvent(user_id="user3") # Enqueue 3 events (reaches size threshold) - await buffer.enqueue(event1, partition_id=0) - await buffer.enqueue(event2, partition_id=0) - await buffer.enqueue(event3, partition_id=0) + await loader.enqueue(event1, partition_id=0) + await loader.enqueue(event2, partition_id=0) + await loader.enqueue(event3, partition_id=0) # Should have flushed mock_event_store.store_batch.assert_called_once() - assert buffer.storage.len(0) == 0 + assert loader.storage.len(0) == 0 @pytest.mark.asyncio async def test_per_partition_isolation(self, mock_event_store): """Test that flushing one partition doesn't affect others.""" - buffer = Buffer(event_store=mock_event_store, size=2) + loader = EventLoader(event_store=mock_event_store, batch_size=2) event1 = IdentifyEvent(user_id="user1") event2 = IdentifyEvent(user_id="user2") event3 = IdentifyEvent(user_id="user3") # Enqueue to different partitions - await buffer.enqueue(event1, partition_id=0) - await buffer.enqueue(event2, partition_id=1) - await buffer.enqueue(event3, partition_id=0) + await loader.enqueue(event1, partition_id=0) + await loader.enqueue(event2, partition_id=1) + await loader.enqueue(event3, partition_id=0) # Partition 0 should flush (size=2), partition 1 should not - assert buffer.storage.len(0) == 0 # Flushed - assert buffer.storage.len(1) == 1 # Not flushed + assert loader.storage.len(0) == 0 # Flushed + assert loader.storage.len(1) == 1 # Not flushed mock_event_store.store_batch.assert_called_once() @@ -101,104 +103,104 @@ async def test_buffer_full_error(self, mock_event_store): # Make flush fail mock_event_store.store_batch.side_effect = Exception("Firestore down") - buffer = Buffer(event_store=mock_event_store, size=100, max_size=5) + loader = EventLoader(event_store=mock_event_store, batch_size=100, max_batch_size=5) # Enqueue 5 events (at limit) for i in range(5): - await buffer.enqueue(IdentifyEvent(user_id=f"user{i}"), partition_id=0) + await loader.enqueue(IdentifyEvent(user_id=f"user{i}"), partition_id=0) # 6th event should trigger flush attempt, which fails, then raise BufferFullError with pytest.raises(BufferFullError, match="Partition 0 buffer full"): - await buffer.enqueue(IdentifyEvent(user_id="user6"), partition_id=0) + await loader.enqueue(IdentifyEvent(user_id="user6"), partition_id=0) @pytest.mark.asyncio async def test_per_partition_limits(self, mock_event_store): """Test that each partition has its own max_size limit.""" - buffer = Buffer(event_store=mock_event_store, size=100, max_size=5) + loader = EventLoader(event_store=mock_event_store, batch_size=100, max_batch_size=5) # Fill partition 0 for i in range(5): - await buffer.enqueue(IdentifyEvent(user_id=f"user{i}"), partition_id=0) + await loader.enqueue(IdentifyEvent(user_id=f"user{i}"), partition_id=0) # Partition 1 should still accept events - await buffer.enqueue(IdentifyEvent(user_id="alice"), partition_id=1) - assert buffer.storage.len(1) == 1 + await loader.enqueue(IdentifyEvent(user_id="alice"), partition_id=1) + assert loader.storage.len(1) == 1 -class TestBufferTimeBasedFlush: +class TestEventLoaderTimeBasedFlush: """Tests for time-based flushing.""" @pytest.mark.asyncio async def test_time_based_flush(self, mock_event_store): """Test that buffer flushes after timeout.""" - buffer = Buffer(event_store=mock_event_store, size=100, timeout=0.1) + loader = EventLoader(event_store=mock_event_store, batch_size=100, flush_interval=0.1) - await buffer.start_flusher() + await loader.start_flusher() # Enqueue 1 event (below size threshold) - await buffer.enqueue(IdentifyEvent(user_id="alice"), partition_id=0) + await loader.enqueue(IdentifyEvent(user_id="alice"), partition_id=0) # Wait for timeout await asyncio.sleep(0.2) # Should have flushed assert mock_event_store.store_batch.called - assert buffer.storage.len(0) == 0 + assert loader.storage.len(0) == 0 - await buffer.stop_flusher() + await loader.stop_flusher() @pytest.mark.asyncio async def test_start_flusher_twice_raises_error(self, mock_event_store): """Test that starting flusher twice raises RuntimeError.""" - buffer = Buffer(event_store=mock_event_store) + loader = EventLoader(event_store=mock_event_store) - await buffer.start_flusher() + await loader.start_flusher() with pytest.raises(RuntimeError, match="Flusher already running"): - await buffer.start_flusher() + await loader.start_flusher() - await buffer.stop_flusher() + await loader.stop_flusher() -class TestBufferShutdown: +class TestEventLoaderShutdown: """Tests for graceful shutdown.""" @pytest.mark.asyncio async def test_graceful_shutdown_flushes_all_buffers(self, mock_event_store): """Test that shutdown flushes all buffers.""" - buffer = Buffer(event_store=mock_event_store, size=100) + loader = EventLoader(event_store=mock_event_store, batch_size=100) - await buffer.start_flusher() + await loader.start_flusher() # Enqueue events to multiple partitions (below size threshold) - await buffer.enqueue(IdentifyEvent(user_id="alice"), partition_id=0) - await buffer.enqueue(IdentifyEvent(user_id="bob"), partition_id=1) + await loader.enqueue(IdentifyEvent(user_id="alice"), partition_id=0) + await loader.enqueue(IdentifyEvent(user_id="bob"), partition_id=1) # Shutdown - await buffer.stop_flusher() + await loader.stop_flusher() # All buffers should be empty - assert buffer.storage.len(0) == 0 - assert buffer.storage.len(1) == 0 + assert loader.storage.len(0) == 0 + assert loader.storage.len(1) == 0 # Should have called store_batch for each partition assert mock_event_store.store_batch.call_count == 2 -class TestBufferConcurrency: +class TestEventLoaderConcurrency: """Tests for concurrent access.""" @pytest.mark.asyncio async def test_concurrent_enqueue(self, mock_event_store): """Test that multiple tasks can enqueue safely.""" - buffer = Buffer(event_store=mock_event_store, size=1000) + loader = EventLoader(event_store=mock_event_store, batch_size=1000) # Enqueue 100 events concurrently tasks = [ - buffer.enqueue(IdentifyEvent(user_id=f"user{i}"), partition_id=i % 16) + loader.enqueue(IdentifyEvent(user_id=f"user{i}"), partition_id=i % 16) for i in range(100) ] await asyncio.gather(*tasks) # Check all events buffered - total = sum(buffer.storage.len(i) for i in range(16)) + total = sum(loader.storage.len(i) for i in range(16)) assert total == 100 diff --git a/tests/unit/processing/test_processor.py b/tests/unit/processing/test_processor.py index d914316..ea6c28d 100644 --- a/tests/unit/processing/test_processor.py +++ b/tests/unit/processing/test_processor.py @@ -29,15 +29,15 @@ async def test_process_event_success(self): mock_sequencer = Mock() mock_sequencer.get_partition_id = Mock(return_value=7) - mock_buffer = Mock() - mock_buffer.enqueue = AsyncMock() + mock_event_loader = Mock() + mock_event_loader.enqueue = AsyncMock() mock_error_store = Mock() processor = Processor( adapter=mock_adapter, sequencer=mock_sequencer, - buffer=mock_buffer, + event_loader=mock_event_loader, error_store=mock_error_store, ) @@ -49,7 +49,7 @@ async def test_process_event_success(self): # Verify mock_adapter.adapt.assert_called_once_with(raw_event) mock_sequencer.get_partition_id.assert_called_once_with(typed_event) - mock_buffer.enqueue.assert_called_once_with(typed_event, 7) + mock_event_loader.enqueue.assert_called_once_with(typed_event, 7) mock_error_store.store_error.assert_not_called() async def test_process_event_invalid(self): @@ -59,7 +59,7 @@ async def test_process_event_invalid(self): mock_adapter.adapt = Mock(return_value=AdapterResult.failure("Missing userId")) mock_sequencer = Mock() - mock_buffer = Mock() + mock_event_loader = Mock() mock_error_store = Mock() mock_error_store.store_error = AsyncMock() @@ -67,7 +67,7 @@ async def test_process_event_invalid(self): processor = Processor( adapter=mock_adapter, sequencer=mock_sequencer, - buffer=mock_buffer, + event_loader=mock_event_loader, error_store=mock_error_store, ) @@ -89,7 +89,7 @@ async def test_process_event_invalid(self): # Verify sequencer and buffer NOT called mock_sequencer.get_partition_id.assert_not_called() - mock_buffer.enqueue.assert_not_called() + mock_event_loader.enqueue.assert_not_called() async def test_process_event_partitioning(self): """Processor should route events to correct partition.""" @@ -104,14 +104,14 @@ async def test_process_event_partitioning(self): ] mock_sequencer = Mock() - mock_buffer = Mock() - mock_buffer.enqueue = AsyncMock() + mock_event_loader = Mock() + mock_event_loader.enqueue = AsyncMock() mock_error_store = Mock() processor = Processor( adapter=mock_adapter, sequencer=mock_sequencer, - buffer=mock_buffer, + event_loader=mock_event_loader, error_store=mock_error_store, ) @@ -124,22 +124,22 @@ async def test_process_event_partitioning(self): await processor.process_event(raw_event) # Verify all events buffered to correct partitions - assert mock_buffer.enqueue.call_count == 3 + assert mock_event_loader.enqueue.call_count == 3 for i, (typed_event, partition_id) in enumerate(events_and_partitions): - call_args = mock_buffer.enqueue.call_args_list[i] + call_args = mock_event_loader.enqueue.call_args_list[i] assert call_args[0][0].user_id == typed_event.user_id assert call_args[0][1] == partition_id async def test_start_lifecycle(self): """Processor.start() should start buffer flusher.""" # Setup - mock_buffer = Mock() - mock_buffer.start_flusher = AsyncMock() + mock_event_loader = Mock() + mock_event_loader.start_flusher = AsyncMock() processor = Processor( adapter=Mock(), sequencer=Mock(), - buffer=mock_buffer, + event_loader=mock_event_loader, error_store=Mock(), ) @@ -147,18 +147,18 @@ async def test_start_lifecycle(self): await processor.start() # Verify - mock_buffer.start_flusher.assert_called_once() + mock_event_loader.start_flusher.assert_called_once() async def test_stop_lifecycle(self): """Processor.stop() should stop buffer gracefully.""" # Setup - mock_buffer = Mock() - mock_buffer.stop_flusher = AsyncMock() + mock_event_loader = Mock() + mock_event_loader.stop_flusher = AsyncMock() processor = Processor( adapter=Mock(), sequencer=Mock(), - buffer=mock_buffer, + event_loader=mock_event_loader, error_store=Mock(), ) @@ -166,7 +166,7 @@ async def test_stop_lifecycle(self): await processor.stop() # Verify - mock_buffer.stop_flusher.assert_called_once() + mock_event_loader.stop_flusher.assert_called_once() async def test_no_exceptions_on_invalid_events(self): """Processor should never throw exceptions for invalid events.""" @@ -180,7 +180,7 @@ async def test_no_exceptions_on_invalid_events(self): processor = Processor( adapter=mock_adapter, sequencer=Mock(), - buffer=Mock(), + event_loader=Mock(), error_store=mock_error_store, ) From 8108d55b5281ed87759e9748a5a8743ea1b8560c Mon Sep 17 00:00:00 2001 From: prosdev Date: Tue, 13 Jan 2026 07:56:05 -0800 Subject: [PATCH 2/2] =?UTF-8?q?docs:=20update=20all=20documentation=20for?= =?UTF-8?q?=20Buffer=20=E2=86=92=20EventLoader=20rename?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Updated all markdown documentation to reflect the Buffer → EventLoader refactor: **Files Updated**: - README.md - Component table and flow diagram - CLAUDE.md - Logging and architecture examples - ARCHITECTURE.md - Phase diagrams, component sections, code examples - LOCAL_DEV.md - Configuration table, logging reference - TESTING.md - Test examples - CONTRIBUTING.md - PR examples - specs/core-pipeline/architecture.md - Full component documentation - specs/core-pipeline/plan.md - Implementation plan, file paths, examples - specs/core-pipeline/tasks.md - Task descriptions, file references **Changes**: - Buffer → EventLoader (class and concept) - BufferStorage → EventLoaderStorage (protocol) - buffer.py → event_loader.py (file paths) - buffer_storage.py → event_loader_storage.py (file paths) - size → batch_size (parameters) - timeout → flush_interval (parameters) **Note**: Ring Buffer references remain unchanged (correct - referring to WAL) --- ARCHITECTURE.md | 14 +++++------ CLAUDE.md | 4 +-- CONTRIBUTING.md | 2 +- LOCAL_DEV.md | 4 +-- README.md | 6 ++--- TESTING.md | 2 +- specs/core-pipeline/architecture.md | 38 ++++++++++++++--------------- specs/core-pipeline/plan.md | 14 +++++------ specs/core-pipeline/tasks.md | 14 +++++------ 9 files changed, 49 insertions(+), 49 deletions(-) diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 47d5f69..4f6c9c1 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -45,7 +45,7 @@ eventkit provides these primitives as a composable, type-safe library. ┌─────────────────────────────────────────────────────────────────┐ │ Phase 3: Routing & Sequencing │ │ │ -│ TypedEvent → Sequencer → Partition → Buffer │ +│ TypedEvent → Sequencer → Partition → EventLoader │ │ │ │ • Sequencer: Hash-based routing (FNV-1a) │ │ • Consistent partitioning (userId → partition N) │ @@ -56,9 +56,9 @@ eventkit provides these primitives as a composable, type-safe library. ┌─────────────────────────────────────────────────────────────────┐ │ Phase 4: Batching & Storage │ │ │ -│ Buffer → EventStore / ErrorStore → Firestore │ +│ EventLoader → EventStore / ErrorStore → Firestore │ │ │ -│ • Buffer: Time & size-based flushing │ +│ • EventLoader: Time & size-based flushing │ │ • EventStore: Subcollections per stream │ │ • ErrorStore: Separate DLQ collection │ │ • Batch writes (500 events max per Firestore batch) │ @@ -224,8 +224,8 @@ class Sequencer: --- -#### Buffer -**File:** `src/eventkit/processing/buffer.py` +#### EventLoader +**File:** `src/eventkit/processing/event_loader.py` Per-partition buffering with size and time-based flushing. @@ -240,8 +240,8 @@ Per-partition buffering with size and time-based flushing. - **Shutdown** - Flush all buffers on graceful shutdown ```python -class Buffer: - def __init__(self, event_store: EventStore, size: int = 100, timeout: float = 5.0): +class EventLoader: + def __init__(self, event_store: EventStore, batch_size: int = 100, flush_interval: float = 5.0): self.buffers: dict[int, list[TypedEvent]] = defaultdict(list) async def add(self, partition_id: int, event: TypedEvent) -> None: diff --git a/CLAUDE.md b/CLAUDE.md index 3839a45..850c4da 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -79,7 +79,7 @@ logger.info(f"Received {event_type} event from {stream}") # Don't do this! **Always Log (INFO level)**: - API requests: `method`, `path`, `status_code`, `duration_ms` - Events received: `stream`, `event_type` (NOT full payload) -- Buffer flushes: `partition`, `event_count`, `duration_ms` +- EventLoader flushes: `partition`, `event_count`, `duration_ms` - Store writes: `event_count`, `duration_ms` **Sometimes Log (DEBUG level)**: @@ -152,7 +152,7 @@ The `Processor` doesn't know about queues - it only has `process_event()`: ```python class Processor: async def process_event(self, raw_event: RawEvent) -> None: - # Adapt → Sequence → Buffer + # Adapt → Sequence → EventLoader ... ``` diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index f291d02..db2f3f0 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -354,7 +354,7 @@ Closes #12 ``` fix(buffer): prevent race condition in partition flush -Buffer._flush_partition was not thread-safe when called from +EventLoader._flush_partition was not thread-safe when called from both size-based and time-based triggers simultaneously. Added asyncio.Lock per partition to serialize flushes. diff --git a/LOCAL_DEV.md b/LOCAL_DEV.md index d6c06d7..97c4426 100644 --- a/LOCAL_DEV.md +++ b/LOCAL_DEV.md @@ -138,7 +138,7 @@ See `src/eventkit/config.py` for all available settings. | `EVENTKIT_RING_BUFFER_PUBLISHER_BATCH_SIZE` | `100` | Events per publisher batch | | `EVENTKIT_RING_BUFFER_PUBLISHER_POLL_INTERVAL` | `0.1` | Seconds between ring buffer polls | | `EVENTKIT_RING_BUFFER_CLEANUP_INTERVAL` | `3600.0` | Seconds between cleanup runs (1 hour) | -| **Buffer** ||| +| **EventLoader** ||| | `EVENTKIT_BUFFER_SIZE` | `100` | Events per partition before flush | | `EVENTKIT_BUFFER_MAX_SIZE` | `1000` | Hard limit per partition | | `EVENTKIT_BUFFER_TIMEOUT` | `5.0` | Max seconds before flush | @@ -220,7 +220,7 @@ Example output (single-line JSON per log): **Always Logged:** - API requests: method, path, status_code, duration_ms - Events: stream, event_type (NOT full payload for PII safety) -- Buffer flushes: partition, event_count, duration_ms +- EventLoader flushes: partition, event_count, duration_ms - Store writes: event_count, duration_ms **Never Logged:** diff --git a/README.md b/README.md index 7872aed..8ddd81b 100644 --- a/README.md +++ b/README.md @@ -85,7 +85,7 @@ curl -X POST http://localhost:8000/api/v1/identify \ │ • Adapters - Validate & normalize to typed events │ │ • Validators - Composable field checks │ │ • Sequencer - Hash-based routing for consistency │ -│ • Buffer - Batch events for write efficiency │ +│ • EventLoader - Batch events for write efficiency │ └─────────────────────────────────────────────────────────┘ ↓ ┌─────────────────────────────────────────────────────────┐ @@ -104,7 +104,7 @@ curl -X POST http://localhost:8000/api/v1/identify \ | **RawEvent** | Flexible container for any JSON | Schema-agnostic at ingestion | | **Adapters** | Validate and normalize to typed events | Protocol-based, pluggable | | **Sequencer** | Hash-based event routing | Consistent ordering per identity | -| **Buffer** | Batch events before storage | Reduce write amplification | +| **EventLoader** | Batch events before storage | Reduce write amplification | | **Event Store** | Persist events to storage | Interface for multiple backends | | **Error Store** | Dead letter queue for failures | Never lose data, debug later | @@ -277,7 +277,7 @@ See [LOCAL_DEV.md](LOCAL_DEV.md) for detailed local development instructions. **Quick Start:** ```bash # Start Firestore emulator -docker compose up -d +docker-compose up -d # Install dependencies uv sync diff --git a/TESTING.md b/TESTING.md index f1991ca..f80295f 100644 --- a/TESTING.md +++ b/TESTING.md @@ -215,7 +215,7 @@ async def processor(): ) adapter = SegmentAdapter() sequencer = Sequencer(num_partitions=4) # Small for tests - buffer = Buffer(event_store, size=10, timeout=1.0) # Small/fast for tests + event_loader = EventLoader(event_store, batch_size=10, flush_interval=1.0) # Small/fast for tests processor = Processor(adapter, sequencer, buffer, error_store) diff --git a/specs/core-pipeline/architecture.md b/specs/core-pipeline/architecture.md index 89fc445..966e131 100644 --- a/specs/core-pipeline/architecture.md +++ b/specs/core-pipeline/architecture.md @@ -46,7 +46,7 @@ │ partition_id ↓ ┌───────────────┐ - │ Buffer │ + │ EventLoader │ │ (Batching) │ └───────┬───────┘ │ Batch @@ -221,8 +221,8 @@ class Processor: # Sequence: Route to partition partition_id = self.sequencer.get_partition_id(result.event) - # Buffer: Batch events - await self.buffer.enqueue(result.event, partition_id) + # EventLoader: Batch events + await self.event_loader.enqueue(result.event, partition_id) ``` **Design Decision**: Queue-agnostic. Processor doesn't know if it's called from AsyncQueue or PubSubQueue workers. @@ -298,17 +298,17 @@ partition_id = sequencer.get_partition_id(event) --- -## 4. Buffer (Batching) +## 4. EventLoader (Batching) **Responsibility**: Batch events before writing to storage ```python -class Buffer: +class EventLoader: def __init__( self, event_store: EventStore, - storage: BufferStorage | None = None, - size: int = 100, + storage: EventLoaderStorage | None = None, + batch_size: int = 100, max_size: int = 1000, timeout: float = 5.0 ): @@ -386,7 +386,7 @@ class Processor: self, adapter: EventAdapter, sequencer: Sequencer, - buffer: Buffer, + event_loader: EventLoader, error_store: ErrorStore ): ... @@ -402,8 +402,8 @@ class Processor: # Sequence partition_id = self.sequencer.get_partition_id(result.event) - # Buffer - await self.buffer.enqueue(result.event, partition_id) + # EventLoader + await self.event_loader.enqueue(result.event, partition_id) ``` **Error Handling**: Never raise exceptions. Invalid events → error store. @@ -454,7 +454,7 @@ def get_processor() -> Processor: error_store = FirestoreErrorStore(...) - buffer = Buffer( + event_loader = EventLoader( event_store=event_store, size=int(os.getenv("BUFFER_SIZE", "100")), timeout=float(os.getenv("BUFFER_TIMEOUT", "5.0")) @@ -473,12 +473,12 @@ def get_processor() -> Processor: ## Pluggable Design -### BufferStorage Protocol +### EventLoaderStorage Protocol -The buffer's internal storage is pluggable: +The event loader's internal storage is pluggable: ```python -class BufferStorage(Protocol): +class EventLoaderStorage(Protocol): async def append(self, partition_id: int, event: TypedEvent) -> None: ... async def get_all(self, partition_id: int) -> list[TypedEvent]: ... async def clear(self, partition_id: int) -> None: ... @@ -487,9 +487,9 @@ class BufferStorage(Protocol): ``` **Implementations**: -- `InMemoryBufferStorage` (default) - Fast, volatile -- `DiskBufferStorage` (future) - Persistent, larger capacity -- `RedisBufferStorage` (future) - Distributed, shared state +- `InMemoryEventLoaderStorage` (default) - Fast, volatile +- `DiskEventLoaderStorage` (future) - Persistent, larger capacity +- `RedisEventLoaderStorage` (future) - Distributed, shared state --- @@ -519,12 +519,12 @@ Configure via `EVENTKIT_QUEUE_MODE` environment variable: **v0.1.0 (Current) - DirectQueue**: ``` -API → Processor → Buffer → Storage +API → Processor → EventLoader → Storage ``` **v0.2.0+ - PubSubQueue**: ``` -API → Pub/Sub → Worker(s) → Buffer → Storage +API → Pub/Sub → Worker(s) → EventLoader → Storage ↓ Horizontal scaling ``` diff --git a/specs/core-pipeline/plan.md b/specs/core-pipeline/plan.md index 865c9f8..8311117 100644 --- a/specs/core-pipeline/plan.md +++ b/specs/core-pipeline/plan.md @@ -286,8 +286,8 @@ class SegmentAdapter: | File | Purpose | User Story | |------|---------|------------| | `src/eventkit/processing/sequencer.py` | Hash-based routing for consistent ordering | Story 4 | -| `src/eventkit/processing/buffer.py` | Batching for efficient writes | Story 5 | -| `src/eventkit/processing/buffer_storage.py` | BufferStorage Protocol (pluggable) | Story 5 | +| `src/eventkit/processing/event_loader.py` | Batching for efficient writes | Story 5 | +| `src/eventkit/processing/event_loader_storage.py` | EventLoaderStorage Protocol (pluggable) | Story 5 | | `src/eventkit/processing/processor.py` | Main orchestrator (queue-agnostic) | All | | `src/eventkit/queues/base.py` | EventQueue Protocol | All | | `src/eventkit/queues/async_queue.py` | AsyncQueue implementation | All | @@ -330,7 +330,7 @@ class Sequencer: ```python # Buffer - Efficient batch processing (Story 5) -class Buffer: +class EventLoader: def __init__( self, event_store: EventStore, @@ -651,7 +651,7 @@ async def lifespan(app: FastAPI): # Create processor components adapter = SegmentSchemaAdapter() sequencer = HashSequencer(num_partitions=settings.NUM_PARTITIONS) - buffer = Buffer( + event_loader = EventLoader( event_store=event_store, size=settings.BUFFER_SIZE, timeout=settings.BUFFER_TIMEOUT @@ -1312,8 +1312,8 @@ eventkit/ │ ├── processing/ │ │ ├── __init__.py │ │ ├── sequencer.py # Sequencer (Phase 4) -│ │ ├── buffer.py # Buffer (Phase 4) -│ │ ├── buffer_storage.py # BufferStorage Protocol (Phase 4) +│ │ ├── event_loader.py # EventLoader (Phase 4) +│ │ ├── event_loader_storage.py # EventLoaderStorage Protocol (Phase 4) │ │ └── processor.py # Processor (Phase 4) │ ├── queues/ │ │ ├── __init__.py @@ -1408,7 +1408,7 @@ async def test_valid_event_reaches_event_store(): error_store = FirestoreErrorStore(project_id="test") adapter = SegmentAdapter() sequencer = Sequencer(num_partitions=16) - buffer = Buffer(event_store, size=100) + event_loader = EventLoader(event_store, size=100) processor = Processor(adapter, sequencer, buffer, error_store) # Execute diff --git a/specs/core-pipeline/tasks.md b/specs/core-pipeline/tasks.md index c82991e..ff2aca0 100644 --- a/specs/core-pipeline/tasks.md +++ b/specs/core-pipeline/tasks.md @@ -477,7 +477,7 @@ class Sequencer: --- -### Task 8: Implement Buffer (Batching) +### Task 8: Implement EventLoader (Batching) **Estimated effort**: 3 hours **Dependencies**: Task 4 **Phase**: Processing Pipeline @@ -495,8 +495,8 @@ Implement per-partition buffering with size and time-based flushing. #### Checklist ```python -# 1. Implement Buffer (src/eventkit/processing/buffer.py) -class Buffer: +# 1. Implement EventLoader (src/eventkit/processing/event_loader.py) +class EventLoader: def __init__(self, event_store: EventStore, size: int = 100, timeout: float = 5.0): self.event_store = event_store self.size = size @@ -562,7 +562,7 @@ class Processor: self, adapter: EventAdapter, sequencer: Sequencer, - buffer: Buffer, + event_loader: EventLoader, error_store: ErrorStore ): self.adapter = adapter @@ -774,7 +774,7 @@ Configure structlog for production observability with dual formatters (JSON for - [x] Context propagation via contextvars (request_id, stream, etc.) - [x] Logging in API (requests, responses, timing) - [x] Logging in Processor (event received, adaptation, sequencing) -- [x] Logging in Buffer (flush operations, counts, timing) +- [x] Logging in EventLoader (flush operations, counts, timing) - [x] Logging in EventStore (writes, errors, counts) - [x] Logging in ErrorStore (DLQ writes) - [x] No sensitive data in logs (no payloads) @@ -866,7 +866,7 @@ async def process_event(self, raw_event: RawEvent): logger.debug("event_sequenced", partition=partition) ``` -**Phase 4: Add Buffer logging with timing** +**Phase 4: Add EventLoader logging with timing** ```python # src/eventkit/processing/buffer.py import time @@ -925,7 +925,7 @@ async def write(self, raw_event: RawEvent, error: str): **Always Log (INFO)**: - API requests: method, path, status_code, duration_ms - Events received: stream, event_type (NOT payload) -- Buffer flushes: partition, event_count, duration_ms +- EventLoader flushes: partition, event_count, duration_ms - Store writes: event_count, duration_ms **Sometimes Log (DEBUG)**: