Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ eventkit provides these primitives as a composable, type-safe library.
┌─────────────────────────────────────────────────────────────────┐
│ Phase 3: Routing & Sequencing │
│ │
│ TypedEvent → Sequencer → Partition → Buffer
│ TypedEvent → Sequencer → Partition → EventLoader
│ │
│ • Sequencer: Hash-based routing (FNV-1a) │
│ • Consistent partitioning (userId → partition N) │
Expand All @@ -56,9 +56,9 @@ eventkit provides these primitives as a composable, type-safe library.
┌─────────────────────────────────────────────────────────────────┐
│ Phase 4: Batching & Storage │
│ │
Buffer → EventStore / ErrorStore → Firestore
EventLoader → EventStore / ErrorStore → Firestore │
│ │
│ • Buffer: Time & size-based flushing
│ • EventLoader: Time & size-based flushing │
│ • EventStore: Subcollections per stream │
│ • ErrorStore: Separate DLQ collection │
│ • Batch writes (500 events max per Firestore batch) │
Expand Down Expand Up @@ -224,8 +224,8 @@ class Sequencer:

---

#### Buffer
**File:** `src/eventkit/processing/buffer.py`
#### EventLoader
**File:** `src/eventkit/processing/event_loader.py`

Per-partition buffering with size and time-based flushing.

Expand All @@ -240,8 +240,8 @@ Per-partition buffering with size and time-based flushing.
- **Shutdown** - Flush all buffers on graceful shutdown

```python
class Buffer:
def __init__(self, event_store: EventStore, size: int = 100, timeout: float = 5.0):
class EventLoader:
def __init__(self, event_store: EventStore, batch_size: int = 100, flush_interval: float = 5.0):
self.buffers: dict[int, list[TypedEvent]] = defaultdict(list)

async def add(self, partition_id: int, event: TypedEvent) -> None:
Expand Down
4 changes: 2 additions & 2 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ logger.info(f"Received {event_type} event from {stream}") # Don't do this!
**Always Log (INFO level)**:
- API requests: `method`, `path`, `status_code`, `duration_ms`
- Events received: `stream`, `event_type` (NOT full payload)
- Buffer flushes: `partition`, `event_count`, `duration_ms`
- EventLoader flushes: `partition`, `event_count`, `duration_ms`
- Store writes: `event_count`, `duration_ms`

**Sometimes Log (DEBUG level)**:
Expand Down Expand Up @@ -152,7 +152,7 @@ The `Processor` doesn't know about queues - it only has `process_event()`:
```python
class Processor:
async def process_event(self, raw_event: RawEvent) -> None:
# Adapt → Sequence → Buffer
# Adapt → Sequence → EventLoader
...
```

Expand Down
2 changes: 1 addition & 1 deletion CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ Closes #12
```
fix(buffer): prevent race condition in partition flush

Buffer._flush_partition was not thread-safe when called from
EventLoader._flush_partition was not thread-safe when called from
both size-based and time-based triggers simultaneously.

Added asyncio.Lock per partition to serialize flushes.
Expand Down
4 changes: 2 additions & 2 deletions LOCAL_DEV.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ See `src/eventkit/config.py` for all available settings.
| `EVENTKIT_RING_BUFFER_PUBLISHER_BATCH_SIZE` | `100` | Events per publisher batch |
| `EVENTKIT_RING_BUFFER_PUBLISHER_POLL_INTERVAL` | `0.1` | Seconds between ring buffer polls |
| `EVENTKIT_RING_BUFFER_CLEANUP_INTERVAL` | `3600.0` | Seconds between cleanup runs (1 hour) |
| **Buffer** |||
| **EventLoader** |||
| `EVENTKIT_BUFFER_SIZE` | `100` | Events per partition before flush |
| `EVENTKIT_BUFFER_MAX_SIZE` | `1000` | Hard limit per partition |
| `EVENTKIT_BUFFER_TIMEOUT` | `5.0` | Max seconds before flush |
Expand Down Expand Up @@ -220,7 +220,7 @@ Example output (single-line JSON per log):
**Always Logged:**
- API requests: method, path, status_code, duration_ms
- Events: stream, event_type (NOT full payload for PII safety)
- Buffer flushes: partition, event_count, duration_ms
- EventLoader flushes: partition, event_count, duration_ms
- Store writes: event_count, duration_ms

**Never Logged:**
Expand Down
6 changes: 3 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ curl -X POST http://localhost:8000/api/v1/identify \
│ • Adapters - Validate & normalize to typed events │
│ • Validators - Composable field checks │
│ • Sequencer - Hash-based routing for consistency │
│ • Buffer - Batch events for write efficiency
│ • EventLoader - Batch events for write efficiency │
└─────────────────────────────────────────────────────────┘
┌─────────────────────────────────────────────────────────┐
Expand All @@ -104,7 +104,7 @@ curl -X POST http://localhost:8000/api/v1/identify \
| **RawEvent** | Flexible container for any JSON | Schema-agnostic at ingestion |
| **Adapters** | Validate and normalize to typed events | Protocol-based, pluggable |
| **Sequencer** | Hash-based event routing | Consistent ordering per identity |
| **Buffer** | Batch events before storage | Reduce write amplification |
| **EventLoader** | Batch events before storage | Reduce write amplification |
| **Event Store** | Persist events to storage | Interface for multiple backends |
| **Error Store** | Dead letter queue for failures | Never lose data, debug later |

Expand Down Expand Up @@ -277,7 +277,7 @@ See [LOCAL_DEV.md](LOCAL_DEV.md) for detailed local development instructions.
**Quick Start:**
```bash
# Start Firestore emulator
docker compose up -d
docker-compose up -d

# Install dependencies
uv sync
Expand Down
2 changes: 1 addition & 1 deletion TESTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ async def processor():
)
adapter = SegmentAdapter()
sequencer = Sequencer(num_partitions=4) # Small for tests
buffer = Buffer(event_store, size=10, timeout=1.0) # Small/fast for tests
event_loader = EventLoader(event_store, batch_size=10, flush_interval=1.0) # Small/fast for tests

processor = Processor(adapter, sequencer, buffer, error_store)

Expand Down
38 changes: 19 additions & 19 deletions specs/core-pipeline/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
│ partition_id
┌───────────────┐
Buffer
EventLoader
│ (Batching) │
└───────┬───────┘
│ Batch
Expand Down Expand Up @@ -221,8 +221,8 @@ class Processor:
# Sequence: Route to partition
partition_id = self.sequencer.get_partition_id(result.event)

# Buffer: Batch events
await self.buffer.enqueue(result.event, partition_id)
# EventLoader: Batch events
await self.event_loader.enqueue(result.event, partition_id)
```

**Design Decision**: Queue-agnostic. Processor doesn't know if it's called from AsyncQueue or PubSubQueue workers.
Expand Down Expand Up @@ -298,17 +298,17 @@ partition_id = sequencer.get_partition_id(event)

---

## 4. Buffer (Batching)
## 4. EventLoader (Batching)

**Responsibility**: Batch events before writing to storage

```python
class Buffer:
class EventLoader:
def __init__(
self,
event_store: EventStore,
storage: BufferStorage | None = None,
size: int = 100,
storage: EventLoaderStorage | None = None,
batch_size: int = 100,
max_size: int = 1000,
timeout: float = 5.0
):
Expand Down Expand Up @@ -386,7 +386,7 @@ class Processor:
self,
adapter: EventAdapter,
sequencer: Sequencer,
buffer: Buffer,
event_loader: EventLoader,
error_store: ErrorStore
):
...
Expand All @@ -402,8 +402,8 @@ class Processor:
# Sequence
partition_id = self.sequencer.get_partition_id(result.event)

# Buffer
await self.buffer.enqueue(result.event, partition_id)
# EventLoader
await self.event_loader.enqueue(result.event, partition_id)
```

**Error Handling**: Never raise exceptions. Invalid events → error store.
Expand Down Expand Up @@ -454,7 +454,7 @@ def get_processor() -> Processor:

error_store = FirestoreErrorStore(...)

buffer = Buffer(
event_loader = EventLoader(
event_store=event_store,
size=int(os.getenv("BUFFER_SIZE", "100")),
timeout=float(os.getenv("BUFFER_TIMEOUT", "5.0"))
Expand All @@ -473,12 +473,12 @@ def get_processor() -> Processor:

## Pluggable Design

### BufferStorage Protocol
### EventLoaderStorage Protocol

The buffer's internal storage is pluggable:
The event loader's internal storage is pluggable:

```python
class BufferStorage(Protocol):
class EventLoaderStorage(Protocol):
async def append(self, partition_id: int, event: TypedEvent) -> None: ...
async def get_all(self, partition_id: int) -> list[TypedEvent]: ...
async def clear(self, partition_id: int) -> None: ...
Expand All @@ -487,9 +487,9 @@ class BufferStorage(Protocol):
```

**Implementations**:
- `InMemoryBufferStorage` (default) - Fast, volatile
- `DiskBufferStorage` (future) - Persistent, larger capacity
- `RedisBufferStorage` (future) - Distributed, shared state
- `InMemoryEventLoaderStorage` (default) - Fast, volatile
- `DiskEventLoaderStorage` (future) - Persistent, larger capacity
- `RedisEventLoaderStorage` (future) - Distributed, shared state

---

Expand Down Expand Up @@ -519,12 +519,12 @@ Configure via `EVENTKIT_QUEUE_MODE` environment variable:

**v0.1.0 (Current) - DirectQueue**:
```
API → Processor → Buffer → Storage
API → Processor → EventLoader → Storage
```

**v0.2.0+ - PubSubQueue**:
```
API → Pub/Sub → Worker(s) → Buffer → Storage
API → Pub/Sub → Worker(s) → EventLoader → Storage
Horizontal scaling
```
Expand Down
14 changes: 7 additions & 7 deletions specs/core-pipeline/plan.md
Original file line number Diff line number Diff line change
Expand Up @@ -286,8 +286,8 @@ class SegmentAdapter:
| File | Purpose | User Story |
|------|---------|------------|
| `src/eventkit/processing/sequencer.py` | Hash-based routing for consistent ordering | Story 4 |
| `src/eventkit/processing/buffer.py` | Batching for efficient writes | Story 5 |
| `src/eventkit/processing/buffer_storage.py` | BufferStorage Protocol (pluggable) | Story 5 |
| `src/eventkit/processing/event_loader.py` | Batching for efficient writes | Story 5 |
| `src/eventkit/processing/event_loader_storage.py` | EventLoaderStorage Protocol (pluggable) | Story 5 |
| `src/eventkit/processing/processor.py` | Main orchestrator (queue-agnostic) | All |
| `src/eventkit/queues/base.py` | EventQueue Protocol | All |
| `src/eventkit/queues/async_queue.py` | AsyncQueue implementation | All |
Expand Down Expand Up @@ -330,7 +330,7 @@ class Sequencer:

```python
# Buffer - Efficient batch processing (Story 5)
class Buffer:
class EventLoader:
def __init__(
self,
event_store: EventStore,
Expand Down Expand Up @@ -651,7 +651,7 @@ async def lifespan(app: FastAPI):
# Create processor components
adapter = SegmentSchemaAdapter()
sequencer = HashSequencer(num_partitions=settings.NUM_PARTITIONS)
buffer = Buffer(
event_loader = EventLoader(
event_store=event_store,
size=settings.BUFFER_SIZE,
timeout=settings.BUFFER_TIMEOUT
Expand Down Expand Up @@ -1312,8 +1312,8 @@ eventkit/
│ ├── processing/
│ │ ├── __init__.py
│ │ ├── sequencer.py # Sequencer (Phase 4)
│ │ ├── buffer.py # Buffer (Phase 4)
│ │ ├── buffer_storage.py # BufferStorage Protocol (Phase 4)
│ │ ├── event_loader.py # EventLoader (Phase 4)
│ │ ├── event_loader_storage.py # EventLoaderStorage Protocol (Phase 4)
│ │ └── processor.py # Processor (Phase 4)
│ ├── queues/
│ │ ├── __init__.py
Expand Down Expand Up @@ -1408,7 +1408,7 @@ async def test_valid_event_reaches_event_store():
error_store = FirestoreErrorStore(project_id="test")
adapter = SegmentAdapter()
sequencer = Sequencer(num_partitions=16)
buffer = Buffer(event_store, size=100)
event_loader = EventLoader(event_store, size=100)
processor = Processor(adapter, sequencer, buffer, error_store)

# Execute
Expand Down
14 changes: 7 additions & 7 deletions specs/core-pipeline/tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -477,7 +477,7 @@ class Sequencer:

---

### Task 8: Implement Buffer (Batching)
### Task 8: Implement EventLoader (Batching)
**Estimated effort**: 3 hours
**Dependencies**: Task 4
**Phase**: Processing Pipeline
Expand All @@ -495,8 +495,8 @@ Implement per-partition buffering with size and time-based flushing.

#### Checklist
```python
# 1. Implement Buffer (src/eventkit/processing/buffer.py)
class Buffer:
# 1. Implement EventLoader (src/eventkit/processing/event_loader.py)
class EventLoader:
def __init__(self, event_store: EventStore, size: int = 100, timeout: float = 5.0):
self.event_store = event_store
self.size = size
Expand Down Expand Up @@ -562,7 +562,7 @@ class Processor:
self,
adapter: EventAdapter,
sequencer: Sequencer,
buffer: Buffer,
event_loader: EventLoader,
error_store: ErrorStore
):
self.adapter = adapter
Expand Down Expand Up @@ -774,7 +774,7 @@ Configure structlog for production observability with dual formatters (JSON for
- [x] Context propagation via contextvars (request_id, stream, etc.)
- [x] Logging in API (requests, responses, timing)
- [x] Logging in Processor (event received, adaptation, sequencing)
- [x] Logging in Buffer (flush operations, counts, timing)
- [x] Logging in EventLoader (flush operations, counts, timing)
- [x] Logging in EventStore (writes, errors, counts)
- [x] Logging in ErrorStore (DLQ writes)
- [x] No sensitive data in logs (no payloads)
Expand Down Expand Up @@ -866,7 +866,7 @@ async def process_event(self, raw_event: RawEvent):
logger.debug("event_sequenced", partition=partition)
```

**Phase 4: Add Buffer logging with timing**
**Phase 4: Add EventLoader logging with timing**
```python
# src/eventkit/processing/buffer.py
import time
Expand Down Expand Up @@ -925,7 +925,7 @@ async def write(self, raw_event: RawEvent, error: str):
**Always Log (INFO)**:
- API requests: method, path, status_code, duration_ms
- Events received: stream, event_type (NOT payload)
- Buffer flushes: partition, event_count, duration_ms
- EventLoader flushes: partition, event_count, duration_ms
- Store writes: event_count, duration_ms

**Sometimes Log (DEBUG)**:
Expand Down
14 changes: 7 additions & 7 deletions src/eventkit/api/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from eventkit.adapters.segment import SegmentSchemaAdapter
from eventkit.config import Settings
from eventkit.processing.buffer import Buffer
from eventkit.processing.event_loader import EventLoader
from eventkit.processing.processor import Processor
from eventkit.processing.sequencer import HashSequencer
from eventkit.queues import EventQueue, create_queue
Expand Down Expand Up @@ -52,7 +52,7 @@ def get_queue() -> EventQueue:
- ErrorStore (Firestore)
- Adapter (SegmentSchemaAdapter)
- Sequencer (HashSequencer)
- Buffer (with EventStore)
- EventLoader (batches events to storage)
- Processor (orchestrator)
- RingBuffer (Write-Ahead Log for durability - created internally by queue)
- EventQueue (factory-created based on QUEUE_MODE)
Expand Down Expand Up @@ -86,18 +86,18 @@ async def collect(queue: EventQueue = Depends(get_queue)):

sequencer = HashSequencer(num_partitions=settings.EVENTKIT_NUM_PARTITIONS)

buffer = Buffer(
event_loader = EventLoader(
event_store=event_store,
size=settings.EVENTKIT_BUFFER_SIZE,
max_size=settings.EVENTKIT_BUFFER_MAX_SIZE,
timeout=settings.EVENTKIT_BUFFER_TIMEOUT,
batch_size=settings.EVENTKIT_BUFFER_SIZE,
max_batch_size=settings.EVENTKIT_BUFFER_MAX_SIZE,
flush_interval=settings.EVENTKIT_BUFFER_TIMEOUT,
)

# Create processor
processor = Processor(
adapter=adapter,
sequencer=sequencer,
buffer=buffer,
event_loader=event_loader,
error_store=error_store,
)

Expand Down
Loading
Loading