diff --git a/docs/learn/feature-discovery.md b/docs/learn/feature-discovery.md index 6816d4dd7..53a4c1e23 100644 --- a/docs/learn/feature-discovery.md +++ b/docs/learn/feature-discovery.md @@ -1,142 +1,14 @@ # Feature Discovery -Metaxy provides automatic feature discovery through Python's entrypoint system. This enables modular architecture patterns essential for scaling Metaxy projects. +In Metaxy, feature definitions are associated with Metaxy projects. +Feature definitions may come from two sources: -## Why Feature Discovery? +- all feature classes from the current Python project -Manual feature registration doesn't scale. As your system grows, you need: +- feature definitions previously pushed to the metadata store (the project they belong to has been recorded at push time) -- **Plugin architectures** - Third-party teams contribute features without modifying core code -- **Feature collections** - Package and distribute related features as installable units -- **Monorepo support** - Discover features across multiple packages in a monorepo -- **Internal packages** - Share features between projects via private package registries +!!! tip -Feature discovery solves these problems through automatic registration at import time. + To push your project, use `metaxy graph push` CLI -## Package Entry Points - -The most powerful discovery mechanism uses Python's standard entry point system via a well-known `"metaxy.project"` entrypoint group in the package metadata. - -### Creating a Feature Plugin - -Structure your feature package: - -``` -my-video-features/ -├── pyproject.toml -└── src/ - └── my_video_features/ - ├── __init__.py - ├── detection.py - └── transcription.py -``` - -Declare entry points in `pyproject.toml`: - -```toml -[project] -name = "my-video-features" -version = "1.0.0" -dependencies = ["metaxy"] - -[project.entry-points."metaxy.project"] -my-video-features = "my_video_features" -``` - -The entry point name is your project name. The value can be either: - -- **Function syntax** (`module:function`) - Points to a callable function that will be invoked to load features. Useful when you need conditional loading or setup logic. -- **Module syntax** (`module`) - Points directly to a module containing Feature definitions. Simply importing the module registers the features. - -!!! warning "One Entry Point Per Package" - - Each package can only declare **one** entry point in the `metaxy.project` group, since `metaxy.toml` only supports a single `project` field. - - To organize features into logical groups within a package, use submodules and import them from your entry point function. - -### Installing and Using Feature Plugins - -Install the package: - -```bash -pip install my-video-features -# Or in a monorepo: -pip install -e ./packages/my-video-features -``` - -!!! warning "UV Package Manager: Entry Point Changes" - - If you're using `uv` and modify entry points in `pyproject.toml`, `uv sync` will **not** recreate the editable package metadata. You must explicitly reinstall: - - ```bash - uv sync --reinstall-package my-video-features my-video-features - ``` - -## Monorepo Patterns - -In monorepos, use entry points to manage feature collections across teams: - -### Team-Owned Feature Packages - -``` -monorepo/ -├── packages/ -│ ├── core-features/ -│ │ └── pyproject.toml # [project.entry-points."metaxy.features"] -│ ├── ml-features/ -│ │ └── pyproject.toml # [project.entry-points."metaxy.features"] -│ └── experimental-features/ -│ └── pyproject.toml # [project.entry-points."metaxy.features"] -└── apps/ - └── main-pipeline/ - └── pyproject.toml # depends on feature packages -``` - -Each team maintains their features independently: - -```toml -# packages/ml-features/pyproject.toml -[project.entry-points."metaxy.project"] -ml-features = "ml_features.load" -``` - -```toml -# packages/core-features/pyproject.toml -[project.entry-points."metaxy.project"] -core-features = "core_features.load" -``` - -The main application imports features from all installed packages, and each feature automatically knows its project based on the entry point. - -## Config-Based Discovery - -For simpler use cases that don't require distribution, you can specify module paths directly in configuration: - -=== "metaxy.toml" - - ```toml - project = "my-project" - entrypoints = [ - "myapp.features.video", - "myapp.features.audio", - ] - ``` - -=== "pyproject.toml" - - ```toml - [tool.metaxy] - project = "my-project" - entrypoints = [ - "myapp.features.video", - "myapp.features.audio", - ] - ``` - -## Best Practices - -1. **Use entry points for distribution** - Any features intended for reuse should use entry points -2. **Version your feature packages** - Use semantic versioning for feature collections -3. **Test in isolation** - Load feature packages into test graphs to verify behavior - -The entry point system transforms feature management from a manual process to an automatic, scalable system that grows with your organization. +This means that the Metaxy project name should always match the Python project name. diff --git a/src/metaxy/metadata_store/base.py b/src/metaxy/metadata_store/base.py index ff61477c0..0ef731068 100644 --- a/src/metaxy/metadata_store/base.py +++ b/src/metaxy/metadata_store/base.py @@ -44,7 +44,8 @@ METAXY_PROVENANCE_BY_FIELD, METAXY_SNAPSHOT_VERSION, ) -from metaxy.models.feature import BaseFeature, FeatureGraph, current_graph +from metaxy.models.feature import BaseFeature, FeatureGraph +from metaxy.models.feature_spec import FeatureSpec from metaxy.models.plan import FeaturePlan from metaxy.models.types import ( CoercibleToFeatureKey, @@ -209,6 +210,12 @@ def __init__( ) self.versioning_engine_cls = versioning_engine_cls + # Hydrated graph: the active graph enriched with FeatureDefinitions from this store + self._hydrated_graph: FeatureGraph | None = None + self._hydrated_graph_source_uid: str | None = ( + None # Track which graph instance was hydrated + ) + # Resolve auto_create_tables from global config if not explicitly provided if auto_create_tables is None: from metaxy.config import MetaxyConfig @@ -225,6 +232,73 @@ def __init__( self.fallback_stores = fallback_stores or [] + @property + def graph(self) -> FeatureGraph: + """Get the hydrated graph for this store. + + Returns a FeatureGraph that has been enriched with FeatureDefinitions + from this store's system storage. The hydration happens lazily on first access + and is cached until the active graph changes. + + This graph should be used for all operations instead of FeatureGraph.get_active() + to ensure external feature definitions are available. + + Raises: + StoreNotOpenError: If the store is not open. Hydration requires + reading from the system storage. + """ + self._check_open() + + active_graph = FeatureGraph.get_active() + + # Check if we need to hydrate (new graph instance or not hydrated yet) + if ( + self._hydrated_graph is None + or self._hydrated_graph_source_uid != active_graph.instance_uid + ): + self._hydrate_graph(active_graph) + + assert self._hydrated_graph is not None + return self._hydrated_graph + + def _hydrate_graph(self, source_graph: FeatureGraph) -> None: + """Hydrate a copy of the source graph with FeatureDefinitions from this store. + + Loads all feature definitions from this store's system storage (not fallback stores) + and adds them to a copy of the source graph. + + Args: + source_graph: The active FeatureGraph to use as a base + """ + # Create a new graph and populate it using add_feature() + hydrated = FeatureGraph() + hydrated.instance_uid = source_graph.instance_uid # Preserve the source UID + + # Add all features from source graph + # Use Feature classes where available (they create proper definitions) + # Otherwise use the definitions directly + for key, definition in source_graph.definitions_by_key.items(): + if key in source_graph._feature_classes: + hydrated.add_feature(source_graph._feature_classes[key]) + else: + hydrated.add_feature(definition) + + # Load feature definitions from this store's system storage + # Only if the store is open and has the system table + assert self._is_open, "_hydrate_graph must be called within an open store" + from metaxy.metadata_store.system.storage import SystemTableStorage + + system_storage = SystemTableStorage(self) + # Load all definitions without filters - hydrate from THIS store only + stored_definitions = system_storage.load_external_feature_definitions() + + for definition in stored_definitions: + # add_feature handles duplicates gracefully (warns and skips) + hydrated.add_feature(definition) + + self._hydrated_graph = hydrated + self._hydrated_graph_source_uid = source_graph.instance_uid + @classmethod @abstractmethod def config_model(cls) -> type[MetadataStoreConfig]: @@ -408,28 +482,29 @@ def resolve_update( # Convert global_filters to a list for easy concatenation global_filter_list = list(global_filters) if global_filters else [] - graph = current_graph() - plan = graph.get_feature_plan(feature.spec().key) + # Resolve feature key once and use throughout + feature_key = self._resolve_feature_key(feature) + plan = self.graph.get_feature_plan(feature_key) # Root features without samples: error (samples required) if not plan.deps and samples_nw is None: raise ValueError( - f"Feature {feature.spec().key} has no upstream dependencies (root feature). " + f"Feature {feature_key.to_string()} has no upstream dependencies (root feature). " f"Must provide 'samples' parameter with sample_uid and {METAXY_PROVENANCE_BY_FIELD} columns. " f"Root features require manual {METAXY_PROVENANCE_BY_FIELD} computation." ) # Combine feature-specific filters with global filters current_feature_filters = [ - *normalized_filters.get(feature.spec().key, []), + *normalized_filters.get(feature_key, []), *global_filter_list, ] current_metadata = self.read_metadata_in_store( - feature, + feature_key, filters=[ nw.col(METAXY_FEATURE_VERSION) - == graph.get_feature_version(feature.spec().key), + == self.graph.get_feature_version(feature_key), *current_feature_filters, ], ) @@ -617,13 +692,17 @@ def read_metadata( Read metadata with optional fallback to upstream stores. Args: - feature: Feature to read metadata for + feature: Feature to read metadata for. Can be: + - A feature key (string, tuple, FeatureKey) + - A Feature class + - A FeatureSpec (for direct spec access - no graph lookup needed if spec has no deps) feature_version: Explicit feature_version to filter by (mutually exclusive with current_only=True) filters: Sequence of Narwhals filter expressions to apply to this feature. Example: `[nw.col("x") > 10, nw.col("y") < 5]` columns: Subset of columns to include. Metaxy's system columns are always included. allow_fallback: If `True`, check fallback stores on local miss - current_only: If `True`, only return rows with current feature_version + current_only: If `True`, only return rows with current feature_version. + For FeatureSpec with no deps, this should be False (no graph to look up version). latest_only: Whether to deduplicate samples within `id_columns` groups ordered by `metaxy_created_at`. Returns: @@ -657,7 +736,7 @@ def read_metadata( # Add feature_version filter only when needed if current_only or feature_version is not None and not is_system_table: version_filter = nw.col(METAXY_FEATURE_VERSION) == ( - current_graph().get_feature_version(feature_key) + self.graph.get_feature_version(feature_key) if current_only else feature_version ) @@ -676,7 +755,7 @@ def read_metadata( lazy_frame = None try: lazy_frame = self.read_metadata_in_store( - feature, filters=filters, columns=read_columns + feature_key, filters=filters, columns=read_columns ) except FeatureNotFoundError as e: # do not read system features from fallback stores @@ -694,12 +773,14 @@ def read_metadata( if lazy_frame is not None and not is_system_table and latest_only: from metaxy.models.constants import METAXY_CREATED_AT + # Get id_columns from plan - _resolve_feature_plan handles FeatureSpec with no deps directly + plan = self._resolve_feature_plan(feature) + id_columns = list(plan.feature.id_columns) + # Apply deduplication lazy_frame = self.versioning_engine_cls.keep_latest_by_group( df=lazy_frame, - group_columns=list( - self._resolve_feature_plan(feature_key).feature.id_columns - ), + group_columns=id_columns, timestamp_column=METAXY_CREATED_AT, ) @@ -861,8 +942,7 @@ def write_metadata_multi( } # Get reverse topological order (dependents first) - graph = current_graph() - sorted_keys = graph.topological_sort_features( + sorted_keys = self.graph.topological_sort_features( list(resolved_metadata.keys()), descending=True ) @@ -1111,12 +1191,19 @@ def _resolve_feature_key(self, feature: CoercibleToFeatureKey) -> FeatureKey: return ValidatedFeatureKeyAdapter.validate_python(feature) def _resolve_feature_plan(self, feature: CoercibleToFeatureKey) -> FeaturePlan: - """Resolve to FeaturePlan for dependency resolution.""" + """Resolve to FeaturePlan for dependency resolution. + + If a FeatureSpec with no dependencies is passed directly, creates + a plan without graph lookup. Otherwise looks up in the graph. + """ + # If it's a FeatureSpec with no deps, create plan directly (no graph lookup) + if isinstance(feature, FeatureSpec) and not feature.deps: + return FeaturePlan(feature=feature, deps=None) + # First resolve to FeatureKey feature_key = self._resolve_feature_key(feature) - # Then get the plan - graph = current_graph() - return graph.get_feature_plan(feature_key) + # Then get the plan from the graph + return self.graph.get_feature_plan(feature_key) # ========== Core CRUD Operations ========== @@ -1248,25 +1335,11 @@ def _add_system_columns( if columns_to_drop: df = df.drop(*columns_to_drop) - # Get current feature version and snapshot_version from code and add them - # Use duck typing to avoid Ray serialization issues with issubclass - if ( - isinstance(feature, type) - and hasattr(feature, "feature_version") - and callable(feature.feature_version) - ): - current_feature_version = feature.feature_version() - else: - from metaxy import get_feature_by_key - - feature_cls = get_feature_by_key(feature_key) - current_feature_version = feature_cls.feature_version() - - # Get snapshot_version from active graph - from metaxy.models.feature import FeatureGraph - - graph = FeatureGraph.get_active() - current_snapshot_version = graph.snapshot_version + # Get current feature version and snapshot_version from the graph + # Use FeatureDefinition from the graph - this works for both local and external features + # Get feature_version from the definition in the graph + current_feature_version = self.graph.get_feature_version(feature_key) + current_snapshot_version = self.graph.snapshot_version df = df.with_columns( [ diff --git a/src/metaxy/metadata_store/system/keys.py b/src/metaxy/metadata_store/system/keys.py index eb07e99c6..790b2ffc4 100644 --- a/src/metaxy/metadata_store/system/keys.py +++ b/src/metaxy/metadata_store/system/keys.py @@ -1,5 +1,8 @@ """System table keys and constants.""" +from metaxy.models.feature_spec import FeatureSpec +from metaxy.models.field import FieldKey, FieldSpec +from metaxy.models.plan import FeaturePlan from metaxy.models.types import FeatureKey METAXY_SYSTEM_KEY_PREFIX = "metaxy-system" @@ -7,3 +10,31 @@ # System table keys FEATURE_VERSIONS_KEY = FeatureKey([METAXY_SYSTEM_KEY_PREFIX, "feature_versions"]) EVENTS_KEY = FeatureKey([METAXY_SYSTEM_KEY_PREFIX, "events"]) + + +def _create_system_spec(key: FeatureKey, id_columns: tuple[str, ...]) -> FeatureSpec: + """Create a minimal FeatureSpec for a system table.""" + return FeatureSpec( + key=key, + id_columns=id_columns, + fields=[FieldSpec(key=FieldKey([col]), code_version="1") for col in id_columns], + # No deps - system tables are root features + ) + + +def _create_system_plan(spec: FeatureSpec) -> FeaturePlan: + """Create a minimal FeaturePlan for a system table (no dependencies).""" + return FeaturePlan(feature=spec, deps=None) + + +# System FeatureSpecs (for versioning engine operations) +# feature_versions uses compound ID: (feature_key, full_definition_version) +# This preserves history while allowing latest lookups +FEATURE_VERSIONS_SPEC = _create_system_spec( + FEATURE_VERSIONS_KEY, ("feature_key", "metaxy_full_definition_version") +) +EVENTS_SPEC = _create_system_spec(EVENTS_KEY, ("event_id",)) + +# System FeaturePlans (for versioning engine context) +FEATURE_VERSIONS_PLAN = _create_system_plan(FEATURE_VERSIONS_SPEC) +EVENTS_PLAN = _create_system_plan(EVENTS_SPEC) diff --git a/src/metaxy/metadata_store/system/storage.py b/src/metaxy/metadata_store/system/storage.py index cf9f83163..5aa6e0c30 100644 --- a/src/metaxy/metadata_store/system/storage.py +++ b/src/metaxy/metadata_store/system/storage.py @@ -36,6 +36,7 @@ if TYPE_CHECKING: from metaxy.metadata_store import MetadataStore + from metaxy.models.feature_definition import FeatureDefinition class SystemTableStorage: @@ -919,3 +920,112 @@ def load_graph_from_snapshot( class_path_overrides=class_path_overrides, force_reload=force_reload, ) + + def load_external_feature_definitions( + self, + feature_keys: set[FeatureKey] | None = None, + projects: set[str] | None = None, + ) -> list[FeatureDefinition]: + """Load feature definitions from the metadata store for external features. + + This method loads FeatureDefinition objects for external features that + are not available as Python classes in the current environment. It queries + the feature_versions system table and reconstructs definitions from stored + metadata. + + **Important**: Always filters out the current project - external features + should not include features from the current project. + + This is used to load external dependencies when: + - A feature in the current project depends on features from other projects + - The external feature's Python class is not importable + - Only the stored metadata is available + + Args: + feature_keys: Optional set of specific feature keys to load. + If provided, only loads definitions for these features. + projects: Optional set of project names to filter by. + If provided, only loads definitions from these projects. + The current project is always excluded regardless of this filter. + + Returns: + List of FeatureDefinition objects loaded from storage. + Returns empty list if no matching definitions are found. + + Note: + The store must already be open when calling this method. + """ + from metaxy.config import MetaxyConfig + from metaxy.models.feature_definition import FeatureDefinition + from metaxy.models.feature_spec import FeatureSpec + + # Get current project to exclude it + current_project = MetaxyConfig.get().project + + # Read system metadata + sys_meta = self._read_system_metadata(FEATURE_VERSIONS_KEY) + + # Always filter out the current project - we only want external features + sys_meta = sys_meta.filter(nw.col("project") != current_project) + + # Apply additional project filter if specified + if projects is not None and len(projects) > 0: + sys_meta = sys_meta.filter(nw.col("project").is_in(list(projects))) + + # Apply feature key filter if specified + if feature_keys is not None and len(feature_keys) > 0: + feature_key_strings = [k.to_string() for k in feature_keys] + sys_meta = sys_meta.filter(nw.col("feature_key").is_in(feature_key_strings)) + + # Deduplicate - keep latest recorded_at per feature_key using versioning engine + from metaxy.metadata_store.system.keys import FEATURE_VERSIONS_PLAN + + with self.store.create_versioning_engine( + plan=FEATURE_VERSIONS_PLAN, implementation=sys_meta.implementation + ) as engine: + sys_meta = engine.keep_latest_by_group( + df=sys_meta, + group_columns=["feature_key"], + timestamp_column="recorded_at", + ) + + # Collect at the very end + features_df = sys_meta.collect().to_polars() + + if features_df.height == 0: + return [] + + # Convert to FeatureDefinition objects + definitions: list[FeatureDefinition] = [] + for row in features_df.iter_rows(named=True): + # Parse the stored data using pydantic for feature_spec + feature_spec_json = row["feature_spec"] + spec = ( + FeatureSpec.model_validate_json(feature_spec_json) + if isinstance(feature_spec_json, str) + else FeatureSpec.model_validate(feature_spec_json) + ) + + feature_schema_json = row["feature_schema"] + if feature_schema_json is None: + feature_schema: dict[str, Any] = {} + elif isinstance(feature_schema_json, str): + import json + + feature_schema = json.loads(feature_schema_json) + else: + feature_schema = feature_schema_json + + # Build FeatureDefinition directly - fail fast on missing fields + definition = FeatureDefinition( + spec=spec, + feature_schema=feature_schema, + project_name=row["project"], + feature_version=row["metaxy_feature_version"], + feature_code_version=spec.code_version, + feature_definition_version=row["metaxy_full_definition_version"], + feature_class_path=row["feature_class_path"], + ) + definitions.append(definition) + + return definitions diff --git a/src/metaxy/models/feature.py b/src/metaxy/models/feature.py index 0702ab95f..e3fca298a 100644 --- a/src/metaxy/models/feature.py +++ b/src/metaxy/models/feature.py @@ -76,126 +76,150 @@ def get_feature_by_key(key: CoercibleToFeatureKey) -> type["BaseFeature"]: class FeatureGraph: def __init__(self): - # Feature class storage (when Python class is available) - self.features_by_key: dict[FeatureKey, type[BaseFeature]] = {} - # Feature definition storage (central data structure for all features) + import uuid + + # Unique identifier for this graph instance + self.instance_uid: str = str(uuid.uuid4()) + # Primary storage: FeatureDefinition objects for ALL features self.definitions_by_key: dict[FeatureKey, FeatureDefinition] = {} - # Feature specs (derived from definitions for convenience) - self.feature_specs_by_key: dict[FeatureKey, FeatureSpec] = {} - # Standalone specs registered without Feature classes (for migrations) - # Note: This is being phased out in favor of definitions_by_key - self.standalone_specs_by_key: dict[FeatureKey, FeatureSpec] = {} + # Feature class references (for get_feature_by_key) - will be removed later + self._feature_classes: dict[FeatureKey, type[BaseFeature]] = {} + # Pending specs: temporarily stores specs during add_feature before definition is created + # This is needed because version computation requires the spec to be accessible + self._pending_specs: dict[FeatureKey, FeatureSpec] = {} @property - def all_specs_by_key(self) -> dict[FeatureKey, FeatureSpec]: - return {**self.feature_specs_by_key, **self.standalone_specs_by_key} - - def add_feature(self, feature: type["BaseFeature"]) -> None: - """Add a feature to the graph. - - Creates and stores a FeatureDefinition internally, along with the - Feature class reference. + def features_by_key(self) -> dict[FeatureKey, type["BaseFeature"]]: + """Access to feature classes (for backward compatibility). - Args: - feature: Feature class to register + Deprecated: Use get_feature_by_key() instead. Will be removed later. """ - import warnings - - key = feature.spec().key - if key in self.features_by_key: - existing = self.features_by_key[key] - warnings.warn( - f"Feature with key {key.to_string()} already registered. " - f"Existing: {existing.__name__}, New: {feature.__name__}. " - f"Ignoring duplicate registration.", - stacklevel=2, - ) - return + return self._feature_classes - # Validate that there are no duplicate column names across dependencies after renaming - if feature.spec().deps: - self._validate_no_duplicate_columns(feature.spec()) - - # Store Feature class first - self.features_by_key[key] = feature - # Store spec before creating definition (needed for version computation) - self.feature_specs_by_key[key] = feature.spec() - # Create and store FeatureDefinition (requires spec to be in graph for version computation) - self.definitions_by_key[key] = feature.to_definition() - - def add_feature_spec(self, spec: FeatureSpec) -> None: - import warnings + @property + def feature_specs_by_key(self) -> dict[FeatureKey, FeatureSpec]: + """Get specs derived from definitions (for backward compatibility). - # Check if a Feature class already exists for this key - if spec.key in self.features_by_key: - warnings.warn( - f"Feature class already exists for key {spec.key.to_string()}. " - f"Standalone spec will be ignored - Feature class takes precedence.", - stacklevel=2, - ) - return + Also includes pending specs that are being processed during add_feature. + """ + result = {key: defn.spec for key, defn in self.definitions_by_key.items()} + # Include pending specs (during add_feature before definition is created) + result.update(self._pending_specs) + return result - # Check if a standalone spec already exists - if spec.key in self.standalone_specs_by_key: - existing = self.standalone_specs_by_key[spec.key] - # Only warn if it's a different spec (by comparing feature_spec_version) - if existing.feature_spec_version != spec.feature_spec_version: - raise ValueError( - f"Standalone spec for key {spec.key.to_string()} already exists " - f"with a different version." - ) + @property + def all_specs_by_key(self) -> dict[FeatureKey, FeatureSpec]: + """Alias for feature_specs_by_key (backward compatibility).""" + return self.feature_specs_by_key - # Validate that there are no duplicate columns across dependencies after renaming - if spec.deps: - self._validate_no_duplicate_columns(spec) + @property + def standalone_specs_by_key(self) -> dict[FeatureKey, FeatureSpec]: + """Specs without Feature classes (backward compatibility). - # Store standalone spec - self.standalone_specs_by_key[spec.key] = spec - # Also add to feature_specs_by_key for methods that only need the spec - self.feature_specs_by_key[spec.key] = spec + Returns specs for definitions that don't have a Feature class. + """ + return { + key: defn.spec + for key, defn in self.definitions_by_key.items() + if key not in self._feature_classes + } - def add_definition(self, definition: "FeatureDefinition") -> None: - """Add an external feature definition (no Feature class available). + def add_feature( + self, + feature: "type[BaseFeature] | FeatureDefinition | FeatureSpec", + ) -> None: + """Add a feature to the graph. - This is used for loading feature definitions from external projects - or from stored metadata when the Feature class is not importable. + Unified method that accepts: + - Feature class (type[BaseFeature]) - creates FeatureDefinition and stores both + - FeatureDefinition - stores directly (for external features) + - FeatureSpec - creates minimal FeatureDefinition (for migrations) - Local features (those with Feature classes) always take precedence. + Local features (Feature classes) always take precedence over external definitions. Args: - definition: FeatureDefinition to add to the graph + feature: Feature class, FeatureDefinition, or FeatureSpec to register """ import warnings - key = definition.key + from metaxy.models.feature_definition import FeatureDefinition - # Local features always take precedence - if key in self.features_by_key: - warnings.warn( - f"Feature class already exists for key {key.to_string()}. " - f"External definition will be ignored - local features take precedence.", - stacklevel=2, + # Determine the key and what we're adding + if isinstance(feature, FeatureDefinition): + key = feature.key + definition = feature + feature_class = None + elif isinstance(feature, FeatureSpec): + key = feature.key + # Create minimal FeatureDefinition from spec + definition = FeatureDefinition( + spec=feature, + feature_schema={}, # No schema for standalone specs + project_name="", # Unknown project + feature_version=feature.feature_spec_version, # Use spec version + feature_code_version=feature.code_version, + feature_definition_version=feature.feature_spec_version, + feature_class_path="", # No class path ) - return + feature_class = None + else: + # It's a Feature class - need to temporarily add spec for version computation + key = feature.spec().key + spec = feature.spec() - # Check if a definition already exists + # Add spec to pending before creating definition (needed for version computation) + self._pending_specs[key] = spec + try: + definition = feature.to_definition() + finally: + # Remove from pending after definition is created + self._pending_specs.pop(key, None) + + feature_class = feature + + # Check for duplicates if key in self.definitions_by_key: - warnings.warn( - f"Definition for key {key.to_string()} already exists. " - f"Ignoring duplicate definition.", - stacklevel=2, - ) - return + # If we're adding a Feature class and one already exists + if feature_class is not None and key in self._feature_classes: + existing = self._feature_classes[key] + warnings.warn( + f"Feature with key {key.to_string()} already registered. " + f"Existing: {existing.__name__}, New: {feature_class.__name__}. " + f"Ignoring duplicate registration.", + stacklevel=2, + ) + return + # If we're adding a Feature class and only definition exists, class takes precedence + elif feature_class is not None: + # Update definition and add class + pass # Continue to store + # If we're adding a definition/spec and class already exists, class takes precedence + elif key in self._feature_classes: + warnings.warn( + f"Feature class already exists for key {key.to_string()}. " + f"External definition will be ignored - local features take precedence.", + stacklevel=2, + ) + return + # If we're adding a definition/spec and one already exists + else: + warnings.warn( + f"Definition for key {key.to_string()} already exists. " + f"Ignoring duplicate definition.", + stacklevel=2, + ) + return - # Validate that there are no duplicate columns across dependencies after renaming + # Validate that there are no duplicate column names across dependencies if definition.spec.deps: self._validate_no_duplicate_columns(definition.spec) - # Store the definition + # Store the definition (primary storage) self.definitions_by_key[key] = definition - # Also add spec for convenience - self.feature_specs_by_key[key] = definition.spec - # Note: features_by_key is NOT populated for external definitions + + # Store the Feature class reference if available (secondary storage) + if feature_class is not None: + self._feature_classes[key] = feature_class def get_definition(self, key: CoercibleToFeatureKey) -> "FeatureDefinition": """Get FeatureDefinition for a feature key. @@ -208,13 +232,17 @@ def get_definition(self, key: CoercibleToFeatureKey) -> "FeatureDefinition": FeatureDefinition for the requested feature Raises: - KeyError: If no feature definition exists for the given key + KeyError: If no feature definition exists for the given key. + This can happen when the feature is from an external project + and hasn't been loaded. Use a MetadataStore to access external + feature definitions. """ validated_key = ValidatedFeatureKeyAdapter.validate_python(key) if validated_key not in self.definitions_by_key: raise KeyError( - f"No feature definition for {validated_key.to_string()}. " - f"Available keys: {[k.to_string() for k in self.definitions_by_key.keys()]}" + f"No feature definition for '{validated_key.to_string()}' in this graph. " + f"If this is an external feature, use MetadataStore.graph to access " + f"feature definitions that include external dependencies loaded from storage." ) return self.definitions_by_key[validated_key] @@ -234,7 +262,76 @@ def has_feature_class(self, key: CoercibleToFeatureKey) -> bool: FeatureDefinition is available. """ validated_key = ValidatedFeatureKeyAdapter.validate_python(key) - return validated_key in self.features_by_key + return validated_key in self._feature_classes + + def get_external_dependency_projects(self, current_project: str) -> set[str]: + """Get the set of external project names that features in this graph depend on. + + This identifies which external projects need to have their feature definitions + loaded from the metadata store before version computation can proceed. + + Args: + current_project: The current project name (features from this project are not external) + + Returns: + Set of project names for external dependencies + """ + from metaxy.models.feature_spec import FeatureDep + + external_projects: set[str] = set() + + for key, spec in self.feature_specs_by_key.items(): + if not spec.deps: + continue + + for dep in spec.deps: + if not isinstance(dep, FeatureDep): + continue + + dep_key = dep.feature + + # Check if the dependency is in the graph + if dep_key in self.definitions_by_key: + # Get the project from the definition + dep_definition = self.definitions_by_key[dep_key] + if dep_definition.project_name != current_project: + external_projects.add(dep_definition.project_name) + elif dep_key not in self.feature_specs_by_key: + # Dependency not in graph at all - this is an unresolved external dependency + # We don't know the project yet, but we'll need to load it + # For now, mark it as needing external resolution + # The project will be determined when we query the system table + pass + + return external_projects + + def get_unresolved_dependencies(self) -> set[FeatureKey]: + """Get feature keys for dependencies that are not in the graph. + + These are dependencies that need to be loaded from external sources. + + Returns: + Set of FeatureKey objects for unresolved dependencies + """ + from metaxy.models.feature_spec import FeatureDep + + unresolved: set[FeatureKey] = set() + + for key, spec in self.feature_specs_by_key.items(): + if not spec.deps: + continue + + for dep in spec.deps: + if not isinstance(dep, FeatureDep): + continue + + dep_key = dep.feature + + # Check if the dependency is in the graph + if dep_key not in self.definitions_by_key: + unresolved.add(dep_key) + + return unresolved def _validate_no_duplicate_columns(self, spec: "FeatureSpec") -> None: """Validate that there are no duplicate column names across dependencies after renaming. @@ -377,10 +474,10 @@ def _validate_no_duplicate_columns(self, spec: "FeatureSpec") -> None: def remove_feature(self, key: CoercibleToFeatureKey) -> None: """Remove a feature from the graph. - Removes Feature class, FeatureDefinition, or standalone spec (whichever exists). + Removes the FeatureDefinition and Feature class reference (if any). Args: - key: Feature key to remove. Accepts types that can be converted into a feature key.. + key: Feature key to remove. Accepts types that can be converted into a feature key. Raises: KeyError: If no feature with the given key is registered @@ -388,40 +485,31 @@ def remove_feature(self, key: CoercibleToFeatureKey) -> None: # Validate and coerce the key validated_key = ValidatedFeatureKeyAdapter.validate_python(key) - # Check all possible storage locations - combined = { - **self.feature_specs_by_key, - **self.standalone_specs_by_key, - **{k: v for k, v in self.definitions_by_key.items()}, - } - - if validated_key not in combined: + if validated_key not in self.definitions_by_key: raise KeyError( f"No feature with key {validated_key.to_string()} found in graph. " - f"Available keys: {[k.to_string() for k in combined]}" + f"Available keys: {[k.to_string() for k in self.definitions_by_key]}" ) - # Remove from all relevant dicts - if validated_key in self.features_by_key: - del self.features_by_key[validated_key] - if validated_key in self.definitions_by_key: - del self.definitions_by_key[validated_key] - if validated_key in self.standalone_specs_by_key: - del self.standalone_specs_by_key[validated_key] - if validated_key in self.feature_specs_by_key: - del self.feature_specs_by_key[validated_key] + # Remove from primary storage + del self.definitions_by_key[validated_key] + + # Remove from secondary storage (Feature class reference) if exists + if validated_key in self._feature_classes: + del self._feature_classes[validated_key] def get_feature_by_key(self, key: CoercibleToFeatureKey) -> type["BaseFeature"]: """Get a feature class by its key. Args: - key: Feature key to look up. Accepts types that can be converted into a feature key.. + key: Feature key to look up. Accepts types that can be converted into a feature key. Returns: Feature class Raises: - KeyError: If no feature with the given key is registered + KeyError: If no feature class with the given key is registered + (note: this only returns Feature classes, not external definitions) Example: ```py @@ -436,12 +524,12 @@ def get_feature_by_key(self, key: CoercibleToFeatureKey) -> type["BaseFeature"]: # Validate and coerce the key validated_key = ValidatedFeatureKeyAdapter.validate_python(key) - if validated_key not in self.features_by_key: + if validated_key not in self._feature_classes: raise KeyError( - f"No feature with key {validated_key.to_string()} found in graph. " - f"Available keys: {[k.to_string() for k in self.features_by_key.keys()]}" + f"No feature class with key {validated_key.to_string()} found in graph. " + f"Available feature classes: {[k.to_string() for k in self._feature_classes.keys()]}" ) - return self.features_by_key[validated_key] + return self._feature_classes[validated_key] def list_features( self, @@ -482,8 +570,8 @@ def list_features( ``` """ if not only_current_project: - # Return all features - return list(self.features_by_key.keys()) + # Return all features (including external definitions) + return list(self.definitions_by_key.keys()) # Normalize projects to list project_list: list[str] @@ -497,17 +585,17 @@ def list_features( except RuntimeError: # Config not initialized - in tests or non-CLI usage # Return all features (can't determine project) - return list(self.features_by_key.keys()) + return list(self.definitions_by_key.keys()) elif isinstance(projects, str): project_list = [projects] else: project_list = projects - # Filter by project(s) using Feature.project attribute + # Filter by project(s) using FeatureDefinition.project_name return [ key - for key in self.features_by_key.keys() - if self.features_by_key[key].project in project_list + for key, defn in self.definitions_by_key.items() + if defn.project_name in project_list ] def get_feature_plan(self, key: CoercibleToFeatureKey) -> FeaturePlan: @@ -970,7 +1058,7 @@ def _to_definition( # Add the FeatureDefinition (either no class path or import failed) # Skip if already added by metaclass during import if definition.key not in graph.definitions_by_key: - graph.add_definition(definition) + graph.add_feature(definition) return graph diff --git a/src/metaxy/models/types.py b/src/metaxy/models/types.py index 73b12c0ed..3017a7cca 100644 --- a/src/metaxy/models/types.py +++ b/src/metaxy/models/types.py @@ -3,7 +3,7 @@ from __future__ import annotations from collections.abc import Iterator, Sequence -from typing import TYPE_CHECKING, Annotated, Any, NamedTuple, TypeAlias, overload +from typing import TYPE_CHECKING, Annotated, Any, NamedTuple, TypeAlias, Union, overload from pydantic import ( BeforeValidator, @@ -366,6 +366,8 @@ def _coerce_to_feature_key(value: Any) -> FeatureKey: - `type[BaseFeature]`: extracts .spec().key + - `FeatureSpec`: extracts .key + Args: value: Value to coerce to `FeatureKey` @@ -378,6 +380,12 @@ def _coerce_to_feature_key(value: Any) -> FeatureKey: if isinstance(value, FeatureKey): return value + # Check if it's a FeatureSpec + from metaxy.models.feature_spec import FeatureSpec + + if isinstance(value, FeatureSpec): + return value.key + # Check if it's a BaseFeature class # Import here to avoid circular dependency at module level from metaxy.models.feature import BaseFeature @@ -418,11 +426,14 @@ def _coerce_to_field_key(value: Any) -> FieldKey: if TYPE_CHECKING: from metaxy.models.feature import BaseFeature + from metaxy.models.feature_spec import FeatureSpec # Type unions - what inputs are accepted -CoercibleToFeatureKey: TypeAlias = ( - str | Sequence[str] | FeatureKey | type["BaseFeature"] -) +# Note: FeatureSpec is also accepted but handled at runtime in _coerce_to_feature_key +# We use Union for compatibility with Python 3.10 runtime (can't use | with forward refs) +CoercibleToFeatureKey: TypeAlias = Union[ + str, Sequence[str], FeatureKey, type["BaseFeature"], "FeatureSpec" +] CoercibleToFieldKey: TypeAlias = str | Sequence[str] | FieldKey # Annotated types for Pydantic field annotations - automatically validate