Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions elx/extensions/dagster/assets.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from collections.abc import Mapping
from typing import Generator, Iterable, List, Sequence
from elx import Runner
from dagster import (
Expand All @@ -22,6 +23,7 @@ def load_assets(
deps: Iterable[AssetKey | str | Sequence[str] | AssetsDefinition | SourceAsset | AssetDep] | None = None,
key_prefix: str | Sequence[str] | None = None,
group_name: str | None = None,
tags: Mapping[str, str] | None = None,
) -> List[AssetsDefinition]:
"""
Load the assets for a runner, each asset represents one tap target combination.
Expand All @@ -31,6 +33,7 @@ def load_assets(
deps (Iterable[AssetKey | str | Sequence[str] | AssetsDefinition | SourceAsset | AssetDep] | None): Upstream assets upon which the assets depend.
key_prefix (str | Sequence[str] | None): Key prefix for the assets. If not provided, defaults to the tap executable name.
group_name (str | None): Group name for the assets. If not provided, defaults to the tap executable name.
tags (Mapping[str, str] | None): Tags for filtering and organizing. These tags are not attached to runs of the asset.

Returns:
List[AssetsDefinition]: The assets.
Expand Down Expand Up @@ -85,6 +88,7 @@ def run(context: OpExecutionContext) -> Generator[Output, None, None]:
description=generate_description(runner=runner, stream=stream),
key_prefix=key_prefix or dagster_safe_name(runner.tap.executable),
code_version=runner.tap.hash_key,
tags=tags,
)
for stream in runner.tap.catalog.streams
if stream.is_selected
Expand Down
Loading