diff --git a/README.md b/README.md index f9b5040..b969f47 100644 --- a/README.md +++ b/README.md @@ -1,18 +1,126 @@ # TinyStream -TinyStream is a lightweight in Python. It’s designed to demonstrate the internal mechanics of modern event streaming: -append-only logs, partitioned storage, replication, and streaming processing — all in a small, readable codebase. +TinyStream is a lightweight streaming system in Python, inspired by Apache Kafka. +It’s designed to demonstrate the internal mechanics of a modern event streaming platform: append-only logs, partitioned storage, replication, and service discovery — all in a small, readable codebase. + +## Design Philosophy: + +**This is not a production system.** It is a "glass box" designed to reproduce the core concepts from systems like Kafka in a minimal, hackable codebase. + +It exists to help answer questions like: +* How does Kafka’s storage engine work internally? +* How does distributed log replication operate? +* How do producers and consumers interact via offsets? +* How do components discover each other and handle leader election? + +It aims to provide readable code that models the essence of distributed stream storage. ## Features * Append-only partitioned log storage * Durable, segment-based storage on disk -* Producer / Consumer APIs (Kafka-style) -* Controller node for broker and topic metadata -* Broker cluster simulation (multi-node) -* Replication and leader election (WIP) -* Retention policies for segment cleanup +* Producer / Consumer client APIs +* Controller node for cluster metadata and leader election +* Broker cluster simulation * Local cluster harness for distributed testing +* **[WIP]** Replication and leader election +* **[WIP]** Retention policies for segment cleanup + +## Installation + +The project uses `uv` for dependency management and execution. + +1. Clone the repository: + ```bash + git clone [https://github.com/Olamyy/tinystream](https://github.com/Olamyy/tinystream) + cd tinystream + ``` +2. Install the required dependencies: + ```bash + uv install + ``` + +## Quick Start: Running Locally + +Here is how to run a minimal "cluster" (one controller, one broker) on your machine. + +### Step 1: Start the Controller + +The Controller manages cluster metadata (topics, brokers, partition leaders). + +```bash +uv run python -m tinystream.controller --port 6000 +``` + +### Step 2: Start a Broker + +The Broker stores data. It registers itself with the Controller. +```bash +uv run python -m tinystream.broker --mode cluster --broker-number=2 +``` + +### Step 4: Create a Topic + +Topics must be created before you can produce to them. Use the admin client to ask the Controller to create a topic. + +```bash +uv run python -m tinystream.admin create-topic \ + --topic "events" \ + --partitions 3 \ + --replication-factor 1 \ + --controller "localhost:6000" +``` + + +### Step 5: Produce Messages + +```python +from tinystream.client.producer import Producer +from tinystream.config.parser import TinyStreamConfig + +config = TinyStreamConfig.from_default_config_file() +config.mode = "cluster" +producer = Producer(config=config) + +print("Sending 10 messages...") + +for i in range(10): + msg = f"hello-tinystream-{i}".encode('utf-8') + producer.send("events", msg) + print(f"Sent: {msg.decode()}") + +print("Done.") +``` + +### Step 6: Consume Messages + +```python + +from tinystream.client.consumer import Consumer +from tinystream.config.parser import TinyStreamConfig + +config = TinyStreamConfig.from_default_config_file() + +config.mode = "cluster" +consumer = Consumer(config=config, group_id="test-group") + +consumer.connect() + +consumer.assign(topic="events", partition=0, start_offset=0) + +for _message in consumer.poll(): + print(f"Received: {_message.value.decode()}") + +``` + +### Running Components in Isolation + +For quick testing, each core component can be run in isolation directly as a module: + +- Controller: `uv run python -m tinystream.controller` +- Broker: `uv run python -m tinystream.broker` +- Producer: `uv run python -m tinystream.client.producer` +- Consumer: `uv run python -m tinystream.client.consumer` ## Architecture Overview @@ -20,7 +128,7 @@ TinyStream is split into five layers: ``` ┌────────────────────────────────────────────┐ -│ Producers / Consumers │ +│ Producers / Consumers │ │ • Send and fetch records from topics │ │ • Commit offsets │ └────────────────────────────────────────────┘ @@ -36,7 +144,7 @@ TinyStream is split into five layers: ▼ ┌────────────────────────────────────────────┐ │ Partition │ -│ • Manages append-only log segments │ +│ • Manages append-only log segments | │ • Handles retention and compaction │ │ • Stores offset and time indexes │ └────────────────────────────────────────────┘ @@ -78,73 +186,6 @@ data/ Messages are never deleted after consumption — instead, TinyStream enforces a retention policy (by time or size) to delete or compact old segments. -## Local Cluster Simulation - -You can run a multi-broker cluster locally to test replication and leader election: - -``` -localhost -└── TinyStream Cluster - ├── broker-1 (port 5001) - ├── broker-2 (port 5002) - ├── broker-3 (port 5003) - └── controller (port 6000) -``` - -Each broker stores its own data in a separate directory (e.g., `/tmp/tinystream/b1`). - -## Example Usage - -### Start a local controller and brokers - -```bash -python -m tinystream.controller --port 6000 -python -m tinystream.broker --id 1 --port 5001 --controller localhost:6000 -python -m tinystream.broker --id 2 --port 5002 --controller localhost:6000 -``` - -### Produce messages - -```python -from tinystream.client import Producer - -producer = Producer(controller="localhost:6000") -for i in range(10): - producer.send("events", f"msg-{i}".encode()) -``` - -### Consume messages - -```python -from tinystream.client import Consumer - -consumer = Consumer(controller="localhost:6000") -for msg in consumer.read("events"): - print(msg.value) -``` - -## Local Cluster Testing - -TinyStream includes a built-in harness for testing multi-broker setups: - -```python -from tinystream.testing import LocalCluster - -def test_cluster_replication(): - cluster = LocalCluster(brokers=3) - cluster.start() - - p = cluster.new_producer() - c = cluster.new_consumer() - - for i in range(100): - p.send("demo", f"event-{i}".encode()) - - msgs = c.read("demo", from_beginning=True) - assert len(msgs) == 100 - - cluster.stop() -``` ## What to Test @@ -154,24 +195,4 @@ def test_cluster_replication(): | Leader Election | Kill leader → ensure controller reassigns | | Retention | Configure short TTL → check old segment deletion | | Consistency | Compare offsets after recovery | -| Consumer Groups | Add/remove consumers → verify rebalancing | - -## Roadmap - -* [ ] Controller-based leader election -* [ ] Replication between brokers -* [ ] Topic compaction policies -* [ ] Async I/O for log reads -* [ ] REST/gRPC API layer -* [ ] Stream processing DSL (TinyFlink) - -## Design Philosophy - -TinyStream is not a production system. It's primarily to reproduce core concepts from event streaming systems in a minimal codebase. - -* How Kafka’s storage engine works internally -* How distributed log replication operates -* How producers and consumers interact via offsets -* How checkpointing, retention, and recovery behave in real streaming systems - -It aims to provide readable, hackable code that models the essence of stream storage. +| Consumer Groups | Add/remove consumers → verify rebalancing diff --git a/consume.py b/consume.py deleted file mode 100644 index 4a11dd4..0000000 --- a/consume.py +++ /dev/null @@ -1,55 +0,0 @@ -import asyncio -from tinystream.client import Consumer - -TOPIC = "clicks" -PARTITION = 0 -START_OFFSET = 0 - - -async def main(): - print("Initializing consumer...") - consumer = Consumer(host="localhost", port=9092) - - try: - await consumer.connect() - print("Consumer connected.") - - consumer.assign(TOPIC, partition=PARTITION, start_offset=START_OFFSET) - print( - f"Assigned to {TOPIC}-{PARTITION} at offset {START_OFFSET}. Polling... (Press Ctrl+C to stop)" - ) - - while True: - batch = await consumer.poll(max_messages=10) - - if batch: - print("--- Received batch ---") - for msg in batch: - print(f"Received: {msg}") - print("----------------------") - else: - await asyncio.sleep(1) - - except ConnectionRefusedError: - print("\n[ERROR] Could not connect to broker.") - print("Please ensure the broker is running in another terminal:") - print(" python -m tinystream.broker") - - except KeyboardInterrupt: - # --- Handle Ctrl+C gracefully --- - print("\n\nStopping consumer... (Ctrl+C pressed)") - - except Exception as e: - print(f"\nAn error occurred: {e}") - - finally: - # --- Ensure connection is always closed --- - if consumer._connection.is_connected: - await consumer.close() - print("Consumer connection closed.") - else: - print("Consumer was not connected.") - - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/load.py b/load.py deleted file mode 100644 index ef7aa32..0000000 --- a/load.py +++ /dev/null @@ -1,85 +0,0 @@ -import asyncio -import time -import uuid -from typing import List - -from tinystream.client.producer import Producer - -NUM_PRODUCERS = 50 -MESSAGES_PER_PRODUCER = 5000000 -TOPIC = "load_test_topic" -TOTAL_MESSAGES = NUM_PRODUCERS * MESSAGES_PER_PRODUCER - - -async def producer_worker(worker_id: int): - """ - A single producer task that sends a batch of messages. - """ - print(f"[Worker {worker_id}] Starting...") - producer = Producer() - await producer.connect() - - messages_sent = 0 - for i in range(MESSAGES_PER_PRODUCER): - msg = { - "worker_id": worker_id, - "message_id": i, - "payload": str(uuid.uuid4()), # A semi-realistic payload - } - try: - await producer.send(TOPIC, msg, key=str(worker_id)) - messages_sent += 1 - except Exception as e: - print(f"[Worker {worker_id}] Error sending message: {e}") - break # Stop this worker on error - - await producer.close() - print(f"[Worker {worker_id}] Finished. Sent {messages_sent} messages.") - return messages_sent - - -async def main(): - print("--- Starting TinyStream Load Test ---") - print("Configuration:") - print(f" Concurrent Producers: {NUM_PRODUCERS}") - print(f" Messages per Producer: {MESSAGES_PER_PRODUCER}") - print(f" Total Messages: {TOTAL_MESSAGES}") - print("---------------------------------------") - - try: - p = Producer() - await p.connect() - await p.close() - print("Broker connection test successful.") - except ConnectionRefusedError: - print("\n[FATAL ERROR] Could not connect to broker.") - print("Please ensure the broker is running in another terminal.") - return - except Exception as e: - print(f"Broker connection test failed: {e}") - return - - start_time = time.monotonic() - - tasks = [producer_worker(i) for i in range(NUM_PRODUCERS)] - - results: List[int] = await asyncio.gather(*tasks) - - end_time = time.monotonic() - - total_time = end_time - start_time - total_sent = sum(results) - messages_per_second = total_sent / total_time - - print("\n--- Load Test Results ---") - print(f"Total messages sent: {total_sent} / {TOTAL_MESSAGES}") - print(f"Total time taken: {total_time:.2f} seconds") - print(f"Ingestion Throughput: {messages_per_second:,.2f} messages/sec") - print("-------------------------") - - -if __name__ == "__main__": - print( - "NOTE: Clear your log directory (e.g., ./data/tinystream_logs) for a clean run." - ) - asyncio.run(main()) diff --git a/main.py b/main.py deleted file mode 100644 index f8811cd..0000000 --- a/main.py +++ /dev/null @@ -1,16 +0,0 @@ -import asyncio - -from tinystream.broker import Broker - - -def main(): - broker = Broker() - - try: - asyncio.run(broker.start()) - except KeyboardInterrupt: - print("\nBroker shutting down.") - - -if __name__ == "__main__": - main() diff --git a/produce.py b/produce.py deleted file mode 100644 index 7866058..0000000 --- a/produce.py +++ /dev/null @@ -1,63 +0,0 @@ -import asyncio -import random -import uuid - -from tinystream.client.producer import Producer - - -async def main(): - print("Initializing producer...") - producer = Producer(broker_host="localhost", broker_port=9092) - - # --- Data for generating random messages --- - users = ["alice", "bob", "carlos", "denise"] - actions = ["click", "view", "purchase", "scroll"] - items = ["item_A", "item_B", "page_X", "button_Y"] - # --- - - try: - await producer.connect() - print("Producer connected. Sending messages... (Press Ctrl+C to stop)") - - message_count = 0 - # --- Infinite loop --- - while True: - user = random.choice(users) - msg = { - "user": user, - "action": random.choice(actions), - "item": random.choice(items), - "message_id": str(uuid.uuid4()), - "count": message_count, - } - - # 2. Send the message - print(f"Sending: {msg}") - response = await producer.send(topic="clicks", data=msg, key=user) - print(f"Broker response: {response}") - - message_count += 1 - - await asyncio.sleep(1) - - except ConnectionRefusedError: - print("\n[ERROR] Could not connect to broker.") - print("Please ensure the broker is running in another terminal:") - print(" python -m tinystream.broker") - - except KeyboardInterrupt: - print("\n\nStopping producer... (Ctrl+C pressed)") - - except Exception as e: - print(f"\nAn error occurred: {e}") - - finally: - if producer._connection.is_connected: - await producer.close() - print("Producer connection closed.") - else: - print("Producer was not connected.") - - -if __name__ == "__main__": - asyncio.run(main()) diff --git a/pyproject.toml b/pyproject.toml index 2551860..90693af 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -5,7 +5,8 @@ description = "Add your description here" requires-python = ">=3.12" dependencies = [ "aiofiles>=25.1.0", - "msgpack>=1.1.2" + "aiosqlite>=0.21.0", + "msgpack>=1.1.2", ] diff --git a/tinystream/__init__.py b/tinystream/__init__.py index e69de29..0f70ecb 100644 --- a/tinystream/__init__.py +++ b/tinystream/__init__.py @@ -0,0 +1,5 @@ +import os + +DEFAULT_CONFIG_PATH = os.environ.get( + "TINYSTREAM_CONFIG_FILE", "tinystream/config/conf.ini" +) diff --git a/tinystream/broker.py b/tinystream/broker.py index d70656e..c074d0d 100644 --- a/tinystream/broker.py +++ b/tinystream/broker.py @@ -1,21 +1,26 @@ -import asyncio -import os from collections import defaultdict from pathlib import Path from typing import Dict, Any, Optional, Type, Literal +import asyncio +import sys +import argparse +import copy +import pathlib +from aiosqlite import Connection as DBConnection + +from tinystream import DEFAULT_CONFIG_PATH +from tinystream.client.connection import TinyStreamAPI +from tinystream.client.base import BaseAsyncClient +from tinystream.config.parser import TinyStreamConfig from tinystream.partitions.base import BasePartition from tinystream.partitions.partition import SingleLogPartition from tinystream.serializer.base import AbstractSerializer from tinystream.storage.base import AbstractLogStorage -from tinystream.config.parser import load_config - -DEFAULT_CONFIG_PATH = os.environ.get( - "TINYSTREAM_CONFIG_FILE", "tinystream/config/conf.ini" -) +from tinystream.utils.serlializer import init_serializer -class Broker: +class Broker(BaseAsyncClient): """ The main TinyStream server. @@ -23,19 +28,44 @@ class Broker: correct partition. """ - def __init__(self, config_path: Optional[str] = None): - self.config = load_config(file_path=config_path or DEFAULT_CONFIG_PATH) - self.broker_config = self.config["broker"] + def __init__(self, config: TinyStreamConfig, broker_id: Optional[int]) -> None: + self.config = config + self.mode = config.mode + self.broker_id = broker_id + self.broker_config = self.config.get_broker_config() self.host = self.broker_config["host"] self.port = int(self.broker_config["port"]) self.base_log_dir = Path(self.broker_config["partition_log_path"]) self.prefix_size = int(self.broker_config.get("prefix_size", "8")) - self.byte_order: Literal["little", "big"] = self.broker_config.get( + self.byte_order: Literal["little", "big"] = self.broker_config.get( # type: ignore "byte_order", "little" - ) # type: ignore + ) + + metastore_config = self.config.get_metastore_config() + + self.metastore_db_path = Path(metastore_config.get("db_path")) + self.db_conn: Optional[DBConnection] = None + + self.serializer_config = self.config.get_serialization_config() + self.serializer: AbstractSerializer = init_serializer( + self.serializer_config.get("type", "messagepack") + ) - self.serializer: AbstractSerializer = self.init_serializer( - self.broker_config.get("serializer_type", "messagepack") + if self.mode == "cluster": + controller_config = config.get_controller_config() + self.controller_client = TinyStreamAPI( + host=controller_config.get("host"), # type: ignore + port=int(controller_config.get("port")), # type: ignore + serializer=self.serializer, + ) + self.heartbeat_task: Optional[asyncio.Task] = None + + super().__init__( + prefix_size=self.prefix_size, + byte_order=self.byte_order, + serializer=self.serializer, + host=self.host, + port=self.port, ) self.partition_class: Type[BasePartition] = self.init_partition_class( self.broker_config.get("partition_type", "singlelogpartition") @@ -45,15 +75,7 @@ def __init__(self, config_path: Optional[str] = None): # { topic_name -> { partition_id -> BasePartition } } self.topics: Dict[str, Dict[int, BasePartition]] = defaultdict(dict) self._lock = asyncio.Lock() - - @staticmethod - def init_serializer(serializer_name: str) -> AbstractSerializer: - if serializer_name == "messagepack": - from tinystream.serializer.msg_pack import MSGPackSerializer - - return MSGPackSerializer() - else: - raise ValueError(f"Unknown serializer: {serializer_name}") + self._server: Optional[asyncio.Server] = None @staticmethod def init_partition_class(partition_name: str) -> Type[BasePartition]: @@ -80,7 +102,7 @@ async def _create_new_partition( """ A single, centralized method for instantiating a new partition. This ensures all config (serializer, storage_class) is - passed correctly. + passed correctly and the partition is registered in the metastore. """ print(f"Creating new partition: {topic_name}-{partition_id}") @@ -102,6 +124,23 @@ async def _create_new_partition( await partition.load() self.topics[topic_name][partition_id] = partition + + if self.db_conn: + try: + await self.db_conn.execute( + """ + INSERT + OR IGNORE INTO partitions (topic_name, partition_id) + VALUES (?, ?) + """, + (topic_name, partition_id), + ) + await self.db_conn.commit() + except Exception as exception: + print( + f"Warning: Failed to register {topic_name}-{partition_id} in metastore: {exception}" + ) + return partition async def load_partitions(self) -> None: @@ -115,7 +154,6 @@ async def load_partitions(self) -> None: return for topic_dir in self.base_log_dir.iterdir(): - print("topic_dir", topic_dir) if not topic_dir.is_dir(): continue @@ -137,7 +175,6 @@ async def get_or_create_partition( Retrieves a partition, creating it if it doesn't exist. This is a central part of the broker's logic. """ - # Check if it exists first without a lock for speed if partition := self.topics.get(topic_name, {}).get(partition_id): return partition @@ -149,57 +186,53 @@ async def get_or_create_partition( async def start(self) -> None: """Starts the main broker server.""" + await self.init_metastore(db_path=self.metastore_db_path) + + if self.mode == "cluster": + await self.controller_client.ensure_connected() + print(f"[Broker {self.broker_id}] Registering with controller...") + await self.controller_client.send_request( + { + "command": "register_broker", + "broker_id": self.broker_id, + "host": self.host, + "port": self.port, + } + ) + self.heartbeat_task = asyncio.create_task(self._heartbeat_loop()) await self.load_partitions() + await self.start_server() - server = await asyncio.start_server( - self.handle_client_connection, self.host, self.port - ) - - addr = server.sockets[0].getsockname() - print(f"Broker listening on {addr[0]}:{addr[1]}...") - - async with server: - await server.serve_forever() + addr = self._server.sockets[0].getsockname() # type: ignore + print(f"[Broker] listening on {addr[0]}:{addr[1]}...") - async def handle_client_connection( - self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter - ): - """ - Callback for each new client connection. - """ - peer = writer.get_extra_info("peername") - print(f"New connection from {peer}") - try: - while True: - len_prefix_bytes = await reader.readexactly(self.prefix_size) - if not len_prefix_bytes: - break + async with self._server: # type: ignore + await self._server.serve_forever() # type: ignore - payload_len = int.from_bytes(len_prefix_bytes, self.byte_order) + async def close(self): + """Shuts down the broker and closes database connections.""" + print("\n[Broker] shutting down...") - payload_bytes = await reader.readexactly(payload_len) + if self.heartbeat_task: + self.heartbeat_task.cancel() - response = await self.dispatch_request(payload_bytes) + if self.controller_client and self.controller_client.is_connected: + print(f"[Broker {self.broker_id}] Deregistering...") + await self.controller_client.send_request( + {"command": "deregister_broker", "broker_id": self.broker_id} + ) + await self.controller_client.close() - response_bytes = self.serializer.serialize(response) - response_len_prefix = len(response_bytes).to_bytes( - self.prefix_size, self.byte_order - ) + if self._server: + self._server.close() + await self._server.wait_closed() + print("Server socket closed.") - writer.write(response_len_prefix) - writer.write(response_bytes) - await writer.drain() + if self.db_conn: + await self.db_conn.close() + print("Metastore connection closed.") - except asyncio.IncompleteReadError: - print(f"Client {peer} disconnected unexpectedly.") - except Exception as e: - print(f"Error handling client {peer}: {e}") - finally: - print(f"Closing connection from {peer}") - writer.close() - await writer.wait_closed() - - async def dispatch_request(self, payload_bytes: bytes) -> Dict[str, Any]: + async def send_request(self, payload_bytes: bytes) -> Dict[str, Any]: """Deserializes the request and calls the correct handler.""" try: request = self.serializer.deserialize(payload_bytes) @@ -211,11 +244,18 @@ async def dispatch_request(self, payload_bytes: bytes) -> Dict[str, Any]: return await self._handle_read(request) elif command == "get_hwm": return await self._handle_get_hwm(request) + + elif command == "commit_offset": + return await self._handle_commit_offset(request) + else: return {"status": "error", "message": "Unknown command"} - except Exception as e: - return {"status": "error", "message": f"Failed to process request: {e}"} + except Exception as exception: + return { + "status": "error", + "message": f"Failed to process request: {exception}", + } async def _handle_append(self, request: Dict[str, Any]) -> Dict[str, Any]: topic = request["topic"] @@ -246,6 +286,21 @@ async def _handle_read(self, request: Dict[str, Any]) -> Dict[str, Any]: except KeyError: return {"status": "error", "message": "Topic or partition not found"} + async def _heartbeat_loop(self): + """Runs in the background, sending heartbeats to the controller.""" + while True: + try: + await self.controller_client.send_request( + {"command": "heartbeat", "broker_id": self.broker_id} + ) + print(f"[Broker {self.broker_id}] Heartbeat sent.") + except Exception as exception: + print( + f"[Broker {self.broker_id}] Failed to send heartbeat: {exception}" + ) + + await asyncio.sleep(3) + async def _handle_get_hwm(self, request: Dict[str, Any]) -> Dict[str, Any]: """Gets the High Watermark (next write offset) for a partition.""" topic = request["topic"] @@ -258,11 +313,195 @@ async def _handle_get_hwm(self, request: Dict[str, Any]) -> Dict[str, Any]: except KeyError: return {"status": "error", "message": "Topic or partition not found"} + async def _handle_commit_offset(self, request: Dict[str, Any]) -> Dict[str, Any]: + """ + Handles a consumer's request to commit its offset for a partition. + Writes the offset to the 'consumer_group_offsets' table. + """ + try: + group_id = request["group_id"] + topic = request["topic"] + partition_id = request["partition"] + offset = request["offset"] + + if not self.db_conn: + return {"status": "error", "message": "Metastore is not enabled."} + + await self.db_conn.execute( + """ + INSERT OR REPLACE INTO consumer_group_offsets + (group_id, topic_name, partition_id, committed_offset) + VALUES (?, ?, ?, ?) + """, + (group_id, topic, partition_id, offset), + ) + + await self.db_conn.commit() + + return {"status": "ok", "message": "Offset committed"} + + except KeyError as exception: + return { + "status": "error", + "message": f"Missing required field: {exception}", + } + except Exception as exception: + return { + "status": "error", + "message": f"Failed to commit offset: {exception}", + } + + +async def main( + _broker_id: Optional[int] = 0, + mode: Optional[str] = "single", + config: Optional[str] = DEFAULT_CONFIG_PATH, + broker_number: Optional[int] = None, +): + if broker_number and mode == "cluster": + print(f"[Launcher] Starting {broker_number} brokers in test mode...") + broker_instances = [] + + base_config_path = config or DEFAULT_CONFIG_PATH + try: + base_config = TinyStreamConfig.from_ini(base_config_path) + base_port = int(base_config.broker_config.get("port", "909")) + except Exception as exception: + print( + f"FATAL: Could not load base config from {base_config_path}: {exception}" + ) + return + + for i in range(broker_number): + broker_config = copy.deepcopy(base_config) + broker_config.mode = "cluster" + broker_config.broker_config["port"] = f"{base_port + i}" + + print( + f"[Launcher] Preparing Broker {i} on port {broker_config.broker_config['port']}..." + ) + broker_instances.append( + Broker( + config=broker_config, + broker_id=i, + ) + ) + + start_tasks = [b.start() for b in broker_instances] + try: + await asyncio.gather(*start_tasks) + except KeyboardInterrupt: + print("\n[Launcher] Caught interrupt, shutting down all brokers...") + finally: + print("[Launcher] Closing all brokers...") + close_tasks = [b.close() for b in broker_instances] + await asyncio.gather(*close_tasks) + return + + else: + broker_id_to_use = _broker_id + + try: + config_obj = TinyStreamConfig.from_ini(config or DEFAULT_CONFIG_PATH) + except Exception as exception: + print(f"FATAL: Could not load config from {config}: {exception}") + return + + config_obj.mode = mode # type: ignore + + base_port = int(config_obj.broker_config.get("port", "909")) + broker_port = base_port + broker_id_to_use # type: ignore + config_obj.broker_config["port"] = f"{broker_port}" + + broker = Broker( + config=config_obj, + broker_id=broker_id_to_use, + ) + + try: + if mode == "cluster": + print( + f"\n[Broker {broker_id_to_use}] Starting in CLUSTER mode on port {broker_port}..." + ) + else: + print(f"\n[Broker 0] Starting in SINGLE mode on port {broker_port}...") + + await broker.start() + + except KeyboardInterrupt: + if mode == "cluster": + print( + f"\n[Broker {broker_id_to_use}] Caught interrupt, shutting down..." + ) + else: + print("\n[Broker 0] Caught interrupt, shutting down...") + finally: + await broker.close() + if __name__ == "__main__": - broker = Broker() + + def print_usage(parser_instance, message): + """Prints a validation error and the parser's help message.""" + print(f"Error: {message}\n") + parser_instance.print_help() + sys.exit(1) + + parser = argparse.ArgumentParser(description="Start TinyStream broker(s).") + parser.add_argument( + "--mode", + choices=["single", "cluster"], + default="single", + help="Broker mode. 'single' for standalone, 'cluster' to connect to a controller.", + ) + parser.add_argument( + "--broker-id", + type=int, + help="Broker ID (required in 'cluster' mode when starting a single broker).", + ) + parser.add_argument( + "--config", + type=str, + default=DEFAULT_CONFIG_PATH, + help=f"Config file path (default: {DEFAULT_CONFIG_PATH})", + ) + parser.add_argument( + "--broker-number", + type=int, + help="[TESTING] Start N brokers in a single process. Overrides other settings.", + ) + args = parser.parse_args() + + _broker_id_to_pass = 0 + + if args.broker_number: + if args.broker_number <= 0: + print_usage(parser, "--broker-number must be greater than 0.") + + args.mode = "cluster" + _broker_id_to_pass = 0 + + else: + if args.mode == "cluster" and args.broker_id is None: + print_usage(parser, "--broker_id is required in 'cluster' mode.") + + _broker_id_to_pass = args.broker_id if args.broker_id is not None else 0 + + config_path = pathlib.Path(args.config) + if not config_path.is_file(): + print(f"Error: Config file not found at {config_path}") + sys.exit(1) try: - asyncio.run(broker.start()) - except KeyboardInterrupt: - print("\nBroker shutting down.") + asyncio.run( + main( + _broker_id=_broker_id_to_pass, + mode=args.mode, + config=str(config_path), + broker_number=args.broker_number, + ) + ) + except Exception as e: + print(f"FATAL: Broker main loop crashed: {e}") + # (Consider adding `import traceback; traceback.print_exc()` for debug) + sys.exit(1) diff --git a/tinystream/client/__init__.py b/tinystream/client/__init__.py index ab8d3a4..e69de29 100644 --- a/tinystream/client/__init__.py +++ b/tinystream/client/__init__.py @@ -1,6 +0,0 @@ -from connection import TinyStreamAPI -from consumer import Consumer -from producer import Producer - - -__all__ = ["TinyStreamAPI", "Consumer", "Producer"] diff --git a/tinystream/client/base.py b/tinystream/client/base.py new file mode 100644 index 0000000..2b4a61a --- /dev/null +++ b/tinystream/client/base.py @@ -0,0 +1,106 @@ +import asyncio +import pathlib +from typing import Literal, Optional, Dict, Any +from aiosqlite import Connection as DBConnection + +import aiosqlite + +from tinystream.utils.db import create_db_schemas + + +class BaseAsyncClient: + def __init__( + self, + prefix_size=8, + byte_order: Literal["little", "big"] = "little", + serializer=None, + host: str = "localhost", + port: int = 9093, + ): + self.prefix_size = prefix_size + self.byte_order = byte_order + self.serializer = serializer + self._server: Optional[asyncio.Server] = None + self.host = host + self.port = port + self.db_connection: Optional[DBConnection] = None + + async def start_server(self): + self._server = await asyncio.start_server( + self.handle_client_connection, self.host, self.port + ) + + async def close_server(self): + if self._server: + self._server.close() + await self._server.wait_closed() + + async def ping(self): + return self._server.is_serving() + + async def send_request(self, request_data: bytes) -> Dict[str, Any]: + raise NotImplementedError() + + async def handle_client_connection( + self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter + ): + peer = writer.get_extra_info("peername") + print(f"[Server] New connection from {peer}") + try: + while True: + len_prefix_bytes = await reader.readexactly(self.prefix_size) + if not len_prefix_bytes: + break + + payload_len = int.from_bytes(len_prefix_bytes, self.byte_order) + payload_bytes = await reader.readexactly(payload_len) + + response = await self.send_request(payload_bytes) + + response_bytes = self.serializer.serialize(response) + + response_len_prefix = len(response_bytes).to_bytes( + self.prefix_size, self.byte_order + ) + writer.write(response_len_prefix) + writer.write(response_bytes) + await writer.drain() + + except asyncio.IncompleteReadError: + print(f"[Server] Client {peer} disconnected unexpectedly.") + + except Exception as e: + print(f"[Server] FATAL error handling client {peer}: {e}") + try: + error_response = { + "status": "error", + "message": f"Fatal server error: {e}", + } + response_bytes = self.serializer.serialize(error_response) + response_len_prefix = len(response_bytes).to_bytes( + self.prefix_size, self.byte_order + ) + writer.write(response_len_prefix) + writer.write(response_bytes) + await writer.drain() + except Exception as e2: + print(f"[Server] Could not send error response to {peer}: {e2}") + + finally: + print(f"[Server] Closing connection from {peer}") + writer.close() + await writer.wait_closed() + + async def init_metastore(self, db_path: pathlib.Path): + print(f"Initializing metastore at {db_path}...") + try: + db_path = pathlib.Path(db_path).resolve() + db_path.parent.mkdir(parents=True, exist_ok=True) + self.db_connection = await aiosqlite.connect(db_path) + await self.db_connection.execute("PRAGMA journal_mode=WAL;") + await create_db_schemas(connection=self.db_connection) + await self.db_connection.commit() + print("[Controller] Metastore tables initialized.") + except Exception as e: + print(f"[Controller] FATAL: Could not initialize metastore: {e}") + raise diff --git a/tinystream/client/connection.py b/tinystream/client/connection.py index ec06735..b9c5bf7 100644 --- a/tinystream/client/connection.py +++ b/tinystream/client/connection.py @@ -36,14 +36,14 @@ def __init__( self.is_connected = False async def _connect(self) -> None: - """Establishes a connection to the broker.""" + """Establishes a connection to the controller.""" # Use a lock to prevent multiple coroutines from trying to # connect at the same time. async with self._connect_lock: if self.is_connected: return - print(f"Connecting to broker at {self.host}:{self.port}...") + print(f"Connecting to controller at {self.host}:{self.port}...") try: self._reader, self._writer = await asyncio.open_connection( self.host, self.port @@ -51,7 +51,7 @@ async def _connect(self) -> None: self.is_connected = True print("Connection successful.") except (OSError, ConnectionRefusedError) as e: - print(f"Failed to connect to broker: {e}") + print(f"Failed to connect to controller: {e}") self.is_connected = False raise @@ -100,10 +100,12 @@ async def send_request(self, request: Dict[str, Any]) -> Dict[str, Any]: return response except (asyncio.IncompleteReadError, ConnectionResetError) as e: - print(f"Connection lost: {e}. Attempting to reconnect on next call.") + print( + f"Connection lost: {e}. Attempting to reconnect on next call. Request data: {request}" + ) await self.close() raise ConnectionError("Connection lost while processing request.") except Exception as e: print(f"An error occurred: {e}") - await self.close() # Close connection on unknown error + await self.close() raise diff --git a/tinystream/client/consumer.py b/tinystream/client/consumer.py index 21d5001..81b69bd 100644 --- a/tinystream/client/consumer.py +++ b/tinystream/client/consumer.py @@ -1,78 +1,166 @@ -from typing import Any, Dict, List, Tuple +import asyncio +import random +import uuid +from typing import Any, Dict, List, Tuple, Optional +from tinystream import DEFAULT_CONFIG_PATH from tinystream.client.connection import TinyStreamAPI -from tinystream.serializer.msg_pack import MSGPackSerializer +from tinystream.cluster_manager import ClusterManager +from tinystream.config.parser import TinyStreamConfig +from tinystream.serializer.base import AbstractSerializer +from tinystream.utils.serlializer import init_serializer class Consumer: """ A stateful consumer that tracks its own offsets for assigned partitions. + Supports both "single" broker and "cluster" (controller-aware) modes. """ def __init__( - self, host: str = "localhost", port: int = 9092, serializer: Any = None + self, + group_id: str, + config: TinyStreamConfig, ) -> None: - self.host = host - self.port = port - self._connection = TinyStreamAPI( - host=self.host, - port=self.port, - serializer=serializer or MSGPackSerializer(), - prefix_size=8, - byte_order="little", + self.group_id = group_id + self.config = config + self.mode = self.config.mode + + serializer_config = config.get_serialization_config() + self.serializer: AbstractSerializer = init_serializer( + serializer_config.get("type", "messagepack") ) - # Key improvement: - # Stores the *next offset* to read for each partition. - # Format: { (topic, partition_id): next_offset } + if self.mode == "cluster": + controller_config = config.get_controller_config() + self.controller_host = controller_config.get("host") + self.controller_port = controller_config.get("port") + self._controller_connection: Optional[TinyStreamAPI] = TinyStreamAPI( + self.controller_host, # type: ignore + self.controller_port, # type: ignore + serializer=self.serializer, # type: ignore + ) + + self._topic_metadata_cache: Dict[str, Dict[int, Any]] = {} + self._broker_info_cache: Dict[int, Any] = {} + self._broker_connections: Dict[Tuple[str, int], TinyStreamAPI] = {} + self._metadata_lock = asyncio.Lock() + self.cluster_manager = ClusterManager( + self.mode, + self._topic_metadata_cache, + self._broker_info_cache, + self._broker_connections, + self.serializer, + ) + + else: + self.mode = "single" + broker_config = config.get_broker_config() + broker_host = broker_config.get("host") + broker_port = broker_config.get("port") + self._single_broker_connection = TinyStreamAPI( + broker_host, # type: ignore + broker_port, # type: ignore + serializer=self.serializer, # type: ignore + ) + + # { (topic, partition_id): next_offset } self._assignments: Dict[Tuple[str, int], int] = {} # Caches high-watermarks to avoid polling empty partitions self._hwms: Dict[Tuple[str, int], int] = {} async def connect(self) -> None: - await self._connection.ensure_connected() + """Explicitly connects to the controller or the single broker.""" + if self.mode == "cluster": + print("[Consumer] (Cluster Mode): Connecting to controller...") + await self._controller_connection.ensure_connected() # type: ignore + await self._refresh_cluster_metadata() + + elif self.mode == "single": + print("[Consumer] (Single Mode): Connecting to broker...") + await self._single_broker_connection.ensure_connected() async def close(self) -> None: - await self._connection.close() + """Closes all active connections.""" + print("Closing consumer connections...") + if self.mode == "cluster": + if self._controller_connection: + await self._controller_connection.close() + for conn in self._broker_connections.values(): + await conn.close() + + elif self.mode == "single": + if self._single_broker_connection: + await self._single_broker_connection.close() + + async def is_connected(self) -> bool: + """Checks if the consumer is connected.""" + if self.mode == "cluster": + if not self._controller_connection.is_connected: # type: ignore + return False + for conn in self._broker_connections.values(): + if not conn.is_connected: + return False + return True + + elif self.mode == "single": + return self._single_broker_connection.is_connected + + return False def assign(self, topic: str, partition: int = 0, start_offset: int = 0) -> None: """ Assigns this consumer to a specific partition, starting from a given offset. Call this *before* polling. - - Args: - topic: The topic to read from. - partition: The partition to read from. - start_offset: The logical offset to start reading from (e.g., 0). """ key = (topic, partition) self._assignments[key] = start_offset self._hwms[key] = 0 - print(f"Consumer assigned to {topic}-{partition} at offset {start_offset}") + print(f"[Consumer] assigned to {topic}-{partition} at offset {start_offset}") + + async def _get_connection_for_partition( + self, topic: str, partition: int + ) -> TinyStreamAPI: + """ + Gets the correct broker connection for a partition based on mode. + """ + if self.mode == "single": + await self._single_broker_connection.ensure_connected() # type: ignore + return self._single_broker_connection + else: + return await self.cluster_manager.get_leader_connection(topic, partition) async def _update_high_watermarks(self) -> None: """ - Asks the broker for the latest high-watermark (HWM) for all - assigned partitions. This tells us what the last available message offset is. + Asks the correct broker for the latest HWM for all assigned partitions. """ for topic, part in self._assignments.keys(): try: - resp = await self._connection.send_request( + conn = await self._get_connection_for_partition(topic, part) + + resp = await conn.send_request( {"command": "get_hwm", "topic": topic, "partition": part} ) if resp.get("status") == "ok": self._hwms[(topic, part)] = resp["high_watermark"] + else: + print( + f"Failed to get HWM for {topic}-{part}: {resp.get('message')}" + ) + if self.mode == "cluster": + await self.cluster_manager.invalidate_caches(conn) + except Exception as e: print(f"Failed to get HWM for {topic}-{part}: {e}") + if self.mode == "cluster": + await self.cluster_manager.invalidate_caches() async def poll(self, max_messages: int = 100) -> List[Dict[str, Any]]: results: List[Dict[str, Any]] = [] - # 1. First, find out what messages are available await self._update_high_watermarks() - # 2. Keep fetching round-robin until batch is full or we run out while len(results) < max_messages: messages_polled_this_round = 0 @@ -82,11 +170,11 @@ async def poll(self, max_messages: int = 100) -> List[Dict[str, Any]]: hwm = self._hwms.get((topic, part), next_offset) - # 3. If our offset is less than HWM, data is available if next_offset < hwm: try: - # 4. Send the read request for the *specific* offset - resp = await self._connection.send_request( + conn = await self._get_connection_for_partition(topic, part) + + resp = await conn.send_request( { "command": "read", "topic": topic, @@ -97,22 +185,187 @@ async def poll(self, max_messages: int = 100) -> List[Dict[str, Any]]: if resp.get("status") == "ok": results.append(resp["data"]) - # 5. CRITICAL: Update our internal state self._assignments[(topic, part)] = next_offset + 1 messages_polled_this_round += 1 else: - # Error (e.g., offset deleted?), stop polling this partition print( f"Read error on {topic}-{part} at {next_offset}: {resp.get('message')}" ) - self._hwms[(topic, part)] = 0 # Invalidate HWM cache + self._hwms[(topic, part)] = 0 + if self.mode == "cluster": + await self.cluster_manager.invalidate_caches(conn) except Exception as e: print(f"Failed to read from {topic}-{part}: {e}") - self._hwms[(topic, part)] = 0 # Invalidate HWM + self._hwms[(topic, part)] = 0 + if self.mode == "cluster": + await self.cluster_manager.invalidate_caches() - # If we went through all partitions and got nothing, we're caught up. if messages_polled_this_round == 0: break return results + + async def commit(self) -> List[Dict[str, Any]]: + """ + Commits the current tracked offsets for all assigned partitions + to the correct broker (leader or single). + """ + print(f"Committing offsets for group '{self.group_id}'...") + commit_tasks = [] + + for (topic, part), offset in self._assignments.items(): + request = { + "command": "commit_offset", + "group_id": self.group_id, + "topic": topic, + "partition": part, + "offset": offset, + } + + commit_tasks.append(self._send_commit_request(topic, part, request)) + + responses = await asyncio.gather(*commit_tasks) + print(f"Commit finished. Responses: {responses}") + return responses + + async def _send_commit_request( + self, topic: str, partition: int, request: Dict[str, Any] + ) -> Dict[str, Any]: + """Helper to send a commit request to the correct broker.""" + try: + conn = await self._get_connection_for_partition(topic, partition) + return await conn.send_request(request) + except Exception as e: + print(f"Failed to commit for {topic}-{partition}: {e}") + if self.mode == "cluster": + await self.cluster_manager.invalidate_caches() + return {"status": "error", "message": str(e)} + + async def _refresh_cluster_metadata(self) -> None: + """Fetches the latest cluster state from the controller.""" + if self.mode != "cluster": + return + + async with self._metadata_lock: + print("[Consumer]: Refreshing cluster metadata from controller...") + try: + await self._controller_connection.ensure_connected() # type: ignore + response = await self._controller_connection.send_request( # type: ignore + {"command": "get_cluster_metadata"} + ) + + if response.get("status") == "ok": + metadata = response["metadata"] + self._broker_info_cache = metadata.get("brokers", {}) + self._topic_metadata_cache = metadata.get("partitions", {}) + print("[Consumer]: Metadata refreshed.") + else: + print( + f"[Consumer]: Failed to refresh metadata: {response.get('message')}" + ) + except Exception as e: + print(f"[Consumer]: Error refreshing metadata: {e}") + + +async def main( + config: Optional[str] = DEFAULT_CONFIG_PATH, + topic: Optional[str] = None, + mode: str = "single", + group_id: Optional[str] = None, +) -> None: + config = TinyStreamConfig.from_ini(config or DEFAULT_CONFIG_PATH) # type: ignore + config.mode = mode # type: ignore + + if not group_id: + print("[Consumer] No group_id provided, generating a random one.") + group_id = f"consumer-{uuid.uuid4()}" + + if not topic: + print( + "[Consumer] No topic provided. Will randomly pick between available test topics." + ) + topic = random.choices(["click", "view", "purchase", "scroll"], k=1)[0] + + consumer = Consumer( + config=config, # type: ignore + group_id=group_id, + ) + + start_offset = 0 + partition = 0 + + try: + await consumer.connect() + print("Consumer connected.") + + consumer.assign(topic=topic, partition=partition, start_offset=start_offset) + print( + f"Assigned to {topic}-{partition} at offset {start_offset}. Polling... (Press Ctrl+C to stop)" + ) + + while True: + batch = await consumer.poll(max_messages=10) + + if batch: + print("--- Received batch ---") + for msg in batch: + print(f"Received: {msg}") + print("----------------------") + await consumer.commit() + else: + await asyncio.sleep(1) + + except ConnectionRefusedError: + print("\n[ERROR] Could not connect to broker.") + print("Please ensure the broker is running in another terminal:") + print(" python -m tinystream.broker") + + except KeyboardInterrupt: + print("\n\nStopping consumer... (Ctrl+C pressed)") + + except Exception as e: + print(f"\nAn error occurred: {e}") + + finally: + if consumer.is_connected(): # type: ignore + await consumer.close() + print("Consumer connection closed.") + else: + print("Consumer was not connected.") + + +if __name__ == "__main__": + import sys + import argparse + + def print_usage(): + print( + "Usage: python consumer.py [--config CONFIG_PATH] [--mode single|cluster] [--group_id GROUP_ID]" + ) + sys.exit(1) + + parser = argparse.ArgumentParser(description="TinyStream Producer") + parser.add_argument( + "--config", + type=str, + default=DEFAULT_CONFIG_PATH, + help="Path to TinyStream configuration file", + ) + parser.add_argument( + "--mode", type=str, default="single", choices=["single", "cluster"] + ) + parser.add_argument( + "--topic", required=True, type=str, help="Topic to consume from" + ) + parser.add_argument("--group_id", type=str, help="Consumer group ID") + args = parser.parse_args() + config_path = args.config + asyncio.run( + main( + config=config_path, + mode=args.mode, + group_id=args.group_id, + topic=args.topic, + ) + ) diff --git a/tinystream/client/producer.py b/tinystream/client/producer.py index 199cdf4..a677cc3 100644 --- a/tinystream/client/producer.py +++ b/tinystream/client/producer.py @@ -1,71 +1,145 @@ +import asyncio import hashlib -from typing import Any, Optional, Dict +from typing import Any, Optional, Dict, Tuple +from tinystream import DEFAULT_CONFIG_PATH from tinystream.client.connection import TinyStreamAPI -from tinystream.serializer.msg_pack import MSGPackSerializer +from tinystream.cluster_manager import ClusterManager +from tinystream.config.parser import TinyStreamConfig +from tinystream.serializer.base import AbstractSerializer +from tinystream.utils.serlializer import init_serializer class Producer: """ - It manages a connection to the broker and provides a simple - `send` method. + Manages connections and provides a `send` method. + Supports two modes: + 1. "single": Connects directly to a single broker. + 2. "cluster": Connects to a controller to discover leader brokers. """ - def __init__( - self, broker_host: str = "localhost", broker_port: int = 9092, serializer=None - ) -> None: - self.host = broker_host - self.port = broker_port - self._connection = TinyStreamAPI( - self.host, self.port, serializer=serializer or MSGPackSerializer() + def __init__(self, config: TinyStreamConfig) -> None: + self.config = config + self.mode = config.mode + + self.serializer_config = config.get_serialization_config() + + self.serializer: AbstractSerializer = init_serializer( + self.serializer_config.get("type", "messagepack") ) - # TODO: This should be dynamic from the broker - self._partition_count = 1 + if self.mode == "cluster": + controller_config = config.get_controller_config() + self.controller_host = controller_config.get("host") + self.controller_port = controller_config.get("port") + self._controller_connection: Optional[TinyStreamAPI] = TinyStreamAPI( + self.controller_host, # type: ignore + self.controller_port, # type: ignore + serializer=self.serializer, # type: ignore + ) + + self._topic_metadata_cache: Dict[str, Dict[int, Any]] = {} + self._broker_info_cache: Dict[int, Any] = {} + self._broker_connections: Dict[Tuple[str, int], TinyStreamAPI] = {} + self._metadata_lock = asyncio.Lock() + self.cluster_manager = ClusterManager( + self.mode, + self._topic_metadata_cache, + self._broker_info_cache, + self._broker_connections, + self.serializer, + ) + + else: + self.mode = "single" + broker_config = config.get_broker_config() + broker_host = broker_config.get("host") + broker_port = broker_config.get("port") + self._single_broker_connection = TinyStreamAPI( + broker_host, # type: ignore + broker_port, # type: ignore + serializer=self.serializer, # type: ignore + ) + self._default_partition_count = 1 + + async def is_connected(self) -> bool: + if self.mode == "cluster": + if ( + not self._controller_connection + or not self._controller_connection.is_connected + ): + return False + for conn in self._broker_connections.values(): + if not conn.is_connected: + return False + return True + else: + return self._single_broker_connection.is_connected async def connect(self) -> None: - """Explicitly connects to the broker.""" - await self._connection.ensure_connected() + """Explicitly connects to the controller or the single broker.""" + if self.mode == "cluster": + print("[Producer] (Cluster Mode): Connecting to controller...") + await self._controller_connection.ensure_connected() # type: ignore + await self._refresh_cluster_metadata() + + elif self.mode == "single": + print("[Producer] (Single Mode): Connecting to broker...") + await self._single_broker_connection.ensure_connected() async def close(self) -> None: - """Closes the connection to the broker.""" - await self._connection.close() + """Closes all active connections.""" + print("Closing producer connections...") + if self.mode == "cluster": + if self._controller_connection: + await self._controller_connection.close() + for conn in self._broker_connections.values(): + await conn.close() + self._broker_connections.clear() + self._topic_metadata_cache.clear() + self._broker_info_cache.clear() + + elif self.mode == "single": + if self._single_broker_connection: + await self._single_broker_connection.close() - def _get_partition(self, key: Optional[bytes]) -> int: + @staticmethod + def _get_partition_id(key: Optional[bytes], partition_count: int) -> int: """ Determines the partition for a given key. - If no key, defaults to partition 0. """ if key is None: - # For now, just send to partition 0 if no key return 0 - # Simple hash-based partitioning hash_bytes = hashlib.md5(key).digest() hash_int = int.from_bytes(hash_bytes, "little") - # In v0.1, we only have one partition (0) - # In a future version, we'd do: - # return hash_int % self._partition_count - return hash_int % self._partition_count + return hash_int % partition_count + + async def _get_partition_count(self, topic: str) -> int: + """Gets partition count for a topic based on mode.""" + if self.mode == "cluster": + if topic not in self._topic_metadata_cache: + await self._refresh_cluster_metadata() + + topic_partitions = self._topic_metadata_cache.get(topic) + if not topic_partitions: + raise ValueError(f"Topic '{topic}' not found in cluster metadata.") + return len(topic_partitions) + else: + return self._default_partition_count async def send( - self, topic: str, data: Any, key: Optional[str] = None + self, topic: str, data: Any, key: Optional[str] = None, retries: int = 3 ) -> Dict[str, Any]: """ Sends a message to a topic. - - Args: - topic: The name of the topic. - data: The message payload (any msgpack-serializable object). - key: An optional key (str). If provided, ensures all messages - with the same key go to the same partition. - - Returns: - The response from the broker (e.g., {"status": "ok", "offset": 0}). + Behavior depends on the producer's mode (single vs. cluster). """ key_bytes = key.encode("utf-8") if key else None - partition_id = self._get_partition(key_bytes) + + partition_count = await self._get_partition_count(topic) + partition_id = self._get_partition_id(key_bytes, partition_count) request = { "command": "append", @@ -74,9 +148,154 @@ async def send( "data": data, } + if self.mode == "cluster": + return await self._send_cluster(request, retries) + else: + return await self._send_single(request) + + async def _send_single(self, request: Dict[str, Any]) -> Dict[str, Any]: + """Handles sending in single-broker mode.""" try: - response = await self._connection.send_request(request) + await self._single_broker_connection.ensure_connected() + response = await self._single_broker_connection.send_request(request) return response - except ConnectionError as e: - print(f"Failed to send message: {e}") - return {"status": "error", "message": str(e)} + except (ConnectionError, asyncio.TimeoutError) as e: + print(f"[Producer] (Single Mode): Connection error: {e}") + raise e + except Exception as e: + print(f"[Producer] (Single Mode): Unexpected error: {e}") + raise e + + async def _send_cluster( + self, request: Dict[str, Any], retries: int + ) -> Dict[str, Any]: + """Handles sending in cluster mode with leader discovery and retries.""" + topic = request["topic"] + partition_id = request["partition"] + + for attempt in range(retries): + connection = None + try: + connection = await self.cluster_manager.get_leader_connection( + topic, partition_id + ) + + response = await connection.send_request(request) + + if response.get("status") == "ok": + return response + + print( + f"[Producer]: Broker error: {response.get('message')}. Retrying..." + ) + await self.cluster_manager.invalidate_caches(connection) + + except (ConnectionError, asyncio.TimeoutError) as e: + print(f"[Producer]: Connection error: {e}. Retrying...") + await self.cluster_manager.invalidate_caches(connection) + + except Exception as e: + print(f"[Producer]: Unexpected error: {e}") + raise e + + await asyncio.sleep(0.5 * (attempt + 1)) + + raise ConnectionError( + f"Failed to send message to topic '{topic}' after {retries} retries." + ) + + async def _refresh_cluster_metadata(self) -> None: + """Fetches the latest cluster state from the controller.""" + if self.mode != "cluster": + return + + async with self._metadata_lock: + try: + await self._controller_connection.ensure_connected() # type: ignore + response = await self._controller_connection.send_request( # type: ignore + {"command": "get_cluster_metadata"} + ) + + if response.get("status") == "ok": + metadata = response["metadata"] + self._broker_info_cache = metadata.get("brokers", {}) + self._topic_metadata_cache = metadata.get("partitions", {}) + print("[Producer]: Metadata refreshed.") + else: + print( + f"[Producer]: Failed to refresh metadata: {response.get('message')}" + ) + except Exception as e: + print(f"[Producer]: Error refreshing metadata: {e}") + + +async def main( + config: Optional[str] = DEFAULT_CONFIG_PATH, mode: str = "single" +) -> None: + config = TinyStreamConfig.from_ini(config or DEFAULT_CONFIG_PATH) # type: ignore + config.mode = mode # type: ignore + producer = Producer(config=config) # type: ignore + + dummy_events = [ + {"user": "alice", "action": "click", "item": "item_A"}, + {"user": "bob", "action": "view", "item": "page_X"}, + {"user": "carlos", "action": "purchase", "item": "item_B"}, + {"user": "denise", "action": "scroll", "item": "button_Y"}, + ] + + try: + await producer.connect() + print("Producer connected. Sending messages... (Press Ctrl+C to stop)") + + message_count = 0 + while True: + event = dummy_events[message_count % len(dummy_events)] + event["message_id"] = str(message_count) + + print(f"Sending: {event}") + response = await producer.send( + topic=f"{event["action"]}s", data=event, key=event["user"] + ) + print(f"Broker response: {response}") + + message_count += 1 + await asyncio.sleep(1) + except ConnectionRefusedError: + print("\n[ERROR] Could not connect to broker.") + print("Please ensure the broker is running in another terminal:") + print(" python -m tinystream.broker") + except KeyboardInterrupt: + print("\n\nStopping producer... (Ctrl+C pressed)") + except Exception as e: + print(f"\nAn error occurred: {e}") + finally: + if await producer.is_connected(): + await producer.close() + print("Producer connection closed.") + else: + print("Producer was not connected.") + + +if __name__ == "__main__": + import sys + import argparse + + def print_usage(): + print( + "Usage: python -m tinystream.client.producer --config --mode " + ) + sys.exit(1) + + parser = argparse.ArgumentParser(description="TinyStream Producer") + parser.add_argument( + "--config", + type=str, + default=DEFAULT_CONFIG_PATH, + help="Path to TinyStream configuration file", + ) + parser.add_argument( + "--mode", type=str, default="single", choices=["single", "cluster"] + ) + args = parser.parse_args() + config_path = args.config + asyncio.run(main(config=config_path, mode=args.mode)) diff --git a/tinystream/cluster_manager.py b/tinystream/cluster_manager.py new file mode 100644 index 0000000..1beb67e --- /dev/null +++ b/tinystream/cluster_manager.py @@ -0,0 +1,84 @@ +from typing import Dict, Any, Tuple, Optional +from tinystream.client.connection import TinyStreamAPI +from tinystream.serializer.base import AbstractSerializer + + +class ClusterManager: + def __init__( + self, + mode: str, + topic_metadata_cache: Dict[str, Dict[int, Any]], + broker_info_cache: Dict[int, Any], + broker_connections: Dict[Tuple[str, int], TinyStreamAPI], + serializer: AbstractSerializer, + ) -> None: + self.mode = mode + self._topic_metadata_cache = topic_metadata_cache + self._broker_info_cache = broker_info_cache + self._broker_connections = broker_connections + self.serializer = serializer + + async def _refresh_cluster_metadata(self): + raise NotImplementedError("This method should be implemented in the subclass.") + + async def get_leader_connection( + self, topic: str, partition_id: int + ) -> TinyStreamAPI: + """ + Gets an active connection to the leader broker for a given partition. + Handles cache misses by querying the controller. + """ + if self.mode != "cluster": + raise RuntimeError("Cannot get leader connection in single-broker mode.") + + topic_partitions = self._topic_metadata_cache.get(topic) + if not topic_partitions: + await self._refresh_cluster_metadata() + topic_partitions = self._topic_metadata_cache.get(topic) + if not topic_partitions: + raise ValueError(f"Topic '{topic}' not found after refresh.") + + partition_info = topic_partitions.get(partition_id) + if not partition_info: + raise ValueError(f"Partition {partition_id} for topic '{topic}' not found.") + + leader_id = partition_info.get("leader") + if leader_id is None: + raise ConnectionError(f"No leader available for {topic}-{partition_id}.") + + broker_info = self._broker_info_cache.get(leader_id) + if not broker_info: + await self._refresh_cluster_metadata() + broker_info = self._broker_info_cache.get(leader_id) + if not broker_info: + raise ValueError(f"Broker {leader_id} not found after refresh.") + + host, port = broker_info["host"], broker_info["port"] + + conn = self._broker_connections.get((host, port)) + if not conn or not conn.is_connected: + print( + f"Producer: Creating new connection to leader {leader_id} at {host}:{port}" + ) + conn = TinyStreamAPI(host, port, serializer=self.serializer) + self._broker_connections[(host, port)] = conn + + await conn.ensure_connected() + return conn + + async def invalidate_caches(self, connection: Optional[TinyStreamAPI] = None): + """Invalidates all metadata and optionally closes a bad connection.""" + if self.mode != "cluster": + return + + print("Producer: Invalidating caches due to error.") + + self._topic_metadata_cache.clear() + self._broker_info_cache.clear() + + if connection: + conn_key = (connection.host, connection.port) + if conn_key in self._broker_connections: + await self._broker_connections.pop(conn_key).close() + + await self._refresh_cluster_metadata() diff --git a/tinystream/config/__init__.py b/tinystream/config/__init__.py index e69de29..9872b7d 100644 --- a/tinystream/config/__init__.py +++ b/tinystream/config/__init__.py @@ -0,0 +1,3 @@ +from tinystream.config.parser import TinyStreamConfig + +__all__ = ["TinyStreamConfig"] diff --git a/tinystream/config/conf.ini b/tinystream/config/conf.ini index f5235a7..11c91f0 100644 --- a/tinystream/config/conf.ini +++ b/tinystream/config/conf.ini @@ -1,9 +1,24 @@ +[default] +mode = broker + +[serialization] +type = messagepack + [broker] partition_log_path = ./data/tinystream/log_partitions partition_type = singlelogpartition log_storage_type = filelogstorage -serializer_type = messagepack host = localhost -port = 9092 +port = 9094 prefix_size = 8 byte_order = little + +[controller] +host = localhost +port = 9093 +heartbeat_timeout = 10 +prefix_size = 8 +byte_order = little + +[metastore] +db_path = ./data/metastore/tinystream.meta.db diff --git a/tinystream/config/parser.py b/tinystream/config/parser.py index 3e869d0..8213263 100644 --- a/tinystream/config/parser.py +++ b/tinystream/config/parser.py @@ -1,18 +1,126 @@ import configparser import os +from typing import Optional, Dict, Any, Literal class EnvInterpolation(configparser.BasicInterpolation): + """ + Interpolation class that expands environment variables in config values. + Example: `host = $HOST` will be replaced by the value of the 'HOST' env var. + """ + def before_get(self, parser, section, option, value, defaults): - return os.path.expandvars(value) + value = os.path.expandvars(value) + if "$" in value: + return os.environ.get(value.replace("$", ""), value) + return value class EnvConfigParser(configparser.ConfigParser): + """ + A ConfigParser that automatically uses the EnvInterpolation class. + """ + def __init__(self, *args, **kwargs): super().__init__(*args, interpolation=EnvInterpolation(), **kwargs) -def load_config(file_path: str) -> EnvConfigParser: - config = EnvConfigParser() - config.read(file_path) - return config +class TinyStreamConfig: + """ + A unified config object for all TinyStream components. + + It can be initialized from an INI file (with env var support) + or a dictionary, and intelligently determines the operation + mode ("single" vs "cluster"). + """ + + controller_config: Dict[str, Any] + broker_config: Dict[str, Any] + serialization: Dict[str, Any] + metastore: Dict[str, Any] + + mode: Literal["single", "cluster"] + + def __init__( + self, + controller_config: Optional[Dict[str, Any]] = None, + broker_config: Optional[Dict[str, Any]] = None, + serialization: Optional[Dict[str, Any]] = None, + metastore: Optional[Dict[str, Any]] = None, + ): + """ + Initializes the config object. + Use .from_ini() or .from_dict() factory methods instead. + """ + self.controller_config = controller_config or {} + self.broker_config = broker_config or {} + self.serialization = serialization or {} + self.metastore = metastore or {} + + if self.controller_config: + self.mode = "cluster" + elif self.broker_config: + self.mode = "single" + else: + raise ValueError( + "Config is empty. Must contain a [controller] or [broker] section." + ) + + @classmethod + def from_ini(cls, file_path: str) -> "TinyStreamConfig": + """ + Creates a TinyStreamConfig object from an INI file. + + This method now uses EnvConfigParser to support + environment variable interpolation. + """ + parser = EnvConfigParser() + + if not parser.read(file_path): + raise FileNotFoundError(f"Config file not found or empty: {file_path}") + + config_dict = {section: dict(parser[section]) for section in parser.sections()} + return cls.from_dict(config_dict) + + @classmethod + def from_dict(cls, config: Dict[str, Any]): + """Creates a TinyStreamConfig object from a dictionary.""" + controller_conf = config.get("controller") + broker_conf = config.get("broker") + serialization = config.get("serialization") + metastore = config.get("metastore") + + return cls( + controller_config=controller_conf, + broker_config=broker_conf, + serialization=serialization, + metastore=metastore, + ) + + @classmethod + def from_default_config_file(cls) -> "TinyStreamConfig": + """Creates a TinyStreamConfig object from the default config file path.""" + default_path = os.getenv("TINYSTREAM_CONFIG_PATH", "tinystream.ini") + return cls.from_ini(default_path) + + def get_controller_config(self) -> Dict[str, Any]: + """Returns the [controller] section. Raises error if in single mode.""" + if self.mode == "single": + raise ValueError("Cannot get controller config in 'single' mode.") + return self.controller_config + + def get_serialization_config(self) -> Dict[str, Any]: + """Returns the [serialization] section, or empty dict if not present.""" + return self.serialization + + def get_broker_config(self) -> Dict[str, Any]: + """ + Returns the [broker] section. + In cluster mode, this might be empty, which is fine. + In single mode, this is the primary config. + """ + return self.broker_config + + def get_metastore_config(self): + """Returns the [metastore] section, or empty dict if not present.""" + return self.metastore diff --git a/tinystream/controller.py b/tinystream/controller.py new file mode 100644 index 0000000..188e319 --- /dev/null +++ b/tinystream/controller.py @@ -0,0 +1,424 @@ +import asyncio + +from dataclasses import dataclass, asdict +import time +from typing import Dict, List, Optional, Any, Literal +from pathlib import Path + +from tinystream import DEFAULT_CONFIG_PATH +from tinystream.client.base import BaseAsyncClient +from tinystream.config.parser import TinyStreamConfig +from tinystream.serializer.base import AbstractSerializer + + +@dataclass +class BrokerInfo: + broker_id: int + host: str + port: int + last_heartbeat: float = time.time() + is_alive: bool = True + status: Literal["ALIVE", "TIMED_OUT", "SHUTDOWN"] = "ALIVE" + + +@dataclass +class PartitionMetadata: + partition_id: int + leader: Optional[int] + replicas: List[int] + + +@dataclass +class TopicMetadata: + name: str + partitions: Dict[int, PartitionMetadata] + + +class Controller(BaseAsyncClient): + """ + TinyStream Controller — manages cluster metadata, brokers, and leader elections. + (Now a fully asynchronous, persistent server) + """ + + def __init__(self, config: TinyStreamConfig): + self.config = config + + controller_config = config.get_controller_config() + self.host = controller_config["host"] + self.port = int(controller_config["port"]) + self.heartbeat_timeout = float(controller_config["heartbeat_timeout"]) + + self.prefix_size = int(controller_config.get("prefix_size", "8")) + self.byte_order: Literal["little", "big"] = controller_config.get( # type: ignore + "byte_order", "little" + ) + self.serializer: AbstractSerializer = self.init_serializer( + controller_config.get("serializer_type", "messagepack") + ) + + super().__init__( + prefix_size=self.prefix_size, + byte_order=self.byte_order, + serializer=self.serializer, + host=self.host, + port=self.port, + ) + + metastore_config = self.config.metastore + + self.metastore_db_path = Path(metastore_config["db_path"]) + + self.brokers: Dict[int, BrokerInfo] = {} + self.topics: Dict[str, TopicMetadata] = {} + + self._lock = asyncio.Lock() + self._monitor_task: Optional[asyncio.Task] = None + + @staticmethod + def init_serializer(serializer_name: str) -> AbstractSerializer: + if serializer_name == "messagepack": + from tinystream.serializer.msg_pack import MSGPackSerializer + + return MSGPackSerializer() + else: + raise ValueError(f"Unknown serializer: {serializer_name}") + + async def _load_metadata(self): + """Loads all cluster metadata from the DB into the in-memory cache.""" + if not self.db_connection: + return + + print("[Controller] Loading metadata from database...") + async with self._lock: + async with self.db_connection.execute("SELECT * FROM brokers") as cursor: + async for row in cursor: + broker_id, host, port = row + self.brokers[broker_id] = BrokerInfo( + broker_id=broker_id, host=host, port=port + ) + + async with self.db_connection.execute("SELECT * FROM topics") as t_cursor: + async for row in t_cursor: + name, p_count, r_factor = row + self.topics[name] = TopicMetadata(name=name, partitions={}) + + async with self.db_connection.execute( + "SELECT * FROM partitions" + ) as p_cursor: + async for row in p_cursor: + topic, p_id, leader, replicas_json = row + replicas = [int(r) for r in replicas_json.split(",")] + if topic in self.topics: + self.topics[topic].partitions[p_id] = PartitionMetadata( + partition_id=p_id, leader=leader, replicas=replicas + ) + + print( + f"[Controller] Loaded {len(self.brokers)} brokers and {len(self.topics)} topics." + ) + + async def start(self): + """Starts the controller server and background tasks.""" + await self.init_metastore(db_path=self.metastore_db_path) + await self._load_metadata() + self.start_background_tasks() + + await self.start_server() + addr = self._server.sockets[0].getsockname() + print(f"[Controller] Listening on {addr[0]}:{addr[1]}...") + + async def run_forever(self) -> None: + """Runs the main server loop. This call blocks forever.""" + if not self._server: + raise RuntimeError( + "Controller server has not been started. Call start() first." + ) + + async with self._server: + await self._server.serve_forever() + + async def close(self): + print("\n[Controller] Shutting down...") + await self.stop_background_tasks() + if self._server: + self._server.close() + await self._server.wait_closed() + print("[Controller] Server socket closed.") + if self.db_connection: + await self.db_connection.close() + print("[Controller] Metastore connection closed.") + + async def send_request(self, payload_bytes: bytes) -> Dict[str, Any]: + """Deserializes request and calls the correct controller method.""" + try: + request = self.serializer.deserialize(payload_bytes) + print(f"[Controller] Received request: {request}") + command = request.get("command") + + if command == "register_broker": + await self.register_broker( + request["broker_id"], request["host"], request["port"] + ) + return {"status": "ok"} + + elif command == "deregister_broker": + return await self._handle_deregister(request) + + elif command == "heartbeat": + await self.update_broker_heartbeat(request["broker_id"]) + return {"status": "ok"} + + elif command == "create_topic": + await self.create_topic( + request["name"], + request["partitions"], + request["replication_factor"], + ) + return {"status": "ok"} + + elif command == "get_cluster_metadata": + metadata = await self.get_cluster_metadata() + return {"status": "ok", "metadata": metadata} + + else: + return {"status": "error", "message": "Unknown command"} + + except Exception as e: + return {"status": "error", "message": f"Failed to process request: {e}"} + + async def register_broker(self, broker_id: int, host: str, port: int): + async with self._lock: + if not self.db_connection: + raise Exception("Database not connected") + + await self.db_connection.execute( + "INSERT OR REPLACE INTO brokers (broker_id, host, port) VALUES (?, ?, ?)", + (broker_id, host, port), + ) + await self.db_connection.commit() + + self.brokers[broker_id] = BrokerInfo( + broker_id=broker_id, host=host, port=port + ) + print(f"[Controller] Registered broker {broker_id} at {host}:{port}") + return self + + async def _handle_deregister(self, request: Dict[str, Any]) -> Dict[str, Any]: + broker_id = request["broker_id"] + async with self._lock: + if broker_id in self.brokers: + print(f"[Controller] Broker {broker_id} reported graceful shutdown.") + await self._handle_broker_failure(broker_id, new_status="SHUTDOWN") + return {"status": "ok", "message": "Deregistered"} + + async def update_broker_heartbeat(self, broker_id: int): + async with self._lock: + if broker_id not in self.brokers: + raise ValueError(f"Broker ID {broker_id} not registered.") + + self.brokers[broker_id].last_heartbeat = time.time() + if not self.brokers[broker_id].is_alive: + print(f"[Controller] Broker {broker_id} is alive again.") + self.brokers[broker_id].is_alive = True + # TODO: Trigger rebalancing + + async def create_topic(self, name: str, partitions: int, replication_factor: int): + async with self._lock: + if not self.db_connection: + raise Exception("Database not connected") + if name in self.topics: + raise ValueError(f"Topic {name} already exists.") + + await self.db_connection.execute( + "INSERT INTO topics (topic_name, partition_count, replication_factor) VALUES (?, ?, ?)", + (name, partitions, replication_factor), + ) + + topic_metadata = TopicMetadata(name=name, partitions={}) + self.topics[name] = topic_metadata + + await self._assign_partitions(name, partitions, replication_factor) + + await self.db_connection.commit() + + async def _assign_partitions( + self, topic: str, partitions: int, replication_factor: int + ): + if not self.db_connection: + raise Exception("Database not connected") + + brokers_alive = [b.broker_id for b in self.brokers.values() if b.is_alive] + if len(brokers_alive) < replication_factor: + raise ValueError("Not enough brokers alive to satisfy replication factor.") + + topic_metadata = self.topics[topic] + + partition_data_to_insert = [] + for partition_id in range(partitions): + replicas = [] + for i in range(replication_factor): + broker_index = (partition_id + i) % len(brokers_alive) + replicas.append(brokers_alive[broker_index]) + + leader = replicas[0] + replicas_json = ",".join(map(str, replicas)) + + partition_metadata = PartitionMetadata( + partition_id=partition_id, leader=leader, replicas=replicas + ) + topic_metadata.partitions[partition_id] = partition_metadata + + partition_data_to_insert.append( + (topic, partition_id, leader, replicas_json) + ) + + await self.db_connection.executemany( + "INSERT INTO partitions (topic_name, partition_id, leader, replicas) VALUES (?, ?, ?, ?)", + partition_data_to_insert, + ) + print(f"[Controller] Assigned and persisted partitions for topic {topic}") + + async def _elect_leader(self, topic: str, partition_id: int) -> Optional[int]: + if not self.db_connection: + raise Exception("Database not connected") + + partition_metadata = self.topics[topic].partitions.get(partition_id) + if not partition_metadata: + raise ValueError(f"Partition {partition_id} does not exist.") + + new_leader = None + for broker_id in partition_metadata.replicas: + if self.brokers[broker_id].is_alive: + new_leader = broker_id + break + + await self.db_connection.execute( + "UPDATE partitions SET leader = ? WHERE topic_name = ? AND partition_id = ?", + (new_leader, topic, partition_id), + ) + await self.db_connection.commit() + + partition_metadata.leader = new_leader + + if new_leader: + print( + f"[Controller] Elected new leader for {topic}-{partition_id}: Broker {new_leader}" + ) + else: + print(f"[Controller] WARNING: No live leader for {topic}-{partition_id}.") + + return new_leader + + async def remove_dead_brokers(self): + async with self._lock: + current_time = time.time() + dead_broker_ids = [] + for broker in self.brokers.values(): + if broker.status == "ALIVE" and ( + current_time - broker.last_heartbeat > self.heartbeat_timeout + ): + print(f"[Controller] Broker {broker.broker_id} timed out.") + dead_broker_ids.append(broker.broker_id) + + for broker_id in dead_broker_ids: + await self._handle_broker_failure(broker_id, new_status="TIMED_OUT") + + async def _handle_broker_failure( + self, broker_id: int, new_status: Literal["TIMED_OUT", "SHUTDOWN"] + ): + """ + Triggers leader re-election for a failed/shutdown broker. + Assumes lock is already held. + """ + if broker_id in self.brokers: + self.brokers[broker_id].status = new_status + + print( + f"[Controller] Handling failure for broker {broker_id}, reason: {new_status}..." + ) + + for topic_metadata in self.topics.values(): + for partition_metadata in topic_metadata.partitions.values(): + if partition_metadata.leader == broker_id: + await self._elect_leader( + topic_metadata.name, partition_metadata.partition_id + ) + + async def get_cluster_metadata(self): + """Returns current state of brokers, topics, and partition leaders.""" + async with self._lock: + partitions_dict = {} + for topic_name, topic_metadata in self.topics.items(): + partitions_dict[topic_name] = { + part_id: { + "leader": part_metadata.leader, + "replicas": part_metadata.replicas, + } + for part_id, part_metadata in topic_metadata.partitions.items() + } + + return { + "brokers": { + broker_id: asdict(broker_info) + for broker_id, broker_info in self.brokers.items() + }, + "partitions": partitions_dict, + } + + def start_background_tasks(self): + """Starts periodic tasks for heartbeat checking.""" + print("[Controller] Starting background heartbeat monitor...") + self._monitor_task = asyncio.create_task(self._heartbeat_monitor_loop()) + + async def stop_background_tasks(self): + """Stops periodic tasks.""" + if self._monitor_task: + self._monitor_task.cancel() + print("[Controller] Stopped background heartbeat monitor.") + + async def _heartbeat_monitor_loop(self): + """Runs periodically to check for dead brokers.""" + while True: + await asyncio.sleep(self.heartbeat_timeout / 2) + await self.remove_dead_brokers() + + async def get_leader(self, topic: str, partition_id: int) -> Optional[int]: + """Returns the current leader for a given topic-partition.""" + async with self._lock: + partition_metadata = self.topics.get(topic).partitions.get(partition_id) # type: ignore + if partition_metadata: + return partition_metadata.leader + return None + + +async def main(): + """ + Custom startup script to initialize, register brokers, and run the server. + """ + config = TinyStreamConfig.from_ini(DEFAULT_CONFIG_PATH) + controller = Controller(config=config) + try: + await controller.start() + + default_broker_counter = 2 + for _broker_id in range(default_broker_counter): + print(f"[Controller] Pre-registering broker {_broker_id}...") + await controller.register_broker( + broker_id=_broker_id, host="localhost", port=int(f"909{_broker_id}") + ) + + print("[Controller] Startup complete. Running server forever...") + await controller.run_forever() + + except KeyboardInterrupt: + print("\n[Controller] Caught interrupt, shutting down...") + finally: + await controller.close() + print("[Controller] Shutdown complete.") + + +if __name__ == "__main__": + try: + asyncio.run(main()) + except KeyboardInterrupt: + print("\n[Controller] Shutdown forced.") diff --git a/tinystream/serializer/msg_pack.py b/tinystream/serializer/msg_pack.py index bd4653b..4b0e788 100644 --- a/tinystream/serializer/msg_pack.py +++ b/tinystream/serializer/msg_pack.py @@ -11,4 +11,4 @@ def serialize(data: Any) -> bytes: @staticmethod def deserialize(data: bytes) -> Any: - return msgpack.unpackb(data, raw=False) + return msgpack.unpackb(data, raw=False, strict_map_key=False) diff --git a/tinystream/utils/db.py b/tinystream/utils/db.py new file mode 100644 index 0000000..c96c85b --- /dev/null +++ b/tinystream/utils/db.py @@ -0,0 +1,54 @@ +async def create_db_schemas(connection): + """ + Creates all tables for both Controller and Broker. + Uses 'IF NOT EXISTS' to be idempotent. + """ + + await connection.execute( + """ + CREATE TABLE IF NOT EXISTS brokers + ( + broker_id INTEGER PRIMARY KEY, + host TEXT NOT NULL, + port INTEGER NOT NULL + ) + """ + ) + + await connection.execute( + """ + CREATE TABLE IF NOT EXISTS topics + ( + topic_name TEXT PRIMARY KEY, + partition_count INTEGER NOT NULL, + replication_factor INTEGER NOT NULL + ) + """ + ) + + await connection.execute( + """ + CREATE TABLE IF NOT EXISTS partitions + ( + topic_name TEXT NOT NULL, + partition_id INTEGER NOT NULL, + leader INTEGER, + replicas TEXT NOT NULL, + FOREIGN KEY (topic_name) REFERENCES topics (topic_name), + PRIMARY KEY (topic_name, partition_id) + ) + """ + ) + + await connection.execute( + """ + CREATE TABLE IF NOT EXISTS consumer_group_offsets + ( + group_id TEXT NOT NULL, + topic_name TEXT NOT NULL, + partition_id INTEGER NOT NULL, + committed_offset INTEGER NOT NULL, + PRIMARY KEY (group_id, topic_name, partition_id) + ) + """ + ) diff --git a/tinystream/utils/serlializer.py b/tinystream/utils/serlializer.py new file mode 100644 index 0000000..f95cfcc --- /dev/null +++ b/tinystream/utils/serlializer.py @@ -0,0 +1,10 @@ +from tinystream.serializer.base import AbstractSerializer + + +def init_serializer(serializer_name: str) -> AbstractSerializer: + if serializer_name == "messagepack": + from tinystream.serializer.msg_pack import MSGPackSerializer + + return MSGPackSerializer() + else: + raise ValueError(f"Unknown serializer: {serializer_name}") diff --git a/uv.lock b/uv.lock index 2363453..f4eb947 100644 --- a/uv.lock +++ b/uv.lock @@ -11,6 +11,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/bc/8a/340a1555ae33d7354dbca4faa54948d76d89a27ceef032c8c3bc661d003e/aiofiles-25.1.0-py3-none-any.whl", hash = "sha256:abe311e527c862958650f9438e859c1fa7568a141b22abcd015e120e86a85695", size = 14668, upload-time = "2025-10-09T20:51:03.174Z" }, ] +[[package]] +name = "aiosqlite" +version = "0.21.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/13/7d/8bca2bf9a247c2c5dfeec1d7a5f40db6518f88d314b8bca9da29670d2671/aiosqlite-0.21.0.tar.gz", hash = "sha256:131bb8056daa3bc875608c631c678cda73922a2d4ba8aec373b19f18c17e7aa3", size = 13454, upload-time = "2025-02-03T07:30:16.235Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/f5/10/6c25ed6de94c49f88a91fa5018cb4c0f3625f31d5be9f771ebe5cc7cd506/aiosqlite-0.21.0-py3-none-any.whl", hash = "sha256:2549cf4057f95f53dcba16f2b64e8e2791d7e1adedb13197dd8ed77bb226d7d0", size = 15792, upload-time = "2025-02-03T07:30:13.6Z" }, +] + [[package]] name = "bidict" version = "0.23.1" @@ -966,6 +978,7 @@ version = "0.1.0" source = { editable = "." } dependencies = [ { name = "aiofiles" }, + { name = "aiosqlite" }, { name = "msgpack" }, ] @@ -981,6 +994,7 @@ dev = [ [package.metadata] requires-dist = [ { name = "aiofiles", specifier = ">=25.1.0" }, + { name = "aiosqlite", specifier = ">=0.21.0" }, { name = "msgpack", specifier = ">=1.1.2" }, ]