diff --git a/.gitignore b/.gitignore index 81220b2..8e7a515 100644 --- a/.gitignore +++ b/.gitignore @@ -93,11 +93,6 @@ ipython_config.py # install all needed dependencies. Dockerfile -# celery beat schedule file -celerybeat-schedule.db -celerybeat-schedule -celerybeat.pid - # SageMath parsed files *.sage.py diff --git a/pytest.ini b/pytest.ini index dfbde25..f8dc50b 100644 --- a/pytest.ini +++ b/pytest.ini @@ -5,7 +5,6 @@ python_classes = Test* python_functions = test_* markers = - celery: marks tests that require celery integration: marks integration tests unit: marks unit tests asyncio: marks tests that use asyncio diff --git a/robosystems_client/api/agent/auto_select_agent.py b/robosystems_client/api/agent/auto_select_agent.py index ba6eb8b..e54ee3c 100644 --- a/robosystems_client/api/agent/auto_select_agent.py +++ b/robosystems_client/api/agent/auto_select_agent.py @@ -134,7 +134,7 @@ def sync_detailed( **Execution Strategies (automatic):** - Fast operations (<5s): Immediate synchronous response - Medium operations (5-30s): SSE streaming with progress updates - - Long operations (>30s): Async Celery worker with operation tracking + - Long operations (>30s): Background queue with operation tracking **Response Mode Override:** Use query parameter `?mode=sync|async` to override automatic strategy selection. @@ -228,7 +228,7 @@ def sync( **Execution Strategies (automatic):** - Fast operations (<5s): Immediate synchronous response - Medium operations (5-30s): SSE streaming with progress updates - - Long operations (>30s): Async Celery worker with operation tracking + - Long operations (>30s): Background queue with operation tracking **Response Mode Override:** Use query parameter `?mode=sync|async` to override automatic strategy selection. @@ -317,7 +317,7 @@ async def asyncio_detailed( **Execution Strategies (automatic):** - Fast operations (<5s): Immediate synchronous response - Medium operations (5-30s): SSE streaming with progress updates - - Long operations (>30s): Async Celery worker with operation tracking + - Long operations (>30s): Background queue with operation tracking **Response Mode Override:** Use query parameter `?mode=sync|async` to override automatic strategy selection. @@ -409,7 +409,7 @@ async def asyncio( **Execution Strategies (automatic):** - Fast operations (<5s): Immediate synchronous response - Medium operations (5-30s): SSE streaming with progress updates - - Long operations (>30s): Async Celery worker with operation tracking + - Long operations (>30s): Background queue with operation tracking **Response Mode Override:** Use query parameter `?mode=sync|async` to override automatic strategy selection. diff --git a/robosystems_client/api/agent/execute_specific_agent.py b/robosystems_client/api/agent/execute_specific_agent.py index 636f67d..eb3374f 100644 --- a/robosystems_client/api/agent/execute_specific_agent.py +++ b/robosystems_client/api/agent/execute_specific_agent.py @@ -124,7 +124,7 @@ def sync_detailed( **Execution Strategies (automatic):** - Fast operations (<5s): Immediate synchronous response - Medium operations (5-30s): SSE streaming with progress updates - - Long operations (>30s): Async Celery worker with operation tracking + - Long operations (>30s): Background queue with operation tracking **Response Mode Override:** Use query parameter `?mode=sync|async` to override automatic strategy selection. @@ -179,7 +179,7 @@ def sync( **Execution Strategies (automatic):** - Fast operations (<5s): Immediate synchronous response - Medium operations (5-30s): SSE streaming with progress updates - - Long operations (>30s): Async Celery worker with operation tracking + - Long operations (>30s): Background queue with operation tracking **Response Mode Override:** Use query parameter `?mode=sync|async` to override automatic strategy selection. @@ -229,7 +229,7 @@ async def asyncio_detailed( **Execution Strategies (automatic):** - Fast operations (<5s): Immediate synchronous response - Medium operations (5-30s): SSE streaming with progress updates - - Long operations (>30s): Async Celery worker with operation tracking + - Long operations (>30s): Background queue with operation tracking **Response Mode Override:** Use query parameter `?mode=sync|async` to override automatic strategy selection. @@ -282,7 +282,7 @@ async def asyncio( **Execution Strategies (automatic):** - Fast operations (<5s): Immediate synchronous response - Medium operations (5-30s): SSE streaming with progress updates - - Long operations (>30s): Async Celery worker with operation tracking + - Long operations (>30s): Background queue with operation tracking **Response Mode Override:** Use query parameter `?mode=sync|async` to override automatic strategy selection. diff --git a/robosystems_client/api/files/update_file.py b/robosystems_client/api/files/update_file.py index 2b90aef..0dfcc7c 100644 --- a/robosystems_client/api/files/update_file.py +++ b/robosystems_client/api/files/update_file.py @@ -103,7 +103,7 @@ def sync_detailed( **What Happens (status='uploaded'):** 1. File validated in S3 2. Row count calculated - 3. DuckDB staging triggered immediately (Celery task) + 3. DuckDB staging triggered immediately (background task) 4. If ingest_to_graph=true, graph ingestion queued 5. File queryable in DuckDB within seconds @@ -165,7 +165,7 @@ def sync( **What Happens (status='uploaded'):** 1. File validated in S3 2. Row count calculated - 3. DuckDB staging triggered immediately (Celery task) + 3. DuckDB staging triggered immediately (background task) 4. If ingest_to_graph=true, graph ingestion queued 5. File queryable in DuckDB within seconds @@ -222,7 +222,7 @@ async def asyncio_detailed( **What Happens (status='uploaded'):** 1. File validated in S3 2. Row count calculated - 3. DuckDB staging triggered immediately (Celery task) + 3. DuckDB staging triggered immediately (background task) 4. If ingest_to_graph=true, graph ingestion queued 5. File queryable in DuckDB within seconds @@ -282,7 +282,7 @@ async def asyncio( **What Happens (status='uploaded'):** 1. File validated in S3 2. Row count calculated - 3. DuckDB staging triggered immediately (Celery task) + 3. DuckDB staging triggered immediately (background task) 4. If ingest_to_graph=true, graph ingestion queued 5. File queryable in DuckDB within seconds diff --git a/robosystems_client/extensions/README.md b/robosystems_client/extensions/README.md index 1078177..983540c 100644 --- a/robosystems_client/extensions/README.md +++ b/robosystems_client/extensions/README.md @@ -1,6 +1,6 @@ # RoboSystems Python Client Extensions -๐Ÿš€ **Production-Ready Extensions** for the RoboSystems Financial Knowledge Graph API +**Production-Ready Extensions** for the RoboSystems Financial Knowledge Graph API [![Python 3.7+](https://img.shields.io/badge/python-3.7+-blue.svg)](https://www.python.org/downloads/) [![License: MIT](https://img.shields.io/badge/License-MIT-yellow.svg)](https://opensource.org/licenses/MIT) @@ -17,7 +17,7 @@ The RoboSystems Python Client Extensions provide enhanced functionality for the - **Caching** with TTL and LRU eviction - **Full Async/Await Support** throughout -## ๐Ÿš€ Quick Start +## Quick Start ### Installation @@ -84,7 +84,7 @@ async def main(): asyncio.run(main()) ``` -## ๐Ÿ” Authentication +## Authentication ### API Key Authentication (Recommended) @@ -139,7 +139,7 @@ dev_ext = create_extensions( ) ``` -## ๐Ÿ›  Advanced Features +## Advanced Features ### Query Builder @@ -182,7 +182,7 @@ print(f"Complexity: {cost['complexity_category']} (score: {cost['complexity_scor # Get optimization recommendations for rec in cost['recommendations']: - print(f"๐Ÿ’ก {rec}") + print(f"Tip: {rec}") ``` ### Result Processing @@ -271,7 +271,7 @@ client.connect("operation_id") client.close() ``` -## ๐Ÿ“Š Examples +## Examples ### Financial Data Analysis @@ -332,7 +332,7 @@ if final_batch: process_transaction_batch(final_batch) total_processed += len(final_batch) -print(f"โœ… Processed {total_processed:,} transactions total") +print(f"Processed {total_processed:,} transactions total") ``` ### Error Handling @@ -361,7 +361,7 @@ except Exception as e: print(f"Query failed: {e}") ``` -## โšก Performance Optimization +## Performance Optimization ### Connection Pooling @@ -391,21 +391,21 @@ query = "MATCH (c:Company) WHERE c.revenue > 1000000 RETURN c" # Check syntax validation = validate_cypher_query(query) if not validation['valid']: - print("โŒ Query has syntax errors:", validation['issues']) + print("Query has syntax errors:", validation['issues']) # Estimate cost cost = estimate_query_cost(query) -print(f"๐Ÿ“Š Query complexity: {cost['complexity_category']}") +print(f"Query complexity: {cost['complexity_category']}") # Follow recommendations for rec in cost['recommendations']: - print(f"๐Ÿ’ก Optimization tip: {rec}") + print(f"Optimization tip: {rec}") # Execute only if reasonable complexity if cost['complexity_category'] in ['low', 'medium']: result = extensions.execute_query("graph_id", query) else: - print("โš ๏ธ Query may be too expensive - consider optimization") + print("Query may be too expensive - consider optimization") ``` ### Caching Strategy @@ -425,7 +425,7 @@ else: cache = results_cache ``` -## ๐Ÿงช Testing +## Testing Run the test suite: @@ -457,7 +457,7 @@ def test_query_execution(): assert result["data"] == [{"count": 100}] ``` -## ๐Ÿ”ง Configuration +## Configuration ### Environment Variables @@ -492,7 +492,7 @@ config = RoboSystemsExtensionConfig( extensions = RoboSystemsExtensions(config) ``` -## ๐Ÿ“š API Reference +## API Reference ### Core Classes @@ -528,7 +528,7 @@ extensions = RoboSystemsExtensions(config) - **`format_duration(milliseconds)`** - Human-readable time formatting - **`create_extensions(method, **kwargs)`** - Extensions factory -## ๐Ÿ› Troubleshooting +## Troubleshooting ### Common Issues @@ -545,9 +545,9 @@ pip install pandas # For DataFrame conversion (optional) extensions = AuthenticatedExtensions("your-api-key") try: result = extensions.execute_query("graph_id", "MATCH (n) RETURN count(n) LIMIT 1") - print("โœ… Authentication successful") + print("Authentication successful") except Exception as e: - print(f"โŒ Auth failed: {e}") + print(f"Auth failed: {e}") ``` **Connection Issues** @@ -585,11 +585,7 @@ logging.basicConfig(level=logging.DEBUG) extensions = AuthenticatedExtensions("your-key") ``` -## ๐Ÿ“„ License - -MIT License - see [LICENSE](LICENSE) file for details. - -## ๐Ÿค Contributing +## Contributing 1. Fork the repository 2. Create a feature branch @@ -597,7 +593,7 @@ MIT License - see [LICENSE](LICENSE) file for details. 4. Run the test suite: `python run_tests.py` 5. Submit a pull request -## ๐Ÿ“ž Support +## Support - **API Reference**: [api.robosystems.ai](https://api.robosystems.ai) - **Issues**: [GitHub Issues](https://github.com/RoboFinSystems/robosystems-python-client/issues) diff --git a/robosystems_client/extensions/agent_client.py b/robosystems_client/extensions/agent_client.py index eb4c563..c704933 100644 --- a/robosystems_client/extensions/agent_client.py +++ b/robosystems_client/extensions/agent_client.py @@ -177,7 +177,7 @@ def execute_query( else datetime.now().isoformat(), ) - # Check if this is a queued response (async Celery execution) + # Check if this is a queued response (async background task execution) is_queued = False queued_response = None diff --git a/robosystems_client/extensions/materialization_client.py b/robosystems_client/extensions/materialization_client.py index e423dbe..f2b91e7 100644 --- a/robosystems_client/extensions/materialization_client.py +++ b/robosystems_client/extensions/materialization_client.py @@ -15,6 +15,7 @@ sync_detailed as get_materialization_status, ) from ..models.materialize_request import MaterializeRequest +from .operation_client import OperationClient, OperationProgress, MonitorOptions logger = logging.getLogger(__name__) @@ -27,6 +28,7 @@ class MaterializationOptions: rebuild: bool = False force: bool = False on_progress: Optional[Callable[[str], None]] = None + timeout: Optional[int] = 600 # 10 minute default timeout @dataclass @@ -66,6 +68,14 @@ def __init__(self, config: Dict[str, Any]): self.base_url = config["base_url"] self.headers = config.get("headers", {}) self.token = config.get("token") + self._operation_client = None + + @property + def operation_client(self) -> OperationClient: + """Get or create the operation client for SSE monitoring.""" + if self._operation_client is None: + self._operation_client = OperationClient(self.config) + return self._operation_client def materialize( self, @@ -75,13 +85,13 @@ def materialize( """ Materialize graph from DuckDB staging tables. - Rebuilds the complete graph database from the current state of DuckDB - staging tables. Automatically discovers all tables, materializes them in - the correct order (nodes before relationships), and clears the staleness flag. + Submits a materialization job to Dagster and monitors progress via SSE. + The operation runs asynchronously on the server but this method waits + for completion and returns the final result. Args: graph_id: Graph database identifier - options: Materialization options (ignore_errors, rebuild, force) + options: Materialization options (ignore_errors, rebuild, force, timeout) Returns: MaterializationResult with detailed execution information @@ -96,7 +106,7 @@ def materialize( try: if options.on_progress: - options.on_progress("Starting graph materialization...") + options.on_progress("Submitting materialization job...") request = MaterializeRequest( ignore_errors=options.ignore_errors, @@ -125,6 +135,7 @@ def materialize( response = materialize_graph(**kwargs) + # Handle non-200 status codes if response.status_code != 200 or not response.parsed: error_msg = f"Materialization failed: {response.status_code}" if hasattr(response, "content"): @@ -148,25 +159,68 @@ def materialize( error=error_msg, ) + # Get the operation_id from the queued response result_data = response.parsed + operation_id = result_data.operation_id if options.on_progress: - options.on_progress( - f"โœ… Materialization complete: {len(result_data.tables_materialized)} tables, " - f"{result_data.total_rows:,} rows in {result_data.execution_time_ms:.2f}ms" - ) - - return MaterializationResult( - status=result_data.status, - was_stale=result_data.was_stale, - stale_reason=result_data.stale_reason, - tables_materialized=result_data.tables_materialized, - total_rows=result_data.total_rows, - execution_time_ms=result_data.execution_time_ms, - message=result_data.message, - success=True, + options.on_progress(f"Materialization queued (operation: {operation_id})") + + # Monitor the operation via SSE until completion + def on_sse_progress(progress: OperationProgress): + if options.on_progress: + msg = progress.message + if progress.percentage is not None: + msg += f" ({progress.percentage:.0f}%)" + options.on_progress(msg) + + monitor_options = MonitorOptions( + on_progress=on_sse_progress, + timeout=options.timeout, ) + op_result = self.operation_client.monitor_operation(operation_id, monitor_options) + + # Convert operation result to materialization result + if op_result.status.value == "completed": + # Extract details from SSE completion event result + sse_result = op_result.result or {} + + if options.on_progress: + tables = sse_result.get("tables_materialized", []) + rows = sse_result.get("total_rows", 0) + time_ms = sse_result.get("execution_time_ms", 0) + options.on_progress( + f"โœ… Materialization complete: {len(tables)} tables, " + f"{rows:,} rows in {time_ms:.2f}ms" + ) + + return MaterializationResult( + status="success", + was_stale=sse_result.get("was_stale", False), + stale_reason=sse_result.get("stale_reason"), + tables_materialized=sse_result.get("tables_materialized", []), + total_rows=sse_result.get("total_rows", 0), + execution_time_ms=sse_result.get( + "execution_time_ms", op_result.execution_time_ms or 0 + ), + message=sse_result.get("message", "Graph materialized successfully"), + success=True, + ) + else: + # Operation failed or was cancelled + return MaterializationResult( + status=op_result.status.value, + was_stale=False, + stale_reason=None, + tables_materialized=[], + total_rows=0, + execution_time_ms=op_result.execution_time_ms or 0, + message=op_result.error or f"Operation {op_result.status.value}", + success=False, + error=op_result.error, + ) + except Exception as e: logger.error(f"Materialization failed: {e}") return MaterializationResult( diff --git a/robosystems_client/extensions/sse_client.py b/robosystems_client/extensions/sse_client.py index 6251b51..1befd84 100644 --- a/robosystems_client/extensions/sse_client.py +++ b/robosystems_client/extensions/sse_client.py @@ -104,8 +104,10 @@ def connect(self, operation_id: str, from_sequence: int = 0) -> None: try: self.client = httpx.Client(timeout=self.config.timeout) - self._response = self.client.stream("GET", url, params=params, headers=headers) - self._response.__enter__() + self._context_manager = self.client.stream( + "GET", url, params=params, headers=headers + ) + self._response = self._context_manager.__enter__() self.reconnect_attempts = 0 self.emit("connected", None) @@ -124,11 +126,9 @@ def _process_events(self) -> None: try: event_buffer = {"event": None, "data": [], "id": None, "retry": None} - print("[SSE DEBUG] Starting to process events...") for line in self._response.iter_lines(): if self.closed: - print("[SSE DEBUG] Stream closed, breaking out of loop") break line = line.strip() @@ -136,7 +136,6 @@ def _process_events(self) -> None: # Empty line indicates end of event if not line: if event_buffer["data"] or event_buffer["event"]: - print(f"[SSE DEBUG] Dispatching event: {event_buffer.get('event')}") self._dispatch_event(event_buffer) event_buffer = {"event": None, "data": [], "id": None, "retry": None} continue @@ -172,13 +171,9 @@ def _process_events(self) -> None: # Handle final event if stream ends without empty line if event_buffer["data"] or event_buffer["event"]: - print("[SSE DEBUG] Dispatching final event after stream end") self._dispatch_event(event_buffer) - print("[SSE DEBUG] Event processing loop ended") - except Exception as error: - print(f"[SSE DEBUG] Exception in event processing: {error}") if not self.closed: self.emit("error", error) @@ -214,13 +209,14 @@ def _dispatch_event(self, event_buffer: Dict[str, Any]) -> None: # Emit typed event self.emit(event_type, parsed_data) - # Check for completion events + # Check for completion events - just set flag, don't close from within loop + # The loop will break on next iteration and close() will be called in finally if event_type in [ EventType.OPERATION_COMPLETED.value, EventType.OPERATION_ERROR.value, EventType.OPERATION_CANCELLED.value, ]: - self.close() + self.closed = True def _handle_error( self, error: Exception, operation_id: str, from_sequence: int @@ -285,12 +281,13 @@ def close(self): """Close the SSE connection""" self.closed = True - if self._response: + if hasattr(self, "_context_manager") and self._context_manager: try: - self._response.__exit__(None, None, None) + self._context_manager.__exit__(None, None, None) except Exception: pass - self._response = None + self._context_manager = None + self._response = None if self.client: self.client.close() @@ -334,10 +331,10 @@ async def connect(self, operation_id: str, from_sequence: int = 0) -> None: try: self.client = httpx.AsyncClient(timeout=self.config.timeout) - self._response = await self.client.stream( + self._context_manager = self.client.stream( "GET", url, params=params, headers=headers ) - await self._response.__aenter__() + self._response = await self._context_manager.__aenter__() self.reconnect_attempts = 0 self.emit("connected", None) @@ -401,13 +398,9 @@ async def _process_events(self) -> None: # Handle final event if stream ends without empty line if event_buffer["data"] or event_buffer["event"]: - print("[SSE DEBUG] Dispatching final event after stream end") self._dispatch_event(event_buffer) - print("[SSE DEBUG] Event processing loop ended") - except Exception as error: - print(f"[SSE DEBUG] Exception in event processing: {error}") if not self.closed: self.emit("error", error) @@ -443,13 +436,14 @@ def _dispatch_event(self, event_buffer: Dict[str, Any]): # Emit typed event self.emit(event_type, parsed_data) - # Check for completion events + # Check for completion events - just set flag, don't close from within loop + # The loop will break on next iteration and close() will be called in finally if event_type in [ EventType.OPERATION_COMPLETED.value, EventType.OPERATION_ERROR.value, EventType.OPERATION_CANCELLED.value, ]: - asyncio.create_task(self.close()) + self.closed = True async def _handle_error( self, error: Exception, operation_id: str, from_sequence: int @@ -512,12 +506,13 @@ async def close(self): """Close the SSE connection (async)""" self.closed = True - if self._response: + if hasattr(self, "_context_manager") and self._context_manager: try: - await self._response.__aexit__(None, None, None) + await self._context_manager.__aexit__(None, None, None) except Exception: pass - self._response = None + self._context_manager = None + self._response = None if self.client: await self.client.aclose() diff --git a/robosystems_client/extensions/subgraph_workspace_client.py b/robosystems_client/extensions/subgraph_workspace_client.py index 34b080c..1fe89a0 100644 --- a/robosystems_client/extensions/subgraph_workspace_client.py +++ b/robosystems_client/extensions/subgraph_workspace_client.py @@ -197,7 +197,9 @@ async def create_workspace_with_fork( sse_url = f"{self.api._base_url}/v1/operations/{operation_id}/stream" headers = {"X-API-Key": self.api.token} - async with httpx.AsyncClient() as client: + # Use longer timeout for SSE streaming (Dagster jobs can take time) + timeout = httpx.Timeout(connect=30.0, read=120.0, write=30.0, pool=30.0) + async with httpx.AsyncClient(timeout=timeout) as client: async with client.stream("GET", sse_url, headers=headers) as sse_response: async for line in sse_response.aiter_lines(): if line.startswith("data: "): diff --git a/robosystems_client/models/materialize_response.py b/robosystems_client/models/materialize_response.py index ef83c58..9ed6d62 100644 --- a/robosystems_client/models/materialize_response.py +++ b/robosystems_client/models/materialize_response.py @@ -1,7 +1,7 @@ from __future__ import annotations from collections.abc import Mapping -from typing import Any, TypeVar, cast +from typing import Any, TypeVar from attrs import define as _attrs_define from attrs import field as _attrs_field @@ -13,102 +13,64 @@ @_attrs_define class MaterializeResponse: - """ + """Response for queued materialization operation. + + Example: + {'graph_id': 'kg_abc123', 'message': 'Materialization queued. Monitor via SSE stream.', 'operation_id': + '550e8400-e29b-41d4-a716-446655440000', 'status': 'queued'} + Attributes: - status (str): Materialization status graph_id (str): Graph database identifier - was_stale (bool): Whether graph was stale before materialization - tables_materialized (list[str]): List of tables successfully materialized - total_rows (int): Total rows materialized across all tables - execution_time_ms (float): Total materialization time + operation_id (str): SSE operation ID for progress tracking message (str): Human-readable status message - stale_reason (None | str | Unset): Reason graph was stale + status (str | Unset): Operation status Default: 'queued'. """ - status: str graph_id: str - was_stale: bool - tables_materialized: list[str] - total_rows: int - execution_time_ms: float + operation_id: str message: str - stale_reason: None | str | Unset = UNSET + status: str | Unset = "queued" additional_properties: dict[str, Any] = _attrs_field(init=False, factory=dict) def to_dict(self) -> dict[str, Any]: - status = self.status - graph_id = self.graph_id - was_stale = self.was_stale - - tables_materialized = self.tables_materialized - - total_rows = self.total_rows - - execution_time_ms = self.execution_time_ms + operation_id = self.operation_id message = self.message - stale_reason: None | str | Unset - if isinstance(self.stale_reason, Unset): - stale_reason = UNSET - else: - stale_reason = self.stale_reason + status = self.status field_dict: dict[str, Any] = {} field_dict.update(self.additional_properties) field_dict.update( { - "status": status, "graph_id": graph_id, - "was_stale": was_stale, - "tables_materialized": tables_materialized, - "total_rows": total_rows, - "execution_time_ms": execution_time_ms, + "operation_id": operation_id, "message": message, } ) - if stale_reason is not UNSET: - field_dict["stale_reason"] = stale_reason + if status is not UNSET: + field_dict["status"] = status return field_dict @classmethod def from_dict(cls: type[T], src_dict: Mapping[str, Any]) -> T: d = dict(src_dict) - status = d.pop("status") - graph_id = d.pop("graph_id") - was_stale = d.pop("was_stale") - - tables_materialized = cast(list[str], d.pop("tables_materialized")) - - total_rows = d.pop("total_rows") - - execution_time_ms = d.pop("execution_time_ms") + operation_id = d.pop("operation_id") message = d.pop("message") - def _parse_stale_reason(data: object) -> None | str | Unset: - if data is None: - return data - if isinstance(data, Unset): - return data - return cast(None | str | Unset, data) - - stale_reason = _parse_stale_reason(d.pop("stale_reason", UNSET)) + status = d.pop("status", UNSET) materialize_response = cls( - status=status, graph_id=graph_id, - was_stale=was_stale, - tables_materialized=tables_materialized, - total_rows=total_rows, - execution_time_ms=execution_time_ms, + operation_id=operation_id, message=message, - stale_reason=stale_reason, + status=status, ) materialize_response.additional_properties = d