Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion elx/extensions/dagster/assets.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from collections.abc import Mapping
from typing import Generator, Iterable, List, Mapping, Sequence
from elx import Runner
from dagster import (
Expand All @@ -22,6 +23,7 @@ def load_assets(
deps: Iterable[AssetKey | str | Sequence[str] | AssetsDefinition | SourceAsset | AssetDep] | None = None,
key_prefix: str | Sequence[str] | None = None,
group_name: str | None = None,
tags: Mapping[str, str] | None = None,
) -> List[AssetsDefinition]:
"""
Load the assets for a runner, each asset represents one tap target combination.
Expand All @@ -31,6 +33,7 @@ def load_assets(
deps (Iterable[AssetKey | str | Sequence[str] | AssetsDefinition | SourceAsset | AssetDep] | None): Upstream assets upon which the assets depend.
key_prefix (str | Sequence[str] | None): Key prefix for the assets. If not provided, defaults to the tap executable name.
group_name (str | None): Group name for the assets. If not provided, defaults to the tap executable name.
tags (Mapping[str, str] | None): Tags for filtering and organizing. These tags are not attached to runs of the asset.

Returns:
List[AssetsDefinition]: The assets.
Expand Down Expand Up @@ -97,8 +100,9 @@ def run(context: OpExecutionContext) -> Generator[Output, None, None]:
dagster_safe_name(stream.name): AssetOut(
is_required=False,
description=generate_description(runner=runner, stream=stream),
key_prefix=key_prefix or dagster_safe_name(runner.tap.executable),
key_prefix=key_prefix,
code_version=runner.tap.hash_key,
tags=tags,
)
for stream in runner.tap.catalog.streams
if stream.is_selected
Expand Down
4 changes: 2 additions & 2 deletions elx/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,13 @@ def state_client_factory(base_path: str) -> StateClient:


class StateManager:
def __init__(self, base_path: str = ".") -> None:
def __init__(self, base_path: str = ".", state_client: StateClient | None = None) -> None:
"""
Args:
base_path (str): The base path to store state files in. Defaults to "./state".
"""
self.base_path = base_path
self.state_client = state_client_factory(base_path)
self.state_client = state_client or state_client_factory(base_path)

def load(self, state_file_name: str) -> dict:
"""
Expand Down
Loading
Loading