Skip to content

Conversation

@prosdev
Copy link
Contributor

@prosdev prosdev commented Jan 11, 2026

Summary

Implements to enable horizontal scaling of event processing using Google Cloud Pub/Sub.

Changes

Core Implementation (4 commits):

  1. PubSubQueue Implementation: Full async queue with Pub/Sub integration

    • Sync-to-async bridge using asyncio.run_coroutine_threadsafe
    • Internal queue for decoupling Pub/Sub callbacks from async processing
    • Background workers for event processing
    • Automatic topic/subscription/DLQ creation
    • Graceful shutdown with queue draining
  2. Factory Integration: Updated create_queue() to support EVENTKIT_QUEUE_MODE=pubsub

  3. CI & Integration Tests:

    • Added Pub/Sub emulator to Docker Compose
    • 5 integration tests covering end-to-end flows
    • Updated CI workflow to start both emulators
  4. Documentation:

    • Updated LOCAL_DEV.md with Pub/Sub setup, queue modes, test isolation
    • Updated CLAUDE.md with async bridge pattern
    • Marked Task 16 complete in tasks.md

Test Coverage

  • Unit Tests: 10 tests for PubSubQueue (96% coverage)
  • Integration Tests: 5 tests with Pub/Sub emulator
  • Overall Coverage: 95% (165 unit tests passing)

Configuration

New environment variables:

  • EVENTKIT_QUEUE_MODE=pubsub (enable Pub/Sub queue)
  • EVENTKIT_PUBSUB_TOPIC (default: eventkit-events)
  • EVENTKIT_PUBSUB_SUBSCRIPTION (default: eventkit-worker)
  • EVENTKIT_PUBSUB_DLQ_TOPIC (default: eventkit-events-dlq)
  • EVENTKIT_PUBSUB_WORKERS (default: 4)
  • EVENTKIT_PUBSUB_MAX_DELIVERY_ATTEMPTS (default: 5)

Closes #18

Added detailed phase for PubSubQueue implementation:
- Async bridge pattern (sync Pub/Sub callback → asyncio.Queue → async workers)
- JSON serialization for simplicity and consistency
- Dual DLQ (Pub/Sub DLQ + ErrorStore)
- Resource management for dev/test environments
- Complete code patterns and design decisions

Estimated: 7-9 hours
Dependencies: Phase 4 (Processor, EventQueue protocol)
- Add PubSubQueue implementing EventQueue protocol
- Async bridge: Pub/Sub sync callbacks → asyncio processing pipeline
- Configurable workers, topic/subscription management
- DLQ support with max delivery attempts
- Graceful shutdown with queue draining
- 96% code coverage with comprehensive unit tests
- Mocked PublisherClient/SubscriberClient for fast unit testing
- Update create_queue() to support PUBSUB mode
- Export PubSubQueue from queues package
- Update factory tests to verify PubSubQueue creation
- All 25 queue tests passing
- Add pubsub-emulator service to docker-compose.yml
- Create comprehensive integration tests for PubSubQueue:
  * End-to-end publish → subscribe → process
  * Multiple workers parallel processing
  * Graceful shutdown with queue draining
  * Message acknowledgment (no redelivery)
  * Resource creation (topics/subscriptions)
- Update CI workflow to start Pub/Sub emulator
- Add PUBSUB_EMULATOR_HOST to test environment
- All 5 integration tests passing
- Update LOCAL_DEV.md: Add Pub/Sub emulator setup, queue mode options
- Update CLAUDE.md: Add Pub/Sub async bridge pattern, command updates
- Update tasks.md: Mark Task 16 (PubSubQueue) as complete
- Document sync-to-async bridge using asyncio.run_coroutine_threadsafe
- Add queue mode configuration examples for all three modes
@prosdev prosdev merged commit c74b6b4 into main Jan 11, 2026
1 check passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Implement PubSubQueue for horizontal scaling (Task 16)

1 participant