Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
with:
python-version: "3.12"

- name: Start Firestore Emulator
- name: Start Firestore and Pub/Sub Emulators
run: docker compose up -d --wait

- name: Install uv
Expand All @@ -49,6 +49,7 @@ jobs:
- name: Run tests
env:
FIRESTORE_EMULATOR_HOST: localhost:8080
PUBSUB_EMULATOR_HOST: localhost:8085
GCP_PROJECT_ID: test-project
run: |
uv run pytest --cov=src/eventkit --cov-report=term-missing --cov-report=xml
Expand All @@ -59,6 +60,6 @@ jobs:
file: ./coverage.xml
fail_ci_if_error: false

- name: Stop Firestore Emulator
- name: Stop Emulators
if: always()
run: docker compose down
62 changes: 58 additions & 4 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ uv sync --all-extras # Install all dependencies with lockfile
pre-commit install # Set up git hooks (one-time)

# Local Development
docker compose up -d --wait # Start Firestore emulator (with healthcheck)
docker compose down # Stop emulator
docker compose up -d --wait # Start emulators (Firestore + Pub/Sub)
docker compose down # Stop emulators

export FIRESTORE_EMULATOR_HOST="localhost:8080"
export PUBSUB_EMULATOR_HOST="localhost:8085"
export GCP_PROJECT_ID="test-project"
export EVENTKIT_QUEUE_MODE="direct" # or "async" or "pubsub"
uv run uvicorn eventkit.api.app:app --reload --port 8000

# Testing
Expand Down Expand Up @@ -138,6 +140,56 @@ class EventQueue(Protocol):

Not abstract base classes. Enables duck typing and easier testing.

### Pub/Sub Async Bridge Pattern

**Challenge**: Pub/Sub's Python client uses sync callbacks in a thread pool, but our `Processor` is fully async.

**Solution**: Bridge sync callbacks to async event loop using `asyncio.run_coroutine_threadsafe`:

```python
class PubSubQueue:
def __init__(self, processor: Processor, settings: Settings):
self.processor = processor
self.subscriber = pubsub_v1.SubscriberClient()
self.internal_queue: asyncio.Queue[tuple[RawEvent, Message]] = asyncio.Queue()
self.loop: asyncio.AbstractEventLoop | None = None

async def start(self) -> None:
self.loop = asyncio.get_event_loop() # Store event loop reference
# Start async workers
for i in range(self.settings.EVENTKIT_PUBSUB_WORKERS):
asyncio.create_task(self._worker(i))
# Start Pub/Sub subscriber (sync callback)
self.subscriber.subscribe(subscription_path, callback=self._pubsub_callback)

def _pubsub_callback(self, message: Message) -> None:
"""Sync callback (runs in Pub/Sub's thread pool)"""
raw_event = RawEvent(**json.loads(message.data.decode("utf-8")))
# Bridge to async world
asyncio.run_coroutine_threadsafe(
self.internal_queue.put((raw_event, message)),
self.loop
)

async def _worker(self, worker_id: int) -> None:
"""Async worker (pulls from internal queue)"""
while not self.shutdown_event.is_set():
raw_event, pubsub_msg = await self.internal_queue.get()
try:
await self.processor.process_event(raw_event)
pubsub_msg.ack() # Sync ack is fine
except Exception:
pubsub_msg.nack() # Redeliver message
finally:
self.internal_queue.task_done()
```

**Key Points**:
- Pub/Sub callback stays fast (just enqueue)
- Async workers handle all the heavy lifting
- Proper ack/nack ensures at-least-once delivery
- Graceful shutdown drains internal queue

### Stream Routing & Sequencing

Events route to named streams, then sequenced by identity hash:
Expand All @@ -163,6 +215,7 @@ Events route to named streams, then sequenced by identity hash:
- **Prefix settings**: Use `EVENTKIT_*` for all environment variables
- **Testing**: Every commit should include tests (unit + integration where applicable)
- **Docker Compose**: Use same setup locally and in CI (`docker compose up -d --wait`)
- **Pub/Sub message format**: Use JSON (not Protobuf) for simplicity and debugging

## Health Checks

Expand All @@ -174,6 +227,7 @@ Used by Kubernetes/load balancers to determine traffic routing.
## Documentation

See `LOCAL_DEV.md` for detailed local development instructions including:
- Setting up Firestore emulator with Docker Compose
- Running the FastAPI server
- Setting up emulators (Firestore + Pub/Sub) with Docker Compose
- Running the FastAPI server in different queue modes
- Manual testing with curl
- Running tests with emulators
69 changes: 62 additions & 7 deletions LOCAL_DEV.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@
- Python 3.12+
- [uv](https://docs.astral.sh/uv/) (recommended) or pip

### 1. Start Firestore Emulator
### 1. Start Emulators

```bash
docker compose up -d
```

This starts the Firestore emulator on `localhost:8080`.
This starts:
- **Firestore emulator** on `localhost:8080` (for event/error storage)
- **Pub/Sub emulator** on `localhost:8085` (for distributed queue mode)

### 2. Install Dependencies

Expand All @@ -24,9 +26,32 @@ uv sync

### 3. Run the API Server

**Option A: Direct Queue Mode (default, inline processing)**
```bash
export FIRESTORE_EMULATOR_HOST="localhost:8080"
export GCP_PROJECT_ID="test-project"
export EVENTKIT_QUEUE_MODE="direct"

uv run uvicorn eventkit.api.app:app --reload --port 8000
```

**Option B: Async Queue Mode (in-process workers)**
```bash
export FIRESTORE_EMULATOR_HOST="localhost:8080"
export GCP_PROJECT_ID="test-project"
export EVENTKIT_QUEUE_MODE="async"
export EVENTKIT_ASYNC_WORKERS="4"

uv run uvicorn eventkit.api.app:app --reload --port 8000
```

**Option C: Pub/Sub Queue Mode (distributed workers)**
```bash
export FIRESTORE_EMULATOR_HOST="localhost:8080"
export PUBSUB_EMULATOR_HOST="localhost:8085"
export GCP_PROJECT_ID="test-project"
export EVENTKIT_QUEUE_MODE="pubsub"
export EVENTKIT_PUBSUB_WORKERS="4"

uv run uvicorn eventkit.api.app:app --reload --port 8000
```
Expand Down Expand Up @@ -54,18 +79,45 @@ curl -X POST http://localhost:8000/collect \
## Running Tests

```bash
# Start emulator
# Start emulators
docker compose up -d

# Run tests
# Run all tests (unit + integration)
export FIRESTORE_EMULATOR_HOST="localhost:8080"
export PUBSUB_EMULATOR_HOST="localhost:8085"
export GCP_PROJECT_ID="test-project"
uv run pytest --cov=src/eventkit

# Stop emulator
# Run only unit tests (fast, no emulator needed)
uv run pytest tests/unit/

# Run only integration tests
uv run pytest tests/integration/

# Stop emulators
docker compose down
```

### Test Isolation

**Docker emulators keep data in memory between test runs** (while containers are running). This is useful for fast iteration, but means:

- **First test run**: Clean slate ✅
- **Second test run** (without restarting): Data from previous run persists ⚠️

**To get a clean slate for integration tests:**

```bash
# Option 1: Restart emulators (fast, keeps containers)
docker compose restart firestore-emulator pubsub-emulator

# Option 2: Full restart (slower, recreates containers)
docker compose down
docker compose up -d --wait
```

**In CI**: Each workflow run starts fresh containers, so tests are always isolated.

---

## Configuration
Expand All @@ -77,8 +129,11 @@ See `src/eventkit/config.py` for all available settings.
| Variable | Default | Description |
|----------|---------|-------------|
| `GCP_PROJECT_ID` | *required* | GCP project ID |
| `FIRESTORE_EMULATOR_HOST` | - | Firestore emulator address |
| `FIRESTORE_EMULATOR_HOST` | - | Firestore emulator address (e.g., `localhost:8080`) |
| `PUBSUB_EMULATOR_HOST` | - | Pub/Sub emulator address (e.g., `localhost:8085`) |
| `EVENTKIT_QUEUE_MODE` | `"direct"` | Queue mode: `direct`, `async`, `pubsub` |
| `EVENTKIT_ASYNC_WORKERS` | `4` | Number of async workers (async mode) |
| `EVENTKIT_PUBSUB_WORKERS` | `4` | Number of Pub/Sub workers (pubsub mode) |
| `EVENTKIT_BUFFER_SIZE` | `100` | Events per partition before flush |

---
Expand All @@ -88,6 +143,6 @@ See `src/eventkit/config.py` for all available settings.
```bash
# Stop API server: Ctrl+C

# Stop Firestore emulator
# Stop emulators
docker compose down
```
16 changes: 15 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Docker Compose for local development and testing
# Runs Firestore emulator for integration tests
# Runs Firestore and Pub/Sub emulators for integration tests

services:
firestore-emulator:
Expand All @@ -15,3 +15,17 @@ services:
timeout: 5s
retries: 10
start_period: 10s

pubsub-emulator:
image: gcr.io/google.com/cloudsdktool/google-cloud-cli:emulators
command: gcloud beta emulators pubsub start --host-port=0.0.0.0:8085
ports:
- "8085:8085"
environment:
- PUBSUB_PROJECT_ID=test-project
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8085"]
interval: 5s
timeout: 5s
retries: 10
start_period: 10s
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ dependencies = [
"pydantic-settings>=2.0.0",
"uvicorn[standard]>=0.24.0",
"google-cloud-firestore>=2.13.0",
"google-cloud-pubsub>=2.18.0",
"structlog>=23.2.0",
"tenacity>=8.2.0",
"python-dateutil>=2.9.0.post0",
Expand Down
Loading
Loading