Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 52 additions & 0 deletions .github/workflows/workflow.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
name: Build and Publish

on:
push:
branches: [ main ]
pull_request:
branches: [ main ]

jobs:
build-test:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'

- name: Install uv
run: pip install uv

- name: Install dependencies
run: uv sync --all-groups

- name: Run tests
run: uv run pytest tests -vvv -Werror

publish:
if: github.ref == 'refs/heads/main'
needs: build-test
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4

- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'

- name: Install uv
run: pip install uv

- name: Build package
run: uv pip run python -m build

- name: Publish to PyPI
env:
PYPI_TOKEN: ${{ secrets.PYPI_TOKEN }}
run: uv pip run twine upload dist/* -u __token__ -p $PYPI_TOKEN
8 changes: 5 additions & 3 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ MANIFEST
*.manifest
*.spec

# Installer logs
# Installer .logs
pip-log.txt
pip-delete-this-directory.txt

Expand Down Expand Up @@ -182,9 +182,9 @@ cython_debug/
.abstra/

# Visual Studio Code
# Visual Studio Code specific template is maintained in a separate VisualStudioCode.gitignore
# Visual Studio Code specific template is maintained in a separate VisualStudioCode.gitignore
# that can be found at https://github.com/github/gitignore/blob/main/Global/VisualStudioCode.gitignore
# and can be added to the global gitignore or merged into this file. However, if you prefer,
# and can be added to the global gitignore or merged into this file. However, if you prefer,
# you could uncomment the following to ignore the entire vscode folder
# .vscode/

Expand All @@ -205,3 +205,5 @@ cython_debug/
marimo/_static/
marimo/_lsp/
__marimo__/
.logs/*
.idea/*
75 changes: 75 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
---
minimum_pre_commit_version: 1.21.0
fail_fast: false
default_stages: [ pre-commit, pre-push ]
repos:
# meta

- repo: meta
hooks:
- id: check-useless-excludes

# formatters

- repo: https://github.com/asottile/pyupgrade
rev: v3.15.1
hooks:
- id: pyupgrade
stages: [ pre-push ]

# linters

- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: v0.2.2
hooks:
- id: ruff
args: [ --fix, --exit-non-zero-on-fix ]
- id: ruff-format

- repo: https://github.com/PyCQA/bandit
rev: 1.7.7
hooks:
- id: bandit
args: [ "-x", "tests" ]
stages: [ pre-push ]

- repo: local
hooks:
- id: uv
name: uv check
description: Validates the structure of the pyproject.toml file
entry: uv pip check
language: system
pass_filenames: false
files: ^pyproject.toml$
stages: [ pre-push ]

- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.8.0
hooks:
- id: mypy
args: ["--install-types", "--non-interactive", "--ignore-missing-imports"]

- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.5.0
hooks:
- id: check-added-large-files
- id: check-docstring-first
- id: debug-statements
- id: end-of-file-fixer
- id: trailing-whitespace
- id: check-ast
- id: check-builtin-literals
- id: detect-private-key
- id: no-commit-to-branch

- repo: https://github.com/pre-commit/pygrep-hooks
rev: v1.10.0
hooks:
# - id: rst-backticks
- id: python-use-type-annotations
- id: python-no-log-warn
- id: python-no-eval
- id: python-check-mock-methods
- id: python-check-blanket-noqa
179 changes: 177 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,2 +1,177 @@
# tinystream
A lightweight streaming engine.
# TinyStream

TinyStream is a lightweight in Python. It’s designed to demonstrate the internal mechanics of modern event streaming:
append-only logs, partitioned storage, replication, and streaming processing — all in a small, readable codebase.

## Features

* Append-only partitioned log storage
* Durable, segment-based storage on disk
* Producer / Consumer APIs (Kafka-style)
* Controller node for broker and topic metadata
* Broker cluster simulation (multi-node)
* Replication and leader election (WIP)
* Retention policies for segment cleanup
* Local cluster harness for distributed testing

## Architecture Overview

TinyStream is split into five layers:

```
┌────────────────────────────────────────────┐
│ Producers / Consumers │
│ • Send and fetch records from topics │
│ • Commit offsets │
└────────────────────────────────────────────┘
┌────────────────────────────────────────────┐
│ Broker │
│ • Accepts produce/fetch requests │
│ • Manages topic partitions │
│ • Serves leader/follower replicas │
└────────────────────────────────────────────┘
┌────────────────────────────────────────────┐
│ Partition │
│ • Manages append-only log segments │
│ • Handles retention and compaction │
│ • Stores offset and time indexes │
└────────────────────────────────────────────┘
┌────────────────────────────────────────────┐
│ Segment │
│ • File-based, append-only structure │
│ • Supports batch reads via index lookups │
└────────────────────────────────────────────┘
┌────────────────────────────────────────────┐
│ Storage │
│ • Local disk or tiered storage backend │
│ • Retention + compaction policy engine │
└────────────────────────────────────────────┘
```

Each topic is split into partitions, and each partition is an append-only log.
Brokers host one or more partitions; producers and consumers talk to brokers via lightweight RPC.

## Partition Storage Layout

Each partition is stored on disk as a directory containing segment and index files:

```
data/
└── topics/
└── user_clicks/
└── partition-0/
├── 00000000000000000000.log
├── 00000000000000000000.index
├── 00000000000001000000.log
├── 00000000000001000000.index
├── partition.metadata
└── lock
```

Messages are never deleted after consumption — instead, TinyStream enforces a retention policy (by time or size) to delete or compact old segments.

## Local Cluster Simulation

You can run a multi-broker cluster locally to test replication and leader election:

```
localhost
└── TinyStream Cluster
├── broker-1 (port 5001)
├── broker-2 (port 5002)
├── broker-3 (port 5003)
└── controller (port 6000)
```

Each broker stores its own data in a separate directory (e.g., `/tmp/tinystream/b1`).

## Example Usage

### Start a local controller and brokers

```bash
python -m tinystream.controller --port 6000
python -m tinystream.broker --id 1 --port 5001 --controller localhost:6000
python -m tinystream.broker --id 2 --port 5002 --controller localhost:6000
```

### Produce messages

```python
from tinystream.client import Producer

producer = Producer(controller="localhost:6000")
for i in range(10):
producer.send("events", f"msg-{i}".encode())
```

### Consume messages

```python
from tinystream.client import Consumer

consumer = Consumer(controller="localhost:6000")
for msg in consumer.read("events"):
print(msg.value)
```

## Local Cluster Testing

TinyStream includes a built-in harness for testing multi-broker setups:

```python
from tinystream.testing import LocalCluster

def test_cluster_replication():
cluster = LocalCluster(brokers=3)
cluster.start()

p = cluster.new_producer()
c = cluster.new_consumer()

for i in range(100):
p.send("demo", f"event-{i}".encode())

msgs = c.read("demo", from_beginning=True)
assert len(msgs) == 100

cluster.stop()
```

## What to Test

| Category | Example |
| --------------- | ---------------------------------------------------- |
| Replication | Write to leader → restart follower → verify catch-up |
| Leader Election | Kill leader → ensure controller reassigns |
| Retention | Configure short TTL → check old segment deletion |
| Consistency | Compare offsets after recovery |
| Consumer Groups | Add/remove consumers → verify rebalancing |

## Roadmap

* [ ] Controller-based leader election
* [ ] Replication between brokers
* [ ] Topic compaction policies
* [ ] Async I/O for log reads
* [ ] REST/gRPC API layer
* [ ] Stream processing DSL (TinyFlink)

## Design Philosophy

TinyStream is not a production system. It's primarily to reproduce core concepts from event streaming systems in a minimal codebase.

* How Kafka’s storage engine works internally
* How distributed log replication operates
* How producers and consumers interact via offsets
* How checkpointing, retention, and recovery behave in real streaming systems

It aims to provide readable, hackable code that models the essence of stream storage.
55 changes: 55 additions & 0 deletions consume.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import asyncio
from tinystream.client import Consumer

TOPIC = "clicks"
PARTITION = 0
START_OFFSET = 0


async def main():
print("Initializing consumer...")
consumer = Consumer(host="localhost", port=9092)

try:
await consumer.connect()
print("Consumer connected.")

consumer.assign(TOPIC, partition=PARTITION, start_offset=START_OFFSET)
print(
f"Assigned to {TOPIC}-{PARTITION} at offset {START_OFFSET}. Polling... (Press Ctrl+C to stop)"
)

while True:
batch = await consumer.poll(max_messages=10)

if batch:
print("--- Received batch ---")
for msg in batch:
print(f"Received: {msg}")
print("----------------------")
else:
await asyncio.sleep(1)

except ConnectionRefusedError:
print("\n[ERROR] Could not connect to broker.")
print("Please ensure the broker is running in another terminal:")
print(" python -m tinystream.broker")

except KeyboardInterrupt:
# --- Handle Ctrl+C gracefully ---
print("\n\nStopping consumer... (Ctrl+C pressed)")

except Exception as e:
print(f"\nAn error occurred: {e}")

finally:
# --- Ensure connection is always closed ---
if consumer._connection.is_connected:
await consumer.close()
print("Consumer connection closed.")
else:
print("Consumer was not connected.")


if __name__ == "__main__":
asyncio.run(main())
Loading