diff --git a/elx/extensions/dagster/assets.py b/elx/extensions/dagster/assets.py index e142f57..88de115 100644 --- a/elx/extensions/dagster/assets.py +++ b/elx/extensions/dagster/assets.py @@ -1,3 +1,4 @@ +from collections.abc import Mapping from typing import Generator, Iterable, List, Sequence from elx import Runner from dagster import ( @@ -22,6 +23,7 @@ def load_assets( deps: Iterable[AssetKey | str | Sequence[str] | AssetsDefinition | SourceAsset | AssetDep] | None = None, key_prefix: str | Sequence[str] | None = None, group_name: str | None = None, + tags: Mapping[str, str] | None = None, ) -> List[AssetsDefinition]: """ Load the assets for a runner, each asset represents one tap target combination. @@ -31,6 +33,7 @@ def load_assets( deps (Iterable[AssetKey | str | Sequence[str] | AssetsDefinition | SourceAsset | AssetDep] | None): Upstream assets upon which the assets depend. key_prefix (str | Sequence[str] | None): Key prefix for the assets. If not provided, defaults to the tap executable name. group_name (str | None): Group name for the assets. If not provided, defaults to the tap executable name. + tags (Mapping[str, str] | None): Tags for filtering and organizing. These tags are not attached to runs of the asset. Returns: List[AssetsDefinition]: The assets. @@ -85,6 +88,7 @@ def run(context: OpExecutionContext) -> Generator[Output, None, None]: description=generate_description(runner=runner, stream=stream), key_prefix=key_prefix or dagster_safe_name(runner.tap.executable), code_version=runner.tap.hash_key, + tags=tags, ) for stream in runner.tap.catalog.streams if stream.is_selected