diff --git a/deployment/cli/main.py b/deployment/cli/main.py index 3a1906148..76e3e94db 100644 --- a/deployment/cli/main.py +++ b/deployment/cli/main.py @@ -11,6 +11,7 @@ import importlib import pkgutil import sys +import traceback from typing import List import deployment.projects as projects_pkg @@ -51,11 +52,13 @@ def build_parser() -> argparse.ArgumentParser: subparsers = parser.add_subparsers(dest="project", required=True) # Discover projects and import them so they can contribute args. + failed_projects: List[str] = [] for project_name in _discover_project_packages(): try: _import_and_register_project(project_name) - except Exception: - # Skip broken/incomplete project bundles rather than breaking the whole CLI. + except Exception as e: + tb = traceback.format_exc() + failed_projects.append(f"- {project_name}: {e}\n{tb}") continue try: @@ -68,6 +71,12 @@ def build_parser() -> argparse.ArgumentParser: adapter.add_args(sub) sub.set_defaults(_adapter_name=project_name) + if not project_registry.list_projects(): + details = "\n".join(failed_projects) if failed_projects else "(no project packages discovered)" + raise RuntimeError( + "No deployment projects were registered. This usually means project imports failed.\n" f"{details}" + ) + return parser diff --git a/deployment/core/__init__.py b/deployment/core/__init__.py index afe64e8b4..1afb0be1a 100644 --- a/deployment/core/__init__.py +++ b/deployment/core/__init__.py @@ -1,15 +1,21 @@ """Core components for deployment framework.""" -from deployment.core.artifacts import Artifact +from deployment.core.artifacts import ( + Artifact, + get_component_files, + resolve_artifact_path, + resolve_engine_path, + resolve_onnx_path, +) from deployment.core.backend import Backend from deployment.core.config.base_config import ( - BackendConfig, BaseDeploymentConfig, DeviceConfig, EvaluationConfig, ExportConfig, ExportMode, RuntimeConfig, + TensorRTConfig, VerificationConfig, VerificationScenario, parse_base_args, @@ -26,6 +32,7 @@ BaseEvaluator, EvalResultDict, EvaluationDefaults, + InferenceInput, ModelSpec, TaskProfile, VerifyResultDict, @@ -56,7 +63,7 @@ "ExportConfig", "ExportMode", "RuntimeConfig", - "BackendConfig", + "TensorRTConfig", "DeviceConfig", "EvaluationConfig", "VerificationConfig", @@ -71,11 +78,16 @@ # Evaluation "BaseEvaluator", "TaskProfile", + "InferenceInput", "EvalResultDict", "VerifyResultDict", "VerificationMixin", # Artifacts "Artifact", + "resolve_artifact_path", + "resolve_onnx_path", + "resolve_engine_path", + "get_component_files", "ModelSpec", # Preprocessing "build_preprocessing_pipeline", diff --git a/deployment/core/artifacts.py b/deployment/core/artifacts.py index 985aa3bb1..aa30d3239 100644 --- a/deployment/core/artifacts.py +++ b/deployment/core/artifacts.py @@ -1,18 +1,231 @@ -"""Artifact descriptors for deployment outputs.""" +""" +Artifact Path Resolution for Deployment Pipelines. + +This module provides: +1. Artifact dataclass - represents an exported model artifact +2. Path resolution functions - resolve artifact paths from deploy config + +Supports: +- Single-component models (YOLOX, Calibration): use component="model" +- Multi-component models (CenterPoint): use component="voxel_encoder", "backbone_head", etc. +""" from __future__ import annotations +import logging +import os import os.path as osp from dataclasses import dataclass +from typing import Any, Dict, Mapping, Optional + +logger = logging.getLogger(__name__) + + +# ============================================================================ +# Artifact Dataclass +# ============================================================================ @dataclass(frozen=True) class Artifact: - """Represents a produced deployment artifact such as ONNX or TensorRT outputs.""" + """ + Represents an exported model artifact (ONNX file, TensorRT engine, etc.). + + Attributes: + path: Filesystem path to the artifact (file or directory). + multi_file: True if artifact is a directory containing multiple files + (e.g., CenterPoint has voxel_encoder.onnx + backbone_head.onnx). + """ path: str multi_file: bool = False + @property def exists(self) -> bool: - """Return True if the artifact path currently exists on disk.""" - return osp.exists(self.path) + """Whether the artifact exists on disk.""" + return os.path.exists(self.path) + + @property + def is_directory(self) -> bool: + """Whether the artifact is a directory.""" + return os.path.isdir(self.path) + + def __str__(self) -> str: + return self.path + + +# ============================================================================ +# Path Resolution Functions +# ============================================================================ + +# File extension mapping +FILE_EXTENSIONS: Dict[str, str] = { + "onnx_file": ".onnx", + "engine_file": ".engine", +} + + +def resolve_artifact_path( + *, + base_dir: str, + components_cfg: Optional[Mapping[str, Any]], + component: str, + file_key: str, +) -> str: + """Resolve artifact path for any component. + + This is the entry point for artifact path resolution. + + Args: + base_dir: Base directory for artifacts (onnx_dir or tensorrt_dir), + or direct path to an artifact file. + components_cfg: The `components` dict from deploy_config. + Can be None for backwards compatibility. + component: Component name (e.g., 'model', 'voxel_encoder', 'backbone_head') + file_key: Key to look up ('onnx_file' or 'engine_file') + + Returns: + Resolved path to the artifact file + + Resolution strategy (single supported mode): + 1. `base_dir` must be a directory (e.g., `.../onnx` or `.../tensorrt`) + 2. Require `components_cfg[component][file_key]` to be set + - must be a relative path resolved under `base_dir` + 3. The resolved path must exist and be a file + + This function intentionally does NOT: + - scan directories for matching extensions + - fall back to default filenames + - accept `base_dir` as a file path + - accept absolute paths in `components` (enforces fully config-driven, workspace-relative artifacts) + + Examples: + # Single-component model (YOLOX) + resolve_artifact_path( + base_dir="work_dirs/yolox/onnx", + components_cfg={"model": {"onnx_file": "yolox.onnx"}}, + component="model", + file_key="onnx_file", + ) + + # Multi-component model (CenterPoint) + resolve_artifact_path( + base_dir="work_dirs/centerpoint/tensorrt", + components_cfg={"voxel_encoder": {"engine_file": "voxel.engine"}}, + component="voxel_encoder", + file_key="engine_file", + ) + """ + if not os.path.isdir(base_dir): + raise ValueError( + "Artifact resolution requires `base_dir` to be a directory. " + f"Got: {base_dir}. " + "Set evaluation.backends..{model_dir|engine_dir} to the artifact directory, " + "and set the artifact filename in deploy config under components.*.{onnx_file|engine_file}." + ) + + # Require filename from components config + filename = _get_filename_from_config(components_cfg, component, file_key) + if not filename: + raise KeyError( + "Missing artifact filename in deploy config. " + f"Expected components['{component}']['{file_key}'] to be set." + ) + + if osp.isabs(filename): + raise ValueError( + "Absolute artifact paths are not allowed. " + f"Set components['{component}']['{file_key}'] to a relative filename under base_dir instead. " + f"(got: {filename})" + ) + + base_abs = osp.abspath(base_dir) + path = osp.abspath(osp.join(base_abs, filename)) + # Prevent escaping base_dir via '../' + if osp.commonpath([base_abs, path]) != base_abs: + raise ValueError( + "Artifact path must stay within base_dir. " + f"Got components['{component}']['{file_key}']={filename} which resolves to {path} outside {base_abs}." + ) + if not os.path.isfile(path): + raise FileNotFoundError( + f"Configured artifact file not found: {path}. " + f"(base_dir={base_dir}, component={component}, file_key={file_key})" + ) + return path + + +def _get_filename_from_config( + components_cfg: Optional[Mapping[str, Any]], + component: str, + file_key: str, +) -> Optional[str]: + """Extract filename from components config.""" + if not components_cfg: + return None + + comp_cfg = components_cfg.get(component, {}) + if not isinstance(comp_cfg, Mapping): + return None + + filename = comp_cfg.get(file_key) + if isinstance(filename, str) and filename: + return filename + return None + + +def get_component_files( + components_cfg: Mapping[str, Any], + file_key: str, +) -> Dict[str, str]: + """Get all component filenames for a given file type. + + Useful for multi-component models to enumerate all artifacts. + + Args: + components_cfg: The unified `components` dict from deploy_config + file_key: Key to look up ('onnx_file' or 'engine_file') + + Returns: + Dict mapping component name to filename + + Example: + >>> components = {"voxel_encoder": {"onnx_file": "voxel.onnx"}, + ... "backbone_head": {"onnx_file": "head.onnx"}} + >>> get_component_files(components, "onnx_file") + {"voxel_encoder": "voxel.onnx", "backbone_head": "head.onnx"} + """ + result = {} + for comp_name, comp_cfg in components_cfg.items(): + if isinstance(comp_cfg, Mapping) and file_key in comp_cfg: + result[comp_name] = comp_cfg[file_key] + return result + + +# Convenience aliases for common use cases +def resolve_onnx_path( + base_dir: str, + components_cfg: Optional[Mapping[str, Any]] = None, + component: str = "model", +) -> str: + """Convenience function for resolving ONNX paths.""" + return resolve_artifact_path( + base_dir=base_dir, + components_cfg=components_cfg, + component=component, + file_key="onnx_file", + ) + + +def resolve_engine_path( + base_dir: str, + components_cfg: Optional[Mapping[str, Any]] = None, + component: str = "model", +) -> str: + """Convenience function for resolving TensorRT engine paths.""" + return resolve_artifact_path( + base_dir=base_dir, + components_cfg=components_cfg, + component=component, + file_key="engine_file", + ) diff --git a/deployment/core/config/__init__.py b/deployment/core/config/__init__.py index 3197eb73d..7859ffe7e 100644 --- a/deployment/core/config/__init__.py +++ b/deployment/core/config/__init__.py @@ -1,13 +1,13 @@ """Configuration subpackage for deployment core.""" from deployment.core.config.base_config import ( - BackendConfig, BaseDeploymentConfig, EvaluationConfig, ExportConfig, ExportMode, PrecisionPolicy, RuntimeConfig, + TensorRTConfig, VerificationConfig, VerificationScenario, parse_base_args, @@ -16,7 +16,7 @@ from deployment.core.evaluation.base_evaluator import EVALUATION_DEFAULTS, EvaluationDefaults __all__ = [ - "BackendConfig", + "TensorRTConfig", "BaseDeploymentConfig", "EvaluationConfig", "ExportConfig", diff --git a/deployment/core/config/base_config.py b/deployment/core/config/base_config.py index a9f00e573..65fcf776b 100644 --- a/deployment/core/config/base_config.py +++ b/deployment/core/config/base_config.py @@ -12,7 +12,7 @@ from dataclasses import dataclass, field from enum import Enum from types import MappingProxyType -from typing import Any, Dict, Iterable, Mapping, Optional, Tuple, Union +from typing import Any, Dict, Mapping, Optional, Tuple, Union import torch from mmengine.config import Config @@ -22,6 +22,7 @@ ONNXExportConfig, TensorRTExportConfig, TensorRTModelInputConfig, + TensorRTProfileConfig, ) # Constants @@ -91,12 +92,14 @@ def from_dict(cls, config_dict: Mapping[str, Any]) -> ExportConfig: onnx_path=config_dict.get("onnx_path"), ) + @property def should_export_onnx(self) -> bool: - """Check if ONNX export is requested.""" + """Whether ONNX export is requested.""" return self.mode in (ExportMode.ONNX, ExportMode.BOTH) + @property def should_export_tensorrt(self) -> bool: - """Check if TensorRT export is requested.""" + """Whether TensorRT export is requested.""" return self.mode in (ExportMode.TRT, ExportMode.BOTH) @@ -149,7 +152,8 @@ def _normalize_cuda(device: Optional[str]) -> Optional[str]: raise ValueError("CUDA device index must be non-negative") return f"cuda:{device_id}" - def get_cuda_device_index(self) -> Optional[int]: + @property + def cuda_device_index(self) -> Optional[int]: """Return CUDA device index as integer (if configured).""" if self.cuda is None: return None @@ -173,36 +177,41 @@ def from_dict(cls, config_dict: Mapping[str, Any]) -> RuntimeConfig: @dataclass(frozen=True) -class BackendConfig: - """Configuration for backend-specific settings.""" +class TensorRTConfig: + """ + Configuration for TensorRT backend-specific settings. + + Uses config structure: + tensorrt_config = dict(precision_policy="auto", max_workspace_size=1<<30) + + TensorRT profiles are defined in components.*.tensorrt_profile. - common_config: Mapping[str, Any] = field(default_factory=_empty_mapping) - model_inputs: Tuple[TensorRTModelInputConfig, ...] = field(default_factory=tuple) + Note: + The deploy config key for this section is **`tensorrt_config`**. + """ + + precision_policy: str = PrecisionPolicy.AUTO.value + max_workspace_size: int = DEFAULT_WORKSPACE_SIZE + + def __post_init__(self) -> None: + """Validate TensorRT precision policy at construction time.""" + if self.precision_policy not in PRECISION_POLICIES: + raise ValueError( + f"Invalid precision_policy '{self.precision_policy}'. " + f"Must be one of {list(PRECISION_POLICIES.keys())}" + ) @classmethod - def from_dict(cls, config_dict: Mapping[str, Any]) -> BackendConfig: - common_config = dict(config_dict.get("common_config", {})) - model_inputs_raw: Iterable[Mapping[str, Any]] = config_dict.get("model_inputs", []) or [] - model_inputs: Tuple[TensorRTModelInputConfig, ...] = tuple( - TensorRTModelInputConfig.from_dict(item) for item in model_inputs_raw - ) + def from_dict(cls, config_dict: Mapping[str, Any]) -> TensorRTConfig: return cls( - common_config=MappingProxyType(common_config), - model_inputs=model_inputs, + precision_policy=config_dict.get("precision_policy", PrecisionPolicy.AUTO.value), + max_workspace_size=config_dict.get("max_workspace_size", DEFAULT_WORKSPACE_SIZE), ) - def get_precision_policy(self) -> str: - """Get precision policy name.""" - return self.common_config.get("precision_policy", PrecisionPolicy.AUTO.value) - - def get_precision_flags(self) -> Mapping[str, bool]: - """Get TensorRT precision flags for the configured policy.""" - policy = self.get_precision_policy() - return PRECISION_POLICIES.get(policy, {}) - - def get_max_workspace_size(self) -> int: - """Get maximum workspace size for TensorRT.""" - return self.common_config.get("max_workspace_size", DEFAULT_WORKSPACE_SIZE) + @property + def precision_flags(self) -> Mapping[str, bool]: + """TensorRT precision flags for the configured policy.""" + return PRECISION_POLICIES[self.precision_policy] @dataclass(frozen=True) @@ -218,16 +227,32 @@ class EvaluationConfig: @classmethod def from_dict(cls, config_dict: Mapping[str, Any]) -> EvaluationConfig: - backends_raw = config_dict.get("backends", {}) or {} + backends_raw = config_dict.get("backends", None) + if backends_raw is None: + backends_raw = {} + if not isinstance(backends_raw, Mapping): + raise TypeError(f"evaluation.backends must be a mapping, got {type(backends_raw).__name__}") backends_frozen = {key: MappingProxyType(dict(value)) for key, value in backends_raw.items()} + models_raw = config_dict.get("models", None) + if models_raw is None: + models_raw = {} + if not isinstance(models_raw, Mapping): + raise TypeError(f"evaluation.models must be a mapping, got {type(models_raw).__name__}") + + devices_raw = config_dict.get("devices", None) + if devices_raw is None: + devices_raw = {} + if not isinstance(devices_raw, Mapping): + raise TypeError(f"evaluation.devices must be a mapping, got {type(devices_raw).__name__}") + return cls( enabled=config_dict.get("enabled", False), num_samples=config_dict.get("num_samples", 10), verbose=config_dict.get("verbose", False), backends=MappingProxyType(backends_frozen), - models=MappingProxyType(dict(config_dict.get("models", {}))), - devices=MappingProxyType(dict(config_dict.get("devices", {}))), + models=MappingProxyType(dict(models_raw)), + devices=MappingProxyType(dict(devices_raw)), ) @@ -243,18 +268,35 @@ class VerificationConfig: @classmethod def from_dict(cls, config_dict: Mapping[str, Any]) -> VerificationConfig: - scenarios_raw = config_dict.get("scenarios", {}) or {} + scenarios_raw = config_dict.get("scenarios") + if scenarios_raw is None: + scenarios_raw = {} + if not isinstance(scenarios_raw, Mapping): + raise TypeError(f"verification.scenarios must be a mapping, got {type(scenarios_raw).__name__}") + scenario_map: Dict[ExportMode, Tuple[VerificationScenario, ...]] = {} for mode_key, scenario_list in scenarios_raw.items(): mode = ExportMode.from_value(mode_key) - scenario_entries = tuple(VerificationScenario.from_dict(entry) for entry in (scenario_list or [])) + if scenario_list is None: + scenario_list = [] + elif not isinstance(scenario_list, (list, tuple)): + raise TypeError( + f"verification.scenarios.{mode_key} must be a list or tuple, got {type(scenario_list).__name__}" + ) + scenario_entries = tuple(VerificationScenario.from_dict(entry) for entry in scenario_list) scenario_map[mode] = scenario_entries + devices_raw = config_dict.get("devices") + if devices_raw is None: + devices_raw = {} + if not isinstance(devices_raw, Mapping): + raise TypeError(f"verification.devices must be a mapping, got {type(devices_raw).__name__}") + return cls( enabled=config_dict.get("enabled", True), num_verify_samples=config_dict.get("num_verify_samples", 3), tolerance=config_dict.get("tolerance", 0.1), - devices=MappingProxyType(dict(config_dict.get("devices", {}))), + devices=MappingProxyType(dict(devices_raw)), scenarios=MappingProxyType(scenario_map), ) @@ -310,12 +352,12 @@ def __init__(self, deploy_cfg: Config): self._validate_config() self._checkpoint_path: Optional[str] = deploy_cfg.get("checkpoint_path") - self._device_config = DeviceConfig.from_dict(deploy_cfg.get("devices", {}) or {}) + self._device_config = DeviceConfig.from_dict(deploy_cfg.get("devices", {})) # Initialize config sections self.export_config = ExportConfig.from_dict(deploy_cfg.get("export", {})) self.runtime_config = RuntimeConfig.from_dict(deploy_cfg.get("runtime_io", {})) - self.backend_config = BackendConfig.from_dict(deploy_cfg.get("backend_config", {})) + self.tensorrt_config = TensorRTConfig.from_dict(deploy_cfg.get("tensorrt_config", {})) self._evaluation_config = EvaluationConfig.from_dict(deploy_cfg.get("evaluation", {})) self._verification_config = VerificationConfig.from_dict(deploy_cfg.get("verification", {})) @@ -336,9 +378,12 @@ def _validate_config(self) -> None: raise ValueError(str(exc)) from exc # Validate precision policy if present - backend_cfg = self.deploy_cfg.get("backend_config", {}) - common_cfg = backend_cfg.get("common_config", {}) - precision_policy = common_cfg.get("precision_policy", PrecisionPolicy.AUTO.value) + tensorrt_config = self.deploy_cfg.get("tensorrt_config") + if tensorrt_config is None: + tensorrt_config = {} + if not isinstance(tensorrt_config, Mapping): + raise TypeError(f"tensorrt_config must be a mapping, got {type(tensorrt_config).__name__}") + precision_policy = tensorrt_config.get("precision_policy", PrecisionPolicy.AUTO.value) if precision_policy not in PRECISION_POLICIES: raise ValueError( f"Invalid precision_policy '{precision_policy}'. " f"Must be one of {list(PRECISION_POLICIES.keys())}" @@ -350,7 +395,7 @@ def _validate_cuda_device(self) -> None: return cuda_device = self.devices.cuda - device_idx = self.devices.get_cuda_device_index() + device_idx = self.devices.cuda_device_index if cuda_device is None or device_idx is None: raise RuntimeError( @@ -372,7 +417,7 @@ def _validate_cuda_device(self) -> None: def _needs_cuda_device(self) -> bool: """Determine if current deployment config requires a CUDA device.""" - if self.export_config.should_export_tensorrt(): + if self.export_config.should_export_tensorrt: return True evaluation_cfg = self.evaluation_config @@ -425,7 +470,8 @@ def devices(self) -> DeviceConfig: """Get normalized device settings.""" return self._device_config - def get_evaluation_backends(self) -> Mapping[Any, Mapping[str, Any]]: + @property + def evaluation_backends(self) -> Mapping[Any, Mapping[str, Any]]: """ Get evaluation backends configuration. @@ -453,72 +499,124 @@ def task_type(self) -> Optional[str]: def get_onnx_settings(self) -> ONNXExportConfig: """ - Get ONNX export settings. + Get ONNX export settings from unified components configuration. + + Reads I/O from components.model.io.{inputs, outputs, dynamic_axes} Returns: ONNXExportConfig instance containing ONNX export parameters """ onnx_config = self.onnx_config - model_io = self.deploy_cfg.get("model_io", {}) - - # Get batch size and dynamic axes from model_io - batch_size = model_io.get("batch_size", None) - dynamic_axes = model_io.get("dynamic_axes", None) + components_io = self._get_model_io_from_components() - # If batch_size is set to a number, disable dynamic_axes - if batch_size is not None and isinstance(batch_size, int): - dynamic_axes = None + # Get input/output names from components + input_names = [inp.get("name", "input") for inp in components_io.get("inputs", [])] + output_names = [out.get("name", "output") for out in components_io.get("outputs", [])] - # Handle multiple inputs and outputs - input_names = [model_io.get("input_name", "input")] - output_names = [model_io.get("output_name", "output")] - - # Add additional inputs if specified - additional_inputs = model_io.get("additional_inputs", []) - for additional_input in additional_inputs: - if isinstance(additional_input, dict): - input_names.append(additional_input.get("name", "input")) - - # Add additional outputs if specified - additional_outputs = model_io.get("additional_outputs", []) - for additional_output in additional_outputs: - if isinstance(additional_output, str): - output_names.append(additional_output) + # Fallback to defaults if components not configured + if not input_names: + input_names = ["input"] + if not output_names: + output_names = ["output"] settings_dict = { "opset_version": onnx_config.get("opset_version", 16), "do_constant_folding": onnx_config.get("do_constant_folding", True), "input_names": tuple(input_names), "output_names": tuple(output_names), - "dynamic_axes": dynamic_axes, + "dynamic_axes": components_io.get("dynamic_axes"), "export_params": onnx_config.get("export_params", True), "keep_initializers_as_inputs": onnx_config.get("keep_initializers_as_inputs", False), "verbose": onnx_config.get("verbose", False), - "save_file": onnx_config.get("save_file", "model.onnx"), - "batch_size": batch_size, + "save_file": components_io.get("onnx_file") or onnx_config.get("save_file", "model.onnx"), + "batch_size": None, } - # Note: simplify is typically True by default, but can be overridden if "simplify" in onnx_config: settings_dict["simplify"] = onnx_config["simplify"] return ONNXExportConfig.from_mapping(settings_dict) + def _get_model_io_from_components(self) -> Dict[str, Any]: + """ + Extract model I/O configuration from components. + + For end-to-end models (single component), returns the io config + from components.model. + + Returns: + Dictionary with inputs, outputs, dynamic_axes, and onnx_file. + """ + components = self.deploy_cfg.get("components", {}) + if not components: + return {} + + # For single-component models, look for 'model' component + if "model" in components: + comp_cfg = components["model"] + io_cfg = comp_cfg.get("io", {}) + return { + "inputs": io_cfg.get("inputs", None), + "outputs": io_cfg.get("outputs", None), + "dynamic_axes": io_cfg.get("dynamic_axes"), + "onnx_file": comp_cfg.get("onnx_file"), + } + + return {} + def get_tensorrt_settings(self) -> TensorRTExportConfig: """ - Get TensorRT export settings with precision policy support. + Get TensorRT export settings from unified components configuration. + + TensorRT profiles are read from components.model.tensorrt_profile. Returns: TensorRTExportConfig instance containing TensorRT export parameters """ + model_inputs = self._build_model_inputs() + settings_dict = { - "max_workspace_size": self.backend_config.get_max_workspace_size(), - "precision_policy": self.backend_config.get_precision_policy(), - "policy_flags": self.backend_config.get_precision_flags(), - "model_inputs": self.backend_config.model_inputs, + "max_workspace_size": self.tensorrt_config.max_workspace_size, + "precision_policy": self.tensorrt_config.precision_policy, + "policy_flags": self.tensorrt_config.precision_flags, + "model_inputs": model_inputs, } return TensorRTExportConfig.from_mapping(settings_dict) + def _build_model_inputs(self) -> Optional[Tuple[TensorRTModelInputConfig, ...]]: + """ + Build model_inputs from components configuration. + + For end-to-end models (single component), extracts tensorrt_profile + from components.model and converts to TensorRTModelInputConfig format. + + Returns: + Tuple of TensorRTModelInputConfig, or None if not configured. + """ + components = self.deploy_cfg.get("components", {}) + if not components or "model" not in components: + return None + + comp_cfg = components["model"] + tensorrt_profile = comp_cfg.get("tensorrt_profile", {}) + + if not tensorrt_profile: + return None + + input_shapes = {} + for input_name, shape_cfg in tensorrt_profile.items(): + if isinstance(shape_cfg, Mapping): + input_shapes[input_name] = TensorRTProfileConfig( + min_shape=shape_cfg.get("min_shape", None), + opt_shape=shape_cfg.get("opt_shape", None), + max_shape=shape_cfg.get("max_shape", None), + ) + + if input_shapes: + return (TensorRTModelInputConfig(input_shapes=MappingProxyType(input_shapes)),) + + return None + def setup_logging(level: str = "INFO") -> logging.Logger: """ diff --git a/deployment/core/contexts.py b/deployment/core/contexts.py index 486caa1e5..8df2f8a23 100644 --- a/deployment/core/contexts.py +++ b/deployment/core/contexts.py @@ -1,18 +1,6 @@ """ Typed context objects for deployment workflows. -This module defines typed dataclasses that replace **kwargs with explicit, -type-checked parameters. This improves: -- Type safety: Catches mismatches at type-check time -- Discoverability: IDE autocomplete shows available parameters -- Refactoring safety: Renamed fields are caught by type checkers - -Design Principles: - 1. Base contexts define common parameters across all projects - 2. Project-specific contexts extend base with additional fields - 3. Optional fields have sensible defaults - 4. Contexts are immutable (frozen=True) for safety - Usage: # Create context for export ctx = ExportContext(sample_idx=0) diff --git a/deployment/core/evaluation/base_evaluator.py b/deployment/core/evaluation/base_evaluator.py index 23d251ae9..e15accf41 100644 --- a/deployment/core/evaluation/base_evaluator.py +++ b/deployment/core/evaluation/base_evaluator.py @@ -23,6 +23,7 @@ from deployment.core.backend import Backend from deployment.core.evaluation.evaluator_types import ( EvalResultDict, + InferenceInput, InferenceResult, LatencyBreakdown, LatencyStats, @@ -38,6 +39,7 @@ "EvalResultDict", "VerifyResultDict", "ModelSpec", + "InferenceInput", "InferenceResult", "LatencyStats", "LatencyBreakdown", @@ -160,8 +162,14 @@ def _prepare_input( sample: Mapping[str, Any], data_loader: BaseDataLoader, device: str, - ) -> Tuple[Any, Dict[str, Any]]: - """Prepare model input from a sample. Returns (input_data, inference_kwargs).""" + ) -> InferenceInput: + """Prepare model input from a sample. + + Returns: + InferenceInput containing: + - data: The actual input data (e.g., points tensor) + - metadata: Sample metadata forwarded to postprocess() + """ raise NotImplementedError @abstractmethod @@ -211,7 +219,7 @@ def _get_verification_input( sample_idx: int, data_loader: BaseDataLoader, device: str, - ) -> Tuple[Any, Dict[str, Any]]: + ) -> InferenceInput: """Get verification input.""" sample = data_loader.load_sample(sample_idx) return self._prepare_input(sample, data_loader, device) @@ -247,19 +255,21 @@ def evaluate( latencies = [] latency_breakdowns = [] - actual_samples = min(num_samples, data_loader.get_num_samples()) + actual_samples = min(num_samples, data_loader.num_samples) for idx in range(actual_samples): if verbose and idx % EVALUATION_DEFAULTS.LOG_INTERVAL == 0: logger.info(f"Processing sample {idx + 1}/{actual_samples}") sample = data_loader.load_sample(idx) - input_data, infer_kwargs = self._prepare_input(sample, data_loader, model.device) + inference_input = self._prepare_input(sample, data_loader, model.device) - gt_data = data_loader.get_ground_truth(idx) + if "ground_truth" not in sample: + raise KeyError("DataLoader.load_sample() must return 'ground_truth' for evaluation.") + gt_data = sample.get("ground_truth") ground_truths = self._parse_ground_truths(gt_data) - infer_result = pipeline.infer(input_data, **infer_kwargs) + infer_result = pipeline.infer(inference_input.data, metadata=inference_input.metadata) latencies.append(infer_result.latency_ms) if infer_result.breakdown: latency_breakdowns.append(infer_result.breakdown) diff --git a/deployment/core/evaluation/evaluator_types.py b/deployment/core/evaluation/evaluator_types.py index de800656f..10eed0d1f 100644 --- a/deployment/core/evaluation/evaluator_types.py +++ b/deployment/core/evaluation/evaluator_types.py @@ -7,8 +7,8 @@ from __future__ import annotations -from dataclasses import asdict, dataclass -from typing import Any, Dict, Optional, TypedDict +from dataclasses import asdict, dataclass, field +from typing import Any, Dict, Mapping, Optional, TypedDict from deployment.core.artifacts import Artifact from deployment.core.backend import Backend @@ -63,7 +63,7 @@ class LatencyStats: median_ms: float @classmethod - def empty(cls) -> "LatencyStats": + def empty(cls) -> LatencyStats: """Return a zero-initialized stats object.""" return cls(0.0, 0.0, 0.0, 0.0, 0.0) @@ -84,7 +84,7 @@ class LatencyBreakdown: stages: Dict[str, LatencyStats] @classmethod - def empty(cls) -> "LatencyBreakdown": + def empty(cls) -> LatencyBreakdown: """Return an empty breakdown.""" return cls(stages={}) @@ -93,6 +93,19 @@ def to_dict(self) -> Dict[str, Dict[str, float]]: return {stage: stats.to_dict() for stage, stats in self.stages.items()} +@dataclass(frozen=True) +class InferenceInput: + """Prepared input for pipeline inference. + + Attributes: + data: The actual input data (e.g., points tensor, image tensor). + metadata: Sample metadata forwarded to postprocess(). + """ + + data: Any + metadata: Mapping[str, Any] = field(default_factory=dict) + + @dataclass(frozen=True) class InferenceResult: """Standard inference return payload.""" @@ -102,7 +115,7 @@ class InferenceResult: breakdown: Optional[Dict[str, float]] = None @classmethod - def empty(cls) -> "InferenceResult": + def empty(cls) -> InferenceResult: """Return an empty inference result.""" return cls(output=None, latency_ms=0.0, breakdown={}) diff --git a/deployment/core/evaluation/verification_mixin.py b/deployment/core/evaluation/verification_mixin.py index 9b44c2110..e7e866e04 100644 --- a/deployment/core/evaluation/verification_mixin.py +++ b/deployment/core/evaluation/verification_mixin.py @@ -16,7 +16,7 @@ import torch from deployment.core.backend import Backend -from deployment.core.evaluation.evaluator_types import ModelSpec, VerifyResultDict +from deployment.core.evaluation.evaluator_types import InferenceInput, ModelSpec, VerifyResultDict from deployment.core.io.base_data_loader import BaseDataLoader @@ -71,8 +71,14 @@ def _get_verification_input( sample_idx: int, data_loader: BaseDataLoader, device: str, - ) -> Tuple[Any, Dict[str, Any]]: - """Get input data for verification.""" + ) -> InferenceInput: + """Get input data for verification. + + Returns: + InferenceInput containing: + - data: The actual input data (e.g., points tensor) + - metadata: Sample metadata forwarded to postprocess() + """ raise NotImplementedError def _get_output_names(self) -> Optional[List[str]]: @@ -363,7 +369,7 @@ def verify( logger.info(f"\nInitializing {test.backend.value} test pipeline...") test_pipeline = self._create_pipeline_for_verification(test, test_device, logger) - actual_samples = min(num_samples, data_loader.get_num_samples()) + actual_samples = min(num_samples, data_loader.num_samples) for i in range(actual_samples): logger.info(f"\n{'='*60}") logger.info(f"Verifying sample {i}") @@ -417,17 +423,25 @@ def _verify_single_sample( logger: logging.Logger, ) -> bool: """Verify a single sample.""" - input_data, metadata = self._get_verification_input(sample_idx, data_loader, ref_device) + inference_input = self._get_verification_input(sample_idx, data_loader, ref_device) ref_name = f"{ref_backend.value} ({ref_device})" logger.info(f"\nRunning {ref_name} reference...") - ref_result = ref_pipeline.infer(input_data, metadata, return_raw_outputs=True) + ref_result = ref_pipeline.infer( + inference_input.data, + metadata=inference_input.metadata, + return_raw_outputs=True, + ) logger.info(f" {ref_name} latency: {ref_result.latency_ms:.2f} ms") - test_input = self._move_input_to_device(input_data, test_device) + test_input = self._move_input_to_device(inference_input.data, test_device) test_name = f"{test_backend.value} ({test_device})" logger.info(f"\nRunning {test_name} test...") - test_result = test_pipeline.infer(test_input, metadata, return_raw_outputs=True) + test_result = test_pipeline.infer( + test_input, + metadata=inference_input.metadata, + return_raw_outputs=True, + ) logger.info(f" {test_name} latency: {test_result.latency_ms:.2f} ms") passed, _ = self._compare_backend_outputs(ref_result.output, test_result.output, tolerance, test_name, logger) diff --git a/deployment/core/io/base_data_loader.py b/deployment/core/io/base_data_loader.py index bdc94c066..ee3aaf4bf 100644 --- a/deployment/core/io/base_data_loader.py +++ b/deployment/core/io/base_data_loader.py @@ -68,7 +68,7 @@ def load_sample(self, index: int) -> SampleData: raise NotImplementedError @abstractmethod - def preprocess(self, sample: SampleData) -> torch.Tensor: + def preprocess(self, sample: SampleData) -> Any: """ Preprocess raw sample data into model input format. @@ -76,16 +76,17 @@ def preprocess(self, sample: SampleData) -> torch.Tensor: sample: Raw sample data returned by load_sample() Returns: - Preprocessed tensor ready for model inference. - Shape and format depend on the specific task. + Preprocessed model input ready for inference. Type/shape is task-specific. + (e.g., torch.Tensor, Dict[str, torch.Tensor], tuple, etc.) Raises: ValueError: If sample format is invalid """ raise NotImplementedError + @property @abstractmethod - def get_num_samples(self) -> int: + def num_samples(self) -> int: """ Get total number of samples in the dataset. @@ -94,20 +95,6 @@ def get_num_samples(self) -> int: """ raise NotImplementedError - @abstractmethod - def get_ground_truth(self, index: int) -> Mapping[str, Any]: - """ - Get ground truth annotations for a specific sample. - - Args: - index: Sample index whose annotations should be returned - - Returns: - Dictionary containing task-specific ground truth data. - Implementations should raise IndexError if the index is invalid. - """ - raise NotImplementedError - def get_shape_sample(self, index: int = 0) -> Any: """ Return a representative sample used for export shape configuration. diff --git a/deployment/core/metrics/base_metrics_interface.py b/deployment/core/metrics/base_metrics_interface.py index 37feb8be4..b1ee8bf8a 100644 --- a/deployment/core/metrics/base_metrics_interface.py +++ b/deployment/core/metrics/base_metrics_interface.py @@ -59,24 +59,27 @@ def to_dict(self) -> Dict[str, Any]: @dataclass(frozen=True) class DetectionSummary: - """Structured summary for detection metrics (2D/3D).""" + """Structured summary for detection metrics (2D/3D). - mAP: float = 0.0 - per_class_ap: Dict[str, float] = field(default_factory=dict) + All matching modes computed by autoware_perception_evaluation are included. + The `mAP_by_mode` and `mAPH_by_mode` dicts contain results for each matching mode. + """ + + mAP_by_mode: Dict[str, float] = field(default_factory=dict) + mAPH_by_mode: Dict[str, float] = field(default_factory=dict) + per_class_ap_by_mode: Dict[str, Dict[str, float]] = field(default_factory=dict) num_frames: int = 0 detailed_metrics: Dict[str, float] = field(default_factory=dict) - mAPH: Optional[float] = None def to_dict(self) -> Dict[str, Any]: - data = { - "mAP": self.mAP, - "per_class_ap": dict(self.per_class_ap), + """Convert to dict.""" + return { + "mAP_by_mode": dict(self.mAP_by_mode), + "mAPH_by_mode": dict(self.mAPH_by_mode), + "per_class_ap_by_mode": {k: dict(v) for k, v in self.per_class_ap_by_mode.items()}, "num_frames": self.num_frames, "detailed_metrics": dict(self.detailed_metrics), } - if self.mAPH is not None: - data["mAPH"] = self.mAPH - return data class BaseMetricsInterface(ABC): @@ -125,7 +128,7 @@ def reset(self) -> None: pass @abstractmethod - def add_frame(self, *args, **kwargs) -> None: + def add_frame(self, *args) -> None: """ Add a frame of predictions and ground truths for evaluation. @@ -146,8 +149,9 @@ def compute_metrics(self) -> Dict[str, float]: """ pass + @property @abstractmethod - def get_summary(self) -> Any: + def summary(self) -> Any: """ Get a summary of the evaluation including primary metrics. @@ -160,3 +164,14 @@ def get_summary(self) -> Any: def frame_count(self) -> int: """Return the number of frames added so far.""" return self._frame_count + + def format_metrics_report(self) -> Optional[str]: + """Format the metrics report as a human-readable string. + + This is an optional method that can be overridden by subclasses to provide + task-specific formatting. By default, returns None. + + Returns: + Formatted metrics report string. None if not implemented. + """ + return None diff --git a/deployment/core/metrics/classification_metrics.py b/deployment/core/metrics/classification_metrics.py index 07852243c..e18edbd73 100644 --- a/deployment/core/metrics/classification_metrics.py +++ b/deployment/core/metrics/classification_metrics.py @@ -314,7 +314,8 @@ def _process_metrics_score(self, metrics_score: MetricsScore) -> Dict[str, float return metric_dict # TODO(vividf): Remove after autoware_perception_evaluation supports confusion matrix. - def get_confusion_matrix(self) -> np.ndarray: + @property + def confusion_matrix(self) -> np.ndarray: """Get the confusion matrix. Returns: @@ -352,7 +353,8 @@ def get_confusion_matrix(self) -> np.ndarray: return confusion_matrix - def get_summary(self) -> ClassificationSummary: + @property + def summary(self) -> ClassificationSummary: """Get a summary of the evaluation. Returns: @@ -373,7 +375,7 @@ def get_summary(self) -> ClassificationSummary: recall=metrics.get("recall", 0.0), f1score=metrics.get("f1score", 0.0), per_class_accuracy=per_class_accuracy, - confusion_matrix=self.get_confusion_matrix().tolist(), + confusion_matrix=self.confusion_matrix.tolist(), num_samples=self._frame_count, detailed_metrics=metrics, ) diff --git a/deployment/core/metrics/detection_2d_metrics.py b/deployment/core/metrics/detection_2d_metrics.py index fb9e73e5c..c462d8256 100644 --- a/deployment/core/metrics/detection_2d_metrics.py +++ b/deployment/core/metrics/detection_2d_metrics.py @@ -451,28 +451,51 @@ def _process_metrics_score(self, metrics_score: MetricsScore) -> Dict[str, float return metric_dict - def get_summary(self) -> DetectionSummary: - """Get a summary of the evaluation including mAP and per-class metrics.""" + @property + def summary(self) -> DetectionSummary: + """Get a summary of the evaluation including mAP and per-class metrics for all matching modes.""" metrics = self.compute_metrics() - # Extract primary metrics (first mAP value found) - primary_map = None - per_class_ap = {} - - for key, value in metrics.items(): - if key.startswith("mAP_") and primary_map is None: - primary_map = value - elif "_AP_" in key and not key.startswith("mAP"): - # Extract class name from key - parts = key.split("_AP_") - if len(parts) == 2: - class_name = parts[0] - if class_name not in per_class_ap: - per_class_ap[class_name] = value + # Extract matching modes from metrics + modes = [] + for k in metrics.keys(): + if k.startswith("mAP_") and k != "mAP": + modes.append(k[len("mAP_") :]) + modes = list(dict.fromkeys(modes)) # Remove duplicates while preserving order + + if not modes: + return DetectionSummary( + mAP_by_mode={}, + mAPH_by_mode={}, + per_class_ap_by_mode={}, + num_frames=self._frame_count, + detailed_metrics=metrics, + ) + + # Collect mAP and per-class AP for each matching mode + mAP_by_mode: Dict[str, float] = {} + per_class_ap_by_mode: Dict[str, Dict[str, float]] = {} + + for mode in modes: + map_value = metrics.get(f"mAP_{mode}", 0.0) + mAP_by_mode[mode] = float(map_value) + + # Collect AP values per class for this mode + per_class_ap_values: Dict[str, List[float]] = {} + ap_key_infix = f"_AP_{mode}_" + for key, value in metrics.items(): + if ap_key_infix not in key or key.startswith("mAP"): + continue + class_name = key.split("_AP_", 1)[0] + per_class_ap_values.setdefault(class_name, []).append(float(value)) + + if per_class_ap_values: + per_class_ap_by_mode[mode] = {k: float(np.mean(v)) for k, v in per_class_ap_values.items() if v} return DetectionSummary( - mAP=primary_map or 0.0, - per_class_ap=per_class_ap, + mAP_by_mode=mAP_by_mode, + mAPH_by_mode={}, # 2D detection doesn't have mAPH + per_class_ap_by_mode=per_class_ap_by_mode, num_frames=self._frame_count, detailed_metrics=metrics, ) diff --git a/deployment/core/metrics/detection_3d_metrics.py b/deployment/core/metrics/detection_3d_metrics.py index 235ab795b..4cbf5ac95 100644 --- a/deployment/core/metrics/detection_3d_metrics.py +++ b/deployment/core/metrics/detection_3d_metrics.py @@ -25,9 +25,10 @@ """ import logging +import re import time -from dataclasses import dataclass, field -from typing import Any, Dict, List, Optional +from dataclasses import dataclass +from typing import Any, Dict, List, Mapping, Optional import numpy as np from perception_eval.common.dataset import FrameGroundTruth @@ -130,30 +131,13 @@ def __post_init__(self): class Detection3DMetricsInterface(BaseMetricsInterface): + # TODO(vividf): refactor this class after refactoring T4MetricV2 """ Interface for computing 3D detection metrics using autoware_perception_evaluation. This interface provides a simplified interface for the deployment framework to compute mAP, mAPH, and other detection metrics that are consistent with the T4MetricV2 used during training. - - Example usage: - config = Detection3DMetricsConfig( - class_names=["car", "truck", "bus", "bicycle", "pedestrian"], - frame_id="base_link", - ) - interface = Detection3DMetricsInterface(config) - - # Add frames - for pred, gt in zip(predictions_list, ground_truths_list): - interface.add_frame( - predictions=pred, # List[Dict] with bbox_3d, label, score - ground_truths=gt, # List[Dict] with bbox_3d, label - ) - - # Compute metrics - metrics = interface.compute_metrics() - # Returns: {"mAP_center_distance_bev_0.5": 0.7, ...} """ _UNKNOWN = "unknown" @@ -176,38 +160,108 @@ def __init__( self.data_root = data_root self.result_root_directory = result_root_directory - # Create perception evaluation config - self.perception_eval_config = PerceptionEvaluationConfig( - dataset_paths=data_root, - frame_id=config.frame_id, - result_root_directory=result_root_directory, - evaluation_config_dict=config.evaluation_config_dict, - load_raw_data=False, - ) + cfg_dict = config.evaluation_config_dict + if cfg_dict is None: + cfg_dict = {} + if not isinstance(cfg_dict, Mapping): + raise TypeError(f"evaluation_config_dict must be a mapping, got {type(cfg_dict).__name__}") + self._evaluation_cfg_dict: Dict[str, Any] = dict(cfg_dict) + + # Create multiple evaluators for different distance ranges (like T4MetricV2) + min_distance = cfg_dict.get("min_distance") + max_distance = cfg_dict.get("max_distance") + + if isinstance(min_distance, (int, float)) and isinstance(max_distance, (int, float)): + min_distance = [float(min_distance)] + max_distance = [float(max_distance)] + elif not isinstance(min_distance, list) or not isinstance(max_distance, list): + raise ValueError( + "min_distance and max_distance must be either scalars (int/float) or lists for multi-evaluator mode. " + f"Got min_distance={type(min_distance)}, max_distance={type(max_distance)}" + ) - # Create critical object filter config - self.critical_object_filter_config = CriticalObjectFilterConfig( - evaluator_config=self.perception_eval_config, - **config.critical_object_filter_config, - ) + if len(min_distance) != len(max_distance): + raise ValueError( + f"min_distance and max_distance must have the same length. " + f"Got len(min_distance)={len(min_distance)}, len(max_distance)={len(max_distance)}" + ) - # Create frame pass fail config - self.frame_pass_fail_config = PerceptionPassFailConfig( - evaluator_config=self.perception_eval_config, - **config.frame_pass_fail_config, - ) + if not min_distance or not max_distance: + raise ValueError("min_distance and max_distance lists cannot be empty") + + # Create distance ranges and evaluators + self._bev_distance_ranges = list(zip(min_distance, max_distance)) + self.evaluators: Dict[str, Dict[str, Any]] = {} + self._create_evaluators(config) + + self.gt_count_total: int = 0 + self.pred_count_total: int = 0 + self.gt_count_by_label: Dict[str, int] = {} + self.pred_count_by_label: Dict[str, int] = {} + self._last_metrics_by_eval_name: Dict[str, MetricsScore] = {} + + def _create_evaluators(self, config: Detection3DMetricsConfig) -> None: + """Create multiple evaluators for different distance ranges (like T4MetricV2).""" + range_filter_name = "bev_center" + + for min_dist, max_dist in self._bev_distance_ranges: + # Create a copy of evaluation_config_dict with single distance values + eval_config_dict_raw = config.evaluation_config_dict + if eval_config_dict_raw is None: + eval_config_dict_raw = {} + if not isinstance(eval_config_dict_raw, Mapping): + raise TypeError(f"evaluation_config_dict must be a mapping, got {type(eval_config_dict_raw).__name__}") + eval_config_dict = dict(eval_config_dict_raw) + eval_config_dict["min_distance"] = min_dist + eval_config_dict["max_distance"] = max_dist + + # Create perception evaluation config for this range + evaluator_config = PerceptionEvaluationConfig( + dataset_paths=self.data_root, + frame_id=config.frame_id, + result_root_directory=self.result_root_directory, + evaluation_config_dict=eval_config_dict, + load_raw_data=False, + ) - # Initialize evaluation manager (will be created on first use or reset) - self.evaluator: Optional[PerceptionEvaluationManager] = None + # Create critical object filter config + critical_object_filter_config = CriticalObjectFilterConfig( + evaluator_config=evaluator_config, + **config.critical_object_filter_config, + ) + + # Create frame pass fail config + frame_pass_fail_config = PerceptionPassFailConfig( + evaluator_config=evaluator_config, + **config.frame_pass_fail_config, + ) + + evaluator_name = f"{range_filter_name}_{min_dist}-{max_dist}" + + self.evaluators[evaluator_name] = { + "evaluator": None, # Will be created on reset + "evaluator_config": evaluator_config, + "critical_object_filter_config": critical_object_filter_config, + "frame_pass_fail_config": frame_pass_fail_config, + "bev_distance_range": (min_dist, max_dist), + } def reset(self) -> None: """Reset the interface for a new evaluation session.""" - self.evaluator = PerceptionEvaluationManager( - evaluation_config=self.perception_eval_config, - load_ground_truth=False, - metric_output_dir=None, - ) + # Reset all evaluators + for eval_name, eval_data in self.evaluators.items(): + eval_data["evaluator"] = PerceptionEvaluationManager( + evaluation_config=eval_data["evaluator_config"], + load_ground_truth=False, + metric_output_dir=None, + ) + self._frame_count = 0 + self.gt_count_total = 0 + self.pred_count_total = 0 + self.gt_count_by_label = {} + self.pred_count_by_label = {} + self._last_metrics_by_eval_name = {} def _convert_index_to_label(self, label_index: int) -> Label: """Convert a label index to a Label object. @@ -374,28 +428,57 @@ def add_frame( - num_lidar_pts: int (optional) frame_name: Optional name for the frame. """ - if self.evaluator is None: + needs_reset = any(eval_data["evaluator"] is None for eval_data in self.evaluators.values()) + if needs_reset: self.reset() unix_time = time.time() if frame_name is None: frame_name = str(self._frame_count) + self.pred_count_total += len(predictions) + self.gt_count_total += len(ground_truths) + + for p in predictions: + try: + label = int(p.get("label", -1)) + except Exception: + label = -1 + if 0 <= label < len(self.class_names): + name = self.class_names[label] + self.pred_count_by_label[name] = self.pred_count_by_label.get(name, 0) + 1 + + for g in ground_truths: + try: + label = int(g.get("label", -1)) + except Exception: + label = -1 + if 0 <= label < len(self.class_names): + name = self.class_names[label] + self.gt_count_by_label[name] = self.gt_count_by_label.get(name, 0) + 1 + # Convert predictions to DynamicObject estimated_objects = self._predictions_to_dynamic_objects(predictions, unix_time) # Convert ground truths to FrameGroundTruth frame_ground_truth = self._ground_truths_to_frame_ground_truth(ground_truths, unix_time, frame_name) - # Add frame result to evaluator + # Add frame result to all evaluators try: - self.evaluator.add_frame_result( - unix_time=unix_time, - ground_truth_now_frame=frame_ground_truth, - estimated_objects=estimated_objects, - critical_object_filter_config=self.critical_object_filter_config, - frame_pass_fail_config=self.frame_pass_fail_config, - ) + for eval_name, eval_data in self.evaluators.items(): + if eval_data["evaluator"] is None: + eval_data["evaluator"] = PerceptionEvaluationManager( + evaluation_config=eval_data["evaluator_config"], + load_ground_truth=False, + metric_output_dir=None, + ) + eval_data["evaluator"].add_frame_result( + unix_time=unix_time, + ground_truth_now_frame=frame_ground_truth, + estimated_objects=estimated_objects, + critical_object_filter_config=eval_data["critical_object_filter_config"], + frame_pass_fail_config=eval_data["frame_pass_fail_config"], + ) self._frame_count += 1 except Exception as e: logger.warning(f"Failed to add frame {frame_name}: {e}") @@ -405,22 +488,47 @@ def compute_metrics(self) -> Dict[str, float]: Returns: Dictionary of metrics with keys like: - - mAP_center_distance_bev_0.5 - - mAP_center_distance_bev_1.0 - - mAPH_center_distance_bev_0.5 - - car_AP_center_distance_bev_0.5 + - mAP_center_distance_bev (mean AP across all classes, no threshold) + - mAPH_center_distance_bev (mean APH across all classes, no threshold) + - car_AP_center_distance_bev_0.5 (per-class AP with threshold) + - car_AP_center_distance_bev_1.0 (per-class AP with threshold) + - car_APH_center_distance_bev_0.5 (per-class APH with threshold) + - etc. + For multi-evaluator mode, metrics are prefixed with evaluator name: + - bev_center_0.0-50.0_mAP_center_distance_bev + - bev_center_0.0-50.0_car_AP_center_distance_bev_0.5 + - bev_center_50.0-90.0_mAP_center_distance_bev - etc. + Note: mAP/mAPH keys do not include threshold; only per-class AP/APH keys do. """ - if self.evaluator is None or self._frame_count == 0: + if self._frame_count == 0: logger.warning("No frames to evaluate") return {} try: - # Get scene result (aggregated metrics) - metrics_score: MetricsScore = self.evaluator.get_scene_result() - - # Process metrics into a flat dictionary - return self._process_metrics_score(metrics_score) + # Cache scene results to avoid recomputing + scene_results = {} + for eval_name, eval_data in self.evaluators.items(): + evaluator = eval_data["evaluator"] + if evaluator is None: + continue + + try: + metrics_score = evaluator.get_scene_result() + scene_results[eval_name] = metrics_score + except Exception as e: + logger.warning(f"Error computing metrics for {eval_name}: {e}") + + # Process cached metrics with evaluator name prefix + all_metrics = {} + for eval_name, metrics_score in scene_results.items(): + eval_metrics = self._process_metrics_score(metrics_score, prefix=eval_name) + all_metrics.update(eval_metrics) + + # Cache results for reuse by format_metrics_report() and summary property + self._last_metrics_by_eval_name = scene_results + + return all_metrics except Exception as e: logger.error(f"Error computing metrics: {e}") @@ -429,16 +537,42 @@ def compute_metrics(self) -> Dict[str, float]: traceback.print_exc() return {} - def _process_metrics_score(self, metrics_score: MetricsScore) -> Dict[str, float]: + def format_metrics_report(self) -> str: + """Format the metrics report as a human-readable string. + + For multi-evaluator mode, returns reports for all evaluators with distance range labels. + Uses cached results from compute_metrics() if available to avoid recomputation. + """ + # Use cached results if available, otherwise compute them + if not self._last_metrics_by_eval_name: + # Cache not available, compute now + self.compute_metrics() + + # Format reports for all evaluators using cached results + reports = [] + for eval_name, metrics_score in self._last_metrics_by_eval_name.items(): + try: + # Extract distance range from evaluator name (e.g., "bev_center_0.0-50.0" -> "0.0-50.0") + distance_range = eval_name.replace("bev_center_", "") + report = f"\n{'='*80}\nDistance Range: {distance_range} m\n{'='*80}\n{str(metrics_score)}" + reports.append(report) + except Exception as e: + logger.warning(f"Error formatting report for {eval_name}: {e}") + + return "\n".join(reports) if reports else "" + + def _process_metrics_score(self, metrics_score: MetricsScore, prefix: Optional[str] = None) -> Dict[str, float]: """Process MetricsScore into a flat dictionary. Args: metrics_score: MetricsScore instance from evaluator. + prefix: Optional prefix to add to metric keys (for multi-evaluator mode). Returns: Flat dictionary of metrics. """ metric_dict = {} + key_prefix = f"{prefix}_" if prefix else "" for map_instance in metrics_score.mean_ap_values: matching_mode = map_instance.matching_mode.value.lower().replace(" ", "_") @@ -452,43 +586,127 @@ def _process_metrics_score(self, metrics_score: MetricsScore) -> Dict[str, float ap_value = ap.ap # Create the metric key - key = f"{label_name}_AP_{matching_mode}_{threshold}" + key = f"{key_prefix}{label_name}_AP_{matching_mode}_{threshold}" metric_dict[key] = ap_value + # Process individual APH values + label_to_aphs = getattr(map_instance, "label_to_aphs", None) + if label_to_aphs: + for label, aphs in label_to_aphs.items(): + label_name = label.value + for aph in aphs: + threshold = aph.matching_threshold + aph_value = getattr(aph, "aph", None) + if aph_value is None: + aph_value = getattr(aph, "ap", None) + if aph_value is None: + continue + key = f"{key_prefix}{label_name}_APH_{matching_mode}_{threshold}" + metric_dict[key] = aph_value + # Add mAP and mAPH values - map_key = f"mAP_{matching_mode}" - maph_key = f"mAPH_{matching_mode}" + map_key = f"{key_prefix}mAP_{matching_mode}" + maph_key = f"{key_prefix}mAPH_{matching_mode}" metric_dict[map_key] = map_instance.map metric_dict[maph_key] = map_instance.maph return metric_dict - def get_summary(self) -> DetectionSummary: - """Get a summary of the evaluation including mAP and per-class metrics.""" + @staticmethod + def _extract_matching_modes(metrics: Mapping[str, float]) -> List[str]: + """Extract matching modes from metrics dict keys (e.g., 'mAP_center_distance_bev' -> 'center_distance_bev'). + + Supports both prefixed and non-prefixed formats: + - Non-prefixed: "mAP_center_distance_bev" + - Prefixed: "bev_center_0.0-50.0_mAP_center_distance_bev" + """ + # Matches either "mAP_" or "_mAP_" + pat = re.compile(r"(?:^|_)mAP_(.+)$") + modes: List[str] = [] + for k in metrics.keys(): + m = pat.search(k) + if m: + modes.append(m.group(1)) + # Remove duplicates while preserving order + return list(dict.fromkeys(modes)) + + @property + def summary(self) -> DetectionSummary: + """Get a summary of the evaluation including mAP and per-class metrics for all matching modes. + + Only uses metrics from the last distance bucket. + """ metrics = self.compute_metrics() - # Extract primary metrics (first mAP value found) - primary_map = None - primary_maph = None - per_class_ap = {} - - for key, value in metrics.items(): - if key.startswith("mAP_") and primary_map is None: - primary_map = value - elif key.startswith("mAPH_") and primary_maph is None: - primary_maph = value - elif "_AP_" in key and not key.startswith("mAP"): - # Extract class name from key - parts = key.split("_AP_") - if len(parts) == 2: - class_name = parts[0] - if class_name not in per_class_ap: - per_class_ap[class_name] = value + if not self._bev_distance_ranges: + return DetectionSummary( + mAP_by_mode={}, + mAPH_by_mode={}, + per_class_ap_by_mode={}, + num_frames=self._frame_count, + detailed_metrics=metrics, + ) + + # Use the last distance bucket (should be the full range) + last_min_dist, last_max_dist = self._bev_distance_ranges[-1] + last_evaluator_name = f"bev_center_{last_min_dist}-{last_max_dist}" + + last_metrics_score = self._last_metrics_by_eval_name.get(last_evaluator_name) + if last_metrics_score is None: + return DetectionSummary( + mAP_by_mode={}, + mAPH_by_mode={}, + per_class_ap_by_mode={}, + num_frames=self._frame_count, + detailed_metrics=metrics, + ) + + last_bucket_metrics = self._process_metrics_score(last_metrics_score, prefix=None) + + modes = self._extract_matching_modes(last_bucket_metrics) + if not modes: + return DetectionSummary( + mAP_by_mode={}, + mAPH_by_mode={}, + per_class_ap_by_mode={}, + num_frames=self._frame_count, + detailed_metrics=metrics, + ) + + mAP_by_mode: Dict[str, float] = {} + mAPH_by_mode: Dict[str, float] = {} + per_class_ap_by_mode: Dict[str, Dict[str, float]] = {} + + for mode in modes: + # Get mAP and mAPH directly from last bucket metrics + map_key = f"mAP_{mode}" + maph_key = f"mAPH_{mode}" + + mAP_by_mode[mode] = last_bucket_metrics.get(map_key, 0.0) + mAPH_by_mode[mode] = last_bucket_metrics.get(maph_key, 0.0) + + # Collect AP values per class for this mode from the last bucket + per_class_ap_values: Dict[str, List[float]] = {} + ap_key_separator = f"_AP_{mode}_" + + for key, value in last_bucket_metrics.items(): + idx = key.find(ap_key_separator) + if idx < 0: + continue + + # Label is the token right before "_AP_{mode}_" + prefix_part = key[:idx] + class_name = prefix_part.split("_")[-1] if prefix_part else "" + if class_name: + per_class_ap_values.setdefault(class_name, []).append(float(value)) + + if per_class_ap_values: + per_class_ap_by_mode[mode] = {k: float(np.mean(v)) for k, v in per_class_ap_values.items() if v} return DetectionSummary( - mAP=primary_map or 0.0, - mAPH=primary_maph or 0.0, - per_class_ap=per_class_ap, + mAP_by_mode=mAP_by_mode, + mAPH_by_mode=mAPH_by_mode, + per_class_ap_by_mode=per_class_ap_by_mode, num_frames=self._frame_count, detailed_metrics=metrics, ) diff --git a/deployment/docs/configuration.md b/deployment/docs/configuration.md index 06813a335..9b4ba654c 100644 --- a/deployment/docs/configuration.md +++ b/deployment/docs/configuration.md @@ -4,6 +4,10 @@ Configurations remain dictionary-driven for flexibility, with typed dataclasses ## Structure +### Single-Model Export (Simple Models) + +For simple models with a single ONNX/TensorRT output: + ```python # Task type task_type = "detection3d" # or "detection2d", "classification" @@ -40,16 +44,95 @@ onnx_config = dict( opset_version=16, do_constant_folding=True, save_file="model.onnx", - multi_file=False, ) -backend_config = dict( - common_config=dict( - precision_policy="auto", - max_workspace_size=1 << 30, +tensorrt_config = dict( + precision_policy="auto", + max_workspace_size=1 << 30, +) +``` + +### Multi-File Export (Complex Models like CenterPoint) + +For models that export to multiple ONNX/TensorRT files, use the `components` config: + +```python +task_type = "detection3d" +checkpoint_path = "work_dirs/centerpoint/best_checkpoint.pth" + +devices = dict( + cpu="cpu", + cuda="cuda:0", +) + +export = dict( + mode="both", + work_dir="work_dirs/centerpoint_deployment", +) + +# Unified component configuration (single source of truth) +# Each component defines: name, file paths, IO spec, and TensorRT profile +components = dict( + voxel_encoder=dict( + name="pts_voxel_encoder", + onnx_file="pts_voxel_encoder.onnx", + engine_file="pts_voxel_encoder.engine", + io=dict( + inputs=[dict(name="input_features", dtype="float32")], + outputs=[dict(name="pillar_features", dtype="float32")], + dynamic_axes={ + "input_features": {0: "num_voxels", 1: "num_max_points"}, + "pillar_features": {0: "num_voxels"}, + }, + ), + tensorrt_profile=dict( + input_features=dict( + min_shape=[1000, 32, 11], + opt_shape=[20000, 32, 11], + max_shape=[64000, 32, 11], + ), + ), + ), + backbone_head=dict( + name="pts_backbone_neck_head", + onnx_file="pts_backbone_neck_head.onnx", + engine_file="pts_backbone_neck_head.engine", + io=dict( + inputs=[dict(name="spatial_features", dtype="float32")], + outputs=[ + dict(name="heatmap", dtype="float32"), + dict(name="reg", dtype="float32"), + # ... more outputs + ], + dynamic_axes={...}, + ), + tensorrt_profile=dict( + spatial_features=dict( + min_shape=[1, 32, 760, 760], + opt_shape=[1, 32, 760, 760], + max_shape=[1, 32, 760, 760], + ), + ), ), ) +# Shared ONNX settings (applied to all components) +onnx_config = dict( + opset_version=16, + do_constant_folding=True, + simplify=False, +) + +# Shared TensorRT settings (applied to all components) +tensorrt_config = dict( + precision_policy="auto", + max_workspace_size=2 << 30, +) +``` + +### Verification and Evaluation + +```python verification = dict( enabled=True, num_verify_samples=3, @@ -138,4 +221,4 @@ Use `from_mapping()` / `from_dict()` helpers to instantiate typed configs from e ## Example Config Paths -- `deployment/projects/centerpoint/config/deploy_config.py` +- `deployment/projects/centerpoint/config/deploy_config.py` - Multi-file export example diff --git a/deployment/docs/export_pipeline.md b/deployment/docs/export_pipeline.md index e6a9ed963..2fe5ffe7a 100644 --- a/deployment/docs/export_pipeline.md +++ b/deployment/docs/export_pipeline.md @@ -19,16 +19,55 @@ ## Multi-File Export (CenterPoint) -CenterPoint splits the model into multiple ONNX/TensorRT artifacts: +CenterPoint splits the model into multiple ONNX/TensorRT artifacts using a unified `components` configuration: -- `voxel_encoder.onnx` -- `backbone_head.onnx` +```python +components = dict( + voxel_encoder=dict( + name="pts_voxel_encoder", + onnx_file="pts_voxel_encoder.onnx", # ONNX output filename + engine_file="pts_voxel_encoder.engine", # TensorRT output filename + io=dict( + inputs=[dict(name="input_features", dtype="float32")], + outputs=[dict(name="pillar_features", dtype="float32")], + dynamic_axes={...}, + ), + tensorrt_profile=dict( + input_features=dict(min_shape=[...], opt_shape=[...], max_shape=[...]), + ), + ), + backbone_head=dict( + name="pts_backbone_neck_head", + onnx_file="pts_backbone_neck_head.onnx", + engine_file="pts_backbone_neck_head.engine", + io=dict(...), + tensorrt_profile=dict(...), + ), +) +``` + +### Configuration Structure + +Each component in `deploy_cfg.components` defines: + +- `name`: Component identifier used during export +- `onnx_file`: Output ONNX filename +- `engine_file`: Output TensorRT engine filename +- `io`: Input/output specification (names, dtypes, dynamic_axes) +- `tensorrt_profile`: TensorRT optimization profile (min/opt/max shapes) + +### Export Pipeline Orchestration Export pipelines orchestrate: -- Sequential export of each component. -- Input/output wiring between stages. -- Directory structure management. +- Sequential export of each component +- Input/output wiring between stages +- Directory structure management + +CenterPoint uses a project-specific `ModelComponentExtractor` implementation that provides: + +- `extract_features(model, data_loader, sample_idx)`: project-specific feature extraction for tracing +- `extract_components(model, sample_data)`: splitting into ONNX-exportable submodules and per-component config overrides ## Verification-Oriented Exports @@ -48,3 +87,18 @@ runner = CenterPointDeploymentRunner( ``` Simple projects can skip export pipelines entirely and rely on the base exporters provided by `ExporterFactory`. + +## Runtime Pipeline Usage + +Runtime pipelines receive the `components_cfg` through constructor injection: + +```python +pipeline = CenterPointONNXPipeline( + pytorch_model=model, + onnx_dir="/path/to/onnx", + device="cuda:0", + components_cfg=deploy_cfg["components"], # Pass component config +) +``` + +This allows pipelines to resolve artifact paths from the unified config. diff --git a/deployment/exporters/common/base_exporter.py b/deployment/exporters/common/base_exporter.py index 057ef9712..e2105694b 100644 --- a/deployment/exporters/common/base_exporter.py +++ b/deployment/exporters/common/base_exporter.py @@ -66,7 +66,7 @@ def prepare_model(self, model: torch.nn.Module) -> torch.nn.Module: return self._model_wrapper(model) @abstractmethod - def export(self, model: torch.nn.Module, sample_input: Any, output_path: str, **kwargs) -> None: + def export(self, model: torch.nn.Module, sample_input: Any, output_path: str) -> None: """ Export model to target format. @@ -74,7 +74,6 @@ def export(self, model: torch.nn.Module, sample_input: Any, output_path: str, ** model: PyTorch model to export sample_input: Example model input(s) for tracing/shape inference output_path: Path to save exported model - **kwargs: Additional format-specific arguments Raises: RuntimeError: If export fails diff --git a/deployment/exporters/common/configs.py b/deployment/exporters/common/configs.py index 76d6bc4b1..3872eec7a 100644 --- a/deployment/exporters/common/configs.py +++ b/deployment/exporters/common/configs.py @@ -34,7 +34,9 @@ def _normalize_shape(shape: Optional[Iterable[int]]) -> Tuple[int, ...]: return tuple() return tuple(int(dim) for dim in shape) + @property def has_complete_profile(self) -> bool: + """Whether all three shape profiles (min, opt, max) are configured.""" return bool(self.min_shape and self.opt_shape and self.max_shape) @@ -46,10 +48,20 @@ class TensorRTModelInputConfig: @classmethod def from_dict(cls, data: Mapping[str, Any]) -> TensorRTModelInputConfig: - input_shapes_raw = data.get("input_shapes", {}) or {} - profile_map = { - name: TensorRTProfileConfig.from_dict(shape_dict or {}) for name, shape_dict in input_shapes_raw.items() - } + input_shapes_raw = data.get("input_shapes") + if input_shapes_raw is None: + input_shapes_raw = {} + if not isinstance(input_shapes_raw, Mapping): + raise TypeError(f"input_shapes must be a mapping, got {type(input_shapes_raw).__name__}") + + profile_map = {} + for name, shape_dict in input_shapes_raw.items(): + if shape_dict is None: + shape_dict = {} + elif not isinstance(shape_dict, Mapping): + raise TypeError(f"input_shapes.{name} must be a mapping, got {type(shape_dict).__name__}") + profile_map[name] = TensorRTProfileConfig.from_dict(shape_dict) + return cls(input_shapes=MappingProxyType(profile_map)) @@ -140,7 +152,7 @@ def from_mapping(cls, data: Mapping[str, Any]) -> TensorRTExportConfig: ) return cls( precision_policy=str(data.get("precision_policy", cls.precision_policy)), - policy_flags=MappingProxyType(dict(data.get("policy_flags", {}))), + policy_flags=MappingProxyType(data.get("policy_flags", {})), max_workspace_size=int(data.get("max_workspace_size", cls.max_workspace_size)), model_inputs=parsed_inputs, ) diff --git a/deployment/exporters/common/factory.py b/deployment/exporters/common/factory.py index 9533f2d12..c58192890 100644 --- a/deployment/exporters/common/factory.py +++ b/deployment/exporters/common/factory.py @@ -5,9 +5,10 @@ from __future__ import annotations import logging -from typing import Type +from typing import Optional, Type from deployment.core import BaseDeploymentConfig +from deployment.exporters.common.configs import TensorRTExportConfig from deployment.exporters.common.model_wrappers import BaseModelWrapper from deployment.exporters.common.onnx_exporter import ONNXExporter from deployment.exporters.common.tensorrt_exporter import TensorRTExporter @@ -38,12 +39,21 @@ def create_onnx_exporter( def create_tensorrt_exporter( config: BaseDeploymentConfig, logger: logging.Logger, + config_override: Optional[TensorRTExportConfig] = None, ) -> TensorRTExporter: """ Build a TensorRT exporter using the deployment config settings. + + Args: + config: Deployment configuration + logger: Logger instance + config_override: Optional TensorRT config to use instead of the one + derived from the deployment config. Useful for + per-component configurations in multi-file exports. """ + trt_config = config_override if config_override is not None else config.get_tensorrt_settings() return TensorRTExporter( - config=config.get_tensorrt_settings(), + config=trt_config, logger=logger, ) diff --git a/deployment/exporters/common/model_wrappers.py b/deployment/exporters/common/model_wrappers.py index 24b798ba3..7f40efa07 100644 --- a/deployment/exporters/common/model_wrappers.py +++ b/deployment/exporters/common/model_wrappers.py @@ -10,7 +10,6 @@ """ from abc import ABC, abstractmethod -from typing import Any, Dict import torch import torch.nn as nn @@ -27,20 +26,18 @@ class BaseModelWrapper(nn.Module, ABC): base class if special output format conversion is needed. """ - def __init__(self, model: nn.Module, **kwargs): + def __init__(self, model: nn.Module): """ Initialize wrapper. Args: model: PyTorch model to wrap - **kwargs: Wrapper-specific arguments """ super().__init__() self.model = model - self._wrapper_config = kwargs @abstractmethod - def forward(self, *args, **kwargs): + def forward(self, *args): """ Forward pass for ONNX export. @@ -48,10 +45,6 @@ def forward(self, *args, **kwargs): """ raise NotImplementedError - def get_config(self) -> Dict[str, Any]: - """Get wrapper configuration.""" - return self._wrapper_config - class IdentityWrapper(BaseModelWrapper): """ @@ -61,9 +54,9 @@ class IdentityWrapper(BaseModelWrapper): This is the default wrapper for most models. """ - def __init__(self, model: nn.Module, **kwargs): - super().__init__(model, **kwargs) + def __init__(self, model: nn.Module): + super().__init__(model) - def forward(self, *args, **kwargs): + def forward(self, *args): """Forward pass without modification.""" - return self.model(*args, **kwargs) + return self.model(*args) diff --git a/deployment/exporters/common/tensorrt_exporter.py b/deployment/exporters/common/tensorrt_exporter.py index 0ecede689..6abc4026f 100644 --- a/deployment/exporters/common/tensorrt_exporter.py +++ b/deployment/exporters/common/tensorrt_exporter.py @@ -312,7 +312,7 @@ def _configure_input_shapes( f" - sample_input: {sample_input}\n" "\n" "Example config:\n" - " backend_config = dict(\n" + " tensorrt_config = dict(\n" " model_inputs=[\n" " dict(\n" " input_shapes={\n" @@ -358,7 +358,12 @@ def _extract_input_shapes(self, entry: Any) -> Mapping[str, Any]: if isinstance(entry, TensorRTModelInputConfig): return entry.input_shapes if isinstance(entry, Mapping): - return entry.get("input_shapes", {}) or {} + input_shapes = entry.get("input_shapes") + if input_shapes is None: + input_shapes = {} + if not isinstance(input_shapes, Mapping): + raise TypeError(f"input_shapes must be a mapping, got {type(input_shapes).__name__}") + return input_shapes raise TypeError(f"Unsupported TensorRT model input entry: {type(entry)}") def _resolve_profile_shapes( diff --git a/deployment/exporters/export_pipelines/base.py b/deployment/exporters/export_pipelines/base.py index 1b0ff7d0b..438fc45c6 100644 --- a/deployment/exporters/export_pipelines/base.py +++ b/deployment/exporters/export_pipelines/base.py @@ -55,7 +55,6 @@ def export( output_dir: str, config: BaseDeploymentConfig, device: str, - data_loader: BaseDataLoader, ) -> Artifact: """ Execute the TensorRT export pipeline and return the produced artifact. @@ -65,7 +64,6 @@ def export( output_dir: Directory for output files config: Deployment configuration device: CUDA device string - data_loader: Data loader for samples Returns: Artifact describing the exported TensorRT output diff --git a/deployment/exporters/export_pipelines/interfaces.py b/deployment/exporters/export_pipelines/interfaces.py index a2ebd7ee7..09524326d 100644 --- a/deployment/exporters/export_pipelines/interfaces.py +++ b/deployment/exporters/export_pipelines/interfaces.py @@ -7,7 +7,7 @@ from abc import ABC, abstractmethod from dataclasses import dataclass -from typing import Any, List, Optional, Tuple +from typing import Any, List, Optional import torch @@ -63,4 +63,29 @@ def extract_components(self, model: torch.nn.Module, sample_data: Any) -> List[E Returns: List of ExportableComponent instances ready for ONNX export """ - pass + ... + + @abstractmethod + def extract_features( + self, + model: torch.nn.Module, + data_loader: Any, + sample_idx: int, + ) -> Any: + """ + Extract model-specific intermediate features required for multi-component export. + + Some models require running a portion of the network to generate the input + tensor(s) for later components. This method encapsulates that model-specific + logic and returns a standardized tuple used by `extract_components`. + + Args: + model: PyTorch model used for feature extraction + data_loader: Data loader used to access the sample + sample_idx: Sample index used for tracing/feature extraction + + Returns: + A tuple of (input_features, voxel_dict) or other model-specific payload + that `extract_components` expects. + """ + ... diff --git a/deployment/pipelines/base_factory.py b/deployment/pipelines/base_factory.py index 4aa9f5c71..0576777c5 100644 --- a/deployment/pipelines/base_factory.py +++ b/deployment/pipelines/base_factory.py @@ -36,7 +36,7 @@ def create_pipeline( model_spec: ModelSpec, pytorch_model: Any, device: Optional[str] = None, - **kwargs, + components_cfg: Optional[Any] = None, ) -> BaseDeploymentPipeline: """Build and return a pipeline instance for the given model spec. @@ -49,7 +49,7 @@ def create_pipeline( model_spec: Describes the model path/device/backend and any metadata. pytorch_model: A loaded PyTorch model (used for PYTORCH backends). device: Optional device override (defaults to `model_spec.device`). - **kwargs: Project-specific options passed from evaluator/CLI. + components_cfg: Project-specific component configuration (e.g., file paths, IO specs). """ raise NotImplementedError diff --git a/deployment/pipelines/base_pipeline.py b/deployment/pipelines/base_pipeline.py index efa369523..47ee46120 100644 --- a/deployment/pipelines/base_pipeline.py +++ b/deployment/pipelines/base_pipeline.py @@ -7,7 +7,7 @@ import logging import time from abc import ABC, abstractmethod -from typing import Any, Dict, Optional, Tuple, Union +from typing import Any, Dict, Mapping, Optional, Tuple, Union import torch @@ -44,7 +44,7 @@ def __init__(self, model: Any, device: str = "cpu", task_type: str = "unknown", logger.info(f"Initialized {self.__class__.__name__} on device: {self.device}") @abstractmethod - def preprocess(self, input_data: Any, **kwargs) -> Any: + def preprocess(self, input_data: Any) -> Any: """Convert raw input into model-ready tensors/arrays. Implementations may optionally return a tuple `(model_input, metadata_dict)` @@ -63,12 +63,12 @@ def run_model(self, preprocessed_input: Any) -> Union[Any, Tuple[Any, Dict[str, raise NotImplementedError @abstractmethod - def postprocess(self, model_output: Any, metadata: Dict = None) -> Any: + def postprocess(self, model_output: Any, metadata: Optional[Mapping[str, Any]] = None) -> Any: """Convert raw model outputs into final predictions/results.""" raise NotImplementedError def infer( - self, input_data: Any, metadata: Optional[Dict] = None, return_raw_outputs: bool = False, **kwargs + self, input_data: Any, metadata: Optional[Mapping[str, Any]] = None, return_raw_outputs: bool = False ) -> InferenceResult: """Run end-to-end inference with latency breakdown. @@ -81,7 +81,6 @@ def infer( input_data: Raw input sample(s) in a project-defined format. metadata: Optional auxiliary context merged with preprocess metadata. return_raw_outputs: If True, skip `postprocess` and return raw model output. - **kwargs: Forwarded to `preprocess` for project-specific options. Returns: InferenceResult with `output`, total latency, and per-stage breakdown. @@ -94,7 +93,7 @@ def infer( try: start_time = time.perf_counter() - preprocessed = self.preprocess(input_data, **kwargs) + preprocessed = self.preprocess(input_data) preprocess_metadata = {} model_input = preprocessed @@ -105,8 +104,10 @@ def infer( latency_breakdown["preprocessing_ms"] = (preprocess_time - start_time) * 1000 merged_metadata = {} - merged_metadata.update(metadata or {}) - merged_metadata.update(preprocess_metadata) + if metadata is not None: + merged_metadata.update(metadata) + if preprocess_metadata is not None: + merged_metadata.update(preprocess_metadata) model_start = time.perf_counter() model_result = self.run_model(model_input) diff --git a/deployment/pipelines/factory.py b/deployment/pipelines/factory.py index b7ef2290f..0dcce4cef 100644 --- a/deployment/pipelines/factory.py +++ b/deployment/pipelines/factory.py @@ -51,7 +51,7 @@ def create( model_spec: ModelSpec, pytorch_model: Any, device: Optional[str] = None, - **kwargs, + components_cfg: Optional[Any] = None, ) -> BaseDeploymentPipeline: """ Create a pipeline for the specified project. @@ -61,7 +61,7 @@ def create( model_spec: Model specification (backend/device/path) pytorch_model: PyTorch model instance device: Override device (uses model_spec.device if None) - **kwargs: Project-specific arguments + components_cfg: Project-specific component configuration Returns: Pipeline instance @@ -75,6 +75,7 @@ def create( ... "centerpoint", ... model_spec, ... pytorch_model, + ... components_cfg=components_cfg, ... ) """ return pipeline_registry.create_pipeline( @@ -82,7 +83,7 @@ def create( model_spec=model_spec, pytorch_model=pytorch_model, device=device, - **kwargs, + components_cfg=components_cfg, ) @staticmethod diff --git a/deployment/pipelines/gpu_resource_mixin.py b/deployment/pipelines/gpu_resource_mixin.py index d71082a8b..3f4db2048 100644 --- a/deployment/pipelines/gpu_resource_mixin.py +++ b/deployment/pipelines/gpu_resource_mixin.py @@ -80,7 +80,8 @@ def allocate(self, nbytes: int) -> Any: self._allocations.append(allocation) return allocation - def get_stream(self) -> Any: + @property + def stream(self) -> Any: """Return a lazily-created CUDA stream shared by the manager.""" if self._stream is None: self._stream = cuda.Stream() diff --git a/deployment/pipelines/registry.py b/deployment/pipelines/registry.py index 03e9f27b8..585da3c09 100644 --- a/deployment/pipelines/registry.py +++ b/deployment/pipelines/registry.py @@ -73,7 +73,7 @@ def create_pipeline( model_spec: ModelSpec, pytorch_model: Any, device: Optional[str] = None, - **kwargs, + components_cfg: Optional[Any] = None, ) -> BaseDeploymentPipeline: """Create a project-specific pipeline instance using the registered factory. @@ -85,7 +85,7 @@ def create_pipeline( model_spec=model_spec, pytorch_model=pytorch_model, device=device, - **kwargs, + components_cfg=components_cfg, ) def list_projects(self) -> list: diff --git a/deployment/projects/centerpoint/config/deploy_config.py b/deployment/projects/centerpoint/config/deploy_config.py new file mode 100644 index 000000000..227811a43 --- /dev/null +++ b/deployment/projects/centerpoint/config/deploy_config.py @@ -0,0 +1,183 @@ +""" +CenterPoint Deployment Configuration +""" + +# ============================================================================ +# Task type for pipeline building +# Options: 'detection2d', 'detection3d', 'classification', 'segmentation' +# ============================================================================ +task_type = "detection3d" + +# ============================================================================ +# Checkpoint Path - Single source of truth for PyTorch model +# ============================================================================ +checkpoint_path = "work_dirs/centerpoint/best_checkpoint.pth" + +# ============================================================================ +# Device settings (shared by export, evaluation, verification) +# ============================================================================ +devices = dict( + cpu="cpu", + cuda="cuda:0", +) + +# ============================================================================ +# Export Configuration +# ============================================================================ +export = dict( + mode="both", + work_dir="work_dirs/centerpoint_deployment", + onnx_path=None, +) + +# Derived artifact directories +_WORK_DIR = str(export["work_dir"]).rstrip("/") +_ONNX_DIR = f"{_WORK_DIR}/onnx" +_TENSORRT_DIR = f"{_WORK_DIR}/tensorrt" + +# ============================================================================ +# Unified Component Configuration (Single Source of Truth) +# +# Each component defines: +# - name: Component identifier used in export +# - onnx_file: Output ONNX filename +# - engine_file: Output TensorRT engine filename +# - io: Input/output specification for ONNX export +# - tensorrt_profile: TensorRT optimization profile (min/opt/max shapes) +# ============================================================================ +components = dict( + voxel_encoder=dict( + name="pts_voxel_encoder", + onnx_file="pts_voxel_encoder.onnx", + engine_file="pts_voxel_encoder.engine", + io=dict( + inputs=[ + dict(name="input_features", dtype="float32"), + ], + outputs=[ + dict(name="pillar_features", dtype="float32"), + ], + dynamic_axes={ + "input_features": {0: "num_voxels", 1: "num_max_points"}, + "pillar_features": {0: "num_voxels"}, + }, + ), + tensorrt_profile=dict( + input_features=dict( + min_shape=[1000, 32, 11], + opt_shape=[20000, 32, 11], + max_shape=[64000, 32, 11], + ), + ), + ), + backbone_head=dict( + name="pts_backbone_neck_head", + onnx_file="pts_backbone_neck_head.onnx", + engine_file="pts_backbone_neck_head.engine", + io=dict( + inputs=[ + dict(name="spatial_features", dtype="float32"), + ], + outputs=[ + dict(name="heatmap", dtype="float32"), + dict(name="reg", dtype="float32"), + dict(name="height", dtype="float32"), + dict(name="dim", dtype="float32"), + dict(name="rot", dtype="float32"), + dict(name="vel", dtype="float32"), + ], + dynamic_axes={ + "spatial_features": {0: "batch_size", 2: "height", 3: "width"}, + "heatmap": {0: "batch_size", 2: "height", 3: "width"}, + "reg": {0: "batch_size", 2: "height", 3: "width"}, + "height": {0: "batch_size", 2: "height", 3: "width"}, + "dim": {0: "batch_size", 2: "height", 3: "width"}, + "rot": {0: "batch_size", 2: "height", 3: "width"}, + "vel": {0: "batch_size", 2: "height", 3: "width"}, + }, + ), + tensorrt_profile=dict( + spatial_features=dict( + min_shape=[1, 32, 1020, 1020], + opt_shape=[1, 32, 1020, 1020], + max_shape=[1, 32, 1020, 1020], + ), + ), + ), +) + +# ============================================================================ +# Runtime I/O settings +# ============================================================================ +runtime_io = dict( + # This should be a path relative to `data_root` in the model config. + info_file="info/t4dataset_j6gen2_infos_val.pkl", + sample_idx=1, +) + +# ============================================================================ +# ONNX Export Settings (shared across all components) +# ============================================================================ +onnx_config = dict( + opset_version=16, + do_constant_folding=True, + export_params=True, + keep_initializers_as_inputs=False, + simplify=False, +) + +# ============================================================================ +# TensorRT Build Settings (shared across all components) +# ============================================================================ +tensorrt_config = dict( + precision_policy="auto", + max_workspace_size=2 << 30, +) + +# ============================================================================ +# Evaluation Configuration +# ============================================================================ +evaluation = dict( + enabled=True, + num_samples=1, + verbose=True, + backends=dict( + pytorch=dict( + enabled=True, + device=devices["cuda"], + ), + onnx=dict( + enabled=True, + device=devices["cuda"], + model_dir=_ONNX_DIR, + ), + tensorrt=dict( + enabled=True, + device=devices["cuda"], + engine_dir=_TENSORRT_DIR, + ), + ), +) + +# ============================================================================ +# Verification Configuration +# ============================================================================ +verification = dict( + enabled=False, + tolerance=1e-1, + num_verify_samples=1, + devices=devices, + scenarios=dict( + both=[ + dict(ref_backend="pytorch", ref_device="cpu", test_backend="onnx", test_device="cpu"), + dict(ref_backend="onnx", ref_device="cuda", test_backend="tensorrt", test_device="cuda"), + ], + onnx=[ + dict(ref_backend="pytorch", ref_device="cpu", test_backend="onnx", test_device="cpu"), + ], + trt=[ + dict(ref_backend="onnx", ref_device="cuda", test_backend="tensorrt", test_device="cuda"), + ], + none=[], + ), +) diff --git a/deployment/projects/registry.py b/deployment/projects/registry.py index a64bc73a7..c5932323c 100644 --- a/deployment/projects/registry.py +++ b/deployment/projects/registry.py @@ -48,7 +48,7 @@ def get(self, name: str) -> ProjectAdapter: raise KeyError(f"Unknown project '{name}'. Available: [{available}]") return self._adapters[key] - def list(self) -> list[str]: + def list_projects(self) -> list[str]: return sorted(self._adapters.keys()) diff --git a/deployment/runtime/artifact_manager.py b/deployment/runtime/artifact_manager.py index 6c81910a2..2996e4c15 100644 --- a/deployment/runtime/artifact_manager.py +++ b/deployment/runtime/artifact_manager.py @@ -74,13 +74,13 @@ def resolve_artifact(self, backend: Backend) -> Tuple[Optional[Artifact], bool]: """ artifact = self.artifacts.get(backend.value) if artifact: - return artifact, artifact.exists() + return artifact, artifact.exists config_path = self._get_config_path(backend) if config_path: is_dir = osp.isdir(config_path) if osp.exists(config_path) else False artifact = Artifact(path=config_path, multi_file=is_dir) - return artifact, artifact.exists() + return artifact, artifact.exists return None, False diff --git a/deployment/runtime/evaluation_orchestrator.py b/deployment/runtime/evaluation_orchestrator.py index 44fe299ef..1759535da 100644 --- a/deployment/runtime/evaluation_orchestrator.py +++ b/deployment/runtime/evaluation_orchestrator.py @@ -76,7 +76,7 @@ def run(self, artifact_manager: ArtifactManager) -> Dict[str, Any]: num_samples = eval_config.num_samples if num_samples == -1: - num_samples = self.data_loader.get_num_samples() + num_samples = self.data_loader.num_samples verbose_mode = eval_config.verbose all_results: Dict[str, Any] = {} @@ -119,7 +119,7 @@ def _get_models_to_evaluate(self, artifact_manager: ArtifactManager) -> List[Mod Returns: List of model specifications """ - backends = self.config.get_evaluation_backends() + backends = self.config.evaluation_backends models_to_evaluate: List[ModelSpec] = [] for backend_key, backend_cfg in backends.items(): @@ -198,14 +198,20 @@ def _print_cross_backend_comparison(self, all_results: Mapping[str, Any]) -> Non if results and "error" not in results: if "accuracy" in results: self.logger.info(f" Accuracy: {results.get('accuracy', 0):.4f}") - if "mAP" in results: - self.logger.info(f" mAP: {results.get('mAP', 0):.4f}") - - if "latency_stats" in results: - stats = results["latency_stats"] - self.logger.info(f" Latency: {stats['mean_ms']:.2f} ± {stats['std_ms']:.2f} ms") - elif "latency" in results: + if "mAP_by_mode" in results: + mAP_by_mode = results.get("mAP_by_mode", {}) + if mAP_by_mode: + for mode, map_value in mAP_by_mode.items(): + self.logger.info(f" mAP ({mode}): {map_value:.4f}") + + if "mAPH_by_mode" in results: + mAPH_by_mode = results.get("mAPH_by_mode", {}) + if mAPH_by_mode: + for mode, maph_value in mAPH_by_mode.items(): + self.logger.info(f" mAPH ({mode}): {maph_value:.4f}") + + if "latency" in results: latency = results["latency"] - self.logger.info(f" Latency: {latency['mean_ms']:.2f} ± {latency['std_ms']:.2f} ms") + self.logger.info(f" Latency: {latency.mean_ms:.2f} ± {latency.std_ms:.2f} ms") else: self.logger.info(" No results available") diff --git a/deployment/runtime/export_orchestrator.py b/deployment/runtime/export_orchestrator.py index ea540ec39..633f2d80a 100644 --- a/deployment/runtime/export_orchestrator.py +++ b/deployment/runtime/export_orchestrator.py @@ -124,8 +124,8 @@ def run(self, context: Optional[ExportContext] = None) -> ExportResult: result = ExportResult() - should_export_onnx = self.config.export_config.should_export_onnx() - should_export_trt = self.config.export_config.should_export_tensorrt() + should_export_onnx = self.config.export_config.should_export_onnx + should_export_trt = self.config.export_config.should_export_tensorrt checkpoint_path = self.config.checkpoint_path external_onnx_path = self.config.export_config.onnx_path @@ -158,7 +158,7 @@ def _determine_pytorch_requirements(self) -> bool: Returns: True if PyTorch model is needed, False otherwise """ - if self.config.export_config.should_export_onnx(): + if self.config.export_config.should_export_onnx: return True eval_config = self.config.evaluation_config @@ -317,7 +317,7 @@ def _export_onnx(self, pytorch_model: Any, context: ExportContext) -> Optional[A Returns: Artifact representing the exported ONNX model or None if export is not configured """ - if not self.config.export_config.should_export_onnx(): + if not self.config.export_config.should_export_onnx: return None if self._onnx_pipeline is None and self._onnx_wrapper_cls is None: @@ -386,7 +386,7 @@ def _export_tensorrt(self, onnx_path: str, context: ExportContext) -> Optional[A Returns: Artifact representing the exported TensorRT engine or None if export is not configured """ - if not self.config.export_config.should_export_tensorrt(): + if not self.config.export_config.should_export_tensorrt: return None if not onnx_path: @@ -406,7 +406,7 @@ def _export_tensorrt(self, onnx_path: str, context: ExportContext) -> Optional[A output_path = self._get_tensorrt_output_path(onnx_path, tensorrt_dir) cuda_device = self.config.devices.cuda - device_id = self.config.devices.get_cuda_device_index() + device_id = self.config.devices.cuda_device_index if cuda_device is None or device_id is None: raise RuntimeError("TensorRT export requires a CUDA device. Set deploy_cfg.devices['cuda'].") torch.cuda.set_device(device_id) @@ -421,7 +421,6 @@ def _export_tensorrt(self, onnx_path: str, context: ExportContext) -> Optional[A output_dir=tensorrt_dir, config=self.config, device=cuda_device, - data_loader=self.data_loader, ) self.artifact_manager.register_artifact(Backend.TENSORRT, artifact) self.logger.info(f"TensorRT export successful: {artifact.path}") diff --git a/deployment/runtime/verification_orchestrator.py b/deployment/runtime/verification_orchestrator.py index 8f23d2a4d..1151e0fc7 100644 --- a/deployment/runtime/verification_orchestrator.py +++ b/deployment/runtime/verification_orchestrator.py @@ -84,7 +84,12 @@ def run(self, artifact_manager: ArtifactManager) -> Dict[str, Any]: num_verify_samples = verification_cfg.num_verify_samples tolerance = verification_cfg.tolerance - devices_map = dict(verification_cfg.devices or {}) + devices_raw = verification_cfg.devices + if devices_raw is None: + devices_raw = {} + if not isinstance(devices_raw, Mapping): + raise TypeError(f"verification.devices must be a mapping, got {type(devices_raw).__name__}") + devices_map = dict(devices_raw) devices_map.setdefault("cpu", self.config.devices.cpu or "cpu") if self.config.devices.cuda: devices_map.setdefault("cuda", self.config.devices.cuda)