diff --git a/python-derivations/shipments-joins/enriched-shipments.flow.py b/python-derivations/shipments-joins/enriched-shipments.flow.py new file mode 100644 index 0000000..cf13e87 --- /dev/null +++ b/python-derivations/shipments-joins/enriched-shipments.flow.py @@ -0,0 +1,193 @@ +from collections.abc import AsyncIterator +from dani_demo.python_derivations.enriched_shipments import ( + IDerivation, + Document, + Request, + Response +) +from pydantic import BaseModel, Field + + +class CustomerTier(BaseModel): + """Reference data for a customer's tier information.""" + tier: str | None = None + region: str | None = None + account_manager: str | None = None + + +class ShipmentData(BaseModel): + """Core shipment fields we need for the join output.""" + shipment_id: int + customer_id: int + shipment_status: str | None = None + is_priority: bool = False + city: str | None = None + expected_delivery_date: str | None = None + + +class JoinState(BaseModel): + """State for a single join key (customer_id). + + We store both sides of the join here. For a one-to-many join + (one customer tier to many shipments), we store the tier once + and shipments in a dictionary keyed by shipment_id. + """ + # Right side: customer tier (one per customer) + tier: CustomerTier | None = None + + # Left side: shipments (many per customer) + # Keyed by shipment_id for efficient updates + shipments: dict[int, ShipmentData] = Field(default_factory=lambda: {}) + + +class State(BaseModel): + """Root state container mapping customer_id to join state.""" + customers: dict[int, JoinState] = Field(default_factory=lambda: {}) + + +class Derivation(IDerivation): + """Derivation that left-joins shipments with customer tier reference data. + + This implements a LEFT JOIN where: + - Shipments are always emitted (left side) + - Customer tier data is enriched when available (right side) + - Shipments without matching tier data still appear in output + """ + + def __init__(self, open: Request.Open): + """Initialize and restore persisted state.""" + super().__init__(open) + self.state = State(**open.state) + self.touched_customers: set[int] = set() + + async def shipments( + self, + read: Request.ReadShipments + ) -> AsyncIterator[Document]: + """Process shipment events (left side of the join). + + For a left join, we always emit shipments. If we have tier data + for this customer, we include it. Otherwise, tier fields are null. + """ + doc = read.doc + customer_id = doc.customer_id + shipment_id = doc.id + + # Skip if customer_id is None + if customer_id is None: + return + + # Get or create join state for this customer + join_state = self.state.customers.setdefault( + customer_id, + JoinState() + ) + + # Handle deletion: remove the shipment from state + op = doc.m_meta.op + if op == 'd': + join_state.shipments.pop(shipment_id, None) + self.touched_customers.add(customer_id) + return # Don't emit for deletions + + # Store/update the shipment in state + shipment_data = ShipmentData( + shipment_id=shipment_id, + customer_id=customer_id, + shipment_status=doc.shipment_status, + is_priority=doc.is_priority or False, + city=doc.city, + expected_delivery_date=doc.expected_delivery_date + ) + join_state.shipments[shipment_id] = shipment_data + self.touched_customers.add(customer_id) + + # Left join: always emit the shipment with whatever tier data we have + yield self._build_joined_document(shipment_data, join_state.tier) + + async def customer_tiers( + self, + read: Request.ReadCustomerTiers + ) -> AsyncIterator[Document]: + """Process customer tier updates (right side of the join). + + When tier data arrives or changes, we re-emit all shipments for + this customer with the updated enrichment data. + """ + doc = read.doc + # Convert customer_id from string to int (source schema has it as string) + customer_id = int(doc.customer_id) + + # Get or create join state for this customer + join_state = self.state.customers.setdefault( + customer_id, + JoinState() + ) + + # Handle deletion: clear the tier data + # (For Google Sheets, this happens when a row is removed) + op = doc.m_meta.op + if op == 'd': + join_state.tier = None + self.touched_customers.add(customer_id) + # Re-emit all shipments for this customer without tier data + for shipment in join_state.shipments.values(): + yield self._build_joined_document(shipment, None) + return + + # Store/update the tier data + join_state.tier = CustomerTier( + tier=doc.tier, + region=doc.region, + account_manager=doc.account_manager + ) + self.touched_customers.add(customer_id) + + # Re-emit all shipments for this customer with the updated tier data + for shipment in join_state.shipments.values(): + yield self._build_joined_document(shipment, join_state.tier) + + def _build_joined_document( + self, + shipment: ShipmentData, + tier: CustomerTier | None + ) -> Document: + """Construct the joined output document.""" + return Document( + shipment_id=shipment.shipment_id, + customer_id=shipment.customer_id, + shipment_status=shipment.shipment_status, + is_priority=shipment.is_priority, + city=shipment.city, + expected_delivery_date=shipment.expected_delivery_date, + # Enrichment fields from tier (null if not available) + customer_tier=tier.tier if tier else None, + customer_region=tier.region if tier else None, + account_manager=tier.account_manager if tier else None + ) + + def start_commit( + self, + start_commit: Request.StartCommit + ) -> Response.StartedCommit: + """Persist state changes.""" + updated_state = { + "customers": { + str(cid): self.state.customers[cid].model_dump() + for cid in self.touched_customers + if cid in self.state.customers + } + } + self.touched_customers = set() + + return Response.StartedCommit( + state=Response.StartedCommit.State( + updated=updated_state, + merge_patch=True, + ) + ) + + async def reset(self): + """Reset state for testing.""" + self.state = State() + self.touched_customers = set() \ No newline at end of file diff --git a/python-derivations/shipments-joins/enriched-shipments.schema.yaml b/python-derivations/shipments-joins/enriched-shipments.schema.yaml new file mode 100644 index 0000000..2cccbab --- /dev/null +++ b/python-derivations/shipments-joins/enriched-shipments.schema.yaml @@ -0,0 +1,34 @@ +type: object +properties: + shipment_id: + type: integer + description: Unique identifier for the shipment + customer_id: + type: integer + description: Customer who owns this shipment + shipment_status: + type: string + description: Current status of the shipment + is_priority: + type: boolean + description: Whether this is a priority shipment + city: + type: string + description: Delivery city + expected_delivery_date: + type: string + format: date + description: Expected delivery date + # Enrichment fields from customer tiers + customer_tier: + type: string + description: Customer's service tier (Gold, Silver, Bronze) + customer_region: + type: string + description: Customer's geographic region + account_manager: + type: string + description: Assigned account manager for this customer +required: + - shipment_id + - customer_id \ No newline at end of file diff --git a/python-derivations/shipments-joins/flow.yaml b/python-derivations/shipments-joins/flow.yaml new file mode 100644 index 0000000..f9217a8 --- /dev/null +++ b/python-derivations/shipments-joins/flow.yaml @@ -0,0 +1,19 @@ +collections: + + dani-demo/python-derivations/enriched-shipments: + schema: enriched-shipments.schema.yaml + key: + - /shipment_id + derive: + using: + python: + module: enriched-shipments.flow.py + transforms: + - name: shipments + source: Artificial-Industries/postgres-shipments/public/shipments + shuffle: any + - name: customer_tiers + source: dani-demo/customer-tiers/Sheet1 + shuffle: + key: + - /customer_id \ No newline at end of file diff --git a/python-derivations/shipments-joins/flow_generated/python/dani-demo/__init__.py b/python-derivations/shipments-joins/flow_generated/python/dani-demo/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/python-derivations/shipments-joins/flow_generated/python/dani-demo/python-derivations/__init__.py b/python-derivations/shipments-joins/flow_generated/python/dani-demo/python-derivations/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/python-derivations/shipments-joins/flow_generated/python/dani_demo/python_derivations/enriched_shipments/__init__.py b/python-derivations/shipments-joins/flow_generated/python/dani_demo/python_derivations/enriched_shipments/__init__.py new file mode 100644 index 0000000..91e8af2 --- /dev/null +++ b/python-derivations/shipments-joins/flow_generated/python/dani_demo/python_derivations/enriched_shipments/__init__.py @@ -0,0 +1,264 @@ +from abc import ABC, abstractmethod +import typing +import collections.abc +import pydantic + + +# Generated for published documents of derived collection dani-demo/python-derivations/enriched-shipments +class Document(pydantic.BaseModel): + model_config = pydantic.ConfigDict(extra='allow') + + account_manager: typing.Optional[str] = None + """Assigned account manager for this customer""" + city: typing.Optional[str] = None + """Delivery city""" + customer_id: int + """Customer who owns this shipment""" + customer_region: typing.Optional[str] = None + """Customer's geographic region""" + customer_tier: typing.Optional[str] = None + """Customer's service tier (Gold, Silver, Bronze)""" + expected_delivery_date: typing.Optional[str] = None + """Expected delivery date""" + is_priority: typing.Optional[bool] = None + """Whether this is a priority shipment""" + shipment_id: int + """Unique identifier for the shipment""" + shipment_status: typing.Optional[str] = None + """Current status of the shipment""" + + + +# Generated for read documents of sourced collection Artificial-Industries/postgres-shipments/public/shipments +class SourceShipmentsPublicShipments(pydantic.BaseModel): + model_config = pydantic.ConfigDict(extra='allow') + + city: typing.Optional[typing.Union[str, None]] = None + """(source type: varchar)""" + created_at: typing.Optional[typing.Union[str, None]] = None + """(source type: timestamp)""" + current_location: typing.Optional[typing.Any] = None + """(source type: composite)""" + customer_id: typing.Optional[typing.Union[int, None]] = None + """(source type: int4)""" + delivery_coordinates: typing.Optional[typing.Any] = None + """(source type: composite)""" + delivery_name: typing.Optional[typing.Union[str, None]] = None + """(source type: varchar)""" + expected_delivery_date: typing.Optional[typing.Union[str, None]] = None + """(source type: date)""" + id: int + """(source type: non-nullable int4)""" + is_priority: typing.Optional[typing.Union[bool, None]] = None + """(source type: bool)""" + order_id: typing.Optional[typing.Union[str, None]] = None + """(source type: uuid)""" + shipment_status: typing.Optional[typing.Union[str, None]] = None + """(source type: enum)""" + street_address: typing.Optional[typing.Union[str, None]] = None + """(source type: varchar)""" + updated_at: typing.Optional[typing.Union[str, None]] = None + """(source type: timestamp)""" + +class SourceShipments(pydantic.BaseModel): + class MMeta(pydantic.BaseModel): + class Before(pydantic.BaseModel): + """Record state immediately before this change was applied.""" + model_config = pydantic.ConfigDict(extra='allow') + + city: typing.Optional[typing.Union[str, None]] = None + """(source type: varchar)""" + created_at: typing.Optional[typing.Union[str, None]] = None + """(source type: timestamp)""" + current_location: typing.Optional[typing.Any] = None + """(source type: composite)""" + customer_id: typing.Optional[typing.Union[int, None]] = None + """(source type: int4)""" + delivery_coordinates: typing.Optional[typing.Any] = None + """(source type: composite)""" + delivery_name: typing.Optional[typing.Union[str, None]] = None + """(source type: varchar)""" + expected_delivery_date: typing.Optional[typing.Union[str, None]] = None + """(source type: date)""" + id: int + """(source type: non-nullable int4)""" + is_priority: typing.Optional[typing.Union[bool, None]] = None + """(source type: bool)""" + order_id: typing.Optional[typing.Union[str, None]] = None + """(source type: uuid)""" + shipment_status: typing.Optional[typing.Union[str, None]] = None + """(source type: enum)""" + street_address: typing.Optional[typing.Union[str, None]] = None + """(source type: varchar)""" + updated_at: typing.Optional[typing.Union[str, None]] = None + """(source type: timestamp)""" + + class Source(pydantic.BaseModel): + model_config = pydantic.ConfigDict(extra='allow') + + loc: tuple[int, int, int] + """Location of this WAL event as [last Commit.EndLSN; event LSN; current Begin.FinalLSN]. See https://www.postgresql.org/docs/current/protocol-logicalrep-message-formats.html""" + schema: str + """Database schema (namespace) of the event.""" + snapshot: typing.Optional[bool] = None + """Snapshot is true if the record was produced from an initial table backfill and unset if produced from the replication log.""" + table: str + """Database table of the event.""" + ts_ms: typing.Optional[int] = None + """Unix timestamp (in millis) at which this event was recorded by the database.""" + txid: typing.Optional[int] = None + """The 32-bit transaction ID assigned by Postgres to the commit which produced this change.""" + + model_config = pydantic.ConfigDict(extra='allow') + + before: typing.Optional["Before"] = None + """Record state immediately before this change was applied.""" + op: typing.Literal["c", "d", "u"] + """Change operation type: 'c' Create/Insert, 'u' Update, 'd' Delete.""" + source: "Source" + + model_config = pydantic.ConfigDict(extra='allow') + + m_meta: "MMeta" = pydantic.Field(alias="""_meta""") + city: typing.Optional[typing.Union[str, None]] = None + """(source type: varchar)""" + created_at: typing.Optional[typing.Union[str, None]] = None + """(source type: timestamp)""" + current_location: typing.Optional[typing.Any] = None + """(source type: composite)""" + customer_id: typing.Optional[typing.Union[int, None]] = None + """(source type: int4)""" + delivery_coordinates: typing.Optional[typing.Any] = None + """(source type: composite)""" + delivery_name: typing.Optional[typing.Union[str, None]] = None + """(source type: varchar)""" + expected_delivery_date: typing.Optional[typing.Union[str, None]] = None + """(source type: date)""" + id: int + """(source type: non-nullable int4)""" + is_priority: typing.Optional[typing.Union[bool, None]] = None + """(source type: bool)""" + order_id: typing.Optional[typing.Union[str, None]] = None + """(source type: uuid)""" + shipment_status: typing.Optional[typing.Union[str, None]] = None + """(source type: enum)""" + street_address: typing.Optional[typing.Union[str, None]] = None + """(source type: varchar)""" + updated_at: typing.Optional[typing.Union[str, None]] = None + """(source type: timestamp)""" + + + +# Generated for read documents of sourced collection dani-demo/customer-tiers/Sheet1 +class SourceCustomerTiers(pydantic.BaseModel): + """Row""" + class MMeta(pydantic.BaseModel): + """Meta Document metadata""" + op: str + """Op Operation type (c: Create, u: Update, d: Delete)""" + row_id: int + """Row Id Row ID of the Document, counting up from zero, or -1 if not known""" + uuid: str + + m_meta: "MMeta" = pydantic.Field(alias="""_meta""") + """Meta Document metadata""" + account_manager: str + customer_id: str + region: str + tier: str + + + +class Request(pydantic.BaseModel): + + class Open(pydantic.BaseModel): + state: dict[str, typing.Any] + + class Flush(pydantic.BaseModel): + pass + + class Reset(pydantic.BaseModel): + pass + + class StartCommit(pydantic.BaseModel): + runtime_checkpoint: typing.Any = pydantic.Field(default=None, alias='runtimeCheckpoint') + + open: typing.Optional[Open] = None + flush: typing.Optional[Flush] = None + reset: typing.Optional[Reset] = None + start_commit: typing.Optional[StartCommit] = pydantic.Field(default=None, alias='startCommit') + + + class ReadShipments(pydantic.BaseModel): + doc: SourceShipments + transform: typing.Literal[0] + + + class ReadCustomerTiers(pydantic.BaseModel): + doc: SourceCustomerTiers + transform: typing.Literal[1] + + read : typing.Annotated[ReadShipments | ReadCustomerTiers, pydantic.Field(discriminator='transform')] | None = None + + @pydantic.model_validator(mode='before') + @classmethod + def inject_default_transform(cls, data: dict[str, typing.Any]) -> dict[str, typing.Any]: + if 'read' in data and 'transform' not in data['read']: + data['read']['transform'] = 0 # Make implicit default explicit + return data + + +class Response(pydantic.BaseModel): + class Opened(pydantic.BaseModel): + pass + + class Published(pydantic.BaseModel): + doc: Document + + class Flushed(pydantic.BaseModel): + pass + + class StartedCommit(pydantic.BaseModel): + + class State(pydantic.BaseModel): + updated: dict[str, typing.Any] + merge_patch: bool = False + + state: typing.Optional[State] = None + + opened: typing.Optional[Opened] = None + published: typing.Optional[Published] = None + flushed: typing.Optional[Flushed] = None + started_commit: typing.Optional[StartedCommit] = pydantic.Field(default=None, alias='startedCommit') + +class IDerivation(ABC): + """Abstract base class for derivation implementations.""" + + def __init__(self, open: Request.Open): + """Initialize the derivation with an Open message.""" + pass + + @abstractmethod + async def shipments(self, read: Request.ReadShipments) -> collections.abc.AsyncIterator[Document]: + """Transform method for 'shipments' source.""" + if False: + yield # Mark as a generator. + + @abstractmethod + async def customer_tiers(self, read: Request.ReadCustomerTiers) -> collections.abc.AsyncIterator[Document]: + """Transform method for 'customer_tiers' source.""" + if False: + yield # Mark as a generator. + + async def flush(self) -> collections.abc.AsyncIterator[Document]: + """Flush any buffered documents. Override to implement pipelining.""" + if False: + yield # Mark as a generator. + + def start_commit(self, start_commit: Request.StartCommit) -> Response.StartedCommit: + """Return state updates to persist. Override to implement stateful derivations.""" + return Response.StartedCommit() + + async def reset(self): + """Reset internal state for testing. Override if needed.""" + pass diff --git a/python-derivations/shipments-joins/pyproject.toml b/python-derivations/shipments-joins/pyproject.toml new file mode 100644 index 0000000..2dc51f9 --- /dev/null +++ b/python-derivations/shipments-joins/pyproject.toml @@ -0,0 +1,13 @@ +[project] +name = "dani-demo-python-derivations-enriched-shipments" +version = "0.1.0" +requires-python = ">=3.12" +dependencies = [ + "pydantic>=2.0", + "pyright>=1.1", +] + +[tool.pylsp-mypy] +enabled = true + +# IDE configuration is in pyrightconfig.json diff --git a/python-derivations/shipments-joins/pyrightconfig.json b/python-derivations/shipments-joins/pyrightconfig.json new file mode 100644 index 0000000..7f39867 --- /dev/null +++ b/python-derivations/shipments-joins/pyrightconfig.json @@ -0,0 +1,6 @@ +{ + "extraPaths": [ + "flow_generated/python" + ], + "typeCheckingMode": "strict" +} \ No newline at end of file diff --git a/python-derivations/shipments-stateful/customer-metrics.flow.py b/python-derivations/shipments-stateful/customer-metrics.flow.py new file mode 100644 index 0000000..82283f3 --- /dev/null +++ b/python-derivations/shipments-stateful/customer-metrics.flow.py @@ -0,0 +1,320 @@ +"""Stateful derivation that maintains per-customer shipping metrics. + +This derivation demonstrates how to: +1. Load persisted state when the derivation initializes +2. Maintain in-memory state as documents are processed +3. Persist state updates efficiently via start_commit() +4. Handle state recovery across restarts and failures + +The key insight is that state is partitioned by customer_id (via shuffle.key), +so each worker only manages state for its assigned subset of customers. +""" +from collections.abc import AsyncIterator +from datetime import datetime, date +from dani_demo.python_derivations.customer_metrics import ( + IDerivation, + Document, + Request, + Response, + SourceShipments +) +from pydantic import BaseModel, Field + + +class CustomerState(BaseModel): + """Metrics we track for each customer. + + This model defines the shape of our per-customer state. Using Pydantic + gives us automatic serialization to/from JSON for persistence, plus + validation and clear documentation of our state structure. + """ + total_shipments: int = 0 + on_time_count: int = 0 + late_count: int = 0 + active_shipments: int = 0 + total_delivery_days: int = 0 # Running sum for computing average + delivered_count: int = 0 # Denominator for average calculation + last_shipment_date: str | None = None + + # Track which shipments we've seen and their current status. + # This is essential for CDC: when a shipment transitions from + # "In Transit" to "Delivered", we need to know it was previously + # active so we can decrement active_shipments correctly. + known_shipments: dict[int, str] = Field(default_factory=lambda: {}) + + +class State(BaseModel): + """Root state container mapping customer IDs to their metrics. + + The runtime persists this entire structure. On restart, it's passed + back to us via Request.Open so we can resume where we left off. + """ + customers: dict[int, CustomerState] = Field(default_factory=lambda: {}) + + +class Derivation(IDerivation): + """Derivation that aggregates shipment events into customer profiles.""" + + # Customers with this many shipments or more are considered VIPs. + VIP_THRESHOLD = 10 + + def __init__(self, open: Request.Open): + """Initialize the derivation and restore persisted state. + + The runtime calls this when the derivation starts (or restarts). + The open.state dictionary contains whatever we returned from + start_commit() in our previous run. On the very first run, + open.state is an empty dictionary. + + Args: + open: Contains runtime configuration and persisted state + """ + super().__init__(open) + + # Restore state from the previous transaction. If this is our + # first run, State() creates empty default structures. + self.state = State(**open.state) + + # Track which customers we've modified in the current transaction. + # This optimization lets us persist only changed state rather than + # the entire state dictionary, which matters when you have millions + # of customers but only thousands change per transaction. + self.touched_customers: dict[int, CustomerState] = {} + + async def shipments( + self, + read: Request.ReadShipments + ) -> AsyncIterator[Document]: + """Process a shipment event and update customer metrics. + + This method is called for each document from our source collection. + We update the relevant customer's state in memory, then yield + their current metrics as an output document. + + Args: + read: Contains the shipment document and metadata + + Yields: + Updated customer metrics document + """ + doc = read.doc + + # Ensure required fields are not None + assert doc.customer_id is not None, "customer_id cannot be None" + assert doc.shipment_status is not None, "shipment_status cannot be None" + + customer_id = doc.customer_id + shipment_id = doc.id + current_status = doc.shipment_status + + # Get or create state for this customer. The setdefault pattern + # handles both existing and new customers elegantly. + customer = self.state.customers.setdefault( + customer_id, + CustomerState() + ) + + # Check if we've seen this shipment before and what its status was. + # This is crucial for CDC processing—we need to handle transitions + # like "In Transit" -> "Delivered" without double-counting. + previous_status = customer.known_shipments.get(shipment_id) + + # Handle the CDC operation type. The _meta.op field tells us whether + # this is a create, update, or delete operation. + op = doc.m_meta.op + + if op == 'd': + # Deletion: remove the shipment from our tracking. + # In practice, you might handle this differently depending + # on your business logic—perhaps archive rather than forget. + self._handle_deletion(customer, shipment_id, previous_status) + else: + # Create or update: process the shipment event + self._process_shipment( + customer, + shipment_id, + current_status, + previous_status, + doc + ) + + # Update the last shipment date if this one is more recent + shipment_date = doc.created_at[:10] if doc.created_at else None + if shipment_date: + if not customer.last_shipment_date or shipment_date > customer.last_shipment_date: + customer.last_shipment_date = shipment_date + + # Mark this customer as touched so start_commit knows to persist them + self.touched_customers[customer_id] = customer + + # Emit the current state of this customer's metrics. + # Because our collection is keyed by customer_id, this document + # will replace any previous version for this customer. + yield self._build_output_document(customer_id, customer) + + def _handle_deletion( + self, + customer: CustomerState, + shipment_id: int, + previous_status: str | None + ) -> None: + """Handle a deleted shipment by reversing its contribution to metrics.""" + if previous_status is None: + # We never saw this shipment, nothing to undo + return + + # Decrement the appropriate counters based on what we knew + customer.total_shipments = max(0, customer.total_shipments - 1) + + if previous_status in ('In Transit', 'At Checkpoint', 'Out for Delivery'): + customer.active_shipments = max(0, customer.active_shipments - 1) + elif previous_status == 'Delivered': + customer.delivered_count = max(0, customer.delivered_count - 1) + # Note: We can't perfectly undo the delivery time contribution + # without storing more state. This is a trade-off. + + # Remove from our tracking + del customer.known_shipments[shipment_id] + + def _process_shipment( + self, + customer: CustomerState, + shipment_id: int, + current_status: str, + previous_status: str | None, + doc: SourceShipments + ) -> None: + """Process a create or update event for a shipment.""" + + # If this is a new shipment we haven't seen before, increment total + if previous_status is None: + customer.total_shipments += 1 + + # Handle status transitions. The key insight is that we need to + # properly account for a shipment moving between states. + is_active_status = current_status in ( + 'In Transit', 'At Checkpoint', 'Out for Delivery' + ) + was_active_status = previous_status in ( + 'In Transit', 'At Checkpoint', 'Out for Delivery' + ) if previous_status else False + + # Update active shipment count based on status transition + if is_active_status and not was_active_status: + # Became active (new shipment or status changed to active) + customer.active_shipments += 1 + elif was_active_status and not is_active_status: + # Was active, now isn't (delivered, cancelled, etc.) + customer.active_shipments = max(0, customer.active_shipments - 1) + + # Handle delivery completion + if current_status == 'Delivered' and previous_status != 'Delivered': + customer.delivered_count += 1 + + # Calculate delivery time and update on-time/late counts + self._record_delivery(customer, doc) + + # Update our tracking of this shipment's status + customer.known_shipments[shipment_id] = current_status + + def _record_delivery(self, customer: CustomerState, doc: SourceShipments) -> None: + """Record delivery metrics when a shipment is delivered.""" + # Calculate days from creation to now (delivery time) + if doc.created_at: + try: + created = datetime.fromisoformat( + doc.created_at.replace('Z', '+00:00') + ) + delivery_days = (date.today() - created.date()).days + customer.total_delivery_days += max(0, delivery_days) + except (ValueError, AttributeError): + pass # Skip if date parsing fails + + # Determine if delivery was on time + if doc.expected_delivery_date: + try: + expected = datetime.fromisoformat( + doc.expected_delivery_date.replace('Z', '+00:00') + ).date() + if date.today() <= expected: + customer.on_time_count += 1 + else: + customer.late_count += 1 + except (ValueError, AttributeError): + pass # Skip if date parsing fails + + def _build_output_document( + self, + customer_id: int, + customer: CustomerState + ) -> Document: + """Construct the output document from current customer state.""" + # Calculate average delivery days, handling division by zero + avg_delivery_days = None + if customer.delivered_count > 0: + avg_delivery_days = round( + customer.total_delivery_days / customer.delivered_count, + 1 + ) + + return Document( + customer_id=customer_id, + total_shipments=customer.total_shipments, + on_time_count=customer.on_time_count, + late_count=customer.late_count, + active_shipments=customer.active_shipments, + avg_delivery_days=avg_delivery_days, + is_vip=customer.total_shipments >= self.VIP_THRESHOLD, + last_shipment_date=customer.last_shipment_date + ) + + def start_commit( + self, + start_commit: Request.StartCommit + ) -> Response.StartedCommit: + """Persist state changes at the end of the transaction. + + The runtime calls this method when it's ready to commit the current + transaction. We return the state that should be durably persisted. + + The key optimization here is merge_patch=True, which tells the runtime + to merge our updates with the existing state rather than replacing it + entirely. This means we only need to return the customers that changed, + not the entire state dictionary. + + Args: + start_commit: Contains metadata about the commit + + Returns: + State updates to persist, with merge semantics + """ + # Build the partial state update containing only touched customers. + # This is a JSON merge patch: keys present in our update replace + # the corresponding keys in the persisted state; keys we don't + # mention are left unchanged. + updated_state = { + "customers": { + str(cid): customer.model_dump() + for cid, customer in self.touched_customers.items() + } + } + + # Clear the touched set for the next transaction + self.touched_customers = {} + + return Response.StartedCommit( + state=Response.StartedCommit.State( + updated=updated_state, + merge_patch=True, # Merge with existing state, don't replace + ) + ) + + async def reset(self): + """Reset all state for testing. + + The runtime calls this between catalog tests to ensure each test + starts with a clean slate. Without this, state from one test would + leak into the next, causing flaky and confusing test failures. + """ + self.state = State() + self.touched_customers = {} \ No newline at end of file diff --git a/python-derivations/shipments-stateful/customer-metrics.schema.yaml b/python-derivations/shipments-stateful/customer-metrics.schema.yaml new file mode 100644 index 0000000..1db4ba8 --- /dev/null +++ b/python-derivations/shipments-stateful/customer-metrics.schema.yaml @@ -0,0 +1,30 @@ +type: object +properties: + customer_id: + type: integer + description: Unique identifier for the customer + total_shipments: + type: integer + description: Total number of shipments for this customer + on_time_count: + type: integer + description: Number of shipments delivered on or before expected date + late_count: + type: integer + description: Number of shipments delivered after expected date + active_shipments: + type: integer + description: Number of shipments currently in transit + avg_delivery_days: + type: [number, "null"] + description: Average days between shipment creation and delivery + is_vip: + type: boolean + description: Whether customer qualifies for VIP status (10+ shipments) + last_shipment_date: + type: [string, "null"] + format: date + description: Date of most recent shipment +required: + - customer_id + - total_shipments \ No newline at end of file diff --git a/python-derivations/shipments-stateful/flow.yaml b/python-derivations/shipments-stateful/flow.yaml new file mode 100644 index 0000000..3cb6009 --- /dev/null +++ b/python-derivations/shipments-stateful/flow.yaml @@ -0,0 +1,15 @@ +collections: + dani-demo/python-derivations/customer-metrics: + schema: customer-metrics.schema.yaml + key: + - /customer_id + derive: + using: + python: + module: customer-metrics.flow.py + transforms: + - name: shipments + source: Artificial-Industries/postgres-shipments/public/shipments + shuffle: + key: + - /customer_id \ No newline at end of file diff --git a/python-derivations/shipments-stateful/flow_generated/python/dani-demo/__init__.py b/python-derivations/shipments-stateful/flow_generated/python/dani-demo/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/python-derivations/shipments-stateful/flow_generated/python/dani-demo/python-derivations/__init__.py b/python-derivations/shipments-stateful/flow_generated/python/dani-demo/python-derivations/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/python-derivations/shipments-stateful/flow_generated/python/dani_demo/python_derivations/customer_metrics/__init__.py b/python-derivations/shipments-stateful/flow_generated/python/dani_demo/python_derivations/customer_metrics/__init__.py new file mode 100644 index 0000000..2f67aad --- /dev/null +++ b/python-derivations/shipments-stateful/flow_generated/python/dani_demo/python_derivations/customer_metrics/__init__.py @@ -0,0 +1,231 @@ +from abc import ABC, abstractmethod +import typing +import collections.abc +import pydantic + + +# Generated for published documents of derived collection dani-demo/python-derivations/customer-metrics +class Document(pydantic.BaseModel): + model_config = pydantic.ConfigDict(extra='allow') + + active_shipments: typing.Optional[int] = None + """Number of shipments currently in transit""" + avg_delivery_days: typing.Optional[typing.Union[int, float]] = None + """Average days between shipment creation and delivery""" + customer_id: int + """Unique identifier for the customer""" + is_vip: typing.Optional[bool] = None + """Whether customer qualifies for VIP status (10+ shipments)""" + last_shipment_date: typing.Optional[str] = None + """Date of most recent shipment""" + late_count: typing.Optional[int] = None + """Number of shipments delivered after expected date""" + on_time_count: typing.Optional[int] = None + """Number of shipments delivered on or before expected date""" + total_shipments: int + """Total number of shipments for this customer""" + + + +# Generated for read documents of sourced collection Artificial-Industries/postgres-shipments/public/shipments +class SourceShipmentsPublicShipments(pydantic.BaseModel): + model_config = pydantic.ConfigDict(extra='allow') + + city: typing.Optional[typing.Union[str, None]] = None + """(source type: varchar)""" + created_at: typing.Optional[typing.Union[str, None]] = None + """(source type: timestamp)""" + current_location: typing.Optional[typing.Any] = None + """(source type: composite)""" + customer_id: typing.Optional[typing.Union[int, None]] = None + """(source type: int4)""" + delivery_coordinates: typing.Optional[typing.Any] = None + """(source type: composite)""" + delivery_name: typing.Optional[typing.Union[str, None]] = None + """(source type: varchar)""" + expected_delivery_date: typing.Optional[typing.Union[str, None]] = None + """(source type: date)""" + id: int + """(source type: non-nullable int4)""" + is_priority: typing.Optional[typing.Union[bool, None]] = None + """(source type: bool)""" + order_id: typing.Optional[typing.Union[str, None]] = None + """(source type: uuid)""" + shipment_status: typing.Optional[typing.Union[str, None]] = None + """(source type: enum)""" + street_address: typing.Optional[typing.Union[str, None]] = None + """(source type: varchar)""" + updated_at: typing.Optional[typing.Union[str, None]] = None + """(source type: timestamp)""" + +class SourceShipments(pydantic.BaseModel): + class MMeta(pydantic.BaseModel): + class Before(pydantic.BaseModel): + """Record state immediately before this change was applied.""" + model_config = pydantic.ConfigDict(extra='allow') + + city: typing.Optional[typing.Union[str, None]] = None + """(source type: varchar)""" + created_at: typing.Optional[typing.Union[str, None]] = None + """(source type: timestamp)""" + current_location: typing.Optional[typing.Any] = None + """(source type: composite)""" + customer_id: typing.Optional[typing.Union[int, None]] = None + """(source type: int4)""" + delivery_coordinates: typing.Optional[typing.Any] = None + """(source type: composite)""" + delivery_name: typing.Optional[typing.Union[str, None]] = None + """(source type: varchar)""" + expected_delivery_date: typing.Optional[typing.Union[str, None]] = None + """(source type: date)""" + id: int + """(source type: non-nullable int4)""" + is_priority: typing.Optional[typing.Union[bool, None]] = None + """(source type: bool)""" + order_id: typing.Optional[typing.Union[str, None]] = None + """(source type: uuid)""" + shipment_status: typing.Optional[typing.Union[str, None]] = None + """(source type: enum)""" + street_address: typing.Optional[typing.Union[str, None]] = None + """(source type: varchar)""" + updated_at: typing.Optional[typing.Union[str, None]] = None + """(source type: timestamp)""" + + class Source(pydantic.BaseModel): + model_config = pydantic.ConfigDict(extra='allow') + + loc: tuple[int, int, int] + """Location of this WAL event as [last Commit.EndLSN; event LSN; current Begin.FinalLSN]. See https://www.postgresql.org/docs/current/protocol-logicalrep-message-formats.html""" + schema: str + """Database schema (namespace) of the event.""" + snapshot: typing.Optional[bool] = None + """Snapshot is true if the record was produced from an initial table backfill and unset if produced from the replication log.""" + table: str + """Database table of the event.""" + ts_ms: typing.Optional[int] = None + """Unix timestamp (in millis) at which this event was recorded by the database.""" + txid: typing.Optional[int] = None + """The 32-bit transaction ID assigned by Postgres to the commit which produced this change.""" + + model_config = pydantic.ConfigDict(extra='allow') + + before: typing.Optional["Before"] = None + """Record state immediately before this change was applied.""" + op: typing.Literal["c", "d", "u"] + """Change operation type: 'c' Create/Insert, 'u' Update, 'd' Delete.""" + source: "Source" + + model_config = pydantic.ConfigDict(extra='allow') + + m_meta: "MMeta" = pydantic.Field(alias="""_meta""") + city: typing.Optional[typing.Union[str, None]] = None + """(source type: varchar)""" + created_at: typing.Optional[typing.Union[str, None]] = None + """(source type: timestamp)""" + current_location: typing.Optional[typing.Any] = None + """(source type: composite)""" + customer_id: typing.Optional[typing.Union[int, None]] = None + """(source type: int4)""" + delivery_coordinates: typing.Optional[typing.Any] = None + """(source type: composite)""" + delivery_name: typing.Optional[typing.Union[str, None]] = None + """(source type: varchar)""" + expected_delivery_date: typing.Optional[typing.Union[str, None]] = None + """(source type: date)""" + id: int + """(source type: non-nullable int4)""" + is_priority: typing.Optional[typing.Union[bool, None]] = None + """(source type: bool)""" + order_id: typing.Optional[typing.Union[str, None]] = None + """(source type: uuid)""" + shipment_status: typing.Optional[typing.Union[str, None]] = None + """(source type: enum)""" + street_address: typing.Optional[typing.Union[str, None]] = None + """(source type: varchar)""" + updated_at: typing.Optional[typing.Union[str, None]] = None + """(source type: timestamp)""" + + + +class Request(pydantic.BaseModel): + + class Open(pydantic.BaseModel): + state: dict[str, typing.Any] + + class Flush(pydantic.BaseModel): + pass + + class Reset(pydantic.BaseModel): + pass + + class StartCommit(pydantic.BaseModel): + runtime_checkpoint: typing.Any = pydantic.Field(default=None, alias='runtimeCheckpoint') + + open: typing.Optional[Open] = None + flush: typing.Optional[Flush] = None + reset: typing.Optional[Reset] = None + start_commit: typing.Optional[StartCommit] = pydantic.Field(default=None, alias='startCommit') + + + class ReadShipments(pydantic.BaseModel): + doc: SourceShipments + transform: typing.Literal[0] + + read : typing.Annotated[ReadShipments, pydantic.Field(discriminator='transform')] | None = None + + @pydantic.model_validator(mode='before') + @classmethod + def inject_default_transform(cls, data: dict[str, typing.Any]) -> dict[str, typing.Any]: + if 'read' in data and 'transform' not in data['read']: + data['read']['transform'] = 0 # Make implicit default explicit + return data + + +class Response(pydantic.BaseModel): + class Opened(pydantic.BaseModel): + pass + + class Published(pydantic.BaseModel): + doc: Document + + class Flushed(pydantic.BaseModel): + pass + + class StartedCommit(pydantic.BaseModel): + + class State(pydantic.BaseModel): + updated: dict[str, typing.Any] + merge_patch: bool = False + + state: typing.Optional[State] = None + + opened: typing.Optional[Opened] = None + published: typing.Optional[Published] = None + flushed: typing.Optional[Flushed] = None + started_commit: typing.Optional[StartedCommit] = pydantic.Field(default=None, alias='startedCommit') + +class IDerivation(ABC): + """Abstract base class for derivation implementations.""" + + def __init__(self, open: Request.Open): + """Initialize the derivation with an Open message.""" + pass + + @abstractmethod + async def shipments(self, read: Request.ReadShipments) -> collections.abc.AsyncIterator[Document]: + """Transform method for 'shipments' source.""" + if False: + yield # Mark as a generator. + + async def flush(self) -> collections.abc.AsyncIterator[Document]: + """Flush any buffered documents. Override to implement pipelining.""" + if False: + yield # Mark as a generator. + + def start_commit(self, start_commit: Request.StartCommit) -> Response.StartedCommit: + """Return state updates to persist. Override to implement stateful derivations.""" + return Response.StartedCommit() + + async def reset(self): + """Reset internal state for testing. Override if needed.""" + pass diff --git a/python-derivations/shipments-stateful/pyproject.toml b/python-derivations/shipments-stateful/pyproject.toml new file mode 100644 index 0000000..5b09fa3 --- /dev/null +++ b/python-derivations/shipments-stateful/pyproject.toml @@ -0,0 +1,13 @@ +[project] +name = "dani-demo-python-derivations-customer-metrics" +version = "0.1.0" +requires-python = ">=3.12" +dependencies = [ + "pydantic>=2.0", + "pyright>=1.1", +] + +[tool.pylsp-mypy] +enabled = true + +# IDE configuration is in pyrightconfig.json diff --git a/python-derivations/shipments-stateful/pyrightconfig.json b/python-derivations/shipments-stateful/pyrightconfig.json new file mode 100644 index 0000000..7f39867 --- /dev/null +++ b/python-derivations/shipments-stateful/pyrightconfig.json @@ -0,0 +1,6 @@ +{ + "extraPaths": [ + "flow_generated/python" + ], + "typeCheckingMode": "strict" +} \ No newline at end of file diff --git a/python-derivations/shipments-stateless/flow.yaml b/python-derivations/shipments-stateless/flow.yaml new file mode 100644 index 0000000..df7b263 --- /dev/null +++ b/python-derivations/shipments-stateless/flow.yaml @@ -0,0 +1,14 @@ +collections: + dani-demo/python-derivations/processed-shipments: + schema: processed-shipments.schema.yaml + key: + - /id + derive: + using: + python: + module: processed-shipments.flow.py + transforms: + - name: shipments + source: Artificial-Industries/postgres-shipments/public/shipments + shuffle: any + backfill: 1 diff --git a/python-derivations/shipments-stateless/flow_generated/python/dani-demo/__init__.py b/python-derivations/shipments-stateless/flow_generated/python/dani-demo/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/python-derivations/shipments-stateless/flow_generated/python/dani-demo/python-derivations/__init__.py b/python-derivations/shipments-stateless/flow_generated/python/dani-demo/python-derivations/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/python-derivations/shipments-stateless/flow_generated/python/dani_demo/python_derivations/processed_shipments/__init__.py b/python-derivations/shipments-stateless/flow_generated/python/dani_demo/python_derivations/processed_shipments/__init__.py new file mode 100644 index 0000000..08cc774 --- /dev/null +++ b/python-derivations/shipments-stateless/flow_generated/python/dani_demo/python_derivations/processed_shipments/__init__.py @@ -0,0 +1,220 @@ +from abc import ABC, abstractmethod +import typing +import collections.abc +import pydantic + + +# Generated for published documents of derived collection dani-demo/python-derivations/processed-shipments +class Document(pydantic.BaseModel): + model_config = pydantic.ConfigDict(extra='allow') + + days_until_delivery: typing.Optional[int] = None + full_address: str + id: int + is_urgent: typing.Optional[bool] = None + status_summary: str + + + +# Generated for read documents of sourced collection Artificial-Industries/postgres-shipments/public/shipments +class SourceShipmentsPublicShipments(pydantic.BaseModel): + model_config = pydantic.ConfigDict(extra='allow') + + city: typing.Optional[typing.Union[str, None]] = None + """(source type: varchar)""" + created_at: typing.Optional[typing.Union[str, None]] = None + """(source type: timestamp)""" + current_location: typing.Optional[typing.Any] = None + """(source type: composite)""" + customer_id: typing.Optional[typing.Union[int, None]] = None + """(source type: int4)""" + delivery_coordinates: typing.Optional[typing.Any] = None + """(source type: composite)""" + delivery_name: typing.Optional[typing.Union[str, None]] = None + """(source type: varchar)""" + expected_delivery_date: typing.Optional[typing.Union[str, None]] = None + """(source type: date)""" + id: int + """(source type: non-nullable int4)""" + is_priority: typing.Optional[typing.Union[bool, None]] = None + """(source type: bool)""" + order_id: typing.Optional[typing.Union[str, None]] = None + """(source type: uuid)""" + shipment_status: typing.Optional[typing.Union[str, None]] = None + """(source type: enum)""" + street_address: typing.Optional[typing.Union[str, None]] = None + """(source type: varchar)""" + updated_at: typing.Optional[typing.Union[str, None]] = None + """(source type: timestamp)""" + +class SourceShipments(pydantic.BaseModel): + class MMeta(pydantic.BaseModel): + class Before(pydantic.BaseModel): + """Record state immediately before this change was applied.""" + model_config = pydantic.ConfigDict(extra='allow') + + city: typing.Optional[typing.Union[str, None]] = None + """(source type: varchar)""" + created_at: typing.Optional[typing.Union[str, None]] = None + """(source type: timestamp)""" + current_location: typing.Optional[typing.Any] = None + """(source type: composite)""" + customer_id: typing.Optional[typing.Union[int, None]] = None + """(source type: int4)""" + delivery_coordinates: typing.Optional[typing.Any] = None + """(source type: composite)""" + delivery_name: typing.Optional[typing.Union[str, None]] = None + """(source type: varchar)""" + expected_delivery_date: typing.Optional[typing.Union[str, None]] = None + """(source type: date)""" + id: int + """(source type: non-nullable int4)""" + is_priority: typing.Optional[typing.Union[bool, None]] = None + """(source type: bool)""" + order_id: typing.Optional[typing.Union[str, None]] = None + """(source type: uuid)""" + shipment_status: typing.Optional[typing.Union[str, None]] = None + """(source type: enum)""" + street_address: typing.Optional[typing.Union[str, None]] = None + """(source type: varchar)""" + updated_at: typing.Optional[typing.Union[str, None]] = None + """(source type: timestamp)""" + + class Source(pydantic.BaseModel): + model_config = pydantic.ConfigDict(extra='allow') + + loc: tuple[int, int, int] + """Location of this WAL event as [last Commit.EndLSN; event LSN; current Begin.FinalLSN]. See https://www.postgresql.org/docs/current/protocol-logicalrep-message-formats.html""" + schema: str + """Database schema (namespace) of the event.""" + snapshot: typing.Optional[bool] = None + """Snapshot is true if the record was produced from an initial table backfill and unset if produced from the replication log.""" + table: str + """Database table of the event.""" + ts_ms: typing.Optional[int] = None + """Unix timestamp (in millis) at which this event was recorded by the database.""" + txid: typing.Optional[int] = None + """The 32-bit transaction ID assigned by Postgres to the commit which produced this change.""" + + model_config = pydantic.ConfigDict(extra='allow') + + before: typing.Optional["Before"] = None + """Record state immediately before this change was applied.""" + op: typing.Literal["c", "d", "u"] + """Change operation type: 'c' Create/Insert, 'u' Update, 'd' Delete.""" + source: "Source" + + model_config = pydantic.ConfigDict(extra='allow') + + m_meta: "MMeta" = pydantic.Field(alias="""_meta""") + city: typing.Optional[typing.Union[str, None]] = None + """(source type: varchar)""" + created_at: typing.Optional[typing.Union[str, None]] = None + """(source type: timestamp)""" + current_location: typing.Optional[typing.Any] = None + """(source type: composite)""" + customer_id: typing.Optional[typing.Union[int, None]] = None + """(source type: int4)""" + delivery_coordinates: typing.Optional[typing.Any] = None + """(source type: composite)""" + delivery_name: typing.Optional[typing.Union[str, None]] = None + """(source type: varchar)""" + expected_delivery_date: typing.Optional[typing.Union[str, None]] = None + """(source type: date)""" + id: int + """(source type: non-nullable int4)""" + is_priority: typing.Optional[typing.Union[bool, None]] = None + """(source type: bool)""" + order_id: typing.Optional[typing.Union[str, None]] = None + """(source type: uuid)""" + shipment_status: typing.Optional[typing.Union[str, None]] = None + """(source type: enum)""" + street_address: typing.Optional[typing.Union[str, None]] = None + """(source type: varchar)""" + updated_at: typing.Optional[typing.Union[str, None]] = None + """(source type: timestamp)""" + + + +class Request(pydantic.BaseModel): + + class Open(pydantic.BaseModel): + state: dict[str, typing.Any] + + class Flush(pydantic.BaseModel): + pass + + class Reset(pydantic.BaseModel): + pass + + class StartCommit(pydantic.BaseModel): + runtime_checkpoint: typing.Any = pydantic.Field(default=None, alias='runtimeCheckpoint') + + open: typing.Optional[Open] = None + flush: typing.Optional[Flush] = None + reset: typing.Optional[Reset] = None + start_commit: typing.Optional[StartCommit] = pydantic.Field(default=None, alias='startCommit') + + + class ReadShipments(pydantic.BaseModel): + doc: SourceShipments + transform: typing.Literal[0] + + read : typing.Annotated[ReadShipments, pydantic.Field(discriminator='transform')] | None = None + + @pydantic.model_validator(mode='before') + @classmethod + def inject_default_transform(cls, data: dict[str, typing.Any]) -> dict[str, typing.Any]: + if 'read' in data and 'transform' not in data['read']: + data['read']['transform'] = 0 # Make implicit default explicit + return data + + +class Response(pydantic.BaseModel): + class Opened(pydantic.BaseModel): + pass + + class Published(pydantic.BaseModel): + doc: Document + + class Flushed(pydantic.BaseModel): + pass + + class StartedCommit(pydantic.BaseModel): + + class State(pydantic.BaseModel): + updated: dict[str, typing.Any] + merge_patch: bool = False + + state: typing.Optional[State] = None + + opened: typing.Optional[Opened] = None + published: typing.Optional[Published] = None + flushed: typing.Optional[Flushed] = None + started_commit: typing.Optional[StartedCommit] = pydantic.Field(default=None, alias='startedCommit') + +class IDerivation(ABC): + """Abstract base class for derivation implementations.""" + + def __init__(self, open: Request.Open): + """Initialize the derivation with an Open message.""" + pass + + @abstractmethod + async def shipments(self, read: Request.ReadShipments) -> collections.abc.AsyncIterator[Document]: + """Transform method for 'shipments' source.""" + if False: + yield # Mark as a generator. + + async def flush(self) -> collections.abc.AsyncIterator[Document]: + """Flush any buffered documents. Override to implement pipelining.""" + if False: + yield # Mark as a generator. + + def start_commit(self, start_commit: Request.StartCommit) -> Response.StartedCommit: + """Return state updates to persist. Override to implement stateful derivations.""" + return Response.StartedCommit() + + async def reset(self): + """Reset internal state for testing. Override if needed.""" + pass diff --git a/python-derivations/shipments-stateless/processed-shipments.flow.py b/python-derivations/shipments-stateless/processed-shipments.flow.py new file mode 100644 index 0000000..df206e1 --- /dev/null +++ b/python-derivations/shipments-stateless/processed-shipments.flow.py @@ -0,0 +1,34 @@ +"""Derivation implementation for dani-demo/python-derivations/processed-shipments.""" +from collections.abc import AsyncIterator +from dani_demo.python_derivations.processed_shipments import IDerivation, Document, Request + +from datetime import datetime, date + +# Implementation for derivation dani-demo/python-derivations/processed-shipments. +class Derivation(IDerivation): + async def shipments(self, read: Request.ReadShipments) -> AsyncIterator[Document]: + # Simple transformation: combine address fields and add calculated fields + doc = read.doc + + # Combine street address and city into full address + full_address = f"{doc.street_address or 'Unknown'}, {doc.city or 'Unknown'}" + + # Determine if urgent (priority shipments or certain statuses) + is_urgent = doc.is_priority or doc.shipment_status in ["delayed", "critical"] + + # Create status summary + status_summary = f"{doc.shipment_status or 'unknown'} - {'Priority' if doc.is_priority else 'Standard'}" + + # Calculate days until delivery + days_until_delivery = 0 # Default to 0 if no delivery date + if doc.expected_delivery_date: + expected = datetime.fromisoformat(doc.expected_delivery_date.replace('Z', '+00:00')) + days_until_delivery = (expected.date() - date.today()).days + + yield Document( + id=doc.id, + full_address=full_address, + is_urgent=is_urgent, + status_summary=status_summary, + days_until_delivery=days_until_delivery + ) diff --git a/python-derivations/shipments-stateless/processed-shipments.schema.yaml b/python-derivations/shipments-stateless/processed-shipments.schema.yaml new file mode 100644 index 0000000..0cdbf2c --- /dev/null +++ b/python-derivations/shipments-stateless/processed-shipments.schema.yaml @@ -0,0 +1,16 @@ +type: object +properties: + id: + type: integer + full_address: + type: string + is_urgent: + type: boolean + status_summary: + type: string + days_until_delivery: + type: integer +required: + - id + - full_address + - status_summary diff --git a/python-derivations/shipments-stateless/pyproject.toml b/python-derivations/shipments-stateless/pyproject.toml new file mode 100644 index 0000000..70bd777 --- /dev/null +++ b/python-derivations/shipments-stateless/pyproject.toml @@ -0,0 +1,13 @@ +[project] +name = "dani-demo-python-derivations-processed-shipments" +version = "0.1.0" +requires-python = ">=3.12" +dependencies = [ + "pydantic>=2.0", + "pyright>=1.1", +] + +[tool.pylsp-mypy] +enabled = true + +# IDE configuration is in pyrightconfig.json diff --git a/python-derivations/shipments-stateless/pyrightconfig.json b/python-derivations/shipments-stateless/pyrightconfig.json new file mode 100644 index 0000000..7f39867 --- /dev/null +++ b/python-derivations/shipments-stateless/pyrightconfig.json @@ -0,0 +1,6 @@ +{ + "extraPaths": [ + "flow_generated/python" + ], + "typeCheckingMode": "strict" +} \ No newline at end of file