Skip to content

iamsuvhro/NanoKafka

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

3 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

NanoKafka – Lightweight Single-Process Log-Based Message Broker

NanoKafka is a lightweight, single-process, log-based message broker inspired by Apache Kafka.

It is designed for:

  • Internal event-driven systems
  • Single-node ETL pipelines
  • Backtesting engines
  • Trading systems
  • Learning Kafka internals
  • Embedded messaging without external infrastructure

It is NOT a Kafka replacement.


🚀 Features

  • Topics
  • Multiple partitions per topic
  • Append-only log
  • Consumer groups
  • Manual offset commit
  • Size-based retention
  • Backpressure control (block / reject)
  • Lag metrics
  • File persistence
  • Crash recovery
  • Thread-safe design

🏗 Architecture Overview

NanoKafka (Broker)
 ├── Topic
 │     ├── Partition
 │     ├── ConsumerGroup

Each partition maintains:

  • Append-only log
  • Offset tracking per consumer group
  • Retention policy
  • Disk persistence

🧠 Delivery Semantics

NanoKafka provides:

  • At-least-once delivery
  • Ordered delivery per partition
  • Manual commit semantics

It does NOT provide:

  • Exactly-once semantics
  • Replication
  • Distributed clustering
  • Cross-partition ordering

📦 Installation

Currently local module only:

git clone <repo>
cd nanokafka

🧪 Basic Usage

from nanokafka import NanoKafka

broker = NanoKafka()

broker.create_topic(
    "orders",
    num_partitions=2,
    max_size=1000,
    retention_size=5000,
    backpressure="block"
)

# Producer
broker.produce("orders", {"id": 1}, key="user1")

# Consumer
broker.join_group("orders", "group1", "consumerA")

partition_index, (offset, message) = broker.poll("orders", "group1", "consumerA")

print(message)

broker.commit("orders", "group1", partition_index, offset)

📊 Lag Monitoring

print(broker.lag("orders", "group1"))

🔄 Crash Recovery

Logs and offsets are persisted to:

data/<topic>_<partition>.log
data/<topic>_<partition>.offsets

On restart, NanoKafka reloads all state automatically.


⚙️ Backpressure Modes

Mode Behavior
block Producer waits if partition full
reject Producer raises exception

🛑 Limitations

  • Single process only
  • No replication
  • No network protocol
  • Not suitable for distributed production systems

🎯 Project Goals

This project exists to:

  • Demonstrate log-based messaging internals
  • Understand Kafka core architecture
  • Provide embedded event broker for small systems
  • Serve as learning & portfolio project

📦 2️⃣ Proper Module Structure

Structure your repo like this:

nanokafka/
│
├── nanokafka/
│   ├── __init__.py
│   ├── broker.py
│   ├── topic.py
│   ├── partition.py
│   ├── consumer_group.py
│
├── tests/
│   ├── test_basic.py
│   ├── test_commit.py
│   ├── test_retention.py
│
├── examples/
│   ├── basic_usage.py
│
├── README.md
├── setup.py

__init__.py

from .broker import NanoKafka

__all__ = ["NanoKafka"]

🧪 3️⃣ Sample Test Cases (pytest)

Install pytest:

pip install pytest

test_basic.py

import os
from nanokafka import NanoKafka

def test_basic_produce_consume(tmp_path):
    data_dir = tmp_path / "data"

    broker = NanoKafka()
    broker.create_topic("test", num_partitions=1)

    broker.produce("test", "hello")

    broker.join_group("test", "group1", "consumer1")

    p, (offset, msg) = broker.poll("test", "group1", "consumer1")

    assert msg == "hello"

    broker.commit("test", "group1", p, offset)

    assert broker.lag("test", "group1")[0] == 0

test_commit_behavior.py

def test_no_commit_reprocess(tmp_path):
    broker = NanoKafka()
    broker.create_topic("test")

    broker.produce("test", "data")

    broker.join_group("test", "g1", "c1")

    p, (offset, msg) = broker.poll("test", "g1", "c1")

    # Do NOT commit

    p2, (offset2, msg2) = broker.poll("test", "g1", "c1")

    assert offset == offset2

test_retention.py

def test_retention(tmp_path):
    broker = NanoKafka()
    broker.create_topic("test", retention_size=2)

    broker.produce("test", 1)
    broker.produce("test", 2)
    broker.produce("test", 3)

    broker.join_group("test", "g1", "c1")

    p, (offset, msg) = broker.poll("test", "g1", "c1")

    broker.commit("test", "g1", p, offset)

    lag = broker.lag("test", "g1")

    assert isinstance(lag, dict)

🚀 4️⃣ Example Usage (examples/basic_usage.py)

import threading
from nanokafka import NanoKafka

broker = NanoKafka()
broker.create_topic("events", num_partitions=2)

def producer():
    for i in range(10):
        broker.produce("events", f"msg-{i}", key=i)

def consumer():
    broker.join_group("events", "group1", "consumer1")
    while True:
        p, (offset, msg) = broker.poll("events", "group1", "consumer1")
        print(msg)
        broker.commit("events", "group1", p, offset)

t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)

t1.start()
t2.start()

📦 5️⃣ setup.py (Make It Installable)

from setuptools import setup, find_packages

setup(
    name="nanokafka",
    version="0.1.0",
    packages=find_packages(),
    description="Lightweight single-process log-based message broker",
    author="Your Name",
    python_requires=">=3.8",
)

Install locally:

pip install -e .

OR

pip install nanokafka .

About

NanoKafka – Lightweight Single-Process Log-Based Message Broker

Resources

License

Stars

Watchers

Forks

Packages

 
 
 

Contributors

Languages