Skip to content

otaviocarvalho/camus

Repository files navigation

Camus - Iceberg-Backed PostgreSQL Messaging System

A high-performance, PostgreSQL-compatible messaging system backed by Apache Iceberg, built in Rust.

Overview

Camus is a messaging system that combines the best of both worlds:

  • Sub-second latency for real-time message delivery via Tokio broadcast channels
  • Durable storage with ACID guarantees using Apache Iceberg
  • SQL interface through PostgreSQL wire protocol for familiar tooling
  • Powerful querying via DataFusion over Iceberg/Parquet data

Think of it as Kafka meets a data lakehouse, accessible via PostgreSQL.

Architecture

┌─────────────┐
│  Producer   │
└──────┬──────┘
       │ INSERT / Publish
       ▼
┌─────────────────────────────────────────┐
│         QueueManager                     │
│  ┌───────────┐      ┌──────────────┐   │
│  │   Topic   │      │ WriterPool   │   │
│  │ (Broadcast│◄────►│  (Batching)  │   │
│  │  Channel) │      │              │   │
│  └─────┬─────┘      └──────┬───────┘   │
└────────┼────────────────────┼───────────┘
         │                    │
         │ Real-time          │ Durable
         ▼                    ▼
┌─────────────────┐   ┌──────────────────┐
│ Live Consumers  │   │  Apache Iceberg  │
│  (Broadcast)    │   │   (S3/MinIO)     │
└─────────────────┘   └─────────┬────────┘
                                │
                                │ SELECT / Query
                                ▼
                        ┌──────────────────┐
                        │   DataFusion     │
                        │  Query Engine    │
                        └──────────────────┘

Quick Start

Prerequisites

  • Rust 1.75+
  • Docker & Docker Compose

1. Start Infrastructure

# Start MinIO (S3), PostgreSQL (catalog), and Iceberg REST catalog
docker-compose up -d

# Check services are running
docker-compose ps

2. Build and Run

# Build the project
cargo build --release

# Run the server
cargo run --release

# Or with custom log level
RUST_LOG=info cargo run --release

3. Connect and Use via PostgreSQL

Camus exposes a PostgreSQL-compatible interface on port 5433:

psql "postgresql://postgres:postgres@localhost:5433/camus"

Quick Start SQL

-- Create a topic (message queue)
CREATE TOPIC orders;

-- Publish messages using INSERT
INSERT INTO orders (payload) VALUES ('Order #1: Laptop');
INSERT INTO orders (payload) VALUES ('Order #2: Mouse');
INSERT INTO orders (payload) VALUES ('Order #3: Keyboard');

-- Query historical messages from Iceberg
SELECT message_id, timestamp, payload
FROM orders
ORDER BY timestamp DESC;

-- Show all topics
SHOW TOPICS;

SQL Examples

Complete SQL examples are available in examples/sql/:

cd examples/sql

# 1. Create topics
psql "postgresql://postgres:postgres@localhost:5433/camus" -f 01_create_topic.sql

# 2. Publish messages
psql "postgresql://postgres:postgres@localhost:5433/camus" -f 02_publish_messages.sql

# 3. Query historical messages
psql "postgresql://postgres:postgres@localhost:5433/camus" -f 03_query_messages.sql

# 4. Consumer groups
psql "postgresql://postgres:postgres@localhost:5433/camus" -f 04_consumer_groups.sql

# Or run all examples at once
./run_all_examples.sh

See examples/sql/README.md for details.

SQL Reference

Implemented

Topic Management

-- Create a new topic
CREATE TOPIC orders;

-- Show all topics
SHOW TOPICS;

Publishing Messages (INSERT)

-- Simple message
INSERT INTO orders (payload) VALUES ('Order data here');

-- With partition key
INSERT INTO orders (partition_key, payload)
VALUES ('customer-123', 'Order for customer 123');

-- With producer ID
INSERT INTO orders (producer_id, partition_key, payload)
VALUES ('app-1', 'customer-123', 'Order data');

-- Batch insert
INSERT INTO orders (payload) VALUES
    ('Order 1'),
    ('Order 2'),
    ('Order 3');

Querying Messages (SELECT)

-- All messages
SELECT * FROM orders ORDER BY timestamp DESC LIMIT 100;

-- Time-range queries
SELECT * FROM orders
WHERE timestamp > NOW() - INTERVAL '1 hour'
ORDER BY timestamp;

-- Filter by partition key
SELECT * FROM orders
WHERE partition_key = 'customer-123'
ORDER BY timestamp;

-- Aggregations
SELECT
    DATE_TRUNC('hour', timestamp) as hour,
    COUNT(*) as message_count
FROM orders
GROUP BY hour
ORDER BY hour DESC;

Planned (Not Yet Implemented)

  • DROP TOPIC - Delete a topic
  • SUBSCRIBE TO <topic> AS <group> - Real-time streaming via cursor
  • FOR SYSTEM_TIME AS OF <timestamp> - Time-travel queries over Iceberg snapshots
  • JSON field access queries (payload::json->>'field')

Configuration

Configuration is done via environment variables. See .env.example for all options.

Key variables:

Variable Default Description
POSTGRES_ADDR 0.0.0.0:5433 PostgreSQL wire protocol listen address
S3_ENDPOINT http://localhost:9000 S3-compatible storage endpoint
S3_ACCESS_KEY_ID minioadmin S3 access key
S3_SECRET_ACCESS_KEY minioadmin S3 secret key
CATALOG_URI http://localhost:8181 Iceberg REST catalog URL
CHANNEL_CAPACITY 10000 Broadcast channel capacity per topic
BATCH_SIZE 1000 Messages per Iceberg write batch
FLUSH_INTERVAL_MS 5000 Background flush interval

Project Structure

camus/
├── src/                    # Binary entrypoint + E2E benchmark
│
├── iceberg-core/           # Core Iceberg & storage integration
│   ├── catalog/            # Iceberg catalog management
│   ├── schema/             # Message schema definitions
│   ├── storage/            # Object store configuration
│   ├── table_manager.rs    # Table lifecycle management
│   ├── writer/             # Arrow-based Iceberg writers
│   ├── compaction/         # Background compaction scheduler
│   └── metrics.rs          # Prometheus metrics
│
├── message-queue/          # In-memory message routing
│   ├── types/              # Message types and builders
│   ├── topic/              # Topic management with broadcast channels
│   ├── manager.rs          # Queue manager coordinator
│   ├── writer_pool.rs      # Batched persistence to Iceberg
│   ├── consumer/           # Consumer group management
│   ├── offset_store.rs     # Consumer group offset persistence
│   ├── flusher.rs          # Background flush task
│   └── wal/                # Write-ahead log (O_DSYNC, checksums, hash-chaining)
│
├── pg-server/              # PostgreSQL wire protocol
│   ├── server/             # TCP server and pgwire connection handling
│   ├── handler/            # SQL query handler and routing
│   ├── sql_parser/         # SQL extensions parser
│   ├── query_engine/       # Query engine abstraction
│   ├── datafusion_integration/ # DataFusion TableProvider for Iceberg
│   ├── auth/               # Authentication
│   └── cache/              # Query result caching
│
├── adr/                    # Architecture Decision Records
├── benches/                # Criterion benchmarks
├── config/                 # Prometheus config
├── scripts/                # Catalog DB init script
└── examples/               # SQL examples

Infrastructure (docker-compose)

Service Port Purpose
MinIO 9000 (API), 9001 (console) S3-compatible object storage
PostgreSQL 5432 Iceberg catalog metadata
Iceberg REST 8181 REST catalog API
Prometheus 9090 Metrics (optional, --profile monitoring)
Grafana 3000 Dashboards (optional, --profile monitoring)
Camus (app) 5433 PostgreSQL wire protocol server

Credentials: minioadmin/minioadmin (MinIO), postgres/postgres (PostgreSQL).

Development

Running Tests

# Run all tests
cargo test --workspace

# Run with logging
RUST_LOG=debug cargo test --workspace

# Run specific test
cargo test --package iceberg-core test_name

Running Benchmarks

cargo bench --bench throughput

Code Quality

cargo fmt --all
cargo clippy --workspace --all-targets
cargo check --workspace

Architecture Decision Records

Current Status

Working:

  • Iceberg table creation and schema management
  • Message publishing via INSERT (single and batch)
  • Durable persistence to Iceberg/Parquet on S3
  • SELECT queries via DataFusion over Iceberg data
  • CREATE TOPIC / SHOW TOPICS
  • Real-time broadcast channels (in-memory)
  • Write-ahead log (Phase 1: append-only with checksums and hash-chaining)
  • Prometheus metrics collection
  • E2E latency/throughput benchmarks

In progress:

  • Consumer groups and offset tracking
  • Background compaction
  • DROP TOPIC support

Future:

  • SUBSCRIBE (real-time streaming cursors)
  • Time-travel queries (Iceberg snapshots)
  • Exactly-once semantics
  • Schema evolution (Avro/Protobuf)
  • TLS and RBAC
  • Multi-region replication
  • Admin UI

Acknowledgments

Built with:


Note: Camus is in early development. APIs may change. Not recommended for production use yet.

About

Message delivery without meaning, meaningfully delivered anyway

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors