Fix post-merge issues and broken reference links#38
Conversation
- Fix get_latest_timestamp(): query <= now and return last result instead of >= now which always returns empty set - Fix ml_inference field type: dict[str, ...] -> dict[UUID, ...] to match actual UUID key usage in ml_reports.py - Fix broken reference links in README (closes #36): replace Aalto research portal URLs (403) with stable DOI links
Use **dict for Pydantic model init instead of positional arg, and clear stale error outputs from committed notebook.
…ocs, examples - Fix AMQP connection timeout (#28): add heartbeat=600 keepalive, check connection before send instead of reconnecting every call, auto-retry on AMQPConnectionError/AMQPChannelError - Add Field descriptions to all config models (#35) for mkdocstrings documentation extraction - Add metrics documentation page (#31) with full catalog of data quality, ML model, service, and resource utilization metrics - Add report examples (#33): basic_report.py with debug connector, contract_example.py showing constraint creation - Add dataquality examples (#32): image_quality_example.py and array_quality_example.py with synthetic data - Exclude example/bts/ from ruff (legacy naming conventions)
- Remove trivial simple_test.py (tested Python inc function, not project code) - Add connector tests (56 tests): AmqpConnector (reconnect, retry, heartbeat), SocketConnector, KafkaConnector, PromConnector - Add collector tests (18 tests): AmqpCollector, SocketCollector - Add probe tests (33 tests): SystemMonitoringProbe, ProcessMonitoringProbe, DockerMonitoringProbe with mocked psutil/docker - Add QoaClient tests (36 tests): init, observe_metric, timer, import_previous_report, report, asyn_report, all categories - Add RoheReport tests (58 tests): all public methods, graph building, previous report merging, edge cases - Add observability tests (31 tests): EmbeddedDatabase insert/query, NodeAggregator unit conversion and report processing Total: 428 tests (up from 199), 2 skipped (docker/gpu)
…y cleanup - Replace pickle with JSON in socket connector/collector (security fix) - Replace unmaintained flatten_dict with inline implementation - Remove flatten_dict from dependencies - Relax pydantic pin: >=2.7.4,<2.8 -> >=2.7.4,<3 - Relax docker pin: ==7.1.0 -> >=7.1.0,<8 - Add 0.5s backoff delay before AMQP reconnect attempts - Add thread safety: wrap observe_metric, observe_inference, observe_inference_metric, timer, import_previous_report, report with self.lock in QoaClient - Fix all 41 mypy type errors: proper annotations for dict keys, return types, variable types, None guards, attribute access - Reduce mypy disable_error_code from 10 to 1 (import-untyped only)
- Add __version__ to package (importlib.metadata) - Add Dependabot config for automated GitHub Actions and pip updates - Add pytest-cov with coverage config in pyproject.toml - Add --cov to CI test step for coverage reporting - Replace lazy-import with stdlib importlib/try-except patterns - Remove lazy-import from dependencies - Update CHANGELOG.txt with 0.3.18 release notes
There was a problem hiding this comment.
Pull request overview
This PR addresses multiple post-merge regressions across observability/reporting and documentation, including fixing timestamp queries in the embedded DB, aligning ML inference typing with UUID-key usage, and repairing broken reference links.
Changes:
- Fix
EmbeddedDatabase.get_latest_timestamp()query direction/return behavior. - Align ML inference report typing (
ml_inference) with UUID-key usage inMLReport. - Replace pickle-based socket transport with JSON, remove
flatten_dict/lazy-import, and add/expand tests + docs (metrics page, examples, DOI reference links).
Reviewed changes
Copilot reviewed 50 out of 54 changed files in this pull request and generated 6 comments.
Show a summary per file
| File | Description |
|---|---|
| uv.lock | Adds coverage tooling deps; removes some transitive deps after refactor. |
| pyproject.toml | Updates dependency pins, adds pytest-cov + coverage config, adjusts ruff excludes. |
| .github/workflows/python-ci.yml | Runs tests with coverage reporting enabled. |
| .github/dependabot.yml | Enables Dependabot for GitHub Actions and Python deps. |
| src/qoa4ml/observability/odop_obs/embedded_database.py | Fixes timestamp query logic and changes return shape. |
| src/qoa4ml/observability/odop_obs/node_aggregator.py | Switches off flatten_dict/lazy-import, uses local flatten/unflatten, tweaks DB path type. |
| src/qoa4ml/collector/socket_collector.py | Switches from pickle to JSON parsing for incoming socket data. |
| src/qoa4ml/connector/socket_connector.py | Switches from pickle to UTF-8 bytes for outgoing socket data. |
| src/qoa4ml/collector/amqp_collector.py | Adds explicit heartbeat configuration for AMQP connections. |
| src/qoa4ml/connector/amqp_connector.py | Adds heartbeat handling + reconnect/backoff and connection checks. |
| src/qoa4ml/utils/qoa_utils.py | Adds local flatten/unflatten helpers and improves type annotations. |
| src/qoa4ml/reports/ml_report_model.py | Changes ml_inference key type to UUID to match runtime usage. |
| src/qoa4ml/reports/rohe_reports.py | Adds safety checks for None previous endpoints and config loading. |
| src/qoa4ml/qoa_client.py | Improves typing, adjusts instance_id handling, adds locking around report mutations, changes report() return type to dict. |
| src/qoa4ml/probes/system_monitoring_probe.py | Removes lazy-import pattern and imports report model directly. |
| src/qoa4ml/probes/process_monitoring_probe.py | Removes lazy-import pattern and tightens typing/asserts. |
| src/qoa4ml/probes/docker_monitoring_probe.py | Adds explicit config typing and safety assert. |
| src/qoa4ml/probes/probe.py | Avoids calling make_folder() when latency path is unset. |
| src/qoa4ml/probes/mlquality.py | Replaces lazy imports with optional imports. |
| src/qoa4ml/connector/mqtt_connector.py | Replaces lazy imports with optional import guard. |
| src/qoa4ml/utils/docker_util.py | Improves typing of intermediate container stats dict. |
| src/qoa4ml/utils/dataquality_utils.py | Tightens typing/casts for eva_none results. |
| src/qoa4ml/config/configs.py | Adds Field descriptions and restructures many config model fields. |
| src/qoa4ml/init.py | Exposes __version__ via package metadata. |
| docs/metrics.md | Adds a metrics documentation page. |
| mkdocs.yml | Adds “Metrics” page to nav. |
| README.md | Replaces broken Aalto links with stable DOI links. |
| CHANGELOG.txt | Adds 0.3.18 changelog entry describing the refactor. |
| tests/test_observability/test_embedded_database.py | Adds coverage for embedded DB insert/query behavior. |
| tests/test_observability/test_node_aggregator.py | Adds unit tests for NodeAggregator conversion and processing paths. |
| tests/test_qoa_client.py | Adds tests for QoaClient init/connector/report/timer/inference behavior. |
| tests/test_reports/test_rohe_reports.py | Adds comprehensive tests for RoheReport. |
| tests/test_connector/test_socket_connector.py | Adds tests for socket connector serialization and optional latency logging. |
| tests/test_connector/test_amqp_connector.py | Adds tests for AMQP heartbeat/reconnect/send behavior. |
| tests/test_connector/test_kafka_connector.py | Adds tests for Kafka connector behavior with mocked module. |
| tests/test_connector/test_prom_connector.py | Adds tests for PromConnector behavior with mocked module. |
| tests/test_collector/test_socket_collector.py | Adds tests for socket collector message receive/parse behavior. |
| tests/test_collector/test_amqp_collector.py | Adds tests for AMQP collector initialization and callbacks. |
| tests/test_probes/test_system_probe.py | Adds tests for system probe behavior with patched system calls. |
| tests/test_probes/test_process_probe.py | Adds tests for process probe behavior with patched system calls. |
| tests/test_probes/test_docker_probe.py | Adds tests for docker probe behavior with patched docker/stats. |
| tests/simple_test.py | Removes a legacy placeholder test. |
| example/simple/observation_demo/rohe_agent.ipynb | Clears execution outputs and fixes pydantic init usage in example. |
| example/reports/basic_report.py | Adds a runnable reporting example. |
| example/reports/contract_example.py | Adds a runnable contract/constraint example. |
| example/reports/config/client.yaml | Adds config for report examples. |
| example/reports/README.md | Documents the new report examples. |
| example/dataquality/image_quality_example.py | Adds a runnable image quality example. |
| example/dataquality/array_quality_example.py | Adds a runnable array/tabular quality example. |
| example/bts/bts_server/predictionService/powerGrid/LSTM_Prediction.py | Removes unused timer return value assignment. |
| example/bts/bts_server/predictionServerV2.py | Replaces != None with is not None checks. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| registration_data = self.registration(registration_url) | ||
| else: | ||
| assert self.configuration.registration_url is not None | ||
| registration_data = self.registration( | ||
| self.configuration.registration_url |
There was a problem hiding this comment.
Using assert self.configuration.registration_url is not None for runtime control flow is unsafe because asserts can be disabled with python -O. Prefer an explicit check that raises a clear exception (e.g., ValueError) when no registration URL is configured.
| def test_process_hpc_system_report(self, dot_aggregator): | ||
| dot_aggregator.embedded_database = MagicMock() | ||
| report = { | ||
| "type": "system", | ||
| "metadata": {"node_name": "edge1"}, | ||
| "timestamp": 1700000000.0, | ||
| "cpu": {"percent": 45.0}, | ||
| } | ||
| dot_aggregator.process_report(json.dumps(report)) | ||
| dot_aggregator.embedded_database.insert.assert_called_once() | ||
| call_args = dot_aggregator.embedded_database.insert.call_args | ||
| assert call_args[0][0] == 1700000000.0 | ||
| assert call_args[0][1]["type"] == "node" | ||
|
|
||
| def test_process_hpc_process_report(self, dot_aggregator): | ||
| dot_aggregator.embedded_database = MagicMock() | ||
| report = { | ||
| "type": "process", | ||
| "metadata": {"pid": 1234}, | ||
| "timestamp": 1700000000.0, | ||
| "cpu": {"percent": 12.0}, | ||
| } | ||
| dot_aggregator.process_report(json.dumps(report)) | ||
| dot_aggregator.embedded_database.insert.assert_called_once() | ||
| call_args = dot_aggregator.embedded_database.insert.call_args |
There was a problem hiding this comment.
These process_report tests pass json.dumps(report) (a JSON string) directly to NodeAggregator.process_report(). After switching SocketCollector to JSON decoding, the callback will receive an already-parsed Python object (dict/str/etc), not a JSON string. Update the tests (and process_report implementation) to use the same input type the collector provides, otherwise this won’t catch the current integration break.
| def test_get_latest_timestamp_returns_list(self, dot_aggregator): | ||
| mock_point = MagicMock() | ||
| mock_point.tags = {"type": "node"} | ||
| mock_point.fields = {"cpu": 45.0} | ||
| from datetime import datetime | ||
|
|
||
| mock_point.time = datetime(2024, 1, 1) | ||
| dot_aggregator.embedded_database = MagicMock() | ||
| dot_aggregator.embedded_database.get_latest_timestamp.return_value = [ | ||
| mock_point | ||
| ] | ||
| result = dot_aggregator.get_latest_timestamp() | ||
| assert isinstance(result, list) | ||
| assert len(result) == 1 | ||
|
|
There was a problem hiding this comment.
EmbeddedDatabase.get_latest_timestamp() now returns a single Point | None, but this test mocks it as a list ([mock_point]) and asserts the result is a list of length 1. Adjust the mock and assertions (or revert the DB API change) so the test matches the actual return type and catches real regressions.
| @@ -19,4 +19,5 @@ def insert(self, timestamp: float, tags: dict, fields: dict): | |||
| def get_latest_timestamp(self): | |||
| time_query = TimeQuery() | |||
| timestamp = datetime.fromtimestamp(math.floor(time.time())) | |||
There was a problem hiding this comment.
get_latest_timestamp() floors the current time to whole seconds (math.floor(time.time())). Since points are inserted with sub-second timestamps (datetime.fromtimestamp(timestamp) keeps microseconds), a datapoint from the current second (e.g., now-0.1s) can be > the floored timestamp and be incorrectly excluded, returning None. Consider querying against datetime.fromtimestamp(time.time()) / datetime.now() (no flooring) so recent points are included.
| timestamp = datetime.fromtimestamp(math.floor(time.time())) | |
| timestamp = datetime.fromtimestamp(time.time()) |
| timestamp = datetime.fromtimestamp(math.floor(time.time())) | ||
| return self.db.search(time_query >= timestamp) | ||
| results = self.db.search(time_query <= timestamp) | ||
| return results[-1] if results else None |
There was a problem hiding this comment.
get_latest_timestamp() now returns a single Point | None, which is a breaking change from returning a list of points. Callers like NodeAggregator.get_latest_timestamp() (which iterates over the result) will fail unless updated in the same PR. Either keep returning a list (possibly length 0/1), or update all call sites to handle None / a single Point.
| return results[-1] if results else None | |
| return results[-1:] if results else [] |
| break | ||
| data += packet | ||
|
|
||
| report = pickle.loads(data) | ||
| report = json.loads(data.decode("utf-8")) | ||
| self.process_report(report) | ||
| client_socket.close() |
There was a problem hiding this comment.
SocketCollector now JSON-decodes the received bytes before calling process_report(). This changes the callback contract: existing process_report implementations that expect a JSON string (and call json.loads() themselves) will now crash with TypeError. Consider passing the decoded UTF-8 string to process_report (and let the callback decide how to parse), or update all process_report implementations accordingly in this PR.
- Replace assert with ValueError for registration URL check - Fix get_latest_timestamp(): remove math.floor to include sub-second points, return list (not single item) for API compatibility with NodeAggregator which iterates over the result - Fix SocketCollector: pass decoded UTF-8 string to process_report instead of parsed JSON, preserving the callback contract where process_report implementations call json.loads themselves - Update tests to match corrected APIs
Summary
get_latest_timestamp()inembedded_database.py: query<= nowand return last result instead of>= nowwhich always returns an empty setml_inferencefield type inml_report_model.py:dict[str, InferenceInstance]→dict[UUID, InferenceInstance]to match actual UUID key usage inml_reports.pyIssues addressed
Test plan