diff --git a/README.md b/README.md index 5911775..87837a0 100644 --- a/README.md +++ b/README.md @@ -4,14 +4,30 @@ TinyStream is a `lightweight` streaming engine in Python, inspired by Apache Kaf ## Features -* Append-only partitioned log storage -* Durable, segment-based storage on disk -* Producer / Consumer client APIs -* Controller node for cluster metadata and leader election -* Broker cluster simulation -* Local cluster harness for distributed testing -* **[WIP]** Replication and leader election -* **[WIP]** Retention policies for segment cleanup +- Append-only Partitioned Log: The core storage mechanism. +- Segment-Based Storage: Logs are broken into segments with sparse .index files for fast, efficient reads. +- Pluggable Storage: Supports SingleLogStorage (one file) or SegmentedLogStorage (retention-ready). +- Controller Cluster: A controller-based architecture (no "single mode") manages cluster state. +- Metadata & Liveness: The Controller tracks broker liveness (via heartbeats) and partition assignments. +- Leader Election: The Controller automatically elects new leaders when brokers fail [cite: controller.py, _handle_broker_failure]. +- Producer/Consumer APIs: Asynchronous clients for producing and consuming data. +- Log Retention: Supports per-topic log retention by time (retention_ms) or size (retention_bytes) [cite: topic_manager.py, create_topic]. +- HTTP Admin Dashboard: A built-in, lightweight web UI (via FastAPI) to view cluster status. + + +## Configuration Management + +TinyStream uses a 4-layer "override" system for configuration, managed by the `ConfigManager`. + +The final value for any setting is chosen in this order of priority: + +- CLI Arguments: (e.g., `--controller-uri`) +- Environment Variables: (e.g., `TINYSTREAM_CONTROLLER_URI`) +- User-Provided Config File: (e.g., `--config test_confs/broker-1.ini`) +- Default Component Config File: (e.g., `tinystream/config/controller.ini`) + +This allows you to have a set of default configs and override them at runtime. For example, you can start a broker and tell it where the controller is via an environment variable, rather than modifying its config file. + ## Installation @@ -39,67 +55,138 @@ The Controller manages cluster metadata (topics, brokers, partition leaders). uv run python -m tinystream.controller ``` +The controller will start: + +- Its RPC Server on `localhost:9093` (for Brokers to connect). +- The Metastore API / Dashboard on http://localhost:3200. + #### Step 2: Start a Broker -The Broker stores data. It registers itself with the Controller. +The Broker stores data. This command will load ``tinystream/config/broker.ini`` (which knows the controller's address) and start the broker with ID 1. ```bash -uv run python -m tinystream.broker --mode cluster --broker-number=2 +uv run python -m tinystream.broker --broker-id 1 ``` +The broker will start: + +- Its RPC Server on ``localhost:9095`` (from broker.ini, for clients). +- It will then connect to the Controller at ``localhost:9093`` to register itself. + #### Step 4: Create a Topic -Topics must be created before you can produce to them. Use the admin client to ask the Controller to create a topic. +Use the admin client to tell the Controller to create a new topic. ```bash uv run python -m tinystream.admin create-topic \ --topic "events" \ --partitions 3 \ --replication-factor 1 \ - --controller "localhost:6000" + --controller-uri "localhost:6000" ``` #### Step 5: Produce Messages ```python +import asyncio +import argparse from tinystream.client.producer import Producer -from tinystream.config.parser import TinyStreamConfig +from tinystream.config.manager import ConfigManager + +async def run(): + args = argparse.Namespace(config=None, controller_uri=None, metastore_uri=None) + config = ConfigManager(args, component_type="broker") -config = TinyStreamConfig.from_default_config_file() -config.mode = "cluster" -producer = Producer(config=config) + producer = Producer(config=config) -print("Sending 10 messages...") + try: + await producer.connect() + print("Producer connected. Sending 10 messages...") -for i in range(10): - msg = f"hello-tinystream-{i}".encode('utf-8') - producer.send("events", msg) - print(f"Sent: {msg.decode()}") + for i in range(10): + msg = f"hello-tinystream-{i}" + key = f"user-{i % 2}" + print(f"Sending: {msg} (key: {key})") -print("Done.") + response = await producer.send("events", msg.encode('utf-8'), key=key) + print(f"-> Broker response: {response}") + await asyncio.sleep(0.5) + + except Exception as e: + print(f"Error: {e}") + finally: + await producer.close() + +if __name__ == "__main__": + asyncio.run(run()) ``` #### Step 6: Consume Messages ```python +import asyncio +import argparse from tinystream.client.consumer import Consumer -from tinystream.config.parser import TinyStreamConfig +from tinystream.config.manager import ConfigManager + +async def run(): + args = argparse.Namespace(config=None, controller_uri=None, metastore_uri=None) + config = ConfigManager(args, component_type="broker") -config = TinyStreamConfig.from_default_config_file() + consumer = Consumer(config=config, group_id="my-test-group") -config.mode = "cluster" -consumer = Consumer(config=config, group_id="test-group") + try: + await consumer.connect() + print("Consumer connected.") -consumer.connect() + consumer.assign(topic="events", partition=0, start_offset=0) + print("Consuming from 'events-0'. Press Ctrl+C to stop.") -consumer.assign(topic="events", partition=0, start_offset=0) + while True: + messages = await consumer.poll(max_messages=5) + if messages: + for msg in messages: + print(f"Received: {msg.decode('utf-8')}") + await consumer.commit() -for _message in consumer.poll(): - print(f"Received: {_message.value.decode()}") + await asyncio.sleep(1) + except KeyboardInterrupt: + print("\nStopping consumer...") + finally: + await consumer.close() + +if __name__ == "__main__": + asyncio.run(run()) + +``` + +## Load Testing + +A load test script is included in `load_test.py`. It uses spawns worker tasks to send data in parallel. + +Before running, ensure the topic exists: + +```bash +uv run python -m tinystream.admin create-topic --topic "load_test" --partitions 3 --replication-factor 1 ``` +To run the test for 60 seconds with 50 concurrent workers: + +```bash +uv run python load_test.py \ + --topic "load_test" \ + --num-workers 50 \ + --message-size 1024 \ + --run-time 60 +``` + +## Load Test Results + +TODO: Add results from a benchmark run (e.g., on an M4 Mac with 36GB memory) here. + + #### Running Components in Isolation For quick testing, each core component can be run in isolation directly as a module: @@ -108,6 +195,7 @@ For quick testing, each core component can be run in isolation directly as a mod - Broker: `uv run python -m tinystream.broker` - Producer: `uv run python -m tinystream.client.producer` - Consumer: `uv run python -m tinystream.client.consumer` +- Admin: `uv run python -m tinystream.client.admin` ## Architecture Overview diff --git a/load.py b/load.py new file mode 100644 index 0000000..1938783 --- /dev/null +++ b/load.py @@ -0,0 +1,222 @@ +import asyncio +import time +import argparse +import random +import string +import uuid +import sys +from typing import Optional, List + +from tinystream.config.manager import ConfigManager +from tinystream.client.producer import Producer +from tinystream.utils.env import env_default + + +def _random_payload(size_bytes: int) -> str: + """Generates a random string payload of a given size.""" + return "".join(random.choices(string.ascii_letters + string.digits, k=size_bytes)) + + +async def producer_worker( + producer: Producer, + topic: str, + msg_size: int, + batch_size: int, + stats_queue: asyncio.Queue, +): + """ + A single async worker that sends batches of messages in a loop. + """ + print(f"[Worker {asyncio.current_task().get_name()}] starting...") + while True: + try: + tasks = [] + for _ in range(batch_size): + key = str(uuid.uuid4()) + payload = _random_payload(msg_size) + tasks.append(producer.send(topic, payload, key=key)) + + results = await asyncio.gather(*tasks, return_exceptions=True) + + sent_count = 0 + error_count = 0 + + for res in results: + if isinstance(res, Exception): + error_count += 1 + else: + sent_count += 1 + + await stats_queue.put(("sent", sent_count)) + if error_count > 0: + await stats_queue.put(("errors", error_count)) + + except asyncio.CancelledError: + return + + +async def stats_reporter(stats_queue: asyncio.Queue): + """ + A background task that collects and prints throughput stats. + """ + total_sent = 0 + total_errors = 0 + start_time = time.monotonic() + + async def print_stats(): + while True: + await asyncio.sleep(5) # Report every 5 seconds + elapsed = time.monotonic() - start_time + if elapsed == 0: + continue + + msgs_sec = total_sent / elapsed + + print("---") + print(f" Total Sent: {total_sent}") + print(f"Total Errors: {total_errors}") + print(f" Throughput: {msgs_sec:.2f} msgs/sec") + print("---") + + reporter_print_task = asyncio.create_task(print_stats()) + + try: + while True: + event_type, count = await stats_queue.get() + if event_type == "sent": + total_sent += count + elif event_type == "errors": + total_errors += count + stats_queue.task_done() + except asyncio.CancelledError: + reporter_print_task.cancel() + await asyncio.gather(reporter_print_task, return_exceptions=True) + + +async def main(config: ConfigManager, args: argparse.Namespace): + producer = Producer(config=config) + stats_queue = asyncio.Queue() + worker_tasks: List[asyncio.Task] = [] + reporter_task: Optional[asyncio.Task] = None + + try: + await producer.connect() + print(f"Producer connected. Starting {args.num_workers} workers...") + + reporter_task = asyncio.create_task(stats_reporter(stats_queue)) + + for i in range(args.num_workers): + worker_tasks.append( + asyncio.create_task( + producer_worker( + producer, + args.topic, + args.message_size, + args.batch_size, + stats_queue, + ), + name=f"worker-{i}", + ) + ) + + main_work = asyncio.gather(reporter_task, *worker_tasks) + + if args.run_time: + print(f"--- Running test for {args.run_time} seconds ---") + await asyncio.wait_for(main_work, timeout=args.run_time) + else: + print("--- Running test indefinitely (Press Ctrl+C to stop) ---") + await main_work + + except asyncio.TimeoutError: + print(f"\n\nTest duration of {args.run_time} seconds complete. Stopping...") + except ConnectionRefusedError: + print("\n[ERROR] Could not connect to controller.") + except KeyboardInterrupt: + print("\n\nStopping load test... (Ctrl+C pressed)") + except Exception as e: + print(f"\nAn error occurred: {e}") + finally: + print("Shutting down workers...") + if reporter_task: + reporter_task.cancel() + for task in worker_tasks: + task.cancel() + + all_tasks = worker_tasks + ([reporter_task] if reporter_task else []) + await asyncio.gather(*all_tasks, return_exceptions=True) + print("All tasks stopped.") + + if await producer.is_connected(): + await producer.close() + print("Producer connection closed.") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="TinyStream Load Test") + + parser.add_argument( + "--controller-uri", + type=str, + default=env_default("TINYSTREAM_CONTROLLER_URI")(), + help="Controller RPC URI (e.g., localhost:9093). Overrides config.", + ) + parser.add_argument( + "--config", + type=str, + default=env_default("TINYSTREAM_CONFIG")(), + help="Path to a user config file. Overrides default config.", + ) + + parser.add_argument( + "--topic", type=str, default="load_test", help="Topic to produce to." + ) + parser.add_argument( + "--num-workers", + type=int, + default=50, + help="Number of concurrent producer tasks to run.", + ) + parser.add_argument( + "--message-size", + type=int, + default=1024, + help="Size of each message payload in bytes.", + ) + parser.add_argument( + "--batch-size", + type=int, + default=100, + help="Number of messages each worker sends in a concurrent batch.", + ) + + parser.add_argument( + "--run-time", + type=int, + default=1200, + help="Duration to run the test in seconds. (Default: run indefinitely)", + ) + + args = parser.parse_args() + + config_manager = ConfigManager(args, component_type="broker") + + print("Starting load test...") + print(f" Topic: {args.topic}") + print(f" Workers: {args.num_workers}") + print(f" Batch Size: {args.batch_size}") + print(f" Message Size: {args.message_size} bytes") + print(f" Run Time: {args.run_time or 'Indefinite'}") + print( + f"Controller URI: {config_manager.controller_config['host']}:{config_manager.controller_config['port']}" + ) + print("---") + + try: + asyncio.run(main(config=config_manager, args=args)) + except Exception as e: + print(f"FATAL: Load test crashed: {e}") + import traceback + + traceback.print_exc() + sys.exit(1) diff --git a/pyproject.toml b/pyproject.toml index 722a98f..77c427b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -8,6 +8,7 @@ dependencies = [ "aiosqlite>=0.21.0", "fastapi>=0.120.2", "httpx>=0.28.1", + "jinja2>=3.1.6", "msgpack>=1.1.2", "pydantic>=2.12.3", "uvicorn>=0.38.0", diff --git a/test_confs/brokers/0.ini b/test_confs/brokers/0.ini new file mode 100644 index 0000000..8011578 --- /dev/null +++ b/test_confs/brokers/0.ini @@ -0,0 +1,8 @@ +[broker] +partition_log_path = ./data/tinystream/log_partitions +partition_type = segementedlogpartition +log_storage_type = singlelogstorage +host = localhost +port = 9095 +prefix_size = 8 +byte_order = big diff --git a/test_confs/controller.ini b/test_confs/controller.ini new file mode 100644 index 0000000..c23f07d --- /dev/null +++ b/test_confs/controller.ini @@ -0,0 +1,11 @@ +[controller] +mode = cluster +host = localhost +port = 9093 +heartbeat_timeout = 10 +prefix_size = 8 +byte_order = little + +[metastore] +db_path = ./data/metastore/tinystream.meta.db +http_port = 3100 diff --git a/tests/test_brokers/test_storage.py b/tests/test_brokers/test_storage.py index 6cbf66c..12842d9 100644 --- a/tests/test_brokers/test_storage.py +++ b/tests/test_brokers/test_storage.py @@ -3,7 +3,7 @@ import tempfile from pathlib import Path -from tinystream.storage import FileLogStorage +from tinystream.storage import SegmentedLogStorage class TestFileLogStorage(unittest.TestCase): @@ -11,9 +11,7 @@ def setUp(self): self.temp_dir = tempfile.TemporaryDirectory() self.temp_path = Path(self.temp_dir.name) - self.log_file = self.temp_path / "test_partition.log" - - self.storage = FileLogStorage(log_file_path=self.log_file) + self.storage = SegmentedLogStorage(partition_path=self.temp_path) asyncio.run(self.storage.ensure_ready()) @@ -22,11 +20,9 @@ def tearDown(self): def test_storage_ensure_ready(self): new_log_dir = self.temp_path / "data" - new_log_file = new_log_dir / "test.log" - self.assertFalse(new_log_dir.exists()) - new_storage = FileLogStorage(log_file_path=new_log_file) + new_storage = SegmentedLogStorage(partition_path=new_log_dir) asyncio.run(new_storage.ensure_ready()) self.assertTrue(new_log_dir.exists()) @@ -110,7 +106,6 @@ def test_storage_read_at_offset_error(self): async def _test(): await self.storage.append(b"some data") - # Try to read from an invalid offset with self.assertRaises(EOFError): await self.storage.read_at(9999) diff --git a/tinystream/__init__.py b/tinystream/__init__.py index 0f70ecb..d5238d2 100644 --- a/tinystream/__init__.py +++ b/tinystream/__init__.py @@ -1,5 +1,9 @@ import os -DEFAULT_CONFIG_PATH = os.environ.get( - "TINYSTREAM_CONFIG_FILE", "tinystream/config/conf.ini" +DEFAULT_CONTROLLER_CONFIG_PATH = os.environ.get( + "TINYSTREAM_DEFAULT_CONTROLLER_CONFIG_FILE", "tinystream/config/controller.ini" +) + +DEFAULT_BROKER_CONFIG_PATH = os.environ.get( + "TINYSTREAM_DEFAULT_BROKER_CONFIG_FILE", "tinystream/config/broker.ini" ) diff --git a/tinystream/broker.py b/tinystream/broker.py index ad08a4f..c936d75 100644 --- a/tinystream/broker.py +++ b/tinystream/broker.py @@ -1,26 +1,22 @@ import json from pathlib import Path -from typing import Dict, Any, Optional, Type, Literal +from typing import Dict, Any, Optional, Literal import asyncio import sys import argparse -import copy -import pathlib from aiosqlite import Connection as DBConnection -from tinystream.metastore import Metastore -from tinystream.models import TopicMetadata, PartitionMetadata -from tinystream import DEFAULT_CONFIG_PATH from tinystream.client.connection import TinyStreamAPI from tinystream.client.base import BaseAsyncClient -from tinystream.client.topic_manager import TopicManager -from tinystream.config.parser import TinyStreamConfig +from tinystream.config.manager import ConfigManager from tinystream.controller import BrokerInfo from tinystream.partitions.base import BasePartition -from tinystream.partitions.partition import SingleLogPartition +from tinystream.partitions.segmented import SegmentedLogPartition +from tinystream.partitions.single import SingleLogPartition from tinystream.serializer.base import AbstractSerializer -from tinystream.storage.base import AbstractLogStorage +from tinystream.storage import SingleLogStorage, SegmentedLogStorage +from tinystream.utils.env import env_default from tinystream.utils.serlializer import init_serializer @@ -32,11 +28,10 @@ class Broker(BaseAsyncClient): correct partition. """ - def __init__(self, config: TinyStreamConfig, broker_id: Optional[int]) -> None: + def __init__(self, config: ConfigManager, broker_id: Optional[int]) -> None: self.config = config - self.mode = config.mode self.broker_id = broker_id - self.broker_config = self.config.get_broker_config() + self.broker_config = self.config.broker_config self.host = self.broker_config["host"] self.port = int(self.broker_config["port"]) self.base_log_dir = Path(self.broker_config["partition_log_path"]) @@ -45,24 +40,25 @@ def __init__(self, config: TinyStreamConfig, broker_id: Optional[int]) -> None: "byte_order", "little" ) - metastore_config = self.config.get_metastore_config() + metastore_config = self.config.metastore - self.metastore_db_path = Path(metastore_config.get("db_path")) - self.db_conn: Optional[DBConnection] = None + self.metastore_db_path = Path( + metastore_config.get("db_path", "./data/metastore/tinystream.meta.db") + ) + self.db_connection: Optional[DBConnection] = None - self.serializer_config = self.config.get_serialization_config() + self.serializer_config = self.config.serialization self.serializer: AbstractSerializer = init_serializer( self.serializer_config.get("type", "messagepack") ) - if self.mode == "cluster": - controller_config = config.get_controller_config() - self.controller_client = TinyStreamAPI( - host=controller_config.get("host"), # type: ignore - port=int(controller_config.get("port")), # type: ignore - serializer=self.serializer, - ) - self.heartbeat_task: Optional[asyncio.Task[Any]] = None + controller_config = config.controller_config + self.controller_client = TinyStreamAPI( + host=controller_config.get("host"), # type: ignore + port=int(controller_config.get("port")), # type: ignore + serializer=self.serializer, + ) + self.heartbeat_task: Optional[asyncio.Task[Any]] = None super().__init__( prefix_size=self.prefix_size, @@ -71,60 +67,15 @@ def __init__(self, config: TinyStreamConfig, broker_id: Optional[int]) -> None: host=self.host, port=self.port, ) - self.partition_class: Type[BasePartition] = self.init_partition_class( - self.broker_config.get("partition_type", "singlelogpartition") - ) self.brokers: Dict[int, BrokerInfo] = {} - self.topic_metadata: Dict[str, TopicMetadata] = {} self.metastore_task: Optional[asyncio.Task[Any]] = None - # In-memory mapping of actual partition objects: - # { topic_name -> { partition_id -> BasePartition } } self.partitions: Dict[str, Dict[int, BasePartition]] = {} self._lock = asyncio.Lock() self._server: Optional[asyncio.Server] = None - - if self.mode == "single": - print( - f"[Broker {broker_id}] Running in 'single' mode. Initializing topic manager." - ) - self.topic_manager = TopicManager( - db_connection=None, - brokers=self.brokers, - topics=self.topic_metadata, - lock=self._lock, - ) - - metastore_config = self.config.metastore - self.metastore_http_port = int(metastore_config.get("http_port", 6000)) - self.metastore = Metastore( - topic_manager=self.topic_manager, - topics=self.topic_metadata, - brokers=self.brokers, - lock=self._lock, - port=self.metastore_http_port, - ) - - @staticmethod - def init_partition_class(partition_name: str) -> Type[BasePartition]: - if partition_name == "singlelogpartition": - return SingleLogPartition - else: - raise ValueError(f"Unknown partition type: {partition_name}") - - @staticmethod - def init_storage_class(storage_name: str) -> type[AbstractLogStorage]: - """ - Initializes the storage *class* based on configuration. - """ - if storage_name == "filelogstorage": - from tinystream.storage.storage import FileLogStorage - - return FileLogStorage - else: - raise ValueError(f"Unknown storage type: {storage_name}") + self.retention_task: Optional[asyncio.Task] = None async def _create_new_partition( self, topic_name: str, partition_id: int @@ -134,20 +85,36 @@ async def _create_new_partition( This ensures all config (serializer, storage_class) is passed correctly and the partition is registered in the metastore. """ - print(f"Creating new partition: {topic_name}-{partition_id}") - log_file = Path(f"{self.base_log_dir}/{topic_name}/{partition_id}.log") + storage_type = self.broker_config.get("storage_type", "singlelogstorage") + + if storage_type == "singlelogstorage": + storage_class = SingleLogStorage + partition_path = Path( + f"{self.base_log_dir}/{topic_name}/{partition_id}.log" + ) + else: + storage_class = SegmentedLogStorage + partition_path = Path(f"{self.base_log_dir}/{topic_name}/{partition_id}") + + storage_class = storage_class( + partition_path=partition_path, + ) + + partition_name = self.broker_config.get("partition_type", "singlelogpartition") - storage_class = self.init_storage_class( - self.broker_config.get("storage_type", "filelogstorage") - )( - log_file_path=log_file, + if partition_name == "singlelogpartition": + partition_class = SingleLogPartition + else: + partition_class = SegmentedLogPartition + + print( + f"Creating new partition: {topic_name}-{partition_id} of type {partition_name} and storage {storage_type}" ) - partition = self.partition_class( + partition = partition_class( topic_name=topic_name, # type: ignore partition_id=partition_id, # type: ignore - base_log_dir=self.base_log_dir, # type: ignore serializer=self.serializer, # type: ignore storage=storage_class, # type: ignore ) @@ -158,9 +125,9 @@ async def _create_new_partition( self.partitions[topic_name] = {} self.partitions[topic_name][partition_id] = partition - if self.db_conn: + if self.db_connection: try: - await self.db_conn.execute( + await self.db_connection.execute( """ INSERT OR IGNORE @@ -169,7 +136,7 @@ async def _create_new_partition( """, (topic_name, partition_id, json.dumps([])), ) - await self.db_conn.commit() + await self.db_connection.commit() except Exception as exception: print( f"Warning: Failed to register {topic_name}-{partition_id} in metastore: {exception}" @@ -220,49 +187,31 @@ async def get_or_create_partition( async def start(self) -> None: """Starts the main broker server.""" - if self.mode == "cluster": - await self.controller_client.ensure_connected() - print(f"[Broker {self.broker_id}] Registering with controller...") - - response = await self.controller_client.send_request( - { - "command": "register_broker", - "broker_id": self.broker_id, - "host": self.host, - "port": self.port, - } - ) - if response and response.get("status") == "ok": - print(f"[Broker {self.broker_id}] Registered successfully.") - assignments = response.get("assignments", []) - await self._reconcile_partitions(assignments) - else: - raise Exception(f"Could not register with controller: {response}") + await self.load_partitions() - self.heartbeat_task = asyncio.create_task(self._heartbeat_loop()) + await self.controller_client.ensure_connected() + print(f"[Broker {self.broker_id}] Registering with controller...") - else: - await self.init_metastore(db_path=self.metastore_db_path) - self.topic_manager.db_connection = self.db_conn - await self._load_metadata_from_db() - if self.broker_id is None: - raise ValueError("broker_id must not be None") - self_info = BrokerInfo( - broker_id=self.broker_id, host=self.host, port=self.port, is_alive=True - ) - async with self._lock: - self.brokers[self.broker_id] = self_info + response = await self.controller_client.send_request( + { + "command": "register_broker", + "broker_id": self.broker_id, + "host": self.host, + "port": self.port, + } + ) - self.metastore_task = asyncio.create_task( # type: ignore - self.metastore.start(), name="broker-metastore-api" - ) - print( - f"[Broker {self.broker_id}] Metastore API docs at http://localhost:{self.port}/docs" - ) + if response and response.get("status") == "ok": + print(f"[Broker {self.broker_id}] Registered successfully.") + assignments = response.get("assignments", []) + await self._reconcile_partitions(assignments) + else: + raise Exception(f"Could not register with controller: {response}") - print("Metastore initialized successfully.") + self.heartbeat_task = asyncio.create_task(self._heartbeat_loop()) + self.retention_task = asyncio.create_task(self._retention_loop()) await self.load_partitions() await self.start_server() @@ -291,8 +240,11 @@ async def close(self): await self._server.wait_closed() print("Server socket closed.") - if self.db_conn: - await self.db_conn.close() + if self.retention_task: + self.retention_task.cancel() + + if self.db_connection: + await self.db_connection.close() print("Metastore connection closed.") async def send_request(self, payload_bytes: bytes) -> Dict[str, Any]: @@ -310,34 +262,6 @@ async def send_request(self, payload_bytes: bytes) -> Dict[str, Any]: elif command == "commit_offset": return await self._handle_commit_offset(request) - - elif command == "create_topic": - if self.mode == "single" and self.topic_manager: - try: - await self.topic_manager.create_topic( - request["name"], - request["partitions"], - request["replication_factor"], - ) - return { - "status": "success", - "message": f"Topic {request['name']} created.", - } - except ValueError as exception: - return {"status": "error", "message": str(exception)} - except Exception as exception: - print( - f"FATAL error in handle_create_topic_request: {exception}" - ) - return { - "status": "error", - "message": f"Internal server error: {exception}", - } - else: - return { - "status": "error", - "message": "create_topic is only allowed in single mode. Use admin client.", - } else: return {"status": "error", "message": "Unknown command"} @@ -347,35 +271,6 @@ async def send_request(self, payload_bytes: bytes) -> Dict[str, Any]: "message": f"Failed to process request: {exception}", } - async def _load_metadata_from_db(self): - print(f"[Broker {self.broker_id}] Loading metadata from metastore...") - async with self._lock: - # --- MODIFIED: Use self.topic_metadata --- - async with self.db_conn.execute("SELECT * FROM topics") as cursor: - async for row in cursor: - topic_name = row["topic_name"] - self.topic_metadata[topic_name] = TopicMetadata( - name=topic_name, partitions={} - ) - - async with self.db_conn.execute("SELECT * FROM partitions") as cursor: - async for row in cursor: - topic_name = row["topic_name"] - if topic_name in self.topic_metadata: - replicas_list = json.loads(row["replicas"]) - part_meta = PartitionMetadata( - partition_id=row["partition_id"], - leader=row["leader"], - replicas=replicas_list, - ) - self.topic_metadata[topic_name].partitions[ - part_meta.partition_id - ] = part_meta - - print( - f"[Broker {self.broker_id}] Loaded {len(self.topic_metadata)} topics from metastore." - ) - async def _handle_append(self, request: Dict[str, Any]) -> Dict[str, Any]: topic = request["topic"] partition_id = request["partition"] @@ -413,21 +308,44 @@ async def _reconcile_partitions(self, assignments: list[dict]): print( f"[Broker {self.broker_id}] Reconciling {len(assignments)} assignments..." ) + current_assignments = set() + for assignment in assignments: topic = assignment["topic"] part_id = assignment["partition_id"] try: - await self.get_or_create_partition(topic, part_id) + partition = await self.get_or_create_partition(topic, part_id) + + partition.update_policy( + role=assignment["role"], + retention_ms=assignment["retention_ms"], + retention_bytes=assignment["retention_bytes"], + ) + + current_assignments.add((topic, part_id)) + + except Exception as exception: + print(f"Error reconciling partition {topic}/{part_id}: {exception}") + + async def _retention_loop(self): + """ + Runs periodically to enforce retention policies on all partitions. + """ + while True: + await asyncio.sleep(300) - # TODO: A full implementation would also: - # 1. Store the 'role' (leader/follower) - # 2. Trigger follower fetching logic if role == 'follower' - # 3. Handle partition *removals* (i.e., assignments that - # disappeared from the controller's list) + print(f"[Broker {self.broker_id}] Running retention policy check...") - except Exception as e: - print(f"Error reconciling partition {topic}/{part_id}: {e}") + async with self._lock: + for topic_name, partitions in self.partitions.items(): + for partition in partitions.values(): + try: + await partition.enforce_retention() + except Exception as e: + print( + f"Error enforcing retention on {topic_name}-{partition.partition_id}: {e}" + ) async def _heartbeat_loop(self): """Runs in the background, sending heartbeats AND processing assignments.""" @@ -476,10 +394,10 @@ async def _handle_commit_offset(self, request: Dict[str, Any]) -> Dict[str, Any] partition_id = request["partition"] offset = request["offset"] - if not self.db_conn: + if not self.db_connection: return {"status": "error", "message": "Metastore is not enabled."} - await self.db_conn.execute( + await self.db_connection.execute( """ INSERT OR REPLACE INTO consumer_group_offsets (group_id, topic_name, partition_id, committed_offset) @@ -488,7 +406,7 @@ async def _handle_commit_offset(self, request: Dict[str, Any]) -> Dict[str, Any] (group_id, topic, partition_id, offset), ) - await self.db_conn.commit() + await self.db_connection.commit() return {"status": "ok", "message": "Offset committed"} @@ -505,70 +423,18 @@ async def _handle_commit_offset(self, request: Dict[str, Any]) -> Dict[str, Any] async def main( - _broker_id: Optional[int] = 0, - mode: Optional[str] = "single", - config: Optional[str] = DEFAULT_CONFIG_PATH, + config: ConfigManager, + _broker_id: int, broker_number: Optional[int] = None, ): - if broker_number and mode == "cluster": - print(f"[Launcher] Starting {broker_number} brokers in test mode...") - broker_instances = [] - - base_config_path = config or DEFAULT_CONFIG_PATH - try: - base_config = TinyStreamConfig.from_ini(base_config_path) - base_port = int(base_config.broker_config.get("port", "9090")) - except Exception as exception: - print( - f"FATAL: Could not load base config from {base_config_path}: {exception}" - ) - return - - for i in range(broker_number): - broker_config = copy.deepcopy(base_config) - broker_config.mode = "cluster" - broker_config.broker_config["port"] = f"{base_port + i}" - - print( - f"[Launcher] Preparing Broker {i} on port {broker_config.broker_config['port']}..." - ) - broker_instances.append( - Broker( - config=broker_config, - broker_id=i, - ) - ) - start_tasks = [b.start() for b in broker_instances] - try: - await asyncio.gather(*start_tasks) - except KeyboardInterrupt: - print("\n[Launcher] Caught interrupt, shutting down all brokers...") - finally: - print("[Launcher] Closing all brokers...") - close_tasks = [b.close() for b in broker_instances] - await asyncio.gather(*close_tasks) + if broker_number: return else: + config_obj = config broker_id_to_use = _broker_id - try: - config_obj = TinyStreamConfig.from_ini(config or DEFAULT_CONFIG_PATH) - except Exception as exception: - print(f"FATAL: Could not load config from {config}: {exception}") - return - - config_obj.mode = mode # type: ignore - - try: - port_value = config_obj.broker_config.get("port") - if port_value is None: - print(f"FATAL: 'port' not found or is invalid in config file: {config}") - return - broker_port = int(port_value) - except (TypeError, ValueError): - print(f"FATAL: 'port' not found or is invalid in config file: {config}") - return + broker_port = int(config_obj.broker_config.get("port")) broker = Broker( config=config_obj, @@ -576,84 +442,74 @@ async def main( ) try: - if mode == "cluster": - print( - f"\n[Broker {broker_id_to_use}] Starting in CLUSTER mode on port {broker_port}..." - ) - else: - print(f"\n[Broker 0] Starting in SINGLE mode on port {broker_port}...") - + print( + f"\n[Broker {broker_id_to_use}] Starting in CLUSTER mode on port {broker_port}..." + ) await broker.start() - except KeyboardInterrupt: - if mode == "cluster": - print( - f"\n[Broker {broker_id_to_use}] Caught interrupt, shutting down..." - ) - else: - print("\n[Broker 0] Caught interrupt, shutting down...") + print(f"\n[Broker {broker_id_to_use}] Caught interrupt, shutting down...") finally: await broker.close() if __name__ == "__main__": - - def print_usage(parser_instance, message): - print(f"Error: {message}\n") - parser_instance.print_help() - sys.exit(1) - parser = argparse.ArgumentParser(description="Start TinyStream broker(s).") + parser.add_argument( - "--mode", - choices=["single", "cluster"], - default="single", - help="Broker mode. 'single' for standalone, 'cluster' to connect to a controller.", + "--controller-uri", + type=str, + default=env_default("TINYSTREAM_CONTROLLER_URI")(), + help="Controller RPC URI (e.g., localhost:9093). Overrides config.", + ) + parser.add_argument( + "--metastore-uri", + type=str, + default=env_default("TINYSTREAM_METASTORE_URI")(), + help="Metastore HTTP API URI. Overrides config.", ) + parser.add_argument( + "--port", + type=int, + default=env_default("TINYSTREAM_PORT")(), + help="Broker RPC port. Overrides config.", + ) + parser.add_argument( "--broker-id", type=int, - help="Broker ID (required in 'cluster' mode when starting a single broker).", + default=env_default("TINYSTREAM_BROKER_ID")(), + help="Broker ID (required). Overrides config.", ) + parser.add_argument( "--config", type=str, - default=DEFAULT_CONFIG_PATH, - help=f"Config file path (default: {DEFAULT_CONFIG_PATH})", + default=env_default("TINYSTREAM_CONFIG")(), + help="Path to a user config file. Overrides default config.", ) + parser.add_argument( - "--broker-number", - type=int, - help="[TESTING] Start N brokers in a single process. Overrides other settings.", + "--broker-number", type=int, help="[TESTING] Start N brokers in one process." ) args = parser.parse_args() - _broker_id_to_pass = 0 + config_manager = ConfigManager(args, component_type="broker") + _broker_id_to_pass = 0 if args.broker_number: if args.broker_number <= 0: - print_usage(parser, "--broker-number must be greater than 0.") - - args.mode = "cluster" - _broker_id_to_pass = 0 - + sys.exit("Error: --broker-number must be greater than 0.") else: - if args.mode == "cluster" and args.broker_id is None: - print_usage(parser, "--broker_id is required in 'cluster' mode.") - - _broker_id_to_pass = args.broker_id if args.broker_id is not None else 0 - - config_path = pathlib.Path(args.config) - if not config_path.is_file(): - print(f"Error: Config file not found at {config_path}") - sys.exit(1) + _id = args.broker_id or config_manager.broker_config.get("id") + if _id is None: + sys.exit("Error: --broker-id is required (or set 'id' in [broker] config)") + _broker_id_to_pass = int(_id) # type: ignore try: asyncio.run( main( + config=config_manager, _broker_id=_broker_id_to_pass, - mode=args.mode, - config=str(config_path), broker_number=args.broker_number, ) ) diff --git a/tinystream/client/admin.py b/tinystream/client/admin.py index 2034d64..c3aaab7 100644 --- a/tinystream/client/admin.py +++ b/tinystream/client/admin.py @@ -10,11 +10,11 @@ class AdminClient: Connects to the Controller (or a single-mode Broker) via its HTTP API. """ - def __init__(self, controller_addr: str): - if not controller_addr.startswith(("http://", "https://")): - self.controller_addr = f"http://{controller_addr}" + def __init__(self, metastore_api_address: str): + if not metastore_api_address.startswith(("http://", "https://")): + self.controller_addr = f"http://{metastore_api_address}" else: - self.controller_addr = controller_addr + self.controller_addr = metastore_api_address self.api_base = f"{self.controller_addr}/api/v1/admin" @@ -32,27 +32,25 @@ async def _handle_response(response: httpx.Response, success_msg: str): except httpx.HTTPStatusError as e: try: error_data = e.response.json() - print(f"Error: {error_data}", file=sys.stderr) + print(f"Error: {error_data}") except Exception: - print( - f"HTTP Error: {e.response.status_code} - {e.response.text}", - file=sys.stderr, - ) + print(f"HTTP Error: {e.response.status_code} - {e.response.text}") except httpx.RequestError as e: - print( - f"Connection Error: Failed to connect to {e.request.url}.", - file=sys.stderr, - ) - print( - "Please ensure the Controller is running and the address is correct.", - file=sys.stderr, - ) + print(f"Connection Error: Failed to connect to {e.request.url}.") + print("Please ensure the Controller is running and the address is correct.") except Exception as e: - print(f"An unexpected error occurred: {e}", file=sys.stderr) + print(f"An unexpected error occurred: {e}") return None - async def create_topic(self, name: str, partitions: int, replication_factor: int): + async def create_topic( + self, + name: str, + partitions: int, + replication_factor: int, + retention_ms: int = 604800000, + retention_bytes: int = -1, + ): """ Sends a request to the controller to create a new topic. """ @@ -64,6 +62,8 @@ async def create_topic(self, name: str, partitions: int, replication_factor: int "topic_name": name, "partition_count": partitions, "replication_factor": replication_factor, + "retention_ms": retention_ms, + "retention_bytes": retention_bytes, } try: @@ -74,7 +74,7 @@ async def create_topic(self, name: str, partitions: int, replication_factor: int f"Connection Error: Could not connect to controller at {self.controller_addr}", file=sys.stderr, ) - print("Please ensure the controller is running.", file=sys.stderr) + print("Please ensure the controller is running.") sys.exit(1) async def list_topics(self): @@ -116,7 +116,7 @@ async def describe_cluster(self): print("\nRegistered Brokers:") for broker_id, info in brokers.items(): - status = "ALIVE" if info.get("is_alive") else "DEAD" + status = info.get("status") print( f" - Broker {broker_id} ({info.get('host')}:{info.get('port')}) - {status}" ) @@ -134,7 +134,7 @@ async def main(): parser = argparse.ArgumentParser(description="TinyStream Admin Client") parser.add_argument( - "--controller", + "--metastore", default="localhost:3200", help="Controller address (e.g., localhost:6000). (Default: %(default)s)", ) @@ -151,6 +151,15 @@ async def main(): create_parser.add_argument( "--replication-factor", required=True, type=int, help="Replication factor" ) + create_parser.add_argument( + "--retention-ms", + type=int, + default=1800, + help="Retention time in milliseconds", + ) + create_parser.add_argument( + "--retention-bytes", type=int, default=10000, help="Retention size in bytes" + ) subparsers.add_parser("list-topics", help="List all topics in the cluster") @@ -158,7 +167,7 @@ async def main(): args = parser.parse_args() - client = AdminClient(controller_addr=args.controller) + client = AdminClient(metastore_api_address=args.metastore) try: if args.action == "create-topic": @@ -166,20 +175,22 @@ async def main(): name=args.name, partitions=args.partitions, replication_factor=args.replication_factor, + retention_ms=args.retention_ms, + retention_bytes=args.retention_bytes, ) elif args.action == "list-topics": await client.list_topics() elif args.action == "describe-cluster": await client.describe_cluster() else: - print(f"Unknown action: {args.action}", file=sys.stderr) + print(f"Unknown action: {args.action}") parser.print_help() sys.exit(1) await client.close() except Exception as e: - print(f"\nAn unexpected critical error occurred: {e}", file=sys.stderr) + print(f"\nAn unexpected critical error occurred: {e}") sys.exit(1) finally: await client.close() diff --git a/tinystream/client/connection.py b/tinystream/client/connection.py index b9c5bf7..8341e40 100644 --- a/tinystream/client/connection.py +++ b/tinystream/client/connection.py @@ -43,7 +43,7 @@ async def _connect(self) -> None: if self.is_connected: return - print(f"Connecting to controller at {self.host}:{self.port}...") + print(f"Connecting to broker at {self.host}:{self.port}...") try: self._reader, self._writer = await asyncio.open_connection( self.host, self.port diff --git a/tinystream/client/consumer.py b/tinystream/client/consumer.py index ee45ec8..6fb86cc 100644 --- a/tinystream/client/consumer.py +++ b/tinystream/client/consumer.py @@ -1,227 +1,63 @@ import asyncio import uuid from typing import Any, Dict, List, Tuple, Optional - -from tinystream import DEFAULT_CONFIG_PATH -from tinystream.client.connection import TinyStreamAPI -from tinystream.config.parser import TinyStreamConfig +import argparse +from tinystream.cluster_manager import ClusterManager +from tinystream.config.manager import ConfigManager from tinystream.serializer.base import AbstractSerializer from tinystream.utils.serlializer import init_serializer +from tinystream.utils.env import env_default class Consumer: - """ - A stateful consumer that tracks its own offsets for assigned partitions. - Supports both "single" broker and "cluster" (controller-aware) modes. - - """ - def __init__( self, group_id: str, - config: TinyStreamConfig, + config: ConfigManager, ) -> None: self.group_id = group_id self.config = config - self.mode = self.config.mode - serializer_config = config.get_serialization_config() + serializer_config = config.serialization self.serializer: AbstractSerializer = init_serializer( serializer_config.get("type", "messagepack") ) - if self.mode == "cluster": - controller_config = config.get_controller_config() - self.controller_host = controller_config.get("host") - self.controller_port = controller_config.get("port") - self.controller_connection: Optional[TinyStreamAPI] = TinyStreamAPI( - self.controller_host, # type: ignore - self.controller_port, # type: ignore - serializer=self.serializer, # type: ignore - ) - - self._topic_metadata_cache: Dict[ - str, Dict[int, Any] - ] = {} # {topic -> {part_id -> {leader, replicas}}} - self._broker_info_cache: Dict[int, Any] = {} # {broker_id -> {host, port}} - self._broker_connections: Dict[ - Tuple[str, int], TinyStreamAPI - ] = {} # {(host, port) -> connection} - self._metadata_lock = asyncio.Lock() + self.cluster_manager = ClusterManager( + config=config, + serializer=self.serializer, + ) - else: - self.mode = "single" - broker_config = config.get_broker_config() - broker_host = broker_config.get("host") - broker_port = broker_config.get("port") - self._single_broker_connection = TinyStreamAPI( - broker_host, # type: ignore - broker_port, # type: ignore - serializer=self.serializer, # type: ignore - ) - - # { (topic, partition_id): next_offset } self._assignments: Dict[Tuple[str, int], int] = {} - self._hwms: Dict[Tuple[str, int], int] = {} async def connect(self) -> None: - """Explicitly connects to the controller or the single broker.""" - if self.mode == "cluster": - print("[Consumer] (Cluster Mode): Connecting to controller...") - await self.controller_connection.ensure_connected() # type: ignore - await self.refresh_cluster_metadata() - - elif self.mode == "single": - print("[Consumer] (Single Mode): Connecting to broker...") - await self._single_broker_connection.ensure_connected() + """Explicitly connects to the controller.""" + print("[Consumer]: Connecting...") + await self.cluster_manager.connect() async def close(self) -> None: """Closes all active connections.""" print("Closing consumer connections...") - if self.mode == "cluster": - if self.controller_connection: - await self.controller_connection.close() - for conn in self._broker_connections.values(): - await conn.close() - - elif self.mode == "single": - if self._single_broker_connection: - await self._single_broker_connection.close() + await self.cluster_manager.close() async def is_connected(self) -> bool: """Checks if the consumer is connected.""" - if self.mode == "cluster": - if not self.controller_connection.is_connected: # type: ignore - return False - for conn in self._broker_connections.values(): - if not conn.is_connected: - return False - return True - - elif self.mode == "single": - return self._single_broker_connection.is_connected - - return False + return await self.cluster_manager.is_connected() def assign(self, topic: str, partition: int = 0, start_offset: int = 0) -> None: - """ - Assigns this consumer to a specific partition, starting from a given - offset. Call this *before* polling. - """ key = (topic, partition) self._assignments[key] = start_offset self._hwms[key] = 0 print(f"[Consumer] assigned to {topic}-{partition} at offset {start_offset}") - async def refresh_cluster_metadata(self) -> None: - """Fetches the latest metadata from the Controller.""" - if self.mode != "cluster" or not self.controller_connection: - return - - print("[Consumer] Refreshing cluster metadata...") - async with self._metadata_lock: - try: - resp = await self.controller_connection.send_request( - {"command": "get_cluster_metadata"} - ) - if resp.get("status") == "ok": - metadata = resp.get("metadata", {}) - self._topic_metadata_cache = metadata.get("partitions", {}) - self._broker_info_cache = metadata.get("brokers", {}) - - for conn in self._broker_connections.values(): - await conn.close() - self._broker_connections.clear() - print("[Consumer] Metadata refreshed.") - else: - print(f"Failed to refresh metadata: {resp.get('message')}") - except Exception as e: - print(f"Error refreshing metadata: {e}") - - async def invalidate_caches(self, conn: Optional[TinyStreamAPI] = None) -> None: - """Invalidates metadata and connection caches, forcing a refresh.""" - print("[Consumer] Invalidating metadata caches...") - async with self._metadata_lock: - self._topic_metadata_cache.clear() - self._broker_info_cache.clear() - - if conn: - key_to_remove = None - for k, v in self._broker_connections.items(): - if v == conn: - key_to_remove = k - break - if key_to_remove: - await self._broker_connections[key_to_remove].close() - del self._broker_connections[key_to_remove] - else: - for c in self._broker_connections.values(): - await c.close() - self._broker_connections.clear() - - async def get_leader_connection(self, topic: str, partition: int) -> TinyStreamAPI: - """ - Gets the correct broker connection for a partition leader. - Handles metadata refresh and connection caching. - """ - if self.mode == "single": - await self._single_broker_connection.ensure_connected() # type: ignore - return self._single_broker_connection - - async with self._metadata_lock: - topic_info = self._topic_metadata_cache.get(topic) - if not topic_info: - print(f"No metadata for topic '{topic}', refreshing...") - await self.refresh_cluster_metadata() - topic_info = self._topic_metadata_cache.get(topic) - - if not topic_info: - raise Exception(f"Topic not found after refresh: {topic}") - - part_info = topic_info.get(partition) - if not part_info: - part_info = topic_info.get(partition) - - if not part_info: - raise Exception( - f"Partition not found after refresh: {topic}-{partition}" - ) - - leader_id = part_info.get("leader") - if leader_id is None: - raise Exception(f"No leader for partition: {topic}-{partition}") - - broker_info = self._broker_info_cache.get(leader_id) - if not broker_info: - raise Exception(f"Broker info not found for leader ID: {leader_id}") - - broker_host = broker_info.get("host") - broker_port = broker_info.get("data_port", broker_info.get("port")) - - conn_key = (broker_host, broker_port) - if conn_key not in self._broker_connections: - print( - f"Connecting to new broker for {topic}-{partition}: {broker_host}:{broker_port}" - ) - conn = TinyStreamAPI( - broker_host, # type: ignore - broker_port, # type: ignore - serializer=self.serializer, # type: ignore - ) - self._broker_connections[conn_key] = conn - - conn = self._broker_connections[conn_key] - await conn.ensure_connected() - return conn - async def _update_high_watermarks(self) -> None: """ Asks the correct broker for the latest HWM for all assigned partitions. """ for topic, part in self._assignments.keys(): try: - conn = await self.get_leader_connection(topic, part) + conn = await self.cluster_manager.get_leader_connection(topic, part) resp = await conn.send_request( {"command": "get_hwm", "topic": topic, "partition": part} @@ -232,13 +68,11 @@ async def _update_high_watermarks(self) -> None: print( f"Failed to get HWM for {topic}-{part}: {resp.get('message')}" ) - if self.mode == "cluster": - await self.invalidate_caches(conn) + await self.cluster_manager.invalidate_caches(conn) except Exception as e: print(f"Failed to get HWM for {topic}-{part}: {e}") - if self.mode == "cluster": - await self.invalidate_caches() + await self.cluster_manager.invalidate_caches() async def poll(self, max_messages: int = 100) -> List[Dict[str, Any]]: results: List[Dict[str, Any]] = [] @@ -256,7 +90,9 @@ async def poll(self, max_messages: int = 100) -> List[Dict[str, Any]]: if next_offset < hwm: try: - conn = await self.get_leader_connection(topic, part) + conn = await self.cluster_manager.get_leader_connection( + topic, part + ) resp = await conn.send_request( { @@ -276,14 +112,12 @@ async def poll(self, max_messages: int = 100) -> List[Dict[str, Any]]: f"Read error on {topic}-{part} at {next_offset}: {resp.get('message')}" ) self._hwms[(topic, part)] = 0 - if self.mode == "cluster": - await self.invalidate_caches(conn) + await self.cluster_manager.invalidate_caches(conn) except Exception as e: print(f"Failed to read from {topic}-{part}: {e}") self._hwms[(topic, part)] = 0 - if self.mode == "cluster": - await self.invalidate_caches() + await self.cluster_manager.invalidate_caches() if messages_polled_this_round == 0: break @@ -292,8 +126,7 @@ async def poll(self, max_messages: int = 100) -> List[Dict[str, Any]]: async def commit(self) -> List[Dict[str, Any]]: """ - Commits the current tracked offsets for all assigned partitions - to the correct broker (leader or single). + Commits the current tracked offsets for all assigned partitions. """ print(f"Committing offsets for group '{self.group_id}'...") commit_tasks = [] @@ -318,36 +151,27 @@ async def _send_commit_request( ) -> Dict[str, Any]: """Helper to send a commit request to the correct broker.""" try: - conn = await self.get_leader_connection(topic, partition) + conn = await self.cluster_manager.get_leader_connection(topic, partition) return await conn.send_request(request) except Exception as e: print(f"Failed to commit for {topic}-{partition}: {e}") - if self.mode == "cluster": - await self.invalidate_caches() + await self.cluster_manager.invalidate_caches() return {"status": "error", "message": str(e)} async def main( - config_path: Optional[str] = DEFAULT_CONFIG_PATH, - topic: Optional[str] = None, + config: ConfigManager, + topic: str, group_id: Optional[str] = None, ) -> None: - config = TinyStreamConfig.from_ini(config_path or DEFAULT_CONFIG_PATH) # type: ignore - - mode = config.mode - print(f"[Consumer] Starting in '{mode}' mode (from config).") + print("[Consumer] Starting in cluster mode (from config).") if not group_id: print("[Consumer] No group_id provided, generating a random one.") group_id = f"consumer-{uuid.uuid4()}" - if not topic: - print( - "[Consumer] No topic provided. Will randomly pick between available test topics." - ) - topic = "clicks" consumer = Consumer( - config=config, # type: ignore + config=config, group_id=group_id, ) @@ -369,7 +193,10 @@ async def main( if batch: print("--- Received batch ---") for msg in batch: - print(f"Received: {msg}") + try: + print(f"Received: {msg.decode('utf-8')}") + except: + print(f"Received (raw): {msg}") print("----------------------") await consumer.commit() else: @@ -378,15 +205,12 @@ async def main( except ConnectionRefusedError: print("\n[ERROR] Could not connect to broker or controller.") print("Please ensure the broker/controller is running.") - except KeyboardInterrupt: print("\n\nStopping consumer... (Ctrl+C pressed)") - except Exception as e: print(f"\nAn error occurred: {e}") - finally: - if await consumer.is_connected(): # type: ignore + if await consumer.is_connected(): await consumer.close() print("Consumer connection closed.") else: @@ -394,24 +218,39 @@ async def main( if __name__ == "__main__": - import argparse - parser = argparse.ArgumentParser(description="TinyStream Consumer") + + parser.add_argument( + "--controller-uri", + type=str, + default=env_default("TINYSTREAM_CONTROLLER_URI"), + help="Controller RPC URI (e.g., localhost:9093). Overrides config.", + ) + parser.add_argument( + "--metastore-uri", + type=str, + default=env_default("TINYSTREAM_METASTORE_URI"), + help="Metastore HTTP API URI. Overrides config.", + ) + parser.add_argument( "--config", type=str, - default=DEFAULT_CONFIG_PATH, - help="Path to TinyStream configuration file", + default=env_default("TINYSTREAM_CONFIG"), + help="Path to a user config file. Overrides default config.", ) + parser.add_argument( "--topic", required=True, type=str, help="Topic to consume from" ) parser.add_argument("--group_id", type=str, help="Consumer group ID") + args = parser.parse_args() + config_manager = ConfigManager(args, component_type="broker") asyncio.run( main( - config_path=args.config, + config=config_manager, group_id=args.group_id, topic=args.topic, ) diff --git a/tinystream/client/producer.py b/tinystream/client/producer.py index fab979f..5808a3d 100644 --- a/tinystream/client/producer.py +++ b/tinystream/client/producer.py @@ -1,109 +1,43 @@ import asyncio import hashlib -from typing import Any, Optional, Dict, Tuple +import random +import string +import argparse +from typing import Any, Optional, Dict -from tinystream import DEFAULT_CONFIG_PATH -from tinystream.client.connection import TinyStreamAPI from tinystream.cluster_manager import ClusterManager -from tinystream.config.parser import TinyStreamConfig +from tinystream.config.manager import ConfigManager from tinystream.serializer.base import AbstractSerializer from tinystream.utils.serlializer import init_serializer +from tinystream.utils.env import env_default class Producer: - """ - Manages connections and provides a `send` method. - Supports two modes: - 1. "single": Connects directly to a single broker. - 2. "cluster": Connects to a controller to discover leader brokers. - """ - - def __init__(self, config: TinyStreamConfig) -> None: + def __init__(self, config: ConfigManager) -> None: self.config = config - self.mode = config.mode - - self.serializer_config = config.get_serialization_config() + self.serializer_config = config.serialization self.serializer: AbstractSerializer = init_serializer( self.serializer_config.get("type", "messagepack") ) - if self.mode == "cluster": - controller_config = config.get_controller_config() - self.controller_host = controller_config.get("host") - self.controller_port = controller_config.get("port") - self._controller_connection: Optional[TinyStreamAPI] = TinyStreamAPI( - self.controller_host, # type: ignore - self.controller_port, # type: ignore - serializer=self.serializer, # type: ignore - ) - - self._topic_metadata_cache: Dict[str, Dict[int, Any]] = {} - self._broker_info_cache: Dict[int, Any] = {} - self._broker_connections: Dict[Tuple[str, int], TinyStreamAPI] = {} - self._metadata_lock = asyncio.Lock() - self.cluster_manager = ClusterManager( - self.mode, - self._topic_metadata_cache, - self._broker_info_cache, - self._broker_connections, - self.serializer, - self._metadata_lock, - self._controller_connection, - ) - - else: - self.mode = "single" - broker_config = config.get_broker_config() - broker_host = broker_config.get("host") - broker_port = broker_config.get("port") - self._single_broker_connection = TinyStreamAPI( - broker_host, # type: ignore - broker_port, # type: ignore - serializer=self.serializer, # type: ignore - ) - self._default_partition_count = 1 + self.cluster_manager = ClusterManager( + config=config, + serializer=self.serializer, + ) async def is_connected(self) -> bool: - if self.mode == "cluster": - if ( - not self._controller_connection - or not self._controller_connection.is_connected - ): - return False - for conn in self._broker_connections.values(): - if not conn.is_connected: - return False - return True - else: - return self._single_broker_connection.is_connected + return await self.cluster_manager.is_connected() async def connect(self) -> None: - """Explicitly connects to the controller or the single broker.""" - if self.mode == "cluster": - print("[Producer] (Cluster Mode): Connecting to controller...") - await self._controller_connection.ensure_connected() # type: ignore - await self.cluster_manager.refresh_cluster_metadata() - - elif self.mode == "single": - print("[Producer] (Single Mode): Connecting to broker...") - await self._single_broker_connection.ensure_connected() + """Explicitly connects to the controller.""" + print("[Producer]: Connecting...") + await self.cluster_manager.connect() async def close(self) -> None: """Closes all active connections.""" print("Closing producer connections...") - if self.mode == "cluster": - if self._controller_connection: - await self._controller_connection.close() - for conn in self._broker_connections.values(): - await conn.close() - self._broker_connections.clear() - self._topic_metadata_cache.clear() - self._broker_info_cache.clear() - - elif self.mode == "single": - if self._single_broker_connection: - await self._single_broker_connection.close() + await self.cluster_manager.close() @staticmethod def _get_partition_id(key: Optional[bytes], partition_count: int) -> int: @@ -119,24 +53,15 @@ def _get_partition_id(key: Optional[bytes], partition_count: int) -> int: return hash_int % partition_count async def _get_partition_count(self, topic: str) -> int: - """Gets partition count for a topic based on mode.""" - if self.mode == "cluster": - if topic not in self._topic_metadata_cache: - await self.cluster_manager.refresh_cluster_metadata() - - topic_partitions = self.cluster_manager._topic_metadata_cache.get(topic) - if not topic_partitions: - raise ValueError(f"Topic '{topic}' not found in cluster metadata.") - return len(topic_partitions) - else: - return self._default_partition_count + """Gets partition count for a topic from the cluster.""" + topic_partitions = await self.cluster_manager.get_topic_metadata(topic) + return len(topic_partitions) async def send( self, topic: str, data: Any, key: Optional[str] = None, retries: int = 3 ) -> Dict[str, Any]: """ Sends a message to a topic. - Behavior depends on the producer's mode (single vs. cluster). """ key_bytes = key.encode("utf-8") if key else None @@ -149,49 +74,18 @@ async def send( "data": data, } - if self.mode == "cluster": - return await self._send_cluster(request, retries) - else: - return await self._send_single(request) + return await self._send_cluster(request, retries) async def create_topic( self, topic: str, partition_count: int, replication_factor: int = 1 ) -> Dict[str, Any]: """ Creates a new topic in the cluster. - Only applicable in cluster mode. """ - - request = { - "command": "create_topic", - "topic": topic, - "partition_count": partition_count, - "replication_factor": replication_factor, - } - - await self._controller_connection.ensure_connected() # type: ignore - response = await self._controller_connection.send_request( # type: ignore - request + return await self.cluster_manager.create_topic( + topic, partition_count, replication_factor ) - if response.get("status") == "ok" and self.mode == "cluster": - await self.cluster_manager.refresh_cluster_metadata() - - return response - - async def _send_single(self, request: Dict[str, Any]) -> Dict[str, Any]: - """Handles sending in single-broker mode.""" - try: - await self._single_broker_connection.ensure_connected() - response = await self._single_broker_connection.send_request(request) - return response - except (ConnectionError, asyncio.TimeoutError) as e: - print(f"[Producer] (Single Mode): Connection error: {e}") - raise e - except Exception as e: - print(f"[Producer] (Single Mode): Unexpected error: {e}") - raise e - async def _send_cluster( self, request: Dict[str, Any], retries: int ) -> Dict[str, Any]: @@ -229,25 +123,34 @@ async def _send_cluster( ) -async def main( - config: Optional[str] = DEFAULT_CONFIG_PATH, mode: str = "single" -) -> None: - config = TinyStreamConfig.from_ini(config or DEFAULT_CONFIG_PATH) # type: ignore - config.mode = mode # type: ignore - producer = Producer(config=config) # type: ignore +async def main(config: ConfigManager) -> None: + producer = Producer(config=config) - dummy_events = [{"user": "alice", "action": "clicks", "item": "item_A"}] + def _random_payload(size_bytes: int = 5_000) -> str: + return "".join( + random.choices(string.ascii_letters + string.digits, k=size_bytes) + ) + + dummy_events = [ + { + "user": f"user_{i}", + "action": "clicks", + "item": "item_A", + # "payload": _random_payload(), + } + for i in range(1) + ] try: await producer.connect() - print("Producer connected. Sending messages... (Press Ctrl+C to stop)") + print("Producer connected in cluster mode. Sending... (Press Ctrl+C to stop)") message_count = 0 while True: event = dummy_events[message_count % len(dummy_events)] event["message_id"] = str(message_count) - print(f"Sending: {event}") + print(f"Sending: {event['message_id']}") response = await producer.send( topic=event["action"], data=event, key=event["user"] ) @@ -256,9 +159,7 @@ async def main( message_count += 1 await asyncio.sleep(1) except ConnectionRefusedError: - print("\n[ERROR] Could not connect to broker.") - print("Please ensure the broker is running in another terminal:") - print(" python -m tinystream.broker") + print("\n[ERROR] Could not connect to controller.") except KeyboardInterrupt: print("\n\nStopping producer... (Ctrl+C pressed)") except Exception as e: @@ -272,25 +173,30 @@ async def main( if __name__ == "__main__": - import sys - import argparse - - def print_usage(): - print( - "Usage: python -m tinystream.client.producer --config --mode " - ) - sys.exit(1) - parser = argparse.ArgumentParser(description="TinyStream Producer") + parser.add_argument( - "--config", + "--controller-uri", type=str, - default=DEFAULT_CONFIG_PATH, - help="Path to TinyStream configuration file", + default=env_default("TINYSTREAM_CONTROLLER_URI")(), + help="Controller RPC URI (e.g., localhost:9093). Overrides config.", ) parser.add_argument( - "--mode", type=str, default="single", choices=["single", "cluster"] + "--metastore-uri", + type=str, + default=env_default("TINYSTREAM_METASTORE_URI")(), + help="Metastore HTTP API URI. Overrides config.", ) + + parser.add_argument( + "--config", + type=str, + default=env_default("TINYSTREAM_CONFIG")(), + help="Path to a user config file. Overrides default config.", + ) + args = parser.parse_args() - config_path = args.config - asyncio.run(main(config=config_path, mode=args.mode)) + + config_manager = ConfigManager(args, component_type="broker") + + asyncio.run(main(config=config_manager)) diff --git a/tinystream/client/topic_manager.py b/tinystream/client/topic_manager.py index 42ceaa2..904570b 100644 --- a/tinystream/client/topic_manager.py +++ b/tinystream/client/topic_manager.py @@ -1,6 +1,5 @@ import json from typing import Dict - from tinystream.models import PartitionMetadata, TopicMetadata, BrokerInfo @@ -17,7 +16,14 @@ def __init__( self.topics = topics self._lock = lock - async def create_topic(self, name: str, partitions: int, replication_factor: int): + async def create_topic( + self, + name: str, + partitions: int, + replication_factor: int, + retention_ms: int = 1800, + retention_bytes: int = 10000, + ): if not self.db_connection: raise Exception("Database not connected") @@ -28,7 +34,12 @@ async def create_topic(self, name: str, partitions: int, replication_factor: int f"to satisfy replication factor ({replication_factor})." ) - new_topic_metadata = TopicMetadata(name=name, partitions={}) + new_topic_metadata = TopicMetadata( + name=name, + partitions={}, + retention_ms=retention_ms, + retention_bytes=retention_bytes, + ) partition_data_to_insert = [] for partition_id in range(partitions): @@ -38,7 +49,6 @@ async def create_topic(self, name: str, partitions: int, replication_factor: int replicas.append(brokers_alive[broker_index]) leader = replicas[0] - replicas_json = json.dumps(replicas) partition_metadata = PartitionMetadata( @@ -54,8 +64,18 @@ async def create_topic(self, name: str, partitions: int, replication_factor: int try: await self.db_connection.execute( - "INSERT INTO topics (topic_name, partition_count, replication_factor) VALUES (?, ?, ?)", - (name, partitions, replication_factor), + """ + INSERT INTO topics (topic_name, partition_count, replication_factor, + retention_ms, retention_bytes) + VALUES (?, ?, ?, ?, ?) + """, + ( + name, + partitions, + replication_factor, + retention_ms, + retention_bytes, + ), ) await self.db_connection.executemany( diff --git a/tinystream/cluster_manager.py b/tinystream/cluster_manager.py index fc73d0f..9661c2c 100644 --- a/tinystream/cluster_manager.py +++ b/tinystream/cluster_manager.py @@ -2,26 +2,66 @@ from typing import Dict, Any, Tuple, Optional from tinystream.client.connection import TinyStreamAPI from tinystream.serializer.base import AbstractSerializer +from tinystream.config.manager import ConfigManager # Added class ClusterManager: + """ + A self-contained manager for cluster metadata, broker connections, + and leader discovery. + """ + def __init__( self, - mode: str, - topic_metadata_cache: Dict[str, Dict[int, Any]], - broker_info_cache: Dict[int, Any], - broker_connections: Dict[Tuple[str, int], TinyStreamAPI], + config: ConfigManager, serializer: AbstractSerializer, - metadata_lock: asyncio.Lock, - controller_connection: TinyStreamAPI, ) -> None: - self.mode = mode - self._topic_metadata_cache = topic_metadata_cache - self._broker_info_cache = broker_info_cache - self._broker_connections = broker_connections + self._topic_metadata_cache: Dict[str, Dict[int, Any]] = {} + self._broker_info_cache: Dict[int, Any] = {} + self._broker_connections: Dict[Tuple[str, int], TinyStreamAPI] = {} self.serializer = serializer - self._metadata_lock = metadata_lock - self._controller_connection = controller_connection + self._metadata_lock = asyncio.Lock() + + controller_config = config.controller_config + self._controller_connection = TinyStreamAPI( + controller_config.get("host"), + int(controller_config.get("port")), + serializer=self.serializer, + ) + + async def connect(self) -> None: + """Connects to the controller and performs initial metadata fetch.""" + print("[ClusterManager]: Connecting to controller...") + await self._controller_connection.ensure_connected() + await self.refresh_cluster_metadata() + + async def close(self) -> None: + """Closes all connections.""" + print("[ClusterManager]: Closing all connections...") + if self._controller_connection: + await self._controller_connection.close() + for conn in self._broker_connections.values(): + await conn.close() + self._broker_connections.clear() + self._topic_metadata_cache.clear() + self._broker_info_cache.clear() + + async def is_connected(self) -> bool: + """Checks if the connection to the controller is active.""" + return self._controller_connection.is_connected + + async def get_topic_metadata(self, topic: str) -> Dict[int, Any]: + """ + Gets the partition info for a topic, refreshing if not in cache. + """ + async with self._metadata_lock: + topic_partitions = self._topic_metadata_cache.get(topic) + if not topic_partitions: + await self._do_refresh() + topic_partitions = self._topic_metadata_cache.get(topic) + if not topic_partitions: + raise ValueError(f"Topic '{topic}' not found after refresh.") + return topic_partitions async def get_leader_connection( self, topic: str, partition_id: int @@ -30,79 +70,110 @@ async def get_leader_connection( Gets an active connection to the leader broker for a given partition. Handles cache misses by querying the controller. """ - if self.mode != "cluster": - raise RuntimeError("Cannot get leader connection in single-broker mode.") - - topic_partitions = self._topic_metadata_cache.get(topic) - if not topic_partitions: - await self.refresh_cluster_metadata() + async with self._metadata_lock: topic_partitions = self._topic_metadata_cache.get(topic) if not topic_partitions: - raise ValueError(f"Topic '{topic}' not found after refresh.") + await self._do_refresh() + topic_partitions = self._topic_metadata_cache.get(topic) + if not topic_partitions: + raise ValueError(f"Topic '{topic}' not found after refresh.") - partition_info = topic_partitions.get(partition_id) - if not partition_info: - raise ValueError(f"Partition {partition_id} for topic '{topic}' not found.") + partition_info = topic_partitions.get( + partition_id, topic_partitions.get(partition_id) + ) + if not partition_info: + raise ValueError( + f"Partition {partition_id} for topic '{topic}' not found." + ) - leader_id = partition_info.get("leader") - if leader_id is None: - raise ConnectionError(f"No leader available for {topic}-{partition_id}.") + leader_id = partition_info.get("leader") + if leader_id is None: + raise ConnectionError( + f"No leader available for {topic}-{partition_id}." + ) - broker_info = self._broker_info_cache.get(leader_id) - if not broker_info: - await self.refresh_cluster_metadata() - broker_info = self._broker_info_cache.get(leader_id) + broker_info = self._broker_info_cache.get( + leader_id, self._broker_info_cache.get(leader_id) + ) if not broker_info: - raise ValueError(f"Broker {leader_id} not found after refresh.") + await self._do_refresh() + broker_info = self._broker_info_cache.get( + leader_id, self._broker_info_cache.get(leader_id) + ) + if not broker_info: + raise ValueError(f"Broker {leader_id} not found after refresh.") - host, port = broker_info["host"], broker_info["port"] + host = broker_info["host"] + port = int(broker_info.get("data_port", broker_info["port"])) - conn = self._broker_connections.get((host, port)) - if not conn or not conn.is_connected: - print( - f"ClusterManager: Creating new connection to leader {leader_id} at {host}:{port}" - ) - conn = TinyStreamAPI(host, port, serializer=self.serializer) - self._broker_connections[(host, port)] = conn + conn_key = (host, port) + conn = self._broker_connections.get(conn_key) + + if not conn or not conn.is_connected: + print( + f"[ClusterManager]: Creating new connection to leader {leader_id} at {host}:{port}" + ) + conn = TinyStreamAPI(host, port, serializer=self.serializer) + self._broker_connections[conn_key] = conn await conn.ensure_connected() return conn async def invalidate_caches(self, connection: Optional[TinyStreamAPI] = None): """Invalidates all metadata and optionally closes a bad connection.""" - if self.mode != "cluster": - return - - print("ClusterManager: Invalidating caches due to error.") - - self._topic_metadata_cache.clear() - self._broker_info_cache.clear() - - if connection: - conn_key = (connection.host, connection.port) - if conn_key in self._broker_connections: - await self._broker_connections.pop(conn_key).close() - - await self.refresh_cluster_metadata() - - async def refresh_cluster_metadata(self) -> None: - if self.mode != "cluster": - return - + print("[ClusterManager]: Invalidating caches due to error.") async with self._metadata_lock: - try: - await self._controller_connection.ensure_connected() # type: ignore - response = await self._controller_connection.send_request( # type: ignore - {"command": "get_cluster_metadata"} + self._topic_metadata_cache.clear() + self._broker_info_cache.clear() + + if connection: + conn_key = (connection.host, connection.port) + if conn_key in self._broker_connections: + await self._broker_connections.pop(conn_key).close() + + await self._do_refresh() + + async def refresh_cluster_metadata(self, lock_acquired: bool = False): + """Public method to refresh, acquiring the lock if needed.""" + if lock_acquired: + await self._do_refresh() + else: + async with self._metadata_lock: + await self._do_refresh() + + async def _do_refresh(self) -> None: + """The actual refresh logic, assumes lock is held.""" + try: + await self._controller_connection.ensure_connected() + response = await self._controller_connection.send_request( + {"command": "get_cluster_metadata"} + ) + if response.get("status") == "ok": + metadata = response["metadata"] + self._broker_info_cache = metadata.get("brokers", {}) + self._topic_metadata_cache = metadata.get("partitions", {}) + print("[ClusterManager]: Metadata refreshed.") + else: + print( + f"[ClusterManager]: Failed to refresh metadata: {response.get('message')}" ) - if response.get("status") == "ok": - metadata = response["metadata"] - self._broker_info_cache = metadata.get("brokers", {}) - self._topic_metadata_cache = metadata.get("partitions", {}) - print("[ClusterManager]: Metadata refreshed.") - else: - print( - f"[ClusterManager]: Failed to refresh metadata: {response.get('message')}" - ) - except Exception as e: - print(f"[ClusterManager]: Error refreshing metadata: {e}") + except Exception as e: + print(f"[ClusterManager]: Error refreshing metadata: {e}") + + async def create_topic( + self, topic: str, partition_count: int, replication_factor: int = 1 + ) -> Dict[str, Any]: + """Sends a create_topic request to the controller.""" + request = { + "command": "create_topic", + "topic": topic, + "partition_count": partition_count, + "replication_factor": replication_factor, + } + await self._controller_connection.ensure_connected() + response = await self._controller_connection.send_request(request) + + if response.get("status") == "ok": + await self.refresh_cluster_metadata() + + return response diff --git a/tinystream/config/__init__.py b/tinystream/config/__init__.py index 9872b7d..16b0664 100644 --- a/tinystream/config/__init__.py +++ b/tinystream/config/__init__.py @@ -1,3 +1,3 @@ -from tinystream.config.parser import TinyStreamConfig +from tinystream.config.manager import ConfigManager -__all__ = ["TinyStreamConfig"] +__all__ = ["ConfigManager"] diff --git a/tinystream/config/broker.ini b/tinystream/config/broker.ini new file mode 100644 index 0000000..143db74 --- /dev/null +++ b/tinystream/config/broker.ini @@ -0,0 +1,7 @@ +[broker] +partition_log_path = ./data/tinystream/log_partitions +partition_type = segmentedlogpartition +storage_type = segmentedfilelogstorage +host = localhost +port = 9095 +prefix_size = 8 diff --git a/tinystream/config/conf.ini b/tinystream/config/conf.ini deleted file mode 100644 index ceacb1b..0000000 --- a/tinystream/config/conf.ini +++ /dev/null @@ -1,25 +0,0 @@ -[default] -mode = cluster - -[serialization] -type = messagepack - -[broker] -partition_log_path = ./data/tinystream/log_partitions -partition_type = singlelogpartition -log_storage_type = filelogstorage -host = localhost -port = 9094 -prefix_size = 8 -byte_order = little - -[controller] -host = localhost -port = 9093 -heartbeat_timeout = 10 -prefix_size = 8 -byte_order = little - -[metastore] -db_path = ./data/metastore/tinystream.meta.db -http_port = 3200 diff --git a/tinystream/config/controller.ini b/tinystream/config/controller.ini new file mode 100644 index 0000000..b6800f7 --- /dev/null +++ b/tinystream/config/controller.ini @@ -0,0 +1,14 @@ +[serialization] +type = messagepack + +[controller] +host = localhost +port = 9093 +heartbeat_timeout = 10 +prefix_size = 8 +byte_order = little + + +[metastore] +db_path = ./data/metastore/tinystream.meta.db +http_port = 3200 diff --git a/tinystream/config/manager.py b/tinystream/config/manager.py new file mode 100644 index 0000000..3d18ef5 --- /dev/null +++ b/tinystream/config/manager.py @@ -0,0 +1,92 @@ +import argparse +from pathlib import Path +from typing import Dict, Any, Tuple, Literal + +from tinystream import DEFAULT_CONTROLLER_CONFIG_PATH, DEFAULT_BROKER_CONFIG_PATH +from tinystream.config.parser import EnvConfigParser + + +def split_host_port(uri: str, default_host: str, default_port: int) -> Tuple[str, int]: + try: + host, port_str = uri.split(":") + return host, int(port_str) + except (ValueError, TypeError, AttributeError): + return default_host, default_port + + +class ConfigManager: + controller_config: Dict[str, Any] + broker_config: Dict[str, Any] + serialization: Dict[str, Any] + metastore: Dict[str, Any] + + def __init__( + self, args: argparse.Namespace, component_type: Literal["controller", "broker"] + ): + """ + Loads and resolves configuration. + """ + + parser = EnvConfigParser() + + if Path(DEFAULT_CONTROLLER_CONFIG_PATH).is_file(): + print( + f"[ConfigManager] Loading default controller config: {DEFAULT_CONTROLLER_CONFIG_PATH}" + ) + parser.read(DEFAULT_CONTROLLER_CONFIG_PATH) + else: + raise FileNotFoundError( + f"Default controller config not found: {DEFAULT_CONTROLLER_CONFIG_PATH}" + ) + + if component_type == "broker": + if Path(DEFAULT_BROKER_CONFIG_PATH).is_file(): + print( + f"[ConfigManager] Loading default broker config: {DEFAULT_BROKER_CONFIG_PATH}" + ) + parser.read(DEFAULT_BROKER_CONFIG_PATH) + else: + raise FileNotFoundError( + f"Default broker config not found: {DEFAULT_BROKER_CONFIG_PATH}" + ) + + user_config_path = getattr(args, "config", None) + if user_config_path: + if Path(user_config_path).is_file(): # type: ignore + print(f"[ConfigManager] Loading user config: {user_config_path}") + parser.read(user_config_path) # type: ignore + else: + raise FileNotFoundError( + f"User-specified config file not found: {user_config_path}" + ) + + self.controller_config = dict(parser.items("controller")) + self.metastore = dict(parser.items("metastore")) + self.serialization = dict(parser.items("serialization")) + + if component_type == "broker": + self.broker_config = dict(parser.items("broker")) + + if getattr(args, "controller_uri", None): + host, port = split_host_port( + args.controller_uri, + self.controller_config["host"], + int(self.controller_config["port"]), + ) + self.controller_config["host"] = host + self.controller_config["port"] = port + + if getattr(args, "metastore_uri", None): + host, port = split_host_port( + args.metastore_uri, + self.metastore.get("host", "localhost"), + int(self.metastore.get("http_port", 3200)), + ) + self.metastore["host"] = host + self.metastore["http_port"] = port + + if getattr(args, "port", None): + if component_type == "controller": + self.controller_config["port"] = args.port + elif component_type == "broker": + self.broker_config["port"] = args.port diff --git a/tinystream/config/parser.py b/tinystream/config/parser.py index 8213263..5ce144e 100644 --- a/tinystream/config/parser.py +++ b/tinystream/config/parser.py @@ -1,126 +1,25 @@ -import configparser import os -from typing import Optional, Dict, Any, Literal - - -class EnvInterpolation(configparser.BasicInterpolation): - """ - Interpolation class that expands environment variables in config values. - Example: `host = $HOST` will be replaced by the value of the 'HOST' env var. - """ - - def before_get(self, parser, section, option, value, defaults): - value = os.path.expandvars(value) - if "$" in value: - return os.environ.get(value.replace("$", ""), value) - return value +import configparser +from typing import Any class EnvConfigParser(configparser.ConfigParser): - """ - A ConfigParser that automatically uses the EnvInterpolation class. - """ - - def __init__(self, *args, **kwargs): - super().__init__(*args, interpolation=EnvInterpolation(), **kwargs) + """Interpolates environment variables (e.g., $VAR or ${VAR}).""" - -class TinyStreamConfig: - """ - A unified config object for all TinyStream components. - - It can be initialized from an INI file (with env var support) - or a dictionary, and intelligently determines the operation - mode ("single" vs "cluster"). - """ - - controller_config: Dict[str, Any] - broker_config: Dict[str, Any] - serialization: Dict[str, Any] - metastore: Dict[str, Any] - - mode: Literal["single", "cluster"] - - def __init__( + def get( self, - controller_config: Optional[Dict[str, Any]] = None, - broker_config: Optional[Dict[str, Any]] = None, - serialization: Optional[Dict[str, Any]] = None, - metastore: Optional[Dict[str, Any]] = None, - ): - """ - Initializes the config object. - Use .from_ini() or .from_dict() factory methods instead. - """ - self.controller_config = controller_config or {} - self.broker_config = broker_config or {} - self.serialization = serialization or {} - self.metastore = metastore or {} - - if self.controller_config: - self.mode = "cluster" - elif self.broker_config: - self.mode = "single" - else: - raise ValueError( - "Config is empty. Must contain a [controller] or [broker] section." - ) - - @classmethod - def from_ini(cls, file_path: str) -> "TinyStreamConfig": - """ - Creates a TinyStreamConfig object from an INI file. - - This method now uses EnvConfigParser to support - environment variable interpolation. - """ - parser = EnvConfigParser() - - if not parser.read(file_path): - raise FileNotFoundError(f"Config file not found or empty: {file_path}") - - config_dict = {section: dict(parser[section]) for section in parser.sections()} - return cls.from_dict(config_dict) - - @classmethod - def from_dict(cls, config: Dict[str, Any]): - """Creates a TinyStreamConfig object from a dictionary.""" - controller_conf = config.get("controller") - broker_conf = config.get("broker") - serialization = config.get("serialization") - metastore = config.get("metastore") - - return cls( - controller_config=controller_conf, - broker_config=broker_conf, - serialization=serialization, - metastore=metastore, - ) - - @classmethod - def from_default_config_file(cls) -> "TinyStreamConfig": - """Creates a TinyStreamConfig object from the default config file path.""" - default_path = os.getenv("TINYSTREAM_CONFIG_PATH", "tinystream.ini") - return cls.from_ini(default_path) - - def get_controller_config(self) -> Dict[str, Any]: - """Returns the [controller] section. Raises error if in single mode.""" - if self.mode == "single": - raise ValueError("Cannot get controller config in 'single' mode.") - return self.controller_config - - def get_serialization_config(self) -> Dict[str, Any]: - """Returns the [serialization] section, or empty dict if not present.""" - return self.serialization - - def get_broker_config(self) -> Dict[str, Any]: - """ - Returns the [broker] section. - In cluster mode, this might be empty, which is fine. - In single mode, this is the primary config. - """ - return self.broker_config - - def get_metastore_config(self): - """Returns the [metastore] section, or empty dict if not present.""" - return self.metastore + section: str, + option: str, + *, + raw: bool = False, + vars: Any = None, + fallback: Any = None, + ) -> Any: + val = super().get(section, option, raw=raw, vars=vars, fallback=fallback) + + if val is None: + return fallback + + if isinstance(val, str): + return os.path.expandvars(val) + return val diff --git a/tinystream/controller.py b/tinystream/controller.py index 2a88788..9ae5a61 100644 --- a/tinystream/controller.py +++ b/tinystream/controller.py @@ -8,10 +8,10 @@ from tinystream.metastore import Metastore from tinystream.models import BrokerInfo, TopicMetadata, PartitionMetadata -from tinystream import DEFAULT_CONFIG_PATH +from tinystream import DEFAULT_CONTROLLER_CONFIG_PATH from tinystream.client.base import BaseAsyncClient from tinystream.client.topic_manager import TopicManager -from tinystream.config.parser import TinyStreamConfig +from tinystream.config.manager import ConfigManager from tinystream.serializer.base import AbstractSerializer @@ -20,10 +20,10 @@ class Controller(BaseAsyncClient): Manages cluster metadata, brokers, and leader elections. """ - def __init__(self, config: TinyStreamConfig): + def __init__(self, config: ConfigManager): self.config = config - controller_config = config.get_controller_config() + controller_config = config.controller_config self.host = controller_config["host"] self.port = int(controller_config["port"]) self.heartbeat_timeout = float(controller_config["heartbeat_timeout"]) @@ -97,8 +97,13 @@ async def _load_metadata(self): async with self.db_connection.execute("SELECT * FROM topics") as t_cursor: async for row in t_cursor: - name, p_count, r_factor = row - self.topics[name] = TopicMetadata(name=name, partitions={}) + name, p_count, r_factor, retention_ms, retention_bytes = row + self.topics[name] = TopicMetadata( + name=name, + partitions={}, + retention_ms=retention_ms, + retention_bytes=retention_bytes, + ) async with self.db_connection.execute( "SELECT * FROM partitions" @@ -227,6 +232,16 @@ async def register_broker(self, broker_id: int, host: str, port: int): broker_id=broker_id, host=host, port=port ) print(f"[Controller] Registered broker {broker_id} at {host}:{port}") + print( + f"[Controller] Checking for partitions to assign to new broker {broker_id}..." + ) + for topic in self.topics.values(): + for part in topic.partitions.values(): + if broker_id in part.replicas and part.leader is None: + print( + f"[Controller] Triggering election for {topic.name}-{part.partition_id}" + ) + await self._elect_leader(topic.name, part.partition_id) return self async def _handle_deregister(self, request: Dict[str, Any]) -> Dict[str, Any]: @@ -248,7 +263,6 @@ async def update_broker_heartbeat(self, broker_id: int): self.brokers[broker_id].is_alive = True # TODO: Trigger rebalancing - # --- NEW HELPER METHOD --- async def _get_assignments_for_broker(self, broker_id: int) -> list[dict]: """ Scans the in-memory state for all partitions assigned to a broker. @@ -261,12 +275,16 @@ async def _get_assignments_for_broker(self, broker_id: int) -> list[dict]: if broker_id in part_meta.replicas: role = "leader" if broker_id == part_meta.leader else "follower" assignments.append( - {"topic": topic_name, "partition_id": part_id, "role": role} + { + "topic": topic_name, + "partition_id": part_id, + "role": role, + "retention_ms": topic_meta.retention_ms, + "retention_bytes": topic_meta.retention_bytes, + } ) return assignments - # --- END NEW HELPER METHOD --- - async def _elect_leader(self, topic: str, partition_id: int) -> Optional[int]: if not self.db_connection: raise Exception("Database not connected") @@ -304,9 +322,12 @@ async def remove_dead_brokers(self): dead_broker_ids = [] for broker in self.brokers.values(): if broker.status == "ALIVE" and ( - current_time - broker.last_heartkey > self.heartbeat_timeout + current_time - broker.last_heartbeat > self.heartbeat_timeout ): print(f"[Controller] Broker {broker.broker_id} timed out.") + broker.status = "TIMED_OUT" + broker.failed_since = current_time + broker.is_alive = False dead_broker_ids.append(broker.broker_id) for broker_id in dead_broker_ids: @@ -380,11 +401,14 @@ async def get_leader(self, topic: str, partition_id: int) -> Optional[int]: return None -async def main(): +async def main(_config: str): """ Custom startup script to initialize, register brokers, and run the server. """ - config = TinyStreamConfig.from_ini(DEFAULT_CONFIG_PATH) + config = ConfigManager( + args=argparse.Namespace(config=_config), + component_type="controller", + ) controller = Controller(config=config) try: await controller.start() @@ -400,7 +424,18 @@ async def main(): if __name__ == "__main__": + import argparse + + parser = argparse.ArgumentParser(description="Start TinyStream controller") + parser.add_argument( + "--config", + type=str, + default=DEFAULT_CONTROLLER_CONFIG_PATH, + help=f"Config file path (default: {DEFAULT_CONTROLLER_CONFIG_PATH})", + ) + args = parser.parse_args() + try: - asyncio.run(main()) + asyncio.run(main(_config=args.config)) except KeyboardInterrupt: print("\n[Controller] Shutdown forced.") diff --git a/tinystream/metastore.py b/tinystream/metastore.py index 6f8f2c3..83d8ffc 100644 --- a/tinystream/metastore.py +++ b/tinystream/metastore.py @@ -1,6 +1,11 @@ +import time +from pathlib import Path + import uvicorn import asyncio -from fastapi import FastAPI, HTTPException +from fastapi import FastAPI, HTTPException, Request +from fastapi.responses import HTMLResponse +from fastapi.templating import Jinja2Templates from typing import Dict from tinystream.models import ( @@ -12,6 +17,9 @@ from tinystream.client.topic_manager import TopicManager from tinystream.models import BrokerInfo, TopicMetadata +template_dir = Path(__file__).parent.parent / "ui" +templates = Jinja2Templates(directory=str(template_dir)) + class Metastore: """ @@ -46,6 +54,9 @@ def __init__( def _setup_api_routes(self): """Attaches this class's methods to the FastAPI app routes.""" + self.api_app.get("/dashboard", response_class=HTMLResponse)( + self._api_serve_dashboard + ) self.api_app.post("/api/v1/admin/topics", status_code=201)( self._api_create_topic ) @@ -66,10 +77,12 @@ async def start(self): port=self.port, loop="asyncio", log_level="info", + reload=True, ) self.api_server = uvicorn.Server(config) try: + print(f"[MetastoreAPI] API docs at http://localhost:{self.port}/docs") await self.api_server.serve() except asyncio.CancelledError: print("[MetastoreAPI] Server task cancelled.") @@ -85,7 +98,11 @@ async def close(self): async def _api_create_topic(self, request: CreateTopicRequest): try: await self.topic_manager.create_topic( - request.topic_name, request.partition_count, request.replication_factor + request.topic_name, + request.partition_count, + request.replication_factor, + request.retention_ms, # type: ignore + request.retention_bytes, # type: ignore ) return { "status": "success", @@ -110,11 +127,41 @@ async def _api_list_topics(self) -> ListTopicsResponse: async def _api_describe_cluster(self) -> ClusterInfoResponse: async with self._lock: response_brokers = {} + current_time = time.time() for broker_id, info in self.brokers.items(): + last_heartbeat = current_time - info.last_heartbeat + failed_since = last_heartbeat - info.last_heartbeat response_brokers[broker_id] = BrokerInfo( broker_id=info.broker_id, host=info.host, port=info.port, is_alive=info.is_alive, + last_heartbeat=last_heartbeat, + failed_since=failed_since, + status=info.status, ) return ClusterInfoResponse(brokers=response_brokers) + + async def _api_serve_dashboard(self, request: Request): + """ + Gathers live data and renders the HTML dashboard. + """ + async with self._lock: + brokers_data = self.brokers + topics_data = self.topics + + rendered_topics = {} + for name, meta in topics_data.items(): + print("name_name", name, meta) + rendered_topics[name] = { + "name": name, + "partitions": meta.partitions, + "replication_factor": meta.replication_factor, + "retention_ms": meta.retention_ms, + "retention_bytes": meta.retention_bytes, + } + + return templates.TemplateResponse( + "dashboard.html", + {"request": request, "brokers": brokers_data, "topics": topics_data}, + ) diff --git a/tinystream/models.py b/tinystream/models.py index a0127f7..a8b433c 100644 --- a/tinystream/models.py +++ b/tinystream/models.py @@ -12,6 +12,7 @@ class BrokerInfo: last_heartbeat: float = time.time() is_alive: bool = True status: Literal["ALIVE", "TIMED_OUT", "SHUTDOWN"] = "ALIVE" + failed_since: Optional[float] = None @dataclass @@ -25,6 +26,8 @@ class PartitionMetadata: class TopicMetadata: name: str partitions: Dict[int, PartitionMetadata] + retention_ms: int = 1800 + retention_bytes: int = 10000 class CreateTopicRequest(BaseModel): @@ -37,6 +40,12 @@ class CreateTopicRequest(BaseModel): replication_factor: int = Field( ..., gt=0, description="Replication factor (must be >= 1)." ) + retention_ms: Optional[int] = Field( + gt=0, description="Retention time in ms.", default=1800 + ) + retention_bytes: Optional[int] = Field( + gt=-1, description="Retention time in bytes.", default=10000 + ) class TopicInfo(BaseModel): @@ -46,6 +55,9 @@ class TopicInfo(BaseModel): name: str partition_count: int + replication_factor: int + retention_ms: int = 1800 + retention_bytes: int = 10000 class ListTopicsResponse(BaseModel): diff --git a/tinystream/partitions/base.py b/tinystream/partitions/base.py index db19fdb..f7ccae0 100644 --- a/tinystream/partitions/base.py +++ b/tinystream/partitions/base.py @@ -4,7 +4,17 @@ class BasePartition(ABC): """Base class for all partition implementations.""" - pass + def __init__( + self, + topic_name: str, + partition_id: int, + storage=None, + serializer=None, + ): + self.topic_name = topic_name + self.partition_id = partition_id + self.storage = storage + self.serializer = serializer async def load(self): """Loads the partition data.""" @@ -21,3 +31,15 @@ async def read(self, logical_offset): def get_high_watermark(self): """Returns the high watermark (next write offset) of the partition.""" raise NotImplementedError + + def update_policy(self, role: str, retention_ms: int, retention_bytes: int): + """Updates the retention policy of the partition.""" + raise NotImplementedError + + async def enforce_retention(self): + """Enforces the retention policy by deleting old messages.""" + raise NotImplementedError + + async def close(self): + """Closes the partition and releases resources.""" + raise NotImplementedError diff --git a/tinystream/partitions/partition.py b/tinystream/partitions/partition.py deleted file mode 100644 index ab870fb..0000000 --- a/tinystream/partitions/partition.py +++ /dev/null @@ -1,104 +0,0 @@ -import asyncio -from typing import Any, List -from pathlib import Path - -from tinystream.partitions.base import BasePartition -from tinystream.serializer.msg_pack import MSGPackSerializer -from tinystream.storage.storage import FileLogStorage - - -class SingleLogPartition(BasePartition): - def __init__( - self, - topic_name: str, - partition_id: int, - base_log_dir: Path, - storage=None, - serializer=None, - ): - self.topic_name = topic_name - self.partition_id = partition_id - - log_file = base_log_dir / topic_name / f"{partition_id}.log" - self.storage = storage or FileLogStorage(log_file_path=log_file) - self.serializer = serializer or MSGPackSerializer() - - # The core of the partition: mapping logical offsets to physical offsets. - # index[0] = physical offset of the 1st message - # index[1] = physical offset of the 2nd message - self._index: List[int] = [] - self._lock = asyncio.Lock() - self._next_logical_offset = 0 - - async def load(self) -> None: - """ - Loads the partition by replaying its log file to rebuild - the in-memory offset index. This must be called before - the partition can be used. - """ - async with self._lock: - await self.storage.ensure_ready() - - print(f"Loading partition {self.topic_name}-{self.partition_id}...") - self._index = [] - - async for physical_offset, _ in self.storage.replay(): - self._index.append(physical_offset) - - self._next_logical_offset = len(self._index) - print( - f"Loaded {self._next_logical_offset} messages into index for {self.topic_name}-{self.partition_id}." - ) - - async def append(self, data: Any) -> int: - """ - Appends a new message to the partition. - - Args: - data: The Python object to append. - - Returns: - The logical offset (e.g., 0, 1, 2...) of the appended message. - """ - serialized_data = self.serializer.serialize(data) - - async with self._lock: - physical_offset, _ = await self.storage.append(serialized_data) - - self._index.append(physical_offset) - - current_logical_offset = self._next_logical_offset - self._next_logical_offset += 1 - - return current_logical_offset - - async def read(self, logical_offset: int) -> Any: - """ - Reads a message at a specific logical offset. - - Args: - logical_offset: The logical offset (0, 1, 2...) to read. - - Returns: - The deserialized Python object. - - Raises: - IndexError: If the logical offset is out of bounds. - """ - physical_offset: int - try: - physical_offset = self._index[logical_offset] - except IndexError: - raise IndexError( - f"Offset {logical_offset} out of range for partition {self.topic_name}-{self.partition_id}" - ) - - serialized_data = await self.storage.read_at(physical_offset) - - return self.serializer.deserialize(serialized_data) - - def get_high_watermark(self) -> int: - """ - Returns the next available logical offset (i.e., total message count). - """ - return self._next_logical_offset diff --git a/tinystream/partitions/segmented.py b/tinystream/partitions/segmented.py new file mode 100644 index 0000000..6719749 --- /dev/null +++ b/tinystream/partitions/segmented.py @@ -0,0 +1,139 @@ +import asyncio +import time +from typing import Any, Optional + +from tinystream.partitions.base import BasePartition +from tinystream.serializer.msg_pack import MSGPackSerializer +from tinystream.storage.base import AbstractLogStorage + + +class SegmentedLogPartition(BasePartition): + def __init__( + self, + topic_name: str, + partition_id: int, + storage: AbstractLogStorage, + serializer=None, + ): + super().__init__(topic_name, partition_id, storage, serializer) + self.topic_name = topic_name + self.partition_id = partition_id + + self.storage = storage + self.serializer = serializer or MSGPackSerializer() + + self._lock = asyncio.Lock() + + self._next_logical_offset = 0 + + self.role: str = "follower" + self.retention_ms: Optional[int] = None + self.retention_bytes: Optional[int] = None + + def update_policy(self, role: str, retention_ms: int, retention_bytes: int): + """ + Updates the partition's live policy from the controller. + (This logic is correct and remains unchanged) + """ + self.role = role + self.retention_ms = retention_ms + self.retention_bytes = retention_bytes + + async def enforce_retention(self): + """ + Deletes old segments based on the partition's policy. + (This logic is correct and remains unchanged) + """ + if self.retention_ms is None and self.retention_bytes is None: + return + + inactive_segments = await self.storage.get_inactive_segments() + + if self.retention_ms is not None: + now = time.time() * 1000 + cutoff_time = now - self.retention_ms + + for segment in list(inactive_segments): + if segment.last_modified_timestamp < cutoff_time: + print( + f"[{self.topic_name}-{self.partition_id}] Deleting segment {segment.log_path.name} (Time Limit)" + ) + await self.storage.delete_segment(segment) + inactive_segments.remove(segment) + + if self.retention_bytes is not None and self.retention_bytes > 0: + total_size = await self.storage.get_total_size() + + segments_to_delete = sorted(inactive_segments, key=lambda s: s.base_offset) + + while total_size > self.retention_bytes: + if not segments_to_delete: + break + + segment = segments_to_delete.pop(0) + print( + f"[{self.topic_name}-{self.partition_id}] Deleting segment {segment.log_path.name} (Size Limit)" + ) + await self.storage.delete_segment(segment) + total_size -= segment.size + + async def load(self) -> None: + """ + Loads the partition by initializing the storage and finding + the next available logical offset. + """ + async with self._lock: + await self.storage.ensure_ready() + + print(f"Loading partition {self.topic_name}-{self.partition_id}...") + message_count = 0 + try: + print("Storage Path", self.storage.partition_path) + async for _, _ in self.storage.replay(): + message_count += 1 + except Exception as e: + print(f"Error during log replay for HWM: {e}") + + self._next_logical_offset = message_count + + print( + f"Loaded partition. Next offset (HWM) is {self._next_logical_offset}." + ) + + async def append(self, data: Any) -> int: + """ + Appends a new message to the partition. + Passes the logical offset to the storage layer for indexing. + """ + serialized_data = self.serializer.serialize(data) + + async with self._lock: + current_logical_offset = self._next_logical_offset + + self._next_logical_offset += 1 + + await self.storage.append(current_logical_offset, serialized_data) + + return current_logical_offset + + async def read(self, logical_offset: int) -> Any: + """ + Reads a message at a specific logical offset. + Delegates the read (and index search) to the storage layer. + """ + + if logical_offset >= self._next_logical_offset: + raise IndexError( + f"Offset {logical_offset} out of range for partition {self.topic_name}-{self.partition_id}" + ) + + serialized_data = await self.storage.read(logical_offset) + + return self.serializer.deserialize(serialized_data) + + def get_high_watermark(self) -> int: + """ + Returns the next available logical offset (i.e., total message count). + (This is unchanged and correct) + """ + return self._next_logical_offset diff --git a/tinystream/partitions/single.py b/tinystream/partitions/single.py new file mode 100644 index 0000000..1cc3c24 --- /dev/null +++ b/tinystream/partitions/single.py @@ -0,0 +1,161 @@ +import asyncio +import time +from typing import Any, List, Optional + +from tinystream.partitions.base import BasePartition +from tinystream.serializer.msg_pack import MSGPackSerializer +from tinystream.storage import SingleLogStorage + + +class SingleLogPartition(BasePartition): + def __init__( + self, + topic_name: str, + partition_id: int, + storage: SingleLogStorage, + serializer=None, + ): + self.topic_name = topic_name + self.partition_id = partition_id + + self.serializer = serializer or MSGPackSerializer() + self.storage = storage + + super().__init__( + topic_name=self.topic_name, + partition_id=self.partition_id, + storage=self.storage, + serializer=self.serializer, + ) + + self._index: List[int] = [] + self._lock = asyncio.Lock() + self._next_logical_offset = 0 + self.role: str = "follower" + self.retention_ms: Optional[int] = None + self.retention_bytes: Optional[int] = None + + def update_policy(self, role: str, retention_ms: int, retention_bytes: int): + """ + Updates the partition's live policy from the controller. + """ + self.role = role + self.retention_ms = retention_ms + self.retention_bytes = retention_bytes + + async def enforce_retention(self): + if self.retention_ms is None and self.retention_bytes is None: + return + + inactive_segments = await self.storage.get_inactive_segments() + + if self.retention_ms is not None: + print("Checking time retention:", self.retention_ms) + now = time.time() * 1000 + cutoff_time = now - self.retention_ms + print("[Time Retention] Current time (ms):", cutoff_time) + + for segment in inactive_segments: + print( + "Segment:", + segment.log_path.name, + "Last Modified:", + segment.last_modified_timestamp, + ) + if segment.last_modified_timestamp < cutoff_time: + print( + f"[{self.topic_name}-{self.partition_id}] Deleting segment {segment.log_path.name} (Time Limit)" + ) + await self.storage.delete_segment(segment) + + if self.retention_bytes is not None and self.retention_bytes > 0: + print("Checking size retention:", self.retention_bytes) + total_size = await self.storage.get_total_size() + print("[Size Retention] Current total size:", total_size) + + segments_to_delete = sorted(inactive_segments, key=lambda s: s.base_offset) + + while total_size > self.retention_bytes: + if not segments_to_delete: + break + + segment = segments_to_delete.pop(0) + print( + f"[{self.topic_name}-{self.partition_id}] Deleting segment {segment.name} (Size Limit)" + ) + await self.storage.delete_segment(segment) + total_size -= segment.size + + async def load(self) -> None: + async with self._lock: + await self.storage.ensure_ready() + + print(f"Loading partition {self.topic_name}-{self.partition_id}...") + self._index = [] + + print("index_index_index", self._index) + + async for physical_offset, _ in self.storage.replay(): + print("physical_offset:", physical_offset, "_:", _) + self._index.append(physical_offset) + + self._next_logical_offset = len(self._index) + print( + f"Loaded {self._next_logical_offset} messages into index for {self.topic_name}-{self.partition_id}." + ) + + async def append(self, data: Any) -> int: + """ + Appends a new message to the partition. + + Args: + data: The Python object to append. + + Returns: + The logical offset (e.g., 0, 1, 2...) of the appended message. + """ + serialized_data = self.serializer.serialize(data) + + async with self._lock: + physical_offset, _ = await self.storage.append( + data=serialized_data, + logical_offset=None, + ) + + self._index.append(physical_offset) + + current_logical_offset = self._next_logical_offset + self._next_logical_offset += 1 + + return current_logical_offset + + async def read(self, logical_offset: int) -> Any: + """ + Reads a message at a specific logical offset. + + Args: + logical_offset: The logical offset (0, 1, 2...) to read. + + Returns: + The deserialized Python object. + + Raises: + IndexError: If the logical offset is out of bounds. + """ + physical_offset: int + try: + physical_offset = self._index[logical_offset] + except IndexError: + raise IndexError( + f"Offset {logical_offset} out of range for partition {self.topic_name}-{self.partition_id}" + ) + + serialized_data = await self.storage.read_at(physical_offset) + + return self.serializer.deserialize(serialized_data) + + def get_high_watermark(self) -> int: + """ + Returns the next available logical offset (i.e., total message count). + """ + return self._next_logical_offset diff --git a/tinystream/storage/__init__.py b/tinystream/storage/__init__.py index d5ce737..a830ade 100644 --- a/tinystream/storage/__init__.py +++ b/tinystream/storage/__init__.py @@ -1,5 +1,4 @@ -from tinystream.storage.storage import FileLogStorage +from tinystream.storage.single import SingleLogStorage +from tinystream.storage.segemented import SegmentedLogStorage -__all__ = [ - "FileLogStorage", -] +__all__ = ["SegmentedLogStorage", "SingleLogStorage"] diff --git a/tinystream/storage/base.py b/tinystream/storage/base.py index 87f8094..b4747c5 100644 --- a/tinystream/storage/base.py +++ b/tinystream/storage/base.py @@ -1,22 +1,37 @@ from abc import ABC, abstractmethod -from typing import AsyncGenerator, Tuple +from typing import AsyncGenerator, Tuple, Literal, Optional from pathlib import Path class AbstractLogStorage(ABC): - def __init__(self, log_file_path: Path): - self.log_file = log_file_path + def __init__( + self, + partition_path: Path, + prefix_size: int = 8, + byte_order: Literal["little", "big"] = "little", + max_segment_bytes: int = 16 * 1024 * 1024, + ): + self.partition_path = partition_path + self.prefix_size = prefix_size + self.byte_order = byte_order + self.max_segment_bytes = max_segment_bytes @abstractmethod async def ensure_ready(self) -> None: pass @abstractmethod - async def append(self, data: bytes) -> Tuple[int, int]: + async def append( + self, logical_offset: Optional[int], data: bytes + ) -> Tuple[int, int]: pass @abstractmethod - async def read_at(self, offset: int) -> bytes: + async def read_at(self, logical_offset: int) -> bytes: + pass + + @abstractmethod + async def read(self, offset: int) -> bytes: pass @abstractmethod @@ -26,3 +41,15 @@ async def replay(self) -> AsyncGenerator[Tuple[int, bytes], None]: @abstractmethod async def get_current_offset(self) -> int: pass + + @abstractmethod + async def get_inactive_segments(self): + pass + + @abstractmethod + async def delete_segment(self, segment_index: int) -> None: + pass + + @abstractmethod + async def get_total_size(self) -> int: + pass diff --git a/tinystream/storage/log_segment.py b/tinystream/storage/log_segment.py new file mode 100644 index 0000000..62f3062 --- /dev/null +++ b/tinystream/storage/log_segment.py @@ -0,0 +1,205 @@ +import aiofiles +import os +import asyncio +import bisect +from typing import AsyncGenerator, Tuple, Literal, List +from pathlib import Path + + +class LogSegment: + """ + Manages a single pair of log/index files (e.g., 000000.log, 000000.index). + """ + + INDEX_ENTRY_SIZE = 16 + + def __init__( + self, + partition_path: Path, + base_offset: int, + prefix_size: int, + byte_order: Literal["little", "big"], + index_interval_bytes: int = 4096, + ): + self.partition_path = partition_path + self.base_offset = base_offset + self.prefix_size = prefix_size + self.byte_order = byte_order + self.index_interval_bytes = index_interval_bytes + + filename_base = f"{base_offset:020d}" + self.log_path = self.partition_path / f"{filename_base}.log" + self.index_path = self.partition_path / f"{filename_base}.index" + + self._write_lock = asyncio.Lock() + self.size = 0 + self.last_modified_timestamp = 0.0 + + self.index_entries: List[Tuple[int, int]] = [] + self._bytes_since_last_index = 0 + + async def load(self) -> None: + """Loads segment state and reads the .index file into memory.""" + self.partition_path.mkdir(parents=True, exist_ok=True) + try: + async with aiofiles.open(self.log_path, "xb"): + pass + async with aiofiles.open(self.index_path, "xb"): + pass + except FileExistsError: + pass + + self.size = os.path.getsize(self.log_path) + self.last_modified_timestamp = os.path.getmtime(self.log_path) * 1000 + + try: + async with aiofiles.open(self.index_path, "rb") as f: + while True: + entry_bytes = await f.read(self.INDEX_ENTRY_SIZE) + if len(entry_bytes) == 0: + break + if len(entry_bytes) < self.INDEX_ENTRY_SIZE: + print(f"Warning: Corrupted index file {self.index_path.name}") + break + + offset = int.from_bytes(entry_bytes[0:8], self.byte_order) + pos = int.from_bytes(entry_bytes[8:16], self.byte_order) + self.index_entries.append((offset, pos)) + except FileNotFoundError: + pass + + if self.index_entries: + self._bytes_since_last_index = self.size - self.index_entries[-1][1] + else: + self._bytes_since_last_index = self.size + + async def _write_to_index(self, logical_offset: int, byte_position: int): + """Appends a new entry to the .index file and in-memory cache.""" + self.index_entries.append((logical_offset, byte_position)) + entry_bytes = logical_offset.to_bytes( + 8, self.byte_order + ) + byte_position.to_bytes(8, self.byte_order) + async with aiofiles.open(self.index_path, "ab") as f: + await f.write(entry_bytes) + + async def append(self, logical_offset: int, data: bytes) -> int: + """ + Appends data to this segment with a length prefix. + Returns total bytes written. + NOTE: Signature has changed. + """ + async with self._write_lock: + payload_len = len(data) + len_prefix = payload_len.to_bytes(self.prefix_size, self.byte_order) + total_bytes = self.prefix_size + payload_len + + async with aiofiles.open(self.log_path, "ab") as f: + file_offset = await f.tell() + + if ( + not self.index_entries + or self._bytes_since_last_index > self.index_interval_bytes + ): + await self._write_to_index(logical_offset, file_offset) + self._bytes_since_last_index = 0 + else: + self._bytes_since_last_index += total_bytes + + await f.write(len_prefix) + await f.write(data) + + self.size += total_bytes + return total_bytes + + async def _find_position_from_index(self, target_offset: int) -> int: + """Finds the *byte position* to start scanning from for a logical offset.""" + if not self.index_entries: + return 0 + + idx = bisect.bisect_right(self.index_entries, (target_offset, float("inf"))) + + if idx == 0: + return 0 + + _, byte_position = self.index_entries[idx - 1] + return byte_position + + async def read(self, logical_offset_to_find: int) -> bytes: + """ + Reads a single message payload by LOGICAL offset. + Uses the index to perform a fast, sparse scan. + NOTE: Replaces read_at(). + """ + start_position = await self._find_position_from_index(logical_offset_to_find) + + async with aiofiles.open(self.log_path, "rb") as f: + await f.seek(start_position) + + logical_offset_counter = -1 + if self.index_entries: + idx = bisect.bisect_right( + self.index_entries, (logical_offset_to_find, float("inf")) + ) + if idx > 0: + logical_offset_counter = self.index_entries[idx - 1][0] + + while True: + len_prefix_bytes = await f.read(self.prefix_size) + if not len_prefix_bytes: + break + + payload_len = int.from_bytes(len_prefix_bytes, self.byte_order) + + # Need to infer the logical offset. + # This assumes offsets are sequential (e.g., 100, 101, 102) + # This is a placeholder for a real implementation that + # would store the offset *in the log message* + if logical_offset_counter != -1: + logical_offset_counter += 1 + + if logical_offset_counter == logical_offset_to_find: + payload = await f.read(payload_len) + if len(payload) != payload_len: + raise IOError("Log file corrupted.") + return payload + else: + await f.seek(payload_len, 1) + + raise IndexError(f"Offset {logical_offset_to_find} not found in segment.") + + async def replay(self) -> AsyncGenerator[Tuple[int, bytes], None]: # type: ignore + """Reads and yields all messages from this segment.""" + print("Warning: Replay is not accurate without offsets in log") + logical_offset_counter = self.base_offset + try: + async with aiofiles.open(self.log_path, "rb") as f: + while True: + len_prefix_bytes = await f.read(self.prefix_size) + if not len_prefix_bytes: + break + payload_len = int.from_bytes(len_prefix_bytes, self.byte_order) + payload = await f.read(payload_len) + if len(payload) != payload_len: + break + + yield logical_offset_counter, payload + logical_offset_counter += 1 + except FileNotFoundError: + pass + + async def get_current_offset(self) -> int: + """Gets the current end of the file (the next write offset).""" + async with self._write_lock: + return self.base_offset + self.size + + async def delete_files(self): + """Deletes the .log and .index files for this segment.""" + print(f"Deleting segment: {self.log_path.name}") + try: + os.remove(self.log_path) + except OSError as e: + print(f"Error deleting {self.log_path}: {e}") + try: + os.remove(self.index_path) + except OSError as e: + print(f"Error deleting {self.index_path}: {e}") diff --git a/tinystream/storage/segemented.py b/tinystream/storage/segemented.py new file mode 100644 index 0000000..c6b9b5a --- /dev/null +++ b/tinystream/storage/segemented.py @@ -0,0 +1,147 @@ +import asyncio +from pathlib import Path +from typing import List, Literal, Optional, AsyncGenerator, Tuple + +from tinystream.storage.base import AbstractLogStorage +from tinystream.storage.log_segment import LogSegment + + +class SegmentedLogStorage(AbstractLogStorage): + """ + Manages a directory of log segments for a single partition. + """ + + def __init__( + self, + partition_path: Path, + prefix_size: int = 8, + byte_order: Literal["little", "big"] = "little", + max_segment_bytes: int = 16 * 1024 * 1024, + index_interval_bytes: int = 4096, + ): + self.partition_path = partition_path + self.prefix_size = prefix_size + self.byte_order = byte_order + self.max_segment_bytes = max_segment_bytes + self.index_interval_bytes = index_interval_bytes + + super().__init__( + partition_path=self.partition_path, + prefix_size=self.prefix_size, + byte_order=self.byte_order, + max_segment_bytes=self.max_segment_bytes, + ) + + self.segments: List[LogSegment] = [] + self.active_segment: Optional[LogSegment] = None + self._roll_lock = asyncio.Lock() + + async def ensure_ready(self) -> None: + """Scans the directory for .log files and loads them as segments.""" + self.partition_path.mkdir(parents=True, exist_ok=True) + log_files = sorted(self.partition_path.glob("*.log")) + + if not log_files: + await self._roll_segment(base_offset=0) + return + + for log_path in log_files: + try: + base_offset = int(log_path.stem) + segment = LogSegment( + self.partition_path, + base_offset, + self.prefix_size, + self.byte_order, + self.index_interval_bytes, + ) + await segment.load() + self.segments.append(segment) + except ValueError: + print(f"Skipping unknown file: {log_path.name}") + + self.active_segment = self.segments[-1] + print( + f"Loaded {len(self.segments)} segments. Active: {self.active_segment.log_path.name}" + ) + + async def _roll_segment(self, base_offset: int): + """Closes the current segment and starts a new one.""" + new_segment = LogSegment( + self.partition_path, + base_offset, + self.prefix_size, + self.byte_order, + self.index_interval_bytes, + ) + await new_segment.load() + + self.segments.append(new_segment) + self.active_segment = new_segment + print(f"Rolled to new segment: {new_segment.log_path.name}") + + async def append(self, logical_offset: int, data: bytes) -> int: + """ + Appends data to the active segment, rolling if necessary. + NOTE: Signature has changed to accept logical_offset. + Returns total bytes written. + """ + if not self.active_segment: + raise Exception("Storage not initialized. Call ensure_ready().") + + if self.active_segment.size > self.max_segment_bytes: + async with self._roll_lock: + if self.active_segment.size > self.max_segment_bytes: + await self._roll_segment(base_offset=logical_offset) + + return await self.active_segment.append(logical_offset, data) + + async def read(self, logical_offset: int) -> bytes: + """ + Finds the correct segment and reads by logical offset. + NOTE: Replaces read_at(). + """ + segment_to_read = None + for segment in reversed(self.segments): + if logical_offset >= segment.base_offset: + segment_to_read = segment + break + + if not segment_to_read: + raise IndexError(f"Offset {logical_offset} is before the first segment.") + + return await segment_to_read.read(logical_offset) + + async def replay(self) -> AsyncGenerator[Tuple[int, bytes], None]: # type: ignore + """Reads and yields all messages from all segments, in order.""" + for segment in self.segments: + async for logical_offset, payload in segment.replay(): + yield logical_offset, payload + + async def get_current_offset(self) -> int: + """Gets the next logical offset.""" + # This is a problem. The storage layer doesn't know the + # next logical offset, only the next byte offset. + # This needs to be managed by the Partition class. + # For now, I just return the base offset of the next segment. + if not self.active_segment: + return 0 + return await self.active_segment.get_current_offset() + + async def get_inactive_segments(self) -> List[LogSegment]: + if not self.active_segment: + return [] + return self.segments[:-1] + + async def get_total_size(self) -> int: + return sum(s.size for s in self.segments) + + async def delete_segment(self, segment: LogSegment): + await segment.delete_files() + try: + self.segments.remove(segment) + except ValueError: + print(f"Warning: Segment {segment.log_path.name} not in list.") + + async def read_at(self, logical_offset: int) -> bytes: + ... diff --git a/tinystream/storage/storage.py b/tinystream/storage/single.py similarity index 56% rename from tinystream/storage/storage.py rename to tinystream/storage/single.py index 70492dd..c20eb91 100644 --- a/tinystream/storage/storage.py +++ b/tinystream/storage/single.py @@ -1,27 +1,34 @@ import aiofiles import os -from typing import AsyncGenerator, Tuple, Literal +from typing import AsyncGenerator, Tuple, Literal, List, Any, Optional from pathlib import Path from tinystream.storage.base import AbstractLogStorage -class FileLogStorage(AbstractLogStorage): +class SingleLogStorage(AbstractLogStorage): """ - The log file format is a sequence of records: - [ 8-byte length ][ N-byte payload ] + Manages a single log file for a partition. + Format: [ 8-byte length ][ N-byte payload ] + + NOTE: This storage class DOES NOT support retention policies, + as it cannot delete old data without destroying the entire log. """ def __init__( self, - log_file_path: Path, + partition_path: Path, prefix_size: int = 8, byte_order: Literal["little"] = "little", + max_segment_bytes: int = 0, ): - super().__init__(log_file_path=log_file_path) + super().__init__( + partition_path=partition_path, + prefix_size=prefix_size, + byte_order=byte_order, + max_segment_bytes=max_segment_bytes, + ) self.prefix_size = prefix_size self.byte_order = byte_order - self.log_file = log_file_path - self._log_dir = log_file_path.parent self._write_lock = None async def _get_lock(self): @@ -32,20 +39,23 @@ async def _get_lock(self): return self._write_lock async def ensure_ready(self) -> None: - if not os.path.exists(self._log_dir): - os.makedirs(self._log_dir, exist_ok=True) + parent_dir = self.partition_path.parent + if not os.path.exists(parent_dir): + os.makedirs(parent_dir, exist_ok=True) - async def append(self, data: bytes) -> Tuple[int, int]: + async def append( + self, logical_offset: Optional[int], data: bytes + ) -> Tuple[int, int]: """ Appends data to the log file with an 8-byte length prefix. - Format: [ 8-byte length ][ N-byte payload ] + Returns (physical_offset, bytes_written) """ lock = await self._get_lock() async with lock: payload_len = len(data) len_prefix = payload_len.to_bytes(self.prefix_size, self.byte_order) - async with aiofiles.open(self.log_file, "ab") as f: + async with aiofiles.open(self.partition_path, "ab") as f: offset = await f.tell() await f.write(len_prefix) @@ -56,34 +66,27 @@ async def append(self, data: bytes) -> Tuple[int, int]: async def read_at(self, offset: int) -> bytes: """ - Reads a single message payload starting at a specific offset. + Reads a single message payload starting at a specific physical offset. """ - async with aiofiles.open(self.log_file, "rb") as f: + async with aiofiles.open(self.partition_path, "rb") as f: await f.seek(offset) len_prefix_bytes = await f.read(self.prefix_size) if not len_prefix_bytes: - raise EOFError( - "Reached end of file while trying to read length prefix." - ) + raise EOFError("Reached end of file.") payload_len = int.from_bytes(len_prefix_bytes, self.byte_order) payload = await f.read(payload_len) if len(payload) != payload_len: - raise IOError( - f"Log file corrupted. Expected {payload_len} bytes, got {len(payload)}." - ) + raise IOError("Log file corrupted.") return payload async def replay(self) -> AsyncGenerator[Tuple[int, bytes], None]: # type: ignore - """ - AsyncGenerator - Reads and yields all messages from the log file. - """ + """Reads and yields all messages from the log file.""" try: - async with aiofiles.open(self.log_file, "rb") as f: + async with aiofiles.open(self.partition_path, "rb") as f: while True: current_offset = await f.tell() @@ -92,12 +95,10 @@ async def replay(self) -> AsyncGenerator[Tuple[int, bytes], None]: # type: igno break payload_len = int.from_bytes(len_prefix_bytes, self.byte_order) - payload = await f.read(payload_len) + if len(payload) != payload_len: - print( - f"Warning: Log file may be truncated. Expected {payload_len}, got {len(payload)}." - ) + print("Warning: Log file may be truncated.") break yield current_offset, payload @@ -107,6 +108,18 @@ async def replay(self) -> AsyncGenerator[Tuple[int, bytes], None]: # type: igno async def get_current_offset(self) -> int: """Gets the current size of the log file.""" try: - return os.path.getsize(self.log_file) + return os.path.getsize(self.partition_path) except FileNotFoundError: return 0 + + async def get_inactive_segments(self) -> List: + return [] + + async def get_total_size(self) -> int: + return await self.get_current_offset() + + async def delete_segment(self, segment: Any): + ... + + async def read(self, offset: int) -> bytes: + ... diff --git a/tinystream/utils/env.py b/tinystream/utils/env.py new file mode 100644 index 0000000..f619225 --- /dev/null +++ b/tinystream/utils/env.py @@ -0,0 +1,20 @@ +import os + + +def env_default(env_var, *, default=None): + """Return a function that fetches an env var or falls back to default.""" + + def _factory(): + return os.getenv(env_var, default) + + return _factory + + +def split_host_port(s: str): + host, port_str = s.rsplit(":", 1) + try: + port = int(port_str) + except ValueError: + return s, None + else: + return host, port diff --git a/uv.lock b/uv.lock index 994b2c7..3dc4b69 100644 --- a/uv.lock +++ b/uv.lock @@ -1160,6 +1160,7 @@ dependencies = [ { name = "aiosqlite" }, { name = "fastapi" }, { name = "httpx" }, + { name = "jinja2" }, { name = "msgpack" }, { name = "pydantic" }, { name = "uvicorn" }, @@ -1180,6 +1181,7 @@ requires-dist = [ { name = "aiosqlite", specifier = ">=0.21.0" }, { name = "fastapi", specifier = ">=0.120.2" }, { name = "httpx", specifier = ">=0.28.1" }, + { name = "jinja2", specifier = ">=3.1.6" }, { name = "msgpack", specifier = ">=1.1.2" }, { name = "pydantic", specifier = ">=2.12.3" }, { name = "uvicorn", specifier = ">=0.38.0" },