-
Notifications
You must be signed in to change notification settings - Fork 2
feat(adapter): Add Quicksilver POC Adapter #211
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
- Introduced a complete adapter for integrating Quicksilver REST API data into TrufNetwork, including essential components such as `QuicksilverBlock`, `QuicksilverProvider`, and `QuicksilverDataTransformer`. - Implemented a three-step data flow: fetching data from the Quicksilver API, transforming it to TrufNetwork format, and inserting it into TrufNetwork. - Added example usage in `poc_example.py` to demonstrate the integration process. - Included environment utilities for loading configuration from a `.env` file. - Enhanced logging for better traceability during data processing.
…ticker-based approach
WalkthroughAdds a Quicksilver example package, environment utilities, a mock TN block, a PoC script, a new Quicksilver Prefect Block, and a Quicksilver tasks package (provider, transformer, flow, types, and README). Implements data fetching from an HTTP API, transformation to TN format, and an optional dry-run insertion flow. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
participant POC as PoC Script
participant QB as QuicksilverBlock
participant API as HTTP API
POC->>QB: fetch_data(endpoint_path, ticker, params)
QB->>QB: Build QuicksilverEndpoint
QB->>API: GET base_url + path + query
API-->>QB: JSON { data: [...] }
QB->>QB: Validate/parse to DataFrame[QuicksilverDataModel]
QB-->>POC: DataFrame
note over QB: Logs and raises on HTTP/JSON errors
sequenceDiagram
autonumber
participant Flow as quicksilver_flow
participant Prov as QuicksilverProvider
participant QB as QuicksilverBlock
participant Trans as QuicksilverDataTransformer
participant TN as TN Block / Mock
Flow->>Prov: task_fetch_quicksilver_data()
Prov->>QB: fetch_data(endpoint_path, ticker, params)
QB-->>Prov: QuicksilverDataDF
Prov-->>Flow: QuicksilverDataDF
Flow->>Trans: task_transform_quicksilver_data(data, stream_id, provider)
Trans-->>Flow: DataFrame[TnDataRowModel]
alt dry_run == true
Flow-->>Flow: Skip insert
else Insert enabled
Flow->>TN: insert(records, batch_size)
TN-->>Flow: Insert result
end
Flow-->>Flow: Aggregate success, counts, errors
Flow-->>Flow: Return QuicksilverFlowResult
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests
Tip 👮 Agentic pre-merge checks are now available in preview!Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.
Please see the documentation for more information. Example: reviews:
pre_merge_checks:
custom_checks:
- name: "Undocumented Breaking Changes"
mode: "warning"
instructions: |
Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal).Please share your feedback with us on this Discord post. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (16)
src/tsn_adapters/tasks/quicksilver/types.py (1)
7-9: Adopt backend-specific pandera import (pa) to avoid future deprecations.Switch to
pandera.pandas as paand referencepa.DataFrameModel/pa.Field. This aligns with 0.26.x guidance and future top‑level API deprecation.Apply:
-from pandera import DataFrameModel, Field -from pandera.typing import DataFrame, Series +import pandera.pandas as pa +from pandera.typing import DataFrame, Series @@ -class QuicksilverDataModel(DataFrameModel): +class QuicksilverDataModel(pa.DataFrameModel): @@ - ticker: Series[str] = Field(description="Ticker symbol") - price: Series[str] = Field(description="Current price") + ticker: Series[str] = pa.Field(description="Ticker symbol") + price: Series[str] = pa.Field(description="Current price") @@ - class Config(DataFrameModel.Config): + class Config(pa.DataFrameModel.Config): strict = "filter" coerce = TrueBased on learnings
Also applies to: 13-21
src/examples/quicksilver/env_utils.py (1)
7-9: Use python-dotenv for robust .env parsing (quotes, whitespace, comments).The manual parser is brittle.
load_dotenvhandles edge cases and is simpler.Apply:
import os from pathlib import Path from typing import Optional +from dotenv import load_dotenv @@ - with open(env_path, 'r') as f: - for line in f: - line = line.strip() - if line and not line.startswith('#') and '=' in line: - key, value = line.split('=', 1) - os.environ[key] = value - - return True + # Returns True if file found and loaded, False otherwise + return bool(load_dotenv(dotenv_path=env_path, override=False))Based on learnings
Also applies to: 28-35
src/examples/quicksilver/mock_tn_block.py (1)
12-12: Prefer logging over print for library code (even in examples).Switch to a logger to avoid polluting stdout in integrations.
Apply:
+import logging @@ - print("🧪 Mock TN Block initialized (DRY RUN MODE)") + logging.getLogger(__name__).info("🧪 Mock TN Block initialized (DRY RUN MODE)")src/examples/quicksilver/poc_example.py (3)
1-1: Shebang present but file likely not executable.Either make the file executable in git or remove the shebang.
Apply:
-#!/usr/bin/env python3
25-30: Remove f-strings without placeholders.These trigger Ruff F541 and add noise.
Apply:
- print(f"This POC demonstrates:") - print(f"1. 📡 Connect to Quicksilver API") + print("This POC demonstrates:") + print("1. 📡 Connect to Quicksilver API") - print(f"3. 🔄 Transform data to TrufNetwork format") - print(f"4. 🧪 Run complete flow in dry-run mode") + print("3. 🔄 Transform data to TrufNetwork format") + print("4. 🧪 Run complete flow in dry-run mode") @@ - print(f"\n🔄 Step 3: Testing complete flow (dry-run mode)...") + print("\n🔄 Step 3: Testing complete flow (dry-run mode)...") @@ - print(f"\n📊 Flow Results:") + print("\n📊 Flow Results:") @@ - print(f"\n🎯 POC completed! The adapter is ready to:") - print(f" • Fetch data from any Quicksilver endpoint") - print(f" • Transform data for TrufNetwork") - print(f" • Handle different tickers by changing QUICKSILVER_TICKER") + print("\n🎯 POC completed! The adapter is ready to:") + print(" • Fetch data from any Quicksilver endpoint") + print(" • Transform data for TrufNetwork") + print(" • Handle different tickers by changing QUICKSILVER_TICKER")Also applies to: 59-59, 73-73, 83-86
33-38: Avoid catching bare Exception.Catching
Exceptionhides actionable errors and trips Ruff BLE001. Narrow the exceptions (e.g., HTTP/URL errors from the block) or re-raise after logging.Also applies to: 55-57, 88-90
src/tsn_adapters/tasks/quicksilver/__init__.py (1)
13-20: Sort all for consistency (Ruff RUF022).Apply:
__all__ = [ - "quicksilver_flow", - "QuicksilverFlowResult", - "QuicksilverProvider", - "QuicksilverDataTransformer", - "QuicksilverKey", - "QuicksilverDataDF", + "QuicksilverDataDF", + "QuicksilverDataTransformer", + "QuicksilverFlowResult", + "QuicksilverKey", + "QuicksilverProvider", + "quicksilver_flow", ]src/tsn_adapters/tasks/quicksilver/transformer.py (1)
31-38: Castvalueto numeric for downstream consumers.Transform price strings to numeric to avoid stringly-typed metrics and ease validation against TN schemas.
Apply:
- # Clean price data - clean_prices = data['price'].astype(str).str.replace('$', '', regex=False).str.replace(',', '', regex=False) + # Clean and cast price data to numeric + clean_prices = ( + data['price'].astype(str) + .str.replace('$', '', regex=False) + .str.replace(',', '', regex=False) + ) + numeric_prices = pd.to_numeric(clean_prices, errors="coerce") @@ - 'value': clean_prices, + 'value': numeric_prices,Also consider dropping rows with NaN prices if appropriate.
src/tsn_adapters/tasks/quicksilver/flow.py (3)
12-16: Import cleanup and prep for TypedDict-safe returnsRemove unused os import and add typing.cast for safe TypedDict returns.
-import os -from typing import Any, Optional, TypedDict +from typing import Any, Optional, TypedDict, cast
28-30: Silence unused args in fallback stubRename to underscore-prefixed to satisfy linters and intent.
- def task_split_and_insert_records(*args, **kwargs): + def task_split_and_insert_records(*_args, **_kwargs): """Mock implementation - should not be called in dry_run mode.""" raise RuntimeError("Real TN insertion attempted without trufnetwork-sdk-py installed. Use dry_run=True.")
103-104: Avoid blind Exception catch; log traceback and return typed resultUse logger.exception and return a cast TypedDict.
- except Exception as e: - return QuicksilverFlowResult(success=False, records_fetched=records_fetched, records_inserted=records_inserted, errors=[str(e)]) + except Exception as e: + logger.exception("Quicksilver flow failed") + return cast(QuicksilverFlowResult, { + "success": False, + "records_fetched": records_fetched, + "records_inserted": records_inserted, + "errors": [str(e)], + })As per coding guidelines
src/tsn_adapters/tasks/quicksilver/provider.py (1)
35-42: Defensive copy of params to avoid downstream mutationQuicksilverBlock.fetch_data mutates the params dict by adding "where". Passing self.default_params directly can mutate provider state. Pass a copy.
- return self.quicksilver_block.fetch_data( + return self.quicksilver_block.fetch_data( endpoint_path=self.endpoint_path, ticker=self.ticker, - params=self.default_params + params=dict(self.default_params) )src/tsn_adapters/blocks/quicksilver.py (4)
30-35: Support list-valued query paramsUse doseq=True to correctly encode list values.
- if self.params: - url += f"?{urlencode(self.params)}" + if self.params: + url += f"?{urlencode(self.params, doseq=True)}"
107-113: Avoid mutating caller’s params dict; remove redundant importCopy params before adding "where" and use the top-level json import.
- query_params = params or {} + query_params = dict(params) if params else {} @@ - if ticker: - import json + if ticker: where_clause = {"ticker": ticker} query_params["where"] = json.dumps(where_clause)
138-141: Log traceback for fetch errorsUse exception-level logging to capture stack trace.
- self.logger.error(error_msg) + self.logger.exception(error_msg) raise Exception(error_msg) from eAs per coding guidelines
92-93: Byte count log is misleadinglen(str(parsed_data)) counts characters of a repr, not bytes. The diff above switches to len(raw_bytes).
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (11)
src/examples/quicksilver/__init__.py(1 hunks)src/examples/quicksilver/env_utils.py(1 hunks)src/examples/quicksilver/mock_tn_block.py(1 hunks)src/examples/quicksilver/poc_example.py(1 hunks)src/tsn_adapters/blocks/quicksilver.py(1 hunks)src/tsn_adapters/tasks/quicksilver/README.md(1 hunks)src/tsn_adapters/tasks/quicksilver/__init__.py(1 hunks)src/tsn_adapters/tasks/quicksilver/flow.py(1 hunks)src/tsn_adapters/tasks/quicksilver/provider.py(1 hunks)src/tsn_adapters/tasks/quicksilver/transformer.py(1 hunks)src/tsn_adapters/tasks/quicksilver/types.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (6)
src/tsn_adapters/tasks/quicksilver/flow.py (4)
src/tsn_adapters/blocks/quicksilver.py (2)
QuicksilverBlock(40-141)logger(56-60)src/tsn_adapters/blocks/tn_access.py (1)
TNAccessBlock(261-655)src/tsn_adapters/tasks/quicksilver/provider.py (2)
QuicksilverProvider(15-45)get_latest_data(44-45)src/tsn_adapters/tasks/quicksilver/transformer.py (2)
QuicksilverDataTransformer(16-41)transform(24-41)
src/tsn_adapters/tasks/quicksilver/provider.py (3)
src/tsn_adapters/blocks/quicksilver.py (3)
QuicksilverBlock(40-141)logger(56-60)fetch_data(99-141)src/tsn_adapters/common/interfaces/provider.py (1)
IProviderGetter(14-38)src/tsn_adapters/utils/logging.py (1)
get_logger_safe(14-39)
src/examples/quicksilver/poc_example.py (4)
src/examples/quicksilver/env_utils.py (2)
load_environment(38-49)get_quicksilver_config(52-64)src/tsn_adapters/blocks/quicksilver.py (2)
QuicksilverBlock(40-141)fetch_data(99-141)src/tsn_adapters/tasks/quicksilver/flow.py (1)
quicksilver_flow(62-104)src/examples/quicksilver/mock_tn_block.py (1)
MockTNAccessBlock(7-12)
src/tsn_adapters/blocks/quicksilver.py (2)
src/tsn_adapters/utils/logging.py (1)
get_logger_safe(14-39)src/tsn_adapters/tasks/quicksilver/types.py (1)
QuicksilverDataModel(13-20)
src/tsn_adapters/tasks/quicksilver/transformer.py (3)
src/tsn_adapters/common/interfaces/transformer.py (1)
IDataTransformer(15-29)src/tsn_adapters/common/trufnetwork/models/tn_models.py (1)
TnDataRowModel(40-54)src/tsn_adapters/utils/logging.py (1)
get_logger_safe(14-39)
src/tsn_adapters/tasks/quicksilver/__init__.py (3)
src/tsn_adapters/tasks/quicksilver/flow.py (2)
quicksilver_flow(62-104)QuicksilverFlowResult(39-43)src/tsn_adapters/tasks/quicksilver/provider.py (1)
QuicksilverProvider(15-45)src/tsn_adapters/tasks/quicksilver/transformer.py (1)
QuicksilverDataTransformer(16-41)
🪛 Ruff (0.13.1)
src/tsn_adapters/tasks/quicksilver/flow.py
28-28: Unused function argument: args
(ARG001)
28-28: Unused function argument: kwargs
(ARG001)
30-30: Avoid specifying long messages outside the exception class
(TRY003)
72-72: Local variable logger is assigned to but never used
Remove assignment to unused variable logger
(F841)
103-103: Do not catch blind exception: Exception
(BLE001)
src/tsn_adapters/tasks/quicksilver/provider.py
36-36: Avoid specifying long messages outside the exception class
(TRY003)
src/examples/quicksilver/poc_example.py
1-1: Shebang is present but file is not executable
(EXE001)
25-25: f-string without any placeholders
Remove extraneous f prefix
(F541)
26-26: f-string without any placeholders
Remove extraneous f prefix
(F541)
28-28: f-string without any placeholders
Remove extraneous f prefix
(F541)
29-29: f-string without any placeholders
Remove extraneous f prefix
(F541)
36-36: Do not catch blind exception: Exception
(BLE001)
55-55: Do not catch blind exception: Exception
(BLE001)
59-59: f-string without any placeholders
Remove extraneous f prefix
(F541)
73-73: f-string without any placeholders
Remove extraneous f prefix
(F541)
83-83: f-string without any placeholders
Remove extraneous f prefix
(F541)
84-84: f-string without any placeholders
Remove extraneous f prefix
(F541)
85-85: f-string without any placeholders
Remove extraneous f prefix
(F541)
86-86: f-string without any placeholders
Remove extraneous f prefix
(F541)
88-88: Do not catch blind exception: Exception
(BLE001)
src/tsn_adapters/blocks/quicksilver.py
78-78: Audit URL open for permitted schemes. Allowing use of file: or custom schemes is often unexpected.
(S310)
88-88: Audit URL open for permitted schemes. Allowing use of file: or custom schemes is often unexpected.
(S310)
93-93: Consider moving this statement to an else block
(TRY300)
96-96: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
123-123: Abstract raise to an inner function
(TRY301)
123-123: Avoid specifying long messages outside the exception class
(TRY003)
127-127: Prefer TypeError exception for invalid type
(TRY004)
127-127: Abstract raise to an inner function
(TRY301)
127-127: Avoid specifying long messages outside the exception class
(TRY003)
140-140: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
141-141: Create your own exception
(TRY002)
src/tsn_adapters/tasks/quicksilver/__init__.py
13-20: __all__ is not sorted
Apply an isort-style sorting to __all__
(RUF022)
🔇 Additional comments (8)
src/examples/quicksilver/__init__.py (1)
1-3: LGTM — clear module docstring.src/tsn_adapters/tasks/quicksilver/types.py (1)
15-16: Confirm raw dtype forprice.Upstream API may return numeric prices; here it’s modeled as
str. Please confirm this matches the raw payload; if numeric, considerSeries[float]and coercion.src/tsn_adapters/tasks/quicksilver/README.md (1)
1-112: Docs look solid and consistent with the PoC.src/tsn_adapters/tasks/quicksilver/transformer.py (1)
24-41: Verify TN schema expectations forvaluedtype.If
TnDataRowModel.valueis numeric in your TN schema, ensure this transformer matches it (and add validation where it’s enforced).src/tsn_adapters/tasks/quicksilver/__init__.py (1)
8-11: TypedDict instantiation is valid – no change required
Calling a TypedDict class at runtime returns a plain dict per PEP 589, soreturn QuicksilverFlowResult(...)is already correct.Likely an incorrect or invalid review comment.
src/tsn_adapters/tasks/quicksilver/flow.py (2)
46-49: Fetch task looks goodSimple delegation with retries is appropriate.
51-59: Transform task looks goodInstantiation + transform with retries is fine.
src/tsn_adapters/tasks/quicksilver/provider.py (1)
31-33: LGTM: list_available_keysReturning a single configured key matches the single-asset scope in this POC.
| import os | ||
| from env_utils import load_environment, get_quicksilver_config | ||
|
|
||
| env_status = load_environment() | ||
| config = get_quicksilver_config() | ||
|
|
||
| TICKER = config['ticker'] | ||
| STREAM_ID = config['stream_id'] | ||
|
|
||
| from tsn_adapters.blocks.quicksilver import QuicksilverBlock | ||
| from tsn_adapters.tasks.quicksilver.flow import quicksilver_flow | ||
| from mock_tn_block import MockTNAccessBlock | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix imports for src/ layout so “run from repo root” works.
Running python src/examples/quicksilver/poc_example.py won’t find tsn_adapters unless PYTHONPATH=src or we add it at runtime.
Apply:
import os
-from env_utils import load_environment, get_quicksilver_config
+from env_utils import load_environment, get_quicksilver_config
+import sys
+from pathlib import Path
+# Ensure src/ is on sys.path for src-layout imports when running as a script
+sys.path.insert(0, str(Path(__file__).resolve().parents[2]))Alternatively: run with PYTHONPATH=src python src/examples/quicksilver/poc_example.py.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| import os | |
| from env_utils import load_environment, get_quicksilver_config | |
| env_status = load_environment() | |
| config = get_quicksilver_config() | |
| TICKER = config['ticker'] | |
| STREAM_ID = config['stream_id'] | |
| from tsn_adapters.blocks.quicksilver import QuicksilverBlock | |
| from tsn_adapters.tasks.quicksilver.flow import quicksilver_flow | |
| from mock_tn_block import MockTNAccessBlock | |
| import os | |
| from env_utils import load_environment, get_quicksilver_config | |
| import sys | |
| from pathlib import Path | |
| # Ensure src/ is on sys.path for src-layout imports when running as a script | |
| sys.path.insert(0, str(Path(__file__).resolve().parents[2])) | |
| env_status = load_environment() | |
| config = get_quicksilver_config() | |
| TICKER = config['ticker'] | |
| STREAM_ID = config['stream_id'] | |
| from tsn_adapters.blocks.quicksilver import QuicksilverBlock | |
| from tsn_adapters.tasks.quicksilver.flow import quicksilver_flow | |
| from mock_tn_block import MockTNAccessBlock |
🤖 Prompt for AI Agents
In src/examples/quicksilver/poc_example.py around lines 8-20 the imports assume
the repo's src/ directory is on sys.path, so running the script directly from
the repo root fails to find tsn_adapters; before any other imports, prepend the
project's src directory to sys.path (e.g. compute
Path(__file__).resolve().parents[2] to get the src folder and insert it at
sys.path[0]) so the relative package imports work when running python
src/examples/quicksilver/poc_example.py (alternatively document using
PYTHONPATH=src when running).
| url = endpoint.get_url(self.base_url) | ||
|
|
||
| # Create request with headers | ||
| request = Request(url) | ||
| request.add_header("Accept", "application/json") | ||
| request.add_header("User-Agent", "TSN-Adapters/1.0") | ||
|
|
||
| try: | ||
| self.logger.debug(f"Making request to: {url}") | ||
|
|
||
| # Create SSL context for secure connections | ||
| ssl_context = ssl.create_default_context(cafile=certifi.where()) | ||
|
|
||
| response = urlopen(request, timeout=self.timeout, context=ssl_context) | ||
| data = response.read().decode("utf-8") | ||
| parsed_data = json.loads(data) | ||
|
|
||
| self.logger.debug(f"Successfully received {len(str(parsed_data))} bytes of data") | ||
| return parsed_data | ||
|
|
||
| except Exception as e: | ||
| self.logger.error(f"Request failed for URL {url}: {e}") | ||
| raise |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Secure scheme validation, close response, and better error logging
- Validate http(s) scheme to reduce SSRF/unsafe scheme risk.
- Use context manager to close the response.
- Log exceptions with traceback.
- Log actual byte length.
@@
- url = endpoint.get_url(self.base_url)
+ url = endpoint.get_url(self.base_url)
@@
- try:
+ try:
self.logger.debug(f"Making request to: {url}")
-
+ # Validate scheme
+ from urllib.parse import urlsplit
+ parsed = urlsplit(url)
+ if parsed.scheme not in ("http", "https"):
+ raise ValueError(f"Unsupported URL scheme: {parsed.scheme}")
+
# Create SSL context for secure connections
ssl_context = ssl.create_default_context(cafile=certifi.where())
-
- response = urlopen(request, timeout=self.timeout, context=ssl_context)
- data = response.read().decode("utf-8")
- parsed_data = json.loads(data)
-
- self.logger.debug(f"Successfully received {len(str(parsed_data))} bytes of data")
+ with urlopen(request, timeout=self.timeout, context=ssl_context) as response:
+ raw_bytes = response.read()
+ data = raw_bytes.decode("utf-8")
+ parsed_data = json.loads(data)
+ self.logger.debug(f"Successfully received {len(raw_bytes)} bytes of data")
return parsed_data
except Exception as e:
- self.logger.error(f"Request failed for URL {url}: {e}")
+ self.logger.exception(f"Request failed for URL {url}: {e}")
raiseAs per coding guidelines
🧰 Tools
🪛 Ruff (0.13.1)
78-78: Audit URL open for permitted schemes. Allowing use of file: or custom schemes is often unexpected.
(S310)
88-88: Audit URL open for permitted schemes. Allowing use of file: or custom schemes is often unexpected.
(S310)
93-93: Consider moving this statement to an else block
(TRY300)
96-96: Use logging.exception instead of logging.error
Replace with exception
(TRY400)
🤖 Prompt for AI Agents
In src/tsn_adapters/blocks/quicksilver.py around lines 75 to 97, validate that
endpoint.get_url(self.base_url) returns an http or https URL (raise a ValueError
if the scheme is not 'http' or 'https') to prevent unsafe schemes, then open the
request using a context manager (with urlopen(...) as response:) so the response
is always closed, read raw bytes and log the actual byte length
(len(data_bytes)) before decoding/JSON parsing, and in the except block log the
full traceback (use logger.exception or include traceback.format_exc()) and
re-raise the original exception.
| quicksilver_data = task_fetch_quicksilver_data(provider) | ||
| records_fetched = len(quicksilver_data) | ||
|
|
||
| if quicksilver_data.empty: | ||
| return QuicksilverFlowResult(success=False, records_fetched=0, records_inserted=0, errors=["No data"]) | ||
|
|
||
| tn_data = task_transform_quicksilver_data(quicksilver_data, stream_id, data_provider) | ||
|
|
||
| if tn_data.empty: | ||
| return QuicksilverFlowResult(success=False, records_fetched=records_fetched, records_inserted=0, errors=["Transform failed"]) | ||
|
|
||
| if dry_run: | ||
| records_inserted = len(tn_data) | ||
| else: | ||
| insertion_results = task_split_and_insert_records(tn_block=tn_block, data=tn_data, max_batch_size=max_batch_size) | ||
| records_inserted = insertion_results.get("records_inserted", 0) | ||
|
|
||
| return QuicksilverFlowResult(success=True, records_fetched=records_fetched, records_inserted=records_inserted, errors=errors) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TypedDict misuse: returning QuicksilverFlowResult(...) will raise at runtime
TypedDicts are not callables. Replace constructor-like calls with dicts and cast.
- if quicksilver_data.empty:
- return QuicksilverFlowResult(success=False, records_fetched=0, records_inserted=0, errors=["No data"])
+ if quicksilver_data.empty:
+ return cast(QuicksilverFlowResult, {
+ "success": False,
+ "records_fetched": 0,
+ "records_inserted": 0,
+ "errors": ["No data"],
+ })
@@
- if tn_data.empty:
- return QuicksilverFlowResult(success=False, records_fetched=records_fetched, records_inserted=0, errors=["Transform failed"])
+ if tn_data.empty:
+ return cast(QuicksilverFlowResult, {
+ "success": False,
+ "records_fetched": records_fetched,
+ "records_inserted": 0,
+ "errors": ["Transform failed"],
+ })
@@
- return QuicksilverFlowResult(success=True, records_fetched=records_fetched, records_inserted=records_inserted, errors=errors)
+ return cast(QuicksilverFlowResult, {
+ "success": True,
+ "records_fetched": records_fetched,
+ "records_inserted": records_inserted,
+ "errors": errors,
+ })📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| quicksilver_data = task_fetch_quicksilver_data(provider) | |
| records_fetched = len(quicksilver_data) | |
| if quicksilver_data.empty: | |
| return QuicksilverFlowResult(success=False, records_fetched=0, records_inserted=0, errors=["No data"]) | |
| tn_data = task_transform_quicksilver_data(quicksilver_data, stream_id, data_provider) | |
| if tn_data.empty: | |
| return QuicksilverFlowResult(success=False, records_fetched=records_fetched, records_inserted=0, errors=["Transform failed"]) | |
| if dry_run: | |
| records_inserted = len(tn_data) | |
| else: | |
| insertion_results = task_split_and_insert_records(tn_block=tn_block, data=tn_data, max_batch_size=max_batch_size) | |
| records_inserted = insertion_results.get("records_inserted", 0) | |
| return QuicksilverFlowResult(success=True, records_fetched=records_fetched, records_inserted=records_inserted, errors=errors) | |
| quicksilver_data = task_fetch_quicksilver_data(provider) | |
| records_fetched = len(quicksilver_data) | |
| if quicksilver_data.empty: | |
| return cast(QuicksilverFlowResult, { | |
| "success": False, | |
| "records_fetched": 0, | |
| "records_inserted": 0, | |
| "errors": ["No data"], | |
| }) | |
| tn_data = task_transform_quicksilver_data(quicksilver_data, stream_id, data_provider) | |
| if tn_data.empty: | |
| return cast(QuicksilverFlowResult, { | |
| "success": False, | |
| "records_fetched": records_fetched, | |
| "records_inserted": 0, | |
| "errors": ["Transform failed"], | |
| }) | |
| if dry_run: | |
| records_inserted = len(tn_data) | |
| else: | |
| insertion_results = task_split_and_insert_records( | |
| tn_block=tn_block, | |
| data=tn_data, | |
| max_batch_size=max_batch_size | |
| ) | |
| records_inserted = insertion_results.get("records_inserted", 0) | |
| return cast(QuicksilverFlowResult, { | |
| "success": True, | |
| "records_fetched": records_fetched, | |
| "records_inserted": records_inserted, | |
| "errors": errors, | |
| }) |
Time Submission Status
|
Summary
This PR introduces a Proof of Concept (POC) adapter for integrating data from the Quicksilver API aggregator into TrufNetwork. It provides a complete, configurable data pipeline from data fetching to transformation, demonstrated by a runnable example.
Motivation
This adapter serves as a template for a flexible integration pattern where new data sources can be added via the Quicksilver aggregator without requiring adapter code changes. The design prioritizes modularity, configurability through environment variables, and ease of use.
Implementation
QuicksilverBlock: A new Prefect block to handle all communication with the Quicksilver API.QuicksilverProvider: ImplementsIProviderGetterto fetch data for a specific ticker.QuicksilverDataTransformer: Transforms raw Quicksilver data into theTnDataRowModel.quicksilver_flow: A Prefect flow that orchestrates the fetch-transform-load pipeline.poc_example.py: A standalone script to demonstrate the full flow in adry-runmode.How to Test
.envfile insrc/examples/quicksilver/with API and ticker details.Architecture
High-Level Data Flow
graph TD subgraph DePIN Data Sources Yahoo[Yahoo! Finance] CoinGecko[CoinGecko] Sensors[IoT Devices/Sensors] end subgraph Quicksilver QS_API(Quicksilver API Gateway) end Adapter[TSN Adapter] subgraph Consumer Clients TrufNetwork[TrufNetwork Platform] AIAgents[AI Agents] end Yahoo & CoinGecko & Sensors --> QS_API QS_API --> Adapter --> TrufNetwork QS_API --> AIAgents classDef truf_highlight fill:#d4f0ff,stroke:#007bff,stroke-width:2px,color:#000 class Adapter,TrufNetwork truf_highlightAdapter Logic Sequence
sequenceDiagram participant User participant POC_Script as poc_example.py participant Flow as quicksilver_flow participant Provider as QuicksilverProvider participant Block as QuicksilverBlock participant Transformer as QuicksilverDataTransformer User->>POC_Script: Run POC_Script->>Flow: Execute Flow->>Provider: Get data Provider->>Block: fetch_data() Block-->>Provider: DataFrame Provider-->>Flow: DataFrame Flow->>Transformer: transform() Transformer-->>Flow: TnDataRow DataFrame Flow-->>POC_Script: FlowResult POC_Script-->>User: Display resultsSummary by CodeRabbit
New Features
Documentation