diff --git a/pyproject.toml b/pyproject.toml index 90693af..722a98f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -6,7 +6,11 @@ requires-python = ">=3.12" dependencies = [ "aiofiles>=25.1.0", "aiosqlite>=0.21.0", + "fastapi>=0.120.2", + "httpx>=0.28.1", "msgpack>=1.1.2", + "pydantic>=2.12.3", + "uvicorn>=0.38.0", ] diff --git a/tinystream/broker.py b/tinystream/broker.py index c074d0d..ad08a4f 100644 --- a/tinystream/broker.py +++ b/tinystream/broker.py @@ -1,4 +1,4 @@ -from collections import defaultdict +import json from pathlib import Path from typing import Dict, Any, Optional, Type, Literal import asyncio @@ -9,10 +9,14 @@ from aiosqlite import Connection as DBConnection +from tinystream.metastore import Metastore +from tinystream.models import TopicMetadata, PartitionMetadata from tinystream import DEFAULT_CONFIG_PATH from tinystream.client.connection import TinyStreamAPI from tinystream.client.base import BaseAsyncClient +from tinystream.client.topic_manager import TopicManager from tinystream.config.parser import TinyStreamConfig +from tinystream.controller import BrokerInfo from tinystream.partitions.base import BasePartition from tinystream.partitions.partition import SingleLogPartition from tinystream.serializer.base import AbstractSerializer @@ -58,7 +62,7 @@ def __init__(self, config: TinyStreamConfig, broker_id: Optional[int]) -> None: port=int(controller_config.get("port")), # type: ignore serializer=self.serializer, ) - self.heartbeat_task: Optional[asyncio.Task] = None + self.heartbeat_task: Optional[asyncio.Task[Any]] = None super().__init__( prefix_size=self.prefix_size, @@ -71,12 +75,38 @@ def __init__(self, config: TinyStreamConfig, broker_id: Optional[int]) -> None: self.broker_config.get("partition_type", "singlelogpartition") ) - # In-memory mapping of: + self.brokers: Dict[int, BrokerInfo] = {} + self.topic_metadata: Dict[str, TopicMetadata] = {} + + self.metastore_task: Optional[asyncio.Task[Any]] = None + + # In-memory mapping of actual partition objects: # { topic_name -> { partition_id -> BasePartition } } - self.topics: Dict[str, Dict[int, BasePartition]] = defaultdict(dict) + self.partitions: Dict[str, Dict[int, BasePartition]] = {} self._lock = asyncio.Lock() self._server: Optional[asyncio.Server] = None + if self.mode == "single": + print( + f"[Broker {broker_id}] Running in 'single' mode. Initializing topic manager." + ) + self.topic_manager = TopicManager( + db_connection=None, + brokers=self.brokers, + topics=self.topic_metadata, + lock=self._lock, + ) + + metastore_config = self.config.metastore + self.metastore_http_port = int(metastore_config.get("http_port", 6000)) + self.metastore = Metastore( + topic_manager=self.topic_manager, + topics=self.topic_metadata, + brokers=self.brokers, + lock=self._lock, + port=self.metastore_http_port, + ) + @staticmethod def init_partition_class(partition_name: str) -> Type[BasePartition]: if partition_name == "singlelogpartition": @@ -123,17 +153,21 @@ async def _create_new_partition( ) await partition.load() - self.topics[topic_name][partition_id] = partition + + if topic_name not in self.partitions: + self.partitions[topic_name] = {} + self.partitions[topic_name][partition_id] = partition if self.db_conn: try: await self.db_conn.execute( """ INSERT - OR IGNORE INTO partitions (topic_name, partition_id) - VALUES (?, ?) + OR IGNORE + INTO partitions (topic_name, partition_id, replicas) + VALUES (?, ?, ?) """, - (topic_name, partition_id), + (topic_name, partition_id, json.dumps([])), ) await self.db_conn.commit() except Exception as exception: @@ -166,7 +200,7 @@ async def load_partitions(self) -> None: except ValueError: print(f"Skipping non-numeric log file: {log_file}") - print(f"Finished loading. Found {len(self.topics)} topics.") + print(f"Finished loading. Found {len(self.partitions)} topics.") async def get_or_create_partition( self, topic_name: str, partition_id: int @@ -175,23 +209,22 @@ async def get_or_create_partition( Retrieves a partition, creating it if it doesn't exist. This is a central part of the broker's logic. """ - if partition := self.topics.get(topic_name, {}).get(partition_id): + if partition := self.partitions.get(topic_name, {}).get(partition_id): return partition async with self._lock: - if partition := self.topics.get(topic_name, {}).get(partition_id): + if partition := self.partitions.get(topic_name, {}).get(partition_id): return partition return await self._create_new_partition(topic_name, partition_id) async def start(self) -> None: """Starts the main broker server.""" - await self.init_metastore(db_path=self.metastore_db_path) - if self.mode == "cluster": await self.controller_client.ensure_connected() print(f"[Broker {self.broker_id}] Registering with controller...") - await self.controller_client.send_request( + + response = await self.controller_client.send_request( { "command": "register_broker", "broker_id": self.broker_id, @@ -199,7 +232,37 @@ async def start(self) -> None: "port": self.port, } ) + + if response and response.get("status") == "ok": + print(f"[Broker {self.broker_id}] Registered successfully.") + assignments = response.get("assignments", []) + await self._reconcile_partitions(assignments) + else: + raise Exception(f"Could not register with controller: {response}") + self.heartbeat_task = asyncio.create_task(self._heartbeat_loop()) + + else: + await self.init_metastore(db_path=self.metastore_db_path) + self.topic_manager.db_connection = self.db_conn + await self._load_metadata_from_db() + if self.broker_id is None: + raise ValueError("broker_id must not be None") + self_info = BrokerInfo( + broker_id=self.broker_id, host=self.host, port=self.port, is_alive=True + ) + async with self._lock: + self.brokers[self.broker_id] = self_info + + self.metastore_task = asyncio.create_task( # type: ignore + self.metastore.start(), name="broker-metastore-api" + ) + print( + f"[Broker {self.broker_id}] Metastore API docs at http://localhost:{self.port}/docs" + ) + + print("Metastore initialized successfully.") + await self.load_partitions() await self.start_server() @@ -248,6 +311,33 @@ async def send_request(self, payload_bytes: bytes) -> Dict[str, Any]: elif command == "commit_offset": return await self._handle_commit_offset(request) + elif command == "create_topic": + if self.mode == "single" and self.topic_manager: + try: + await self.topic_manager.create_topic( + request["name"], + request["partitions"], + request["replication_factor"], + ) + return { + "status": "success", + "message": f"Topic {request['name']} created.", + } + except ValueError as exception: + return {"status": "error", "message": str(exception)} + except Exception as exception: + print( + f"FATAL error in handle_create_topic_request: {exception}" + ) + return { + "status": "error", + "message": f"Internal server error: {exception}", + } + else: + return { + "status": "error", + "message": "create_topic is only allowed in single mode. Use admin client.", + } else: return {"status": "error", "message": "Unknown command"} @@ -257,6 +347,35 @@ async def send_request(self, payload_bytes: bytes) -> Dict[str, Any]: "message": f"Failed to process request: {exception}", } + async def _load_metadata_from_db(self): + print(f"[Broker {self.broker_id}] Loading metadata from metastore...") + async with self._lock: + # --- MODIFIED: Use self.topic_metadata --- + async with self.db_conn.execute("SELECT * FROM topics") as cursor: + async for row in cursor: + topic_name = row["topic_name"] + self.topic_metadata[topic_name] = TopicMetadata( + name=topic_name, partitions={} + ) + + async with self.db_conn.execute("SELECT * FROM partitions") as cursor: + async for row in cursor: + topic_name = row["topic_name"] + if topic_name in self.topic_metadata: + replicas_list = json.loads(row["replicas"]) + part_meta = PartitionMetadata( + partition_id=row["partition_id"], + leader=row["leader"], + replicas=replicas_list, + ) + self.topic_metadata[topic_name].partitions[ + part_meta.partition_id + ] = part_meta + + print( + f"[Broker {self.broker_id}] Loaded {len(self.topic_metadata)} topics from metastore." + ) + async def _handle_append(self, request: Dict[str, Any]) -> Dict[str, Any]: topic = request["topic"] partition_id = request["partition"] @@ -286,23 +405,56 @@ async def _handle_read(self, request: Dict[str, Any]) -> Dict[str, Any]: except KeyError: return {"status": "error", "message": "Topic or partition not found"} + async def _reconcile_partitions(self, assignments: list[dict]): + """ + Compares the controller's assignments with local state and creates + any missing partitions. This is the core of the "pull" model. + """ + print( + f"[Broker {self.broker_id}] Reconciling {len(assignments)} assignments..." + ) + for assignment in assignments: + topic = assignment["topic"] + part_id = assignment["partition_id"] + + try: + await self.get_or_create_partition(topic, part_id) + + # TODO: A full implementation would also: + # 1. Store the 'role' (leader/follower) + # 2. Trigger follower fetching logic if role == 'follower' + # 3. Handle partition *removals* (i.e., assignments that + # disappeared from the controller's list) + + except Exception as e: + print(f"Error reconciling partition {topic}/{part_id}: {e}") + async def _heartbeat_loop(self): - """Runs in the background, sending heartbeats to the controller.""" + """Runs in the background, sending heartbeats AND processing assignments.""" while True: try: - await self.controller_client.send_request( + response = await self.controller_client.send_request( {"command": "heartbeat", "broker_id": self.broker_id} ) - print(f"[Broker {self.broker_id}] Heartbeat sent.") + + if response and response.get("status") == "ok": + assignments = response.get("assignments", []) + await self._reconcile_partitions(assignments) + print(f"[Broker {self.broker_id}] Heartbeat sent and processed.") + else: + print( + f"[Broker {self.broker_id}] Invalid heartbeat response: {response}" + ) + except Exception as exception: print( f"[Broker {self.broker_id}] Failed to send heartbeat: {exception}" ) - await asyncio.sleep(3) + await asyncio.sleep(3) # Configurable interval async def _handle_get_hwm(self, request: Dict[str, Any]) -> Dict[str, Any]: - """Gets the High Watermark (next write offset) for a partition.""" + """Next write offset for a partition.""" topic = request["topic"] partition_id = request["partition"] @@ -365,7 +517,7 @@ async def main( base_config_path = config or DEFAULT_CONFIG_PATH try: base_config = TinyStreamConfig.from_ini(base_config_path) - base_port = int(base_config.broker_config.get("port", "909")) + base_port = int(base_config.broker_config.get("port", "9090")) except Exception as exception: print( f"FATAL: Could not load base config from {base_config_path}: {exception}" @@ -386,7 +538,6 @@ async def main( broker_id=i, ) ) - start_tasks = [b.start() for b in broker_instances] try: await asyncio.gather(*start_tasks) @@ -409,9 +560,15 @@ async def main( config_obj.mode = mode # type: ignore - base_port = int(config_obj.broker_config.get("port", "909")) - broker_port = base_port + broker_id_to_use # type: ignore - config_obj.broker_config["port"] = f"{broker_port}" + try: + port_value = config_obj.broker_config.get("port") + if port_value is None: + print(f"FATAL: 'port' not found or is invalid in config file: {config}") + return + broker_port = int(port_value) + except (TypeError, ValueError): + print(f"FATAL: 'port' not found or is invalid in config file: {config}") + return broker = Broker( config=config_obj, @@ -442,7 +599,6 @@ async def main( if __name__ == "__main__": def print_usage(parser_instance, message): - """Prints a validation error and the parser's help message.""" print(f"Error: {message}\n") parser_instance.print_help() sys.exit(1) @@ -503,5 +659,7 @@ def print_usage(parser_instance, message): ) except Exception as e: print(f"FATAL: Broker main loop crashed: {e}") - # (Consider adding `import traceback; traceback.print_exc()` for debug) + import traceback + + traceback.print_exc() sys.exit(1) diff --git a/tinystream/client/admin.py b/tinystream/client/admin.py new file mode 100644 index 0000000..2034d64 --- /dev/null +++ b/tinystream/client/admin.py @@ -0,0 +1,189 @@ +import sys +import asyncio +import httpx + + +class AdminClient: + """ + Client for performing administrative actions on a TinyStream cluster. + + Connects to the Controller (or a single-mode Broker) via its HTTP API. + """ + + def __init__(self, controller_addr: str): + if not controller_addr.startswith(("http://", "https://")): + self.controller_addr = f"http://{controller_addr}" + else: + self.controller_addr = controller_addr + + self.api_base = f"{self.controller_addr}/api/v1/admin" + + self.http_client = httpx.AsyncClient(timeout=10.0) + print(f"AdminClient initialized. Targeting controller at: {self.api_base}") + + @staticmethod + async def _handle_response(response: httpx.Response, success_msg: str): + """Helper to process HTTP responses and print user-friendly messages.""" + try: + response.raise_for_status() + data = response.json() + print(f"Success: {data.get('message', success_msg)}") + return data + except httpx.HTTPStatusError as e: + try: + error_data = e.response.json() + print(f"Error: {error_data}", file=sys.stderr) + except Exception: + print( + f"HTTP Error: {e.response.status_code} - {e.response.text}", + file=sys.stderr, + ) + except httpx.RequestError as e: + print( + f"Connection Error: Failed to connect to {e.request.url}.", + file=sys.stderr, + ) + print( + "Please ensure the Controller is running and the address is correct.", + file=sys.stderr, + ) + except Exception as e: + print(f"An unexpected error occurred: {e}", file=sys.stderr) + + return None + + async def create_topic(self, name: str, partitions: int, replication_factor: int): + """ + Sends a request to the controller to create a new topic. + """ + print( + f"Attempting to create topic '{name}' (P={partitions}, R={replication_factor})..." + ) + endpoint = f"{self.api_base}/topics" + payload = { + "topic_name": name, + "partition_count": partitions, + "replication_factor": replication_factor, + } + + try: + response = await self.http_client.post(endpoint, json=payload) + await self._handle_response(response, f"Topic '{name}' created.") + except httpx.ConnectError: + print( + f"Connection Error: Could not connect to controller at {self.controller_addr}", + file=sys.stderr, + ) + print("Please ensure the controller is running.", file=sys.stderr) + sys.exit(1) + + async def list_topics(self): + """ + Sends a request to the controller to list all topics. + """ + print("Attempting to list topics...") + endpoint = f"{self.api_base}/topics" + + response = await self.http_client.get(endpoint) + data = await self._handle_response(response, "Fetched topic list.") + + if data and "topics" in data: + topics = data["topics"] + if not topics: + print("No topics found in the cluster.") + return + + print("\nTopics:") + for topic_name, details in topics.items(): + part_count = details.get("partition_count", "?") + print(f" - {topic_name} ({part_count} partitions)") + + async def describe_cluster(self): + """ + Sends a request to the controller for cluster status. + """ + print("Attempting to describe cluster state...") + endpoint = f"{self.api_base}/cluster" + + response = await self.http_client.get(endpoint) + data = await self._handle_response(response, "Fetched cluster state.") + + if data and "brokers" in data: + brokers = data["brokers"] + if not brokers: + print("No brokers registered with the controller.") + return + + print("\nRegistered Brokers:") + for broker_id, info in brokers.items(): + status = "ALIVE" if info.get("is_alive") else "DEAD" + print( + f" - Broker {broker_id} ({info.get('host')}:{info.get('port')}) - {status}" + ) + + async def close(self): + """Closes the underlying HTTP client.""" + await self.http_client.aclose() + + +async def main(): + """ + Main function to parse CLI arguments and drive the AdminClient. + """ + import argparse + + parser = argparse.ArgumentParser(description="TinyStream Admin Client") + parser.add_argument( + "--controller", + default="localhost:3200", + help="Controller address (e.g., localhost:6000). (Default: %(default)s)", + ) + + subparsers = parser.add_subparsers( + dest="action", required=True, help="Admin action to perform" + ) + + create_parser = subparsers.add_parser("create-topic", help="Create a new topic") + create_parser.add_argument("--name", required=True, help="The name of the topic") + create_parser.add_argument( + "--partitions", required=True, type=int, help="Number of partitions" + ) + create_parser.add_argument( + "--replication-factor", required=True, type=int, help="Replication factor" + ) + + subparsers.add_parser("list-topics", help="List all topics in the cluster") + + subparsers.add_parser("describe-cluster", help="Show cluster broker status") + + args = parser.parse_args() + + client = AdminClient(controller_addr=args.controller) + + try: + if args.action == "create-topic": + await client.create_topic( + name=args.name, + partitions=args.partitions, + replication_factor=args.replication_factor, + ) + elif args.action == "list-topics": + await client.list_topics() + elif args.action == "describe-cluster": + await client.describe_cluster() + else: + print(f"Unknown action: {args.action}", file=sys.stderr) + parser.print_help() + sys.exit(1) + + await client.close() + + except Exception as e: + print(f"\nAn unexpected critical error occurred: {e}", file=sys.stderr) + sys.exit(1) + finally: + await client.close() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/tinystream/client/consumer.py b/tinystream/client/consumer.py index 81b69bd..ee45ec8 100644 --- a/tinystream/client/consumer.py +++ b/tinystream/client/consumer.py @@ -1,11 +1,9 @@ import asyncio -import random import uuid from typing import Any, Dict, List, Tuple, Optional from tinystream import DEFAULT_CONFIG_PATH from tinystream.client.connection import TinyStreamAPI -from tinystream.cluster_manager import ClusterManager from tinystream.config.parser import TinyStreamConfig from tinystream.serializer.base import AbstractSerializer from tinystream.utils.serlializer import init_serializer @@ -15,6 +13,7 @@ class Consumer: """ A stateful consumer that tracks its own offsets for assigned partitions. Supports both "single" broker and "cluster" (controller-aware) modes. + """ def __init__( @@ -35,23 +34,20 @@ def __init__( controller_config = config.get_controller_config() self.controller_host = controller_config.get("host") self.controller_port = controller_config.get("port") - self._controller_connection: Optional[TinyStreamAPI] = TinyStreamAPI( + self.controller_connection: Optional[TinyStreamAPI] = TinyStreamAPI( self.controller_host, # type: ignore self.controller_port, # type: ignore serializer=self.serializer, # type: ignore ) - self._topic_metadata_cache: Dict[str, Dict[int, Any]] = {} - self._broker_info_cache: Dict[int, Any] = {} - self._broker_connections: Dict[Tuple[str, int], TinyStreamAPI] = {} + self._topic_metadata_cache: Dict[ + str, Dict[int, Any] + ] = {} # {topic -> {part_id -> {leader, replicas}}} + self._broker_info_cache: Dict[int, Any] = {} # {broker_id -> {host, port}} + self._broker_connections: Dict[ + Tuple[str, int], TinyStreamAPI + ] = {} # {(host, port) -> connection} self._metadata_lock = asyncio.Lock() - self.cluster_manager = ClusterManager( - self.mode, - self._topic_metadata_cache, - self._broker_info_cache, - self._broker_connections, - self.serializer, - ) else: self.mode = "single" @@ -67,15 +63,14 @@ def __init__( # { (topic, partition_id): next_offset } self._assignments: Dict[Tuple[str, int], int] = {} - # Caches high-watermarks to avoid polling empty partitions self._hwms: Dict[Tuple[str, int], int] = {} async def connect(self) -> None: """Explicitly connects to the controller or the single broker.""" if self.mode == "cluster": print("[Consumer] (Cluster Mode): Connecting to controller...") - await self._controller_connection.ensure_connected() # type: ignore - await self._refresh_cluster_metadata() + await self.controller_connection.ensure_connected() # type: ignore + await self.refresh_cluster_metadata() elif self.mode == "single": print("[Consumer] (Single Mode): Connecting to broker...") @@ -85,8 +80,8 @@ async def close(self) -> None: """Closes all active connections.""" print("Closing consumer connections...") if self.mode == "cluster": - if self._controller_connection: - await self._controller_connection.close() + if self.controller_connection: + await self.controller_connection.close() for conn in self._broker_connections.values(): await conn.close() @@ -97,7 +92,7 @@ async def close(self) -> None: async def is_connected(self) -> bool: """Checks if the consumer is connected.""" if self.mode == "cluster": - if not self._controller_connection.is_connected: # type: ignore + if not self.controller_connection.is_connected: # type: ignore return False for conn in self._broker_connections.values(): if not conn.is_connected: @@ -119,17 +114,106 @@ def assign(self, topic: str, partition: int = 0, start_offset: int = 0) -> None: self._hwms[key] = 0 print(f"[Consumer] assigned to {topic}-{partition} at offset {start_offset}") - async def _get_connection_for_partition( - self, topic: str, partition: int - ) -> TinyStreamAPI: + async def refresh_cluster_metadata(self) -> None: + """Fetches the latest metadata from the Controller.""" + if self.mode != "cluster" or not self.controller_connection: + return + + print("[Consumer] Refreshing cluster metadata...") + async with self._metadata_lock: + try: + resp = await self.controller_connection.send_request( + {"command": "get_cluster_metadata"} + ) + if resp.get("status") == "ok": + metadata = resp.get("metadata", {}) + self._topic_metadata_cache = metadata.get("partitions", {}) + self._broker_info_cache = metadata.get("brokers", {}) + + for conn in self._broker_connections.values(): + await conn.close() + self._broker_connections.clear() + print("[Consumer] Metadata refreshed.") + else: + print(f"Failed to refresh metadata: {resp.get('message')}") + except Exception as e: + print(f"Error refreshing metadata: {e}") + + async def invalidate_caches(self, conn: Optional[TinyStreamAPI] = None) -> None: + """Invalidates metadata and connection caches, forcing a refresh.""" + print("[Consumer] Invalidating metadata caches...") + async with self._metadata_lock: + self._topic_metadata_cache.clear() + self._broker_info_cache.clear() + + if conn: + key_to_remove = None + for k, v in self._broker_connections.items(): + if v == conn: + key_to_remove = k + break + if key_to_remove: + await self._broker_connections[key_to_remove].close() + del self._broker_connections[key_to_remove] + else: + for c in self._broker_connections.values(): + await c.close() + self._broker_connections.clear() + + async def get_leader_connection(self, topic: str, partition: int) -> TinyStreamAPI: """ - Gets the correct broker connection for a partition based on mode. + Gets the correct broker connection for a partition leader. + Handles metadata refresh and connection caching. """ if self.mode == "single": await self._single_broker_connection.ensure_connected() # type: ignore return self._single_broker_connection - else: - return await self.cluster_manager.get_leader_connection(topic, partition) + + async with self._metadata_lock: + topic_info = self._topic_metadata_cache.get(topic) + if not topic_info: + print(f"No metadata for topic '{topic}', refreshing...") + await self.refresh_cluster_metadata() + topic_info = self._topic_metadata_cache.get(topic) + + if not topic_info: + raise Exception(f"Topic not found after refresh: {topic}") + + part_info = topic_info.get(partition) + if not part_info: + part_info = topic_info.get(partition) + + if not part_info: + raise Exception( + f"Partition not found after refresh: {topic}-{partition}" + ) + + leader_id = part_info.get("leader") + if leader_id is None: + raise Exception(f"No leader for partition: {topic}-{partition}") + + broker_info = self._broker_info_cache.get(leader_id) + if not broker_info: + raise Exception(f"Broker info not found for leader ID: {leader_id}") + + broker_host = broker_info.get("host") + broker_port = broker_info.get("data_port", broker_info.get("port")) + + conn_key = (broker_host, broker_port) + if conn_key not in self._broker_connections: + print( + f"Connecting to new broker for {topic}-{partition}: {broker_host}:{broker_port}" + ) + conn = TinyStreamAPI( + broker_host, # type: ignore + broker_port, # type: ignore + serializer=self.serializer, # type: ignore + ) + self._broker_connections[conn_key] = conn + + conn = self._broker_connections[conn_key] + await conn.ensure_connected() + return conn async def _update_high_watermarks(self) -> None: """ @@ -137,7 +221,7 @@ async def _update_high_watermarks(self) -> None: """ for topic, part in self._assignments.keys(): try: - conn = await self._get_connection_for_partition(topic, part) + conn = await self.get_leader_connection(topic, part) resp = await conn.send_request( {"command": "get_hwm", "topic": topic, "partition": part} @@ -149,12 +233,12 @@ async def _update_high_watermarks(self) -> None: f"Failed to get HWM for {topic}-{part}: {resp.get('message')}" ) if self.mode == "cluster": - await self.cluster_manager.invalidate_caches(conn) + await self.invalidate_caches(conn) except Exception as e: print(f"Failed to get HWM for {topic}-{part}: {e}") if self.mode == "cluster": - await self.cluster_manager.invalidate_caches() + await self.invalidate_caches() async def poll(self, max_messages: int = 100) -> List[Dict[str, Any]]: results: List[Dict[str, Any]] = [] @@ -172,7 +256,7 @@ async def poll(self, max_messages: int = 100) -> List[Dict[str, Any]]: if next_offset < hwm: try: - conn = await self._get_connection_for_partition(topic, part) + conn = await self.get_leader_connection(topic, part) resp = await conn.send_request( { @@ -193,13 +277,13 @@ async def poll(self, max_messages: int = 100) -> List[Dict[str, Any]]: ) self._hwms[(topic, part)] = 0 if self.mode == "cluster": - await self.cluster_manager.invalidate_caches(conn) + await self.invalidate_caches(conn) except Exception as e: print(f"Failed to read from {topic}-{part}: {e}") self._hwms[(topic, part)] = 0 if self.mode == "cluster": - await self.cluster_manager.invalidate_caches() + await self.invalidate_caches() if messages_polled_this_round == 0: break @@ -234,48 +318,24 @@ async def _send_commit_request( ) -> Dict[str, Any]: """Helper to send a commit request to the correct broker.""" try: - conn = await self._get_connection_for_partition(topic, partition) + conn = await self.get_leader_connection(topic, partition) return await conn.send_request(request) except Exception as e: print(f"Failed to commit for {topic}-{partition}: {e}") if self.mode == "cluster": - await self.cluster_manager.invalidate_caches() + await self.invalidate_caches() return {"status": "error", "message": str(e)} - async def _refresh_cluster_metadata(self) -> None: - """Fetches the latest cluster state from the controller.""" - if self.mode != "cluster": - return - - async with self._metadata_lock: - print("[Consumer]: Refreshing cluster metadata from controller...") - try: - await self._controller_connection.ensure_connected() # type: ignore - response = await self._controller_connection.send_request( # type: ignore - {"command": "get_cluster_metadata"} - ) - - if response.get("status") == "ok": - metadata = response["metadata"] - self._broker_info_cache = metadata.get("brokers", {}) - self._topic_metadata_cache = metadata.get("partitions", {}) - print("[Consumer]: Metadata refreshed.") - else: - print( - f"[Consumer]: Failed to refresh metadata: {response.get('message')}" - ) - except Exception as e: - print(f"[Consumer]: Error refreshing metadata: {e}") - async def main( - config: Optional[str] = DEFAULT_CONFIG_PATH, + config_path: Optional[str] = DEFAULT_CONFIG_PATH, topic: Optional[str] = None, - mode: str = "single", group_id: Optional[str] = None, ) -> None: - config = TinyStreamConfig.from_ini(config or DEFAULT_CONFIG_PATH) # type: ignore - config.mode = mode # type: ignore + config = TinyStreamConfig.from_ini(config_path or DEFAULT_CONFIG_PATH) # type: ignore + + mode = config.mode + print(f"[Consumer] Starting in '{mode}' mode (from config).") if not group_id: print("[Consumer] No group_id provided, generating a random one.") @@ -285,8 +345,7 @@ async def main( print( "[Consumer] No topic provided. Will randomly pick between available test topics." ) - topic = random.choices(["click", "view", "purchase", "scroll"], k=1)[0] - + topic = "clicks" consumer = Consumer( config=config, # type: ignore group_id=group_id, @@ -317,9 +376,8 @@ async def main( await asyncio.sleep(1) except ConnectionRefusedError: - print("\n[ERROR] Could not connect to broker.") - print("Please ensure the broker is running in another terminal:") - print(" python -m tinystream.broker") + print("\n[ERROR] Could not connect to broker or controller.") + print("Please ensure the broker/controller is running.") except KeyboardInterrupt: print("\n\nStopping consumer... (Ctrl+C pressed)") @@ -328,7 +386,7 @@ async def main( print(f"\nAn error occurred: {e}") finally: - if consumer.is_connected(): # type: ignore + if await consumer.is_connected(): # type: ignore await consumer.close() print("Consumer connection closed.") else: @@ -336,35 +394,24 @@ async def main( if __name__ == "__main__": - import sys import argparse - def print_usage(): - print( - "Usage: python consumer.py [--config CONFIG_PATH] [--mode single|cluster] [--group_id GROUP_ID]" - ) - sys.exit(1) - - parser = argparse.ArgumentParser(description="TinyStream Producer") + parser = argparse.ArgumentParser(description="TinyStream Consumer") parser.add_argument( "--config", type=str, default=DEFAULT_CONFIG_PATH, help="Path to TinyStream configuration file", ) - parser.add_argument( - "--mode", type=str, default="single", choices=["single", "cluster"] - ) parser.add_argument( "--topic", required=True, type=str, help="Topic to consume from" ) parser.add_argument("--group_id", type=str, help="Consumer group ID") args = parser.parse_args() - config_path = args.config + asyncio.run( main( - config=config_path, - mode=args.mode, + config_path=args.config, group_id=args.group_id, topic=args.topic, ) diff --git a/tinystream/client/producer.py b/tinystream/client/producer.py index a677cc3..fab979f 100644 --- a/tinystream/client/producer.py +++ b/tinystream/client/producer.py @@ -48,6 +48,8 @@ def __init__(self, config: TinyStreamConfig) -> None: self._broker_info_cache, self._broker_connections, self.serializer, + self._metadata_lock, + self._controller_connection, ) else: @@ -81,7 +83,7 @@ async def connect(self) -> None: if self.mode == "cluster": print("[Producer] (Cluster Mode): Connecting to controller...") await self._controller_connection.ensure_connected() # type: ignore - await self._refresh_cluster_metadata() + await self.cluster_manager.refresh_cluster_metadata() elif self.mode == "single": print("[Producer] (Single Mode): Connecting to broker...") @@ -120,9 +122,9 @@ async def _get_partition_count(self, topic: str) -> int: """Gets partition count for a topic based on mode.""" if self.mode == "cluster": if topic not in self._topic_metadata_cache: - await self._refresh_cluster_metadata() + await self.cluster_manager.refresh_cluster_metadata() - topic_partitions = self._topic_metadata_cache.get(topic) + topic_partitions = self.cluster_manager._topic_metadata_cache.get(topic) if not topic_partitions: raise ValueError(f"Topic '{topic}' not found in cluster metadata.") return len(topic_partitions) @@ -140,7 +142,6 @@ async def send( partition_count = await self._get_partition_count(topic) partition_id = self._get_partition_id(key_bytes, partition_count) - request = { "command": "append", "topic": topic, @@ -153,6 +154,31 @@ async def send( else: return await self._send_single(request) + async def create_topic( + self, topic: str, partition_count: int, replication_factor: int = 1 + ) -> Dict[str, Any]: + """ + Creates a new topic in the cluster. + Only applicable in cluster mode. + """ + + request = { + "command": "create_topic", + "topic": topic, + "partition_count": partition_count, + "replication_factor": replication_factor, + } + + await self._controller_connection.ensure_connected() # type: ignore + response = await self._controller_connection.send_request( # type: ignore + request + ) + + if response.get("status") == "ok" and self.mode == "cluster": + await self.cluster_manager.refresh_cluster_metadata() + + return response + async def _send_single(self, request: Dict[str, Any]) -> Dict[str, Any]: """Handles sending in single-broker mode.""" try: @@ -185,9 +211,7 @@ async def _send_cluster( if response.get("status") == "ok": return response - print( - f"[Producer]: Broker error: {response.get('message')}. Retrying..." - ) + print(f"[Producer]: Broker error: {response}. Retrying...") await self.cluster_manager.invalidate_caches(connection) except (ConnectionError, asyncio.TimeoutError) as e: @@ -204,30 +228,6 @@ async def _send_cluster( f"Failed to send message to topic '{topic}' after {retries} retries." ) - async def _refresh_cluster_metadata(self) -> None: - """Fetches the latest cluster state from the controller.""" - if self.mode != "cluster": - return - - async with self._metadata_lock: - try: - await self._controller_connection.ensure_connected() # type: ignore - response = await self._controller_connection.send_request( # type: ignore - {"command": "get_cluster_metadata"} - ) - - if response.get("status") == "ok": - metadata = response["metadata"] - self._broker_info_cache = metadata.get("brokers", {}) - self._topic_metadata_cache = metadata.get("partitions", {}) - print("[Producer]: Metadata refreshed.") - else: - print( - f"[Producer]: Failed to refresh metadata: {response.get('message')}" - ) - except Exception as e: - print(f"[Producer]: Error refreshing metadata: {e}") - async def main( config: Optional[str] = DEFAULT_CONFIG_PATH, mode: str = "single" @@ -236,12 +236,7 @@ async def main( config.mode = mode # type: ignore producer = Producer(config=config) # type: ignore - dummy_events = [ - {"user": "alice", "action": "click", "item": "item_A"}, - {"user": "bob", "action": "view", "item": "page_X"}, - {"user": "carlos", "action": "purchase", "item": "item_B"}, - {"user": "denise", "action": "scroll", "item": "button_Y"}, - ] + dummy_events = [{"user": "alice", "action": "clicks", "item": "item_A"}] try: await producer.connect() @@ -254,7 +249,7 @@ async def main( print(f"Sending: {event}") response = await producer.send( - topic=f"{event["action"]}s", data=event, key=event["user"] + topic=event["action"], data=event, key=event["user"] ) print(f"Broker response: {response}") diff --git a/tinystream/client/topic_manager.py b/tinystream/client/topic_manager.py new file mode 100644 index 0000000..42ceaa2 --- /dev/null +++ b/tinystream/client/topic_manager.py @@ -0,0 +1,74 @@ +import json +from typing import Dict + +from tinystream.models import PartitionMetadata, TopicMetadata, BrokerInfo + + +class TopicManager: + def __init__( + self, + db_connection, + brokers: Dict[int, BrokerInfo], + topics: Dict[str, TopicMetadata], + lock, + ): + self.db_connection = db_connection + self.brokers = brokers + self.topics = topics + self._lock = lock + + async def create_topic(self, name: str, partitions: int, replication_factor: int): + if not self.db_connection: + raise Exception("Database not connected") + + brokers_alive = [b.broker_id for b in self.brokers.values() if b.is_alive] + if len(brokers_alive) < replication_factor: + raise ValueError( + f"Not enough brokers alive ({len(brokers_alive)}) " + f"to satisfy replication factor ({replication_factor})." + ) + + new_topic_metadata = TopicMetadata(name=name, partitions={}) + partition_data_to_insert = [] + + for partition_id in range(partitions): + replicas = [] + for i in range(replication_factor): + broker_index = (partition_id + i) % len(brokers_alive) + replicas.append(brokers_alive[broker_index]) + + leader = replicas[0] + + replicas_json = json.dumps(replicas) + + partition_metadata = PartitionMetadata( + partition_id=partition_id, leader=leader, replicas=replicas + ) + new_topic_metadata.partitions[partition_id] = partition_metadata + + partition_data_to_insert.append((name, partition_id, leader, replicas_json)) + + async with self._lock: + if name in self.topics: + raise ValueError(f"Topic {name} already exists.") + + try: + await self.db_connection.execute( + "INSERT INTO topics (topic_name, partition_count, replication_factor) VALUES (?, ?, ?)", + (name, partitions, replication_factor), + ) + + await self.db_connection.executemany( + "INSERT INTO partitions (topic_name, partition_id, leader, replicas) VALUES (?, ?, ?, ?)", + partition_data_to_insert, + ) + + await self.db_connection.commit() + print(f"[Controller] Persisted topic {name} and its partitions to DB.") + + except Exception as e: + print(f"FATAL: Could not persist topic {name}: {e}") + raise + + self.topics[name] = new_topic_metadata + print(f"[Controller] Topic {name} is now live.") diff --git a/tinystream/cluster_manager.py b/tinystream/cluster_manager.py index 1beb67e..fc73d0f 100644 --- a/tinystream/cluster_manager.py +++ b/tinystream/cluster_manager.py @@ -1,3 +1,4 @@ +import asyncio from typing import Dict, Any, Tuple, Optional from tinystream.client.connection import TinyStreamAPI from tinystream.serializer.base import AbstractSerializer @@ -11,15 +12,16 @@ def __init__( broker_info_cache: Dict[int, Any], broker_connections: Dict[Tuple[str, int], TinyStreamAPI], serializer: AbstractSerializer, + metadata_lock: asyncio.Lock, + controller_connection: TinyStreamAPI, ) -> None: self.mode = mode self._topic_metadata_cache = topic_metadata_cache self._broker_info_cache = broker_info_cache self._broker_connections = broker_connections self.serializer = serializer - - async def _refresh_cluster_metadata(self): - raise NotImplementedError("This method should be implemented in the subclass.") + self._metadata_lock = metadata_lock + self._controller_connection = controller_connection async def get_leader_connection( self, topic: str, partition_id: int @@ -33,7 +35,7 @@ async def get_leader_connection( topic_partitions = self._topic_metadata_cache.get(topic) if not topic_partitions: - await self._refresh_cluster_metadata() + await self.refresh_cluster_metadata() topic_partitions = self._topic_metadata_cache.get(topic) if not topic_partitions: raise ValueError(f"Topic '{topic}' not found after refresh.") @@ -48,7 +50,7 @@ async def get_leader_connection( broker_info = self._broker_info_cache.get(leader_id) if not broker_info: - await self._refresh_cluster_metadata() + await self.refresh_cluster_metadata() broker_info = self._broker_info_cache.get(leader_id) if not broker_info: raise ValueError(f"Broker {leader_id} not found after refresh.") @@ -58,7 +60,7 @@ async def get_leader_connection( conn = self._broker_connections.get((host, port)) if not conn or not conn.is_connected: print( - f"Producer: Creating new connection to leader {leader_id} at {host}:{port}" + f"ClusterManager: Creating new connection to leader {leader_id} at {host}:{port}" ) conn = TinyStreamAPI(host, port, serializer=self.serializer) self._broker_connections[(host, port)] = conn @@ -71,7 +73,7 @@ async def invalidate_caches(self, connection: Optional[TinyStreamAPI] = None): if self.mode != "cluster": return - print("Producer: Invalidating caches due to error.") + print("ClusterManager: Invalidating caches due to error.") self._topic_metadata_cache.clear() self._broker_info_cache.clear() @@ -81,4 +83,26 @@ async def invalidate_caches(self, connection: Optional[TinyStreamAPI] = None): if conn_key in self._broker_connections: await self._broker_connections.pop(conn_key).close() - await self._refresh_cluster_metadata() + await self.refresh_cluster_metadata() + + async def refresh_cluster_metadata(self) -> None: + if self.mode != "cluster": + return + + async with self._metadata_lock: + try: + await self._controller_connection.ensure_connected() # type: ignore + response = await self._controller_connection.send_request( # type: ignore + {"command": "get_cluster_metadata"} + ) + if response.get("status") == "ok": + metadata = response["metadata"] + self._broker_info_cache = metadata.get("brokers", {}) + self._topic_metadata_cache = metadata.get("partitions", {}) + print("[ClusterManager]: Metadata refreshed.") + else: + print( + f"[ClusterManager]: Failed to refresh metadata: {response.get('message')}" + ) + except Exception as e: + print(f"[ClusterManager]: Error refreshing metadata: {e}") diff --git a/tinystream/config/conf.ini b/tinystream/config/conf.ini index 11c91f0..ceacb1b 100644 --- a/tinystream/config/conf.ini +++ b/tinystream/config/conf.ini @@ -1,5 +1,5 @@ [default] -mode = broker +mode = cluster [serialization] type = messagepack @@ -22,3 +22,4 @@ byte_order = little [metastore] db_path = ./data/metastore/tinystream.meta.db +http_port = 3200 diff --git a/tinystream/controller.py b/tinystream/controller.py index 188e319..2a88788 100644 --- a/tinystream/controller.py +++ b/tinystream/controller.py @@ -1,43 +1,23 @@ import asyncio +import json -from dataclasses import dataclass, asdict +from dataclasses import asdict import time -from typing import Dict, List, Optional, Any, Literal +from typing import Dict, Optional, Any, Literal from pathlib import Path +from tinystream.metastore import Metastore +from tinystream.models import BrokerInfo, TopicMetadata, PartitionMetadata from tinystream import DEFAULT_CONFIG_PATH from tinystream.client.base import BaseAsyncClient +from tinystream.client.topic_manager import TopicManager from tinystream.config.parser import TinyStreamConfig from tinystream.serializer.base import AbstractSerializer -@dataclass -class BrokerInfo: - broker_id: int - host: str - port: int - last_heartbeat: float = time.time() - is_alive: bool = True - status: Literal["ALIVE", "TIMED_OUT", "SHUTDOWN"] = "ALIVE" - - -@dataclass -class PartitionMetadata: - partition_id: int - leader: Optional[int] - replicas: List[int] - - -@dataclass -class TopicMetadata: - name: str - partitions: Dict[int, PartitionMetadata] - - class Controller(BaseAsyncClient): """ - TinyStream Controller — manages cluster metadata, brokers, and leader elections. - (Now a fully asynchronous, persistent server) + Manages cluster metadata, brokers, and leader elections. """ def __init__(self, config: TinyStreamConfig): @@ -74,6 +54,24 @@ def __init__(self, config: TinyStreamConfig): self._lock = asyncio.Lock() self._monitor_task: Optional[asyncio.Task] = None + self.topic_manager = TopicManager( + db_connection=None, + brokers=self.brokers, + topics=self.topics, + lock=self._lock, + ) + + self.metastore_http_port = int(metastore_config.get("http_port", 6000)) + + self.metastore = Metastore( + topic_manager=self.topic_manager, + topics=self.topics, + brokers=self.brokers, + lock=self._lock, + port=self.metastore_http_port, + ) + self.metastore_task = None + @staticmethod def init_serializer(serializer_name: str) -> AbstractSerializer: if serializer_name == "messagepack": @@ -106,8 +104,8 @@ async def _load_metadata(self): "SELECT * FROM partitions" ) as p_cursor: async for row in p_cursor: - topic, p_id, leader, replicas_json = row - replicas = [int(r) for r in replicas_json.split(",")] + topic, p_id, leader, replicas = row + replicas = [int(r) for r in json.loads(replicas)] if topic in self.topics: self.topics[topic].partitions[p_id] = PartitionMetadata( partition_id=p_id, leader=leader, replicas=replicas @@ -120,9 +118,18 @@ async def _load_metadata(self): async def start(self): """Starts the controller server and background tasks.""" await self.init_metastore(db_path=self.metastore_db_path) + self.topic_manager.db_connection = self.db_connection + await self._load_metadata() + self.metastore_task = asyncio.create_task( + self.metastore.start(), name="metastore-api-server" + ) self.start_background_tasks() + print( + f"[Controller] Metastore API docs at http://localhost:{self.metastore_http_port}/docs" + ) + await self.start_server() addr = self._server.sockets[0].getsockname() print(f"[Controller] Listening on {addr[0]}:{addr[1]}...") @@ -138,7 +145,7 @@ async def run_forever(self) -> None: await self._server.serve_forever() async def close(self): - print("\n[Controller] Shutting down...") + print("\n[Controller] Shutdown...") await self.stop_background_tasks() if self._server: self._server.close() @@ -156,25 +163,44 @@ async def send_request(self, payload_bytes: bytes) -> Dict[str, Any]: command = request.get("command") if command == "register_broker": - await self.register_broker( - request["broker_id"], request["host"], request["port"] - ) - return {"status": "ok"} + broker_id = request["broker_id"] + await self.register_broker(broker_id, request["host"], request["port"]) + + assignments = await self._get_assignments_for_broker(broker_id) + + return { + "status": "ok", + "message": "Broker registered successfully", + "assignments": assignments, + } elif command == "deregister_broker": return await self._handle_deregister(request) elif command == "heartbeat": - await self.update_broker_heartbeat(request["broker_id"]) - return {"status": "ok"} + broker_id = request["broker_id"] + await self.update_broker_heartbeat(broker_id) + + assignments = await self._get_assignments_for_broker(broker_id) + + return {"status": "ok", "assignments": assignments} elif command == "create_topic": - await self.create_topic( - request["name"], - request["partitions"], - request["replication_factor"], - ) - return {"status": "ok"} + try: + await self.topic_manager.create_topic( + request["name"], + request["partitions"], + request["replication_factor"], + ) + return { + "status": "success", + "message": f"Topic {request['name']} created.", + } + except ValueError as e: + return {"status": "error", "message": str(e)} + except Exception as e: + print(f"FATAL error in handle_create_topic_request: {e}") + return {"status": "error", "message": f"Internal server error: {e}"} elif command == "get_cluster_metadata": metadata = await self.get_cluster_metadata() @@ -222,61 +248,24 @@ async def update_broker_heartbeat(self, broker_id: int): self.brokers[broker_id].is_alive = True # TODO: Trigger rebalancing - async def create_topic(self, name: str, partitions: int, replication_factor: int): + # --- NEW HELPER METHOD --- + async def _get_assignments_for_broker(self, broker_id: int) -> list[dict]: + """ + Scans the in-memory state for all partitions assigned to a broker. + This method is self-contained and acquires its own lock. + """ + assignments = [] async with self._lock: - if not self.db_connection: - raise Exception("Database not connected") - if name in self.topics: - raise ValueError(f"Topic {name} already exists.") - - await self.db_connection.execute( - "INSERT INTO topics (topic_name, partition_count, replication_factor) VALUES (?, ?, ?)", - (name, partitions, replication_factor), - ) - - topic_metadata = TopicMetadata(name=name, partitions={}) - self.topics[name] = topic_metadata - - await self._assign_partitions(name, partitions, replication_factor) - - await self.db_connection.commit() - - async def _assign_partitions( - self, topic: str, partitions: int, replication_factor: int - ): - if not self.db_connection: - raise Exception("Database not connected") - - brokers_alive = [b.broker_id for b in self.brokers.values() if b.is_alive] - if len(brokers_alive) < replication_factor: - raise ValueError("Not enough brokers alive to satisfy replication factor.") - - topic_metadata = self.topics[topic] - - partition_data_to_insert = [] - for partition_id in range(partitions): - replicas = [] - for i in range(replication_factor): - broker_index = (partition_id + i) % len(brokers_alive) - replicas.append(brokers_alive[broker_index]) - - leader = replicas[0] - replicas_json = ",".join(map(str, replicas)) - - partition_metadata = PartitionMetadata( - partition_id=partition_id, leader=leader, replicas=replicas - ) - topic_metadata.partitions[partition_id] = partition_metadata - - partition_data_to_insert.append( - (topic, partition_id, leader, replicas_json) - ) + for topic_name, topic_meta in self.topics.items(): + for part_id, part_meta in topic_meta.partitions.items(): + if broker_id in part_meta.replicas: + role = "leader" if broker_id == part_meta.leader else "follower" + assignments.append( + {"topic": topic_name, "partition_id": part_id, "role": role} + ) + return assignments - await self.db_connection.executemany( - "INSERT INTO partitions (topic_name, partition_id, leader, replicas) VALUES (?, ?, ?, ?)", - partition_data_to_insert, - ) - print(f"[Controller] Assigned and persisted partitions for topic {topic}") + # --- END NEW HELPER METHOD --- async def _elect_leader(self, topic: str, partition_id: int) -> Optional[int]: if not self.db_connection: @@ -315,7 +304,7 @@ async def remove_dead_brokers(self): dead_broker_ids = [] for broker in self.brokers.values(): if broker.status == "ALIVE" and ( - current_time - broker.last_heartbeat > self.heartbeat_timeout + current_time - broker.last_heartkey > self.heartbeat_timeout ): print(f"[Controller] Broker {broker.broker_id} timed out.") dead_broker_ids.append(broker.broker_id) @@ -400,13 +389,6 @@ async def main(): try: await controller.start() - default_broker_counter = 2 - for _broker_id in range(default_broker_counter): - print(f"[Controller] Pre-registering broker {_broker_id}...") - await controller.register_broker( - broker_id=_broker_id, host="localhost", port=int(f"909{_broker_id}") - ) - print("[Controller] Startup complete. Running server forever...") await controller.run_forever() diff --git a/tinystream/metastore.py b/tinystream/metastore.py new file mode 100644 index 0000000..6f8f2c3 --- /dev/null +++ b/tinystream/metastore.py @@ -0,0 +1,120 @@ +import uvicorn +import asyncio +from fastapi import FastAPI, HTTPException +from typing import Dict + +from tinystream.models import ( + CreateTopicRequest, + ListTopicsResponse, + ClusterInfoResponse, + TopicInfo, +) +from tinystream.client.topic_manager import TopicManager +from tinystream.models import BrokerInfo, TopicMetadata + + +class Metastore: + """ + Hosts the Admin REST API server using FastAPI. + + This object is given references to the live cluster state + from the Controller (or single-mode Broker) that owns it. + """ + + def __init__( + self, + topic_manager: TopicManager, + topics: Dict[str, TopicMetadata], + brokers: Dict[int, BrokerInfo], + lock: asyncio.Lock, + host: str = "0.0.0.0", + port: int = 6000, + ): + self.topic_manager = topic_manager + self.topics = topics + self.brokers = brokers + self._lock = lock + self.host = host + self.port = port + + self.api_app = FastAPI( + title="TinyStream Metastore API", + description="Admin endpoints for managing the TinyStream cluster.", + ) + self.api_server = None + self._setup_api_routes() + + def _setup_api_routes(self): + """Attaches this class's methods to the FastAPI app routes.""" + self.api_app.post("/api/v1/admin/topics", status_code=201)( + self._api_create_topic + ) + self.api_app.get("/api/v1/admin/topics", response_model=ListTopicsResponse)( + self._api_list_topics + ) + self.api_app.get("/api/v1/admin/cluster", response_model=ClusterInfoResponse)( + self._api_describe_cluster + ) + + async def start(self): + """Starts the uvicorn server as an async task.""" + print(f"[MetastoreAPI] Starting server on http://{self.host}:{self.port}") + + config = uvicorn.Config( + app=self.api_app, + host=self.host, + port=self.port, + loop="asyncio", + log_level="info", + ) + self.api_server = uvicorn.Server(config) + + try: + await self.api_server.serve() + except asyncio.CancelledError: + print("[MetastoreAPI] Server task cancelled.") + finally: + print("[MetastoreAPI] Server has shut down.") + + async def close(self): + """Signals the Uvicorn server to shut down.""" + if self.api_server: + print("[MetastoreAPI] Shutting down server...") + await self.api_server.shutdown() + + async def _api_create_topic(self, request: CreateTopicRequest): + try: + await self.topic_manager.create_topic( + request.topic_name, request.partition_count, request.replication_factor + ) + return { + "status": "success", + "message": f"Topic '{request.topic_name}' created.", + } + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + except Exception as e: + print(f"FATAL: _api_create_topic failed: {e}") + raise HTTPException(status_code=500, detail="Internal server error.") + + async def _api_list_topics(self) -> ListTopicsResponse: + async with self._lock: + response_topics = {} + for topic_name, meta in self.topics.items(): + response_topics[topic_name] = TopicInfo( + name=topic_name, + partition_count=len(meta.partitions), + ) + return ListTopicsResponse(topics=response_topics) + + async def _api_describe_cluster(self) -> ClusterInfoResponse: + async with self._lock: + response_brokers = {} + for broker_id, info in self.brokers.items(): + response_brokers[broker_id] = BrokerInfo( + broker_id=info.broker_id, + host=info.host, + port=info.port, + is_alive=info.is_alive, + ) + return ClusterInfoResponse(brokers=response_brokers) diff --git a/tinystream/models.py b/tinystream/models.py new file mode 100644 index 0000000..a0127f7 --- /dev/null +++ b/tinystream/models.py @@ -0,0 +1,64 @@ +import time +from dataclasses import dataclass +from typing import Literal, Optional, Dict, List +from pydantic import BaseModel, Field + + +@dataclass +class BrokerInfo: + broker_id: int + host: str + port: int + last_heartbeat: float = time.time() + is_alive: bool = True + status: Literal["ALIVE", "TIMED_OUT", "SHUTDOWN"] = "ALIVE" + + +@dataclass +class PartitionMetadata: + partition_id: int + leader: Optional[int] + replicas: List[int] + + +@dataclass +class TopicMetadata: + name: str + partitions: Dict[int, PartitionMetadata] + + +class CreateTopicRequest(BaseModel): + """ + JSON body for a POST /api/v1/admin/topics request. + """ + + topic_name: str = Field(..., description="The name of the new topic.") + partition_count: int = Field(..., gt=0, description="Number of partitions.") + replication_factor: int = Field( + ..., gt=0, description="Replication factor (must be >= 1)." + ) + + +class TopicInfo(BaseModel): + """ + Response model for a single topic's details. + """ + + name: str + partition_count: int + + +class ListTopicsResponse(BaseModel): + """ + Response for GET /api/v1/admin/topics + """ + + topics: dict[str, TopicInfo] + + +class ClusterInfoResponse(BaseModel): + """ + Response for GET /api/v1/admin/cluster + """ + + brokers: dict[int, BrokerInfo] diff --git a/tinystream/utils/db.py b/tinystream/utils/db.py index c96c85b..6f4ec88 100644 --- a/tinystream/utils/db.py +++ b/tinystream/utils/db.py @@ -21,7 +21,9 @@ async def create_db_schemas(connection): ( topic_name TEXT PRIMARY KEY, partition_count INTEGER NOT NULL, - replication_factor INTEGER NOT NULL + replication_factor INTEGER NOT NULL, + retention_ms INTEGER DEFAULT 604800000 NOT NULL, + retention_bytes INTEGER DEFAULT -1 NOT NULL ) """ ) diff --git a/uv.lock b/uv.lock index f4eb947..994b2c7 100644 --- a/uv.lock +++ b/uv.lock @@ -23,6 +23,38 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/f5/10/6c25ed6de94c49f88a91fa5018cb4c0f3625f31d5be9f771ebe5cc7cd506/aiosqlite-0.21.0-py3-none-any.whl", hash = "sha256:2549cf4057f95f53dcba16f2b64e8e2791d7e1adedb13197dd8ed77bb226d7d0", size = 15792, upload-time = "2025-02-03T07:30:13.6Z" }, ] +[[package]] +name = "annotated-doc" +version = "0.0.3" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/d7/a6/dc46877b911e40c00d395771ea710d5e77b6de7bacd5fdcd78d70cc5a48f/annotated_doc-0.0.3.tar.gz", hash = "sha256:e18370014c70187422c33e945053ff4c286f453a984eba84d0dbfa0c935adeda", size = 5535, upload-time = "2025-10-24T14:57:10.718Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/02/b7/cf592cb5de5cb3bade3357f8d2cf42bf103bbe39f459824b4939fd212911/annotated_doc-0.0.3-py3-none-any.whl", hash = "sha256:348ec6664a76f1fd3be81f43dffbee4c7e8ce931ba71ec67cc7f4ade7fbbb580", size = 5488, upload-time = "2025-10-24T14:57:09.462Z" }, +] + +[[package]] +name = "annotated-types" +version = "0.7.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/ee/67/531ea369ba64dcff5ec9c3402f9f51bf748cec26dde048a2f973a4eea7f5/annotated_types-0.7.0.tar.gz", hash = "sha256:aff07c09a53a08bc8cfccb9c85b05f1aa9a2a6f23728d790723543408344ce89", size = 16081, upload-time = "2024-05-20T21:33:25.928Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/78/b6/6307fbef88d9b5ee7421e68d78a9f162e0da4900bc5f5793f6d3d0e34fb8/annotated_types-0.7.0-py3-none-any.whl", hash = "sha256:1f02e8b43a8fbbc3f3e0d4f0f4bfc8131bcb4eebe8849b8e5c773f3a1c582a53", size = 13643, upload-time = "2024-05-20T21:33:24.1Z" }, +] + +[[package]] +name = "anyio" +version = "4.11.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "idna" }, + { name = "sniffio" }, + { name = "typing-extensions", marker = "python_full_version < '3.13'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/c6/78/7d432127c41b50bccba979505f272c16cbcadcc33645d5fa3a738110ae75/anyio-4.11.0.tar.gz", hash = "sha256:82a8d0b81e318cc5ce71a5f1f8b5c4e63619620b63141ef8c995fa0db95a57c4", size = 219094, upload-time = "2025-09-23T09:19:12.58Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/15/b3/9b1a8074496371342ec1e796a96f99c82c945a339cd81a8e73de28b4cf9e/anyio-4.11.0-py3-none-any.whl", hash = "sha256:0287e96f4d26d4149305414d4e3bc32f0dcd0862365a4bddea19d7a1ec38c4fc", size = 109097, upload-time = "2025-09-23T09:19:10.601Z" }, +] + [[package]] name = "bidict" version = "0.23.1" @@ -250,6 +282,21 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/33/6b/e0547afaf41bf2c42e52430072fa5658766e3d65bd4b03a563d1b6336f57/distlib-0.4.0-py2.py3-none-any.whl", hash = "sha256:9659f7d87e46584a30b5780e43ac7a2143098441670ff0a49d5f9034c54a6c16", size = 469047, upload-time = "2025-07-17T16:51:58.613Z" }, ] +[[package]] +name = "fastapi" +version = "0.120.2" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "annotated-doc" }, + { name = "pydantic" }, + { name = "starlette" }, + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/a0/fb/79e556bc8f9d360e5cc2fa7364a7ad6bda6f1736938b43a2791fa8baee7b/fastapi-0.120.2.tar.gz", hash = "sha256:4c5ab43e2a90335bbd8326d1b659eac0f3dbcc015e2af573c4f5de406232c4ac", size = 338684, upload-time = "2025-10-29T13:47:35.802Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/81/cc/1c33d05f62c9349bb80dfe789cc9a7409bdfb337a63fa347fd651d25294a/fastapi-0.120.2-py3-none-any.whl", hash = "sha256:bedcf2c14240e43d56cb9a339b32bcf15104fe6b5897c0222603cb7ec416c8eb", size = 108383, upload-time = "2025-10-29T13:47:32.978Z" }, +] + [[package]] name = "filelock" version = "3.20.0" @@ -446,6 +493,34 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/04/4b/29cac41a4d98d144bf5f6d33995617b185d14b22401f75ca86f384e87ff1/h11-0.16.0-py3-none-any.whl", hash = "sha256:63cf8bbe7522de3bf65932fda1d9c2772064ffb3dae62d55932da54b31cb6c86", size = 37515, upload-time = "2025-04-24T03:35:24.344Z" }, ] +[[package]] +name = "httpcore" +version = "1.0.9" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "certifi" }, + { name = "h11" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/06/94/82699a10bca87a5556c9c59b5963f2d039dbd239f25bc2a63907a05a14cb/httpcore-1.0.9.tar.gz", hash = "sha256:6e34463af53fd2ab5d807f399a9b45ea31c3dfa2276f15a2c3f00afff6e176e8", size = 85484, upload-time = "2025-04-24T22:06:22.219Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/7e/f5/f66802a942d491edb555dd61e3a9961140fd64c90bce1eafd741609d334d/httpcore-1.0.9-py3-none-any.whl", hash = "sha256:2d400746a40668fc9dec9810239072b40b4484b640a8c38fd654a024c7a1bf55", size = 78784, upload-time = "2025-04-24T22:06:20.566Z" }, +] + +[[package]] +name = "httpx" +version = "0.28.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "anyio" }, + { name = "certifi" }, + { name = "httpcore" }, + { name = "idna" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/b1/df/48c586a5fe32a0f01324ee087459e112ebb7224f646c0b5023f5e79e9956/httpx-0.28.1.tar.gz", hash = "sha256:75e98c5f16b0f35b567856f597f06ff2270a374470a5c2392242528e3e3e42fc", size = 141406, upload-time = "2024-12-06T15:37:23.222Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/2a/39/e50c7c3a983047577ee07d2a9e53faf5a69493943ec3f6a384bdc792deb2/httpx-0.28.1-py3-none-any.whl", hash = "sha256:d909fcccc110f8c7faf814ca82a9a4d816bc5a6dbfea25d6591d6985b8ba59ad", size = 73517, upload-time = "2024-12-06T15:37:21.509Z" }, +] + [[package]] name = "identify" version = "2.6.15" @@ -749,6 +824,88 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/a0/e3/59cd50310fc9b59512193629e1984c1f95e5c8ae6e5d8c69532ccc65a7fe/pycparser-2.23-py3-none-any.whl", hash = "sha256:e5c6e8d3fbad53479cab09ac03729e0a9faf2bee3db8208a550daf5af81a5934", size = 118140, upload-time = "2025-09-09T13:23:46.651Z" }, ] +[[package]] +name = "pydantic" +version = "2.12.3" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "annotated-types" }, + { name = "pydantic-core" }, + { name = "typing-extensions" }, + { name = "typing-inspection" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/f3/1e/4f0a3233767010308f2fd6bd0814597e3f63f1dc98304a9112b8759df4ff/pydantic-2.12.3.tar.gz", hash = "sha256:1da1c82b0fc140bb0103bc1441ffe062154c8d38491189751ee00fd8ca65ce74", size = 819383, upload-time = "2025-10-17T15:04:21.222Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/a1/6b/83661fa77dcefa195ad5f8cd9af3d1a7450fd57cc883ad04d65446ac2029/pydantic-2.12.3-py3-none-any.whl", hash = "sha256:6986454a854bc3bc6e5443e1369e06a3a456af9d339eda45510f517d9ea5c6bf", size = 462431, upload-time = "2025-10-17T15:04:19.346Z" }, +] + +[[package]] +name = "pydantic-core" +version = "2.41.4" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/df/18/d0944e8eaaa3efd0a91b0f1fc537d3be55ad35091b6a87638211ba691964/pydantic_core-2.41.4.tar.gz", hash = "sha256:70e47929a9d4a1905a67e4b687d5946026390568a8e952b92824118063cee4d5", size = 457557, upload-time = "2025-10-14T10:23:47.909Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/e9/81/d3b3e95929c4369d30b2a66a91db63c8ed0a98381ae55a45da2cd1cc1288/pydantic_core-2.41.4-cp312-cp312-macosx_10_12_x86_64.whl", hash = "sha256:ab06d77e053d660a6faaf04894446df7b0a7e7aba70c2797465a0a1af00fc887", size = 2099043, upload-time = "2025-10-14T10:20:28.561Z" }, + { url = "https://files.pythonhosted.org/packages/58/da/46fdac49e6717e3a94fc9201403e08d9d61aa7a770fab6190b8740749047/pydantic_core-2.41.4-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:c53ff33e603a9c1179a9364b0a24694f183717b2e0da2b5ad43c316c956901b2", size = 1910699, upload-time = "2025-10-14T10:20:30.217Z" }, + { url = "https://files.pythonhosted.org/packages/1e/63/4d948f1b9dd8e991a5a98b77dd66c74641f5f2e5225fee37994b2e07d391/pydantic_core-2.41.4-cp312-cp312-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:304c54176af2c143bd181d82e77c15c41cbacea8872a2225dd37e6544dce9999", size = 1952121, upload-time = "2025-10-14T10:20:32.246Z" }, + { url = "https://files.pythonhosted.org/packages/b2/a7/e5fc60a6f781fc634ecaa9ecc3c20171d238794cef69ae0af79ac11b89d7/pydantic_core-2.41.4-cp312-cp312-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:025ba34a4cf4fb32f917d5d188ab5e702223d3ba603be4d8aca2f82bede432a4", size = 2041590, upload-time = "2025-10-14T10:20:34.332Z" }, + { url = "https://files.pythonhosted.org/packages/70/69/dce747b1d21d59e85af433428978a1893c6f8a7068fa2bb4a927fba7a5ff/pydantic_core-2.41.4-cp312-cp312-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:b9f5f30c402ed58f90c70e12eff65547d3ab74685ffe8283c719e6bead8ef53f", size = 2219869, upload-time = "2025-10-14T10:20:35.965Z" }, + { url = "https://files.pythonhosted.org/packages/83/6a/c070e30e295403bf29c4df1cb781317b6a9bac7cd07b8d3acc94d501a63c/pydantic_core-2.41.4-cp312-cp312-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:dd96e5d15385d301733113bcaa324c8bcf111275b7675a9c6e88bfb19fc05e3b", size = 2345169, upload-time = "2025-10-14T10:20:37.627Z" }, + { url = "https://files.pythonhosted.org/packages/f0/83/06d001f8043c336baea7fd202a9ac7ad71f87e1c55d8112c50b745c40324/pydantic_core-2.41.4-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:98f348cbb44fae6e9653c1055db7e29de67ea6a9ca03a5fa2c2e11a47cff0e47", size = 2070165, upload-time = "2025-10-14T10:20:39.246Z" }, + { url = "https://files.pythonhosted.org/packages/14/0a/e567c2883588dd12bcbc110232d892cf385356f7c8a9910311ac997ab715/pydantic_core-2.41.4-cp312-cp312-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:ec22626a2d14620a83ca583c6f5a4080fa3155282718b6055c2ea48d3ef35970", size = 2189067, upload-time = "2025-10-14T10:20:41.015Z" }, + { url = "https://files.pythonhosted.org/packages/f4/1d/3d9fca34273ba03c9b1c5289f7618bc4bd09c3ad2289b5420481aa051a99/pydantic_core-2.41.4-cp312-cp312-musllinux_1_1_aarch64.whl", hash = "sha256:3a95d4590b1f1a43bf33ca6d647b990a88f4a3824a8c4572c708f0b45a5290ed", size = 2132997, upload-time = "2025-10-14T10:20:43.106Z" }, + { url = "https://files.pythonhosted.org/packages/52/70/d702ef7a6cd41a8afc61f3554922b3ed8d19dd54c3bd4bdbfe332e610827/pydantic_core-2.41.4-cp312-cp312-musllinux_1_1_armv7l.whl", hash = "sha256:f9672ab4d398e1b602feadcffcdd3af44d5f5e6ddc15bc7d15d376d47e8e19f8", size = 2307187, upload-time = "2025-10-14T10:20:44.849Z" }, + { url = "https://files.pythonhosted.org/packages/68/4c/c06be6e27545d08b802127914156f38d10ca287a9e8489342793de8aae3c/pydantic_core-2.41.4-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:84d8854db5f55fead3b579f04bda9a36461dab0730c5d570e1526483e7bb8431", size = 2305204, upload-time = "2025-10-14T10:20:46.781Z" }, + { url = "https://files.pythonhosted.org/packages/b0/e5/35ae4919bcd9f18603419e23c5eaf32750224a89d41a8df1a3704b69f77e/pydantic_core-2.41.4-cp312-cp312-win32.whl", hash = "sha256:9be1c01adb2ecc4e464392c36d17f97e9110fbbc906bcbe1c943b5b87a74aabd", size = 1972536, upload-time = "2025-10-14T10:20:48.39Z" }, + { url = "https://files.pythonhosted.org/packages/1e/c2/49c5bb6d2a49eb2ee3647a93e3dae7080c6409a8a7558b075027644e879c/pydantic_core-2.41.4-cp312-cp312-win_amd64.whl", hash = "sha256:d682cf1d22bab22a5be08539dca3d1593488a99998f9f412137bc323179067ff", size = 2031132, upload-time = "2025-10-14T10:20:50.421Z" }, + { url = "https://files.pythonhosted.org/packages/06/23/936343dbcba6eec93f73e95eb346810fc732f71ba27967b287b66f7b7097/pydantic_core-2.41.4-cp312-cp312-win_arm64.whl", hash = "sha256:833eebfd75a26d17470b58768c1834dfc90141b7afc6eb0429c21fc5a21dcfb8", size = 1969483, upload-time = "2025-10-14T10:20:52.35Z" }, + { url = "https://files.pythonhosted.org/packages/13/d0/c20adabd181a029a970738dfe23710b52a31f1258f591874fcdec7359845/pydantic_core-2.41.4-cp313-cp313-macosx_10_12_x86_64.whl", hash = "sha256:85e050ad9e5f6fe1004eec65c914332e52f429bc0ae12d6fa2092407a462c746", size = 2105688, upload-time = "2025-10-14T10:20:54.448Z" }, + { url = "https://files.pythonhosted.org/packages/00/b6/0ce5c03cec5ae94cca220dfecddc453c077d71363b98a4bbdb3c0b22c783/pydantic_core-2.41.4-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:e7393f1d64792763a48924ba31d1e44c2cfbc05e3b1c2c9abb4ceeadd912cced", size = 1910807, upload-time = "2025-10-14T10:20:56.115Z" }, + { url = "https://files.pythonhosted.org/packages/68/3e/800d3d02c8beb0b5c069c870cbb83799d085debf43499c897bb4b4aaff0d/pydantic_core-2.41.4-cp313-cp313-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:94dab0940b0d1fb28bcab847adf887c66a27a40291eedf0b473be58761c9799a", size = 1956669, upload-time = "2025-10-14T10:20:57.874Z" }, + { url = "https://files.pythonhosted.org/packages/60/a4/24271cc71a17f64589be49ab8bd0751f6a0a03046c690df60989f2f95c2c/pydantic_core-2.41.4-cp313-cp313-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:de7c42f897e689ee6f9e93c4bec72b99ae3b32a2ade1c7e4798e690ff5246e02", size = 2051629, upload-time = "2025-10-14T10:21:00.006Z" }, + { url = "https://files.pythonhosted.org/packages/68/de/45af3ca2f175d91b96bfb62e1f2d2f1f9f3b14a734afe0bfeff079f78181/pydantic_core-2.41.4-cp313-cp313-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:664b3199193262277b8b3cd1e754fb07f2c6023289c815a1e1e8fb415cb247b1", size = 2224049, upload-time = "2025-10-14T10:21:01.801Z" }, + { url = "https://files.pythonhosted.org/packages/af/8f/ae4e1ff84672bf869d0a77af24fd78387850e9497753c432875066b5d622/pydantic_core-2.41.4-cp313-cp313-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:d95b253b88f7d308b1c0b417c4624f44553ba4762816f94e6986819b9c273fb2", size = 2342409, upload-time = "2025-10-14T10:21:03.556Z" }, + { url = "https://files.pythonhosted.org/packages/18/62/273dd70b0026a085c7b74b000394e1ef95719ea579c76ea2f0cc8893736d/pydantic_core-2.41.4-cp313-cp313-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a1351f5bbdbbabc689727cb91649a00cb9ee7203e0a6e54e9f5ba9e22e384b84", size = 2069635, upload-time = "2025-10-14T10:21:05.385Z" }, + { url = "https://files.pythonhosted.org/packages/30/03/cf485fff699b4cdaea469bc481719d3e49f023241b4abb656f8d422189fc/pydantic_core-2.41.4-cp313-cp313-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:1affa4798520b148d7182da0615d648e752de4ab1a9566b7471bc803d88a062d", size = 2194284, upload-time = "2025-10-14T10:21:07.122Z" }, + { url = "https://files.pythonhosted.org/packages/f9/7e/c8e713db32405dfd97211f2fc0a15d6bf8adb7640f3d18544c1f39526619/pydantic_core-2.41.4-cp313-cp313-musllinux_1_1_aarch64.whl", hash = "sha256:7b74e18052fea4aa8dea2fb7dbc23d15439695da6cbe6cfc1b694af1115df09d", size = 2137566, upload-time = "2025-10-14T10:21:08.981Z" }, + { url = "https://files.pythonhosted.org/packages/04/f7/db71fd4cdccc8b75990f79ccafbbd66757e19f6d5ee724a6252414483fb4/pydantic_core-2.41.4-cp313-cp313-musllinux_1_1_armv7l.whl", hash = "sha256:285b643d75c0e30abda9dc1077395624f314a37e3c09ca402d4015ef5979f1a2", size = 2316809, upload-time = "2025-10-14T10:21:10.805Z" }, + { url = "https://files.pythonhosted.org/packages/76/63/a54973ddb945f1bca56742b48b144d85c9fc22f819ddeb9f861c249d5464/pydantic_core-2.41.4-cp313-cp313-musllinux_1_1_x86_64.whl", hash = "sha256:f52679ff4218d713b3b33f88c89ccbf3a5c2c12ba665fb80ccc4192b4608dbab", size = 2311119, upload-time = "2025-10-14T10:21:12.583Z" }, + { url = "https://files.pythonhosted.org/packages/f8/03/5d12891e93c19218af74843a27e32b94922195ded2386f7b55382f904d2f/pydantic_core-2.41.4-cp313-cp313-win32.whl", hash = "sha256:ecde6dedd6fff127c273c76821bb754d793be1024bc33314a120f83a3c69460c", size = 1981398, upload-time = "2025-10-14T10:21:14.584Z" }, + { url = "https://files.pythonhosted.org/packages/be/d8/fd0de71f39db91135b7a26996160de71c073d8635edfce8b3c3681be0d6d/pydantic_core-2.41.4-cp313-cp313-win_amd64.whl", hash = "sha256:d081a1f3800f05409ed868ebb2d74ac39dd0c1ff6c035b5162356d76030736d4", size = 2030735, upload-time = "2025-10-14T10:21:16.432Z" }, + { url = "https://files.pythonhosted.org/packages/72/86/c99921c1cf6650023c08bfab6fe2d7057a5142628ef7ccfa9921f2dda1d5/pydantic_core-2.41.4-cp313-cp313-win_arm64.whl", hash = "sha256:f8e49c9c364a7edcbe2a310f12733aad95b022495ef2a8d653f645e5d20c1564", size = 1973209, upload-time = "2025-10-14T10:21:18.213Z" }, + { url = "https://files.pythonhosted.org/packages/36/0d/b5706cacb70a8414396efdda3d72ae0542e050b591119e458e2490baf035/pydantic_core-2.41.4-cp313-cp313t-macosx_11_0_arm64.whl", hash = "sha256:ed97fd56a561f5eb5706cebe94f1ad7c13b84d98312a05546f2ad036bafe87f4", size = 1877324, upload-time = "2025-10-14T10:21:20.363Z" }, + { url = "https://files.pythonhosted.org/packages/de/2d/cba1fa02cfdea72dfb3a9babb067c83b9dff0bbcb198368e000a6b756ea7/pydantic_core-2.41.4-cp313-cp313t-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a870c307bf1ee91fc58a9a61338ff780d01bfae45922624816878dce784095d2", size = 1884515, upload-time = "2025-10-14T10:21:22.339Z" }, + { url = "https://files.pythonhosted.org/packages/07/ea/3df927c4384ed9b503c9cc2d076cf983b4f2adb0c754578dfb1245c51e46/pydantic_core-2.41.4-cp313-cp313t-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d25e97bc1f5f8f7985bdc2335ef9e73843bb561eb1fa6831fdfc295c1c2061cf", size = 2042819, upload-time = "2025-10-14T10:21:26.683Z" }, + { url = "https://files.pythonhosted.org/packages/6a/ee/df8e871f07074250270a3b1b82aad4cd0026b588acd5d7d3eb2fcb1471a3/pydantic_core-2.41.4-cp313-cp313t-win_amd64.whl", hash = "sha256:d405d14bea042f166512add3091c1af40437c2e7f86988f3915fabd27b1e9cd2", size = 1995866, upload-time = "2025-10-14T10:21:28.951Z" }, + { url = "https://files.pythonhosted.org/packages/fc/de/b20f4ab954d6d399499c33ec4fafc46d9551e11dc1858fb7f5dca0748ceb/pydantic_core-2.41.4-cp313-cp313t-win_arm64.whl", hash = "sha256:19f3684868309db5263a11bace3c45d93f6f24afa2ffe75a647583df22a2ff89", size = 1970034, upload-time = "2025-10-14T10:21:30.869Z" }, + { url = "https://files.pythonhosted.org/packages/54/28/d3325da57d413b9819365546eb9a6e8b7cbd9373d9380efd5f74326143e6/pydantic_core-2.41.4-cp314-cp314-macosx_10_12_x86_64.whl", hash = "sha256:e9205d97ed08a82ebb9a307e92914bb30e18cdf6f6b12ca4bedadb1588a0bfe1", size = 2102022, upload-time = "2025-10-14T10:21:32.809Z" }, + { url = "https://files.pythonhosted.org/packages/9e/24/b58a1bc0d834bf1acc4361e61233ee217169a42efbdc15a60296e13ce438/pydantic_core-2.41.4-cp314-cp314-macosx_11_0_arm64.whl", hash = "sha256:82df1f432b37d832709fbcc0e24394bba04a01b6ecf1ee87578145c19cde12ac", size = 1905495, upload-time = "2025-10-14T10:21:34.812Z" }, + { url = "https://files.pythonhosted.org/packages/fb/a4/71f759cc41b7043e8ecdaab81b985a9b6cad7cec077e0b92cff8b71ecf6b/pydantic_core-2.41.4-cp314-cp314-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:fc3b4cc4539e055cfa39a3763c939f9d409eb40e85813257dcd761985a108554", size = 1956131, upload-time = "2025-10-14T10:21:36.924Z" }, + { url = "https://files.pythonhosted.org/packages/b0/64/1e79ac7aa51f1eec7c4cda8cbe456d5d09f05fdd68b32776d72168d54275/pydantic_core-2.41.4-cp314-cp314-manylinux_2_17_armv7l.manylinux2014_armv7l.whl", hash = "sha256:b1eb1754fce47c63d2ff57fdb88c351a6c0150995890088b33767a10218eaa4e", size = 2052236, upload-time = "2025-10-14T10:21:38.927Z" }, + { url = "https://files.pythonhosted.org/packages/e9/e3/a3ffc363bd4287b80f1d43dc1c28ba64831f8dfc237d6fec8f2661138d48/pydantic_core-2.41.4-cp314-cp314-manylinux_2_17_ppc64le.manylinux2014_ppc64le.whl", hash = "sha256:e6ab5ab30ef325b443f379ddb575a34969c333004fca5a1daa0133a6ffaad616", size = 2223573, upload-time = "2025-10-14T10:21:41.574Z" }, + { url = "https://files.pythonhosted.org/packages/28/27/78814089b4d2e684a9088ede3790763c64693c3d1408ddc0a248bc789126/pydantic_core-2.41.4-cp314-cp314-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:31a41030b1d9ca497634092b46481b937ff9397a86f9f51bd41c4767b6fc04af", size = 2342467, upload-time = "2025-10-14T10:21:44.018Z" }, + { url = "https://files.pythonhosted.org/packages/92/97/4de0e2a1159cb85ad737e03306717637842c88c7fd6d97973172fb183149/pydantic_core-2.41.4-cp314-cp314-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a44ac1738591472c3d020f61c6df1e4015180d6262ebd39bf2aeb52571b60f12", size = 2063754, upload-time = "2025-10-14T10:21:46.466Z" }, + { url = "https://files.pythonhosted.org/packages/0f/50/8cb90ce4b9efcf7ae78130afeb99fd1c86125ccdf9906ef64b9d42f37c25/pydantic_core-2.41.4-cp314-cp314-manylinux_2_5_i686.manylinux1_i686.whl", hash = "sha256:d72f2b5e6e82ab8f94ea7d0d42f83c487dc159c5240d8f83beae684472864e2d", size = 2196754, upload-time = "2025-10-14T10:21:48.486Z" }, + { url = "https://files.pythonhosted.org/packages/34/3b/ccdc77af9cd5082723574a1cc1bcae7a6acacc829d7c0a06201f7886a109/pydantic_core-2.41.4-cp314-cp314-musllinux_1_1_aarch64.whl", hash = "sha256:c4d1e854aaf044487d31143f541f7aafe7b482ae72a022c664b2de2e466ed0ad", size = 2137115, upload-time = "2025-10-14T10:21:50.63Z" }, + { url = "https://files.pythonhosted.org/packages/ca/ba/e7c7a02651a8f7c52dc2cff2b64a30c313e3b57c7d93703cecea76c09b71/pydantic_core-2.41.4-cp314-cp314-musllinux_1_1_armv7l.whl", hash = "sha256:b568af94267729d76e6ee5ececda4e283d07bbb28e8148bb17adad93d025d25a", size = 2317400, upload-time = "2025-10-14T10:21:52.959Z" }, + { url = "https://files.pythonhosted.org/packages/2c/ba/6c533a4ee8aec6b812c643c49bb3bd88d3f01e3cebe451bb85512d37f00f/pydantic_core-2.41.4-cp314-cp314-musllinux_1_1_x86_64.whl", hash = "sha256:6d55fb8b1e8929b341cc313a81a26e0d48aa3b519c1dbaadec3a6a2b4fcad025", size = 2312070, upload-time = "2025-10-14T10:21:55.419Z" }, + { url = "https://files.pythonhosted.org/packages/22/ae/f10524fcc0ab8d7f96cf9a74c880243576fd3e72bd8ce4f81e43d22bcab7/pydantic_core-2.41.4-cp314-cp314-win32.whl", hash = "sha256:5b66584e549e2e32a1398df11da2e0a7eff45d5c2d9db9d5667c5e6ac764d77e", size = 1982277, upload-time = "2025-10-14T10:21:57.474Z" }, + { url = "https://files.pythonhosted.org/packages/b4/dc/e5aa27aea1ad4638f0c3fb41132f7eb583bd7420ee63204e2d4333a3bbf9/pydantic_core-2.41.4-cp314-cp314-win_amd64.whl", hash = "sha256:557a0aab88664cc552285316809cab897716a372afaf8efdbef756f8b890e894", size = 2024608, upload-time = "2025-10-14T10:21:59.557Z" }, + { url = "https://files.pythonhosted.org/packages/3e/61/51d89cc2612bd147198e120a13f150afbf0bcb4615cddb049ab10b81b79e/pydantic_core-2.41.4-cp314-cp314-win_arm64.whl", hash = "sha256:3f1ea6f48a045745d0d9f325989d8abd3f1eaf47dd00485912d1a3a63c623a8d", size = 1967614, upload-time = "2025-10-14T10:22:01.847Z" }, + { url = "https://files.pythonhosted.org/packages/0d/c2/472f2e31b95eff099961fa050c376ab7156a81da194f9edb9f710f68787b/pydantic_core-2.41.4-cp314-cp314t-macosx_11_0_arm64.whl", hash = "sha256:6c1fe4c5404c448b13188dd8bd2ebc2bdd7e6727fa61ff481bcc2cca894018da", size = 1876904, upload-time = "2025-10-14T10:22:04.062Z" }, + { url = "https://files.pythonhosted.org/packages/4a/07/ea8eeb91173807ecdae4f4a5f4b150a520085b35454350fc219ba79e66a3/pydantic_core-2.41.4-cp314-cp314t-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:523e7da4d43b113bf8e7b49fa4ec0c35bf4fe66b2230bfc5c13cc498f12c6c3e", size = 1882538, upload-time = "2025-10-14T10:22:06.39Z" }, + { url = "https://files.pythonhosted.org/packages/1e/29/b53a9ca6cd366bfc928823679c6a76c7a4c69f8201c0ba7903ad18ebae2f/pydantic_core-2.41.4-cp314-cp314t-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:5729225de81fb65b70fdb1907fcf08c75d498f4a6f15af005aabb1fdadc19dfa", size = 2041183, upload-time = "2025-10-14T10:22:08.812Z" }, + { url = "https://files.pythonhosted.org/packages/c7/3d/f8c1a371ceebcaf94d6dd2d77c6cf4b1c078e13a5837aee83f760b4f7cfd/pydantic_core-2.41.4-cp314-cp314t-win_amd64.whl", hash = "sha256:de2cfbb09e88f0f795fd90cf955858fc2c691df65b1f21f0aa00b99f3fbc661d", size = 1993542, upload-time = "2025-10-14T10:22:11.332Z" }, + { url = "https://files.pythonhosted.org/packages/8a/ac/9fc61b4f9d079482a290afe8d206b8f490e9fd32d4fc03ed4fc698214e01/pydantic_core-2.41.4-cp314-cp314t-win_arm64.whl", hash = "sha256:d34f950ae05a83e0ede899c595f312ca976023ea1db100cd5aa188f7005e3ab0", size = 1973897, upload-time = "2025-10-14T10:22:13.444Z" }, + { url = "https://files.pythonhosted.org/packages/c4/48/ae937e5a831b7c0dc646b2ef788c27cd003894882415300ed21927c21efa/pydantic_core-2.41.4-graalpy312-graalpy250_312_native-macosx_10_12_x86_64.whl", hash = "sha256:4f5d640aeebb438517150fdeec097739614421900e4a08db4a3ef38898798537", size = 2112087, upload-time = "2025-10-14T10:22:56.818Z" }, + { url = "https://files.pythonhosted.org/packages/5e/db/6db8073e3d32dae017da7e0d16a9ecb897d0a4d92e00634916e486097961/pydantic_core-2.41.4-graalpy312-graalpy250_312_native-macosx_11_0_arm64.whl", hash = "sha256:4a9ab037b71927babc6d9e7fc01aea9e66dc2a4a34dff06ef0724a4049629f94", size = 1920387, upload-time = "2025-10-14T10:22:59.342Z" }, + { url = "https://files.pythonhosted.org/packages/0d/c1/dd3542d072fcc336030d66834872f0328727e3b8de289c662faa04aa270e/pydantic_core-2.41.4-graalpy312-graalpy250_312_native-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:e4dab9484ec605c3016df9ad4fd4f9a390bc5d816a3b10c6550f8424bb80b18c", size = 1951495, upload-time = "2025-10-14T10:23:02.089Z" }, + { url = "https://files.pythonhosted.org/packages/2b/c6/db8d13a1f8ab3f1eb08c88bd00fd62d44311e3456d1e85c0e59e0a0376e7/pydantic_core-2.41.4-graalpy312-graalpy250_312_native-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bd8a5028425820731d8c6c098ab642d7b8b999758e24acae03ed38a66eca8335", size = 2139008, upload-time = "2025-10-14T10:23:04.539Z" }, +] + [[package]] name = "pygments" version = "2.19.2" @@ -972,6 +1129,28 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/52/59/0782e51887ac6b07ffd1570e0364cf901ebc36345fea669969d2084baebb/simple_websocket-1.1.0-py3-none-any.whl", hash = "sha256:4af6069630a38ed6c561010f0e11a5bc0d4ca569b36306eb257cd9a192497c8c", size = 13842, upload-time = "2024-10-10T22:39:29.645Z" }, ] +[[package]] +name = "sniffio" +version = "1.3.1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/a2/87/a6771e1546d97e7e041b6ae58d80074f81b7d5121207425c964ddf5cfdbd/sniffio-1.3.1.tar.gz", hash = "sha256:f4324edc670a0f49750a81b895f35c3adb843cca46f0530f79fc1babb23789dc", size = 20372, upload-time = "2024-02-25T23:20:04.057Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/e9/44/75a9c9421471a6c4805dbf2356f7c181a29c1879239abab1ea2cc8f38b40/sniffio-1.3.1-py3-none-any.whl", hash = "sha256:2f6da418d1f1e0fddd844478f41680e794e6051915791a034ff65e5f100525a2", size = 10235, upload-time = "2024-02-25T23:20:01.196Z" }, +] + +[[package]] +name = "starlette" +version = "0.49.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "anyio" }, + { name = "typing-extensions", marker = "python_full_version < '3.13'" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/1b/3f/507c21db33b66fb027a332f2cb3abbbe924cc3a79ced12f01ed8645955c9/starlette-0.49.1.tar.gz", hash = "sha256:481a43b71e24ed8c43b11ea02f5353d77840e01480881b8cb5a26b8cae64a8cb", size = 2654703, upload-time = "2025-10-28T17:34:10.928Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/51/da/545b75d420bb23b5d494b0517757b351963e974e79933f01e05c929f20a6/starlette-0.49.1-py3-none-any.whl", hash = "sha256:d92ce9f07e4a3caa3ac13a79523bd18e3bc0042bb8ff2d759a8e7dd0e1859875", size = 74175, upload-time = "2025-10-28T17:34:09.13Z" }, +] + [[package]] name = "tinystream" version = "0.1.0" @@ -979,7 +1158,11 @@ source = { editable = "." } dependencies = [ { name = "aiofiles" }, { name = "aiosqlite" }, + { name = "fastapi" }, + { name = "httpx" }, { name = "msgpack" }, + { name = "pydantic" }, + { name = "uvicorn" }, ] [package.dev-dependencies] @@ -995,7 +1178,11 @@ dev = [ requires-dist = [ { name = "aiofiles", specifier = ">=25.1.0" }, { name = "aiosqlite", specifier = ">=0.21.0" }, + { name = "fastapi", specifier = ">=0.120.2" }, + { name = "httpx", specifier = ">=0.28.1" }, { name = "msgpack", specifier = ">=1.1.2" }, + { name = "pydantic", specifier = ">=2.12.3" }, + { name = "uvicorn", specifier = ">=0.38.0" }, ] [package.metadata.requires-dev] @@ -1016,6 +1203,18 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/18/67/36e9267722cc04a6b9f15c7f3441c2363321a3ea07da7ae0c0707beb2a9c/typing_extensions-4.15.0-py3-none-any.whl", hash = "sha256:f0fa19c6845758ab08074a0cfa8b7aecb71c999ca73d62883bc25cc018c4e548", size = 44614, upload-time = "2025-08-25T13:49:24.86Z" }, ] +[[package]] +name = "typing-inspection" +version = "0.4.2" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "typing-extensions" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/55/e3/70399cb7dd41c10ac53367ae42139cf4b1ca5f36bb3dc6c9d33acdb43655/typing_inspection-0.4.2.tar.gz", hash = "sha256:ba561c48a67c5958007083d386c3295464928b01faa735ab8547c5692e87f464", size = 75949, upload-time = "2025-10-01T02:14:41.687Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/dc/9b/47798a6c91d8bdb567fe2698fe81e0c6b7cb7ef4d13da4114b41d239f65d/typing_inspection-0.4.2-py3-none-any.whl", hash = "sha256:4ed1cacbdc298c220f1bd249ed5287caa16f34d44ef4e9c3d0cbad5b521545e7", size = 14611, upload-time = "2025-10-01T02:14:40.154Z" }, +] + [[package]] name = "urllib3" version = "2.5.0" @@ -1025,6 +1224,19 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/a7/c2/fe1e52489ae3122415c51f387e221dd0773709bad6c6cdaa599e8a2c5185/urllib3-2.5.0-py3-none-any.whl", hash = "sha256:e6b01673c0fa6a13e374b50871808eb3bf7046c4b125b216f6bf1cc604cff0dc", size = 129795, upload-time = "2025-06-18T14:07:40.39Z" }, ] +[[package]] +name = "uvicorn" +version = "0.38.0" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "click" }, + { name = "h11" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/cb/ce/f06b84e2697fef4688ca63bdb2fdf113ca0a3be33f94488f2cadb690b0cf/uvicorn-0.38.0.tar.gz", hash = "sha256:fd97093bdd120a2609fc0d3afe931d4d4ad688b6e75f0f929fde1bc36fe0e91d", size = 80605, upload-time = "2025-10-18T13:46:44.63Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/ee/d9/d88e73ca598f4f6ff671fb5fde8a32925c2e08a637303a1d12883c7305fa/uvicorn-0.38.0-py3-none-any.whl", hash = "sha256:48c0afd214ceb59340075b4a052ea1ee91c16fbc2a9b1469cca0e54566977b02", size = 68109, upload-time = "2025-10-18T13:46:42.958Z" }, +] + [[package]] name = "virtualenv" version = "20.35.3"