diff --git a/deployment/projects/centerpoint/__init__.py b/deployment/projects/centerpoint/__init__.py new file mode 100644 index 000000000..e7cae0e0c --- /dev/null +++ b/deployment/projects/centerpoint/__init__.py @@ -0,0 +1,22 @@ +"""CenterPoint deployment bundle. + +This package owns all CenterPoint deployment-specific code (runner/evaluator/loader/pipelines/export). +It registers a ProjectAdapter into the global `project_registry` so the unified CLI can invoke it. +""" + +from __future__ import annotations + +from deployment.projects.centerpoint.cli import add_args +from deployment.projects.centerpoint.entrypoint import run + +# Trigger pipeline factory registration for this project. +from deployment.projects.centerpoint.pipelines.factory import CenterPointPipelineFactory # noqa: F401 +from deployment.projects.registry import ProjectAdapter, project_registry + +project_registry.register( + ProjectAdapter( + name="centerpoint", + add_args=add_args, + run=run, + ) +) diff --git a/deployment/projects/centerpoint/cli.py b/deployment/projects/centerpoint/cli.py new file mode 100644 index 000000000..cc040e0d9 --- /dev/null +++ b/deployment/projects/centerpoint/cli.py @@ -0,0 +1,14 @@ +"""CenterPoint CLI extensions.""" + +from __future__ import annotations + +import argparse + + +def add_args(parser: argparse.ArgumentParser) -> None: + """Register CenterPoint-specific CLI flags onto a project subparser.""" + parser.add_argument( + "--rot-y-axis-reference", + action="store_true", + help="Convert rotation to y-axis clockwise reference (CenterPoint ONNX-compatible format)", + ) diff --git a/deployment/projects/centerpoint/data_loader.py b/deployment/projects/centerpoint/data_loader.py new file mode 100644 index 000000000..52057f69c --- /dev/null +++ b/deployment/projects/centerpoint/data_loader.py @@ -0,0 +1,160 @@ +""" +CenterPoint DataLoader for deployment. + +Wraps MMDet3D Dataset to ensure GT is identical to tools/detection3d/test.py. +Pipeline is run once per sample in load_sample(), avoiding redundant computation. +""" + +import copy +from typing import Any, Dict, List, Optional, Union + +import mmdet3d.datasets.transforms # noqa: F401 - registers transforms +import numpy as np +import torch +from mmengine.config import Config +from mmengine.registry import DATASETS, init_default_scope + +from deployment.core import BaseDataLoader + + +class CenterPointDataLoader(BaseDataLoader): + """Deployment dataloader for CenterPoint using MMDet3D Dataset. + + This wraps the same Dataset used by tools/detection3d/test.py, ensuring: + - GT is identical + - Pipeline processing is identical + - Pipeline runs once per sample (no cache needed) + + Design: + load_sample() runs the full pipeline and returns all data (input + GT). + preprocess() extracts model inputs from the loaded sample. + """ + + def __init__( + self, + info_file: str, + model_cfg: Config, + device: str = "cpu", + task_type: Optional[str] = None, + ): + super().__init__( + config={ + "info_file": info_file, + "device": device, + } + ) + + self.model_cfg = model_cfg + self.device = device + self.info_file = info_file + self.dataset = self._build_dataset(model_cfg, info_file) + + def _build_dataset(self, model_cfg: Config, info_file: str) -> Any: + """Build MMDet3D Dataset from config, overriding ann_file.""" + # Set default scope to mmdet3d so transforms are found in the registry + init_default_scope("mmdet3d") + if not hasattr(model_cfg, "test_dataloader"): + raise ValueError("model_cfg must have 'test_dataloader' with dataset config") + + dataset_cfg = copy.deepcopy(model_cfg.test_dataloader.dataset) + + dataset_cfg["ann_file"] = info_file + dataset_cfg["test_mode"] = True + + # Build dataset + dataset = DATASETS.build(dataset_cfg) + return dataset + + def _to_tensor( + self, + data: Union[torch.Tensor, np.ndarray, List[Union[torch.Tensor, np.ndarray]]], + name: str = "data", + ) -> torch.Tensor: + if isinstance(data, torch.Tensor): + return data.to(self.device) + + if isinstance(data, np.ndarray): + return torch.from_numpy(data).to(self.device) + + if isinstance(data, list): + if len(data) == 0: + raise ValueError(f"Empty list for '{name}' in pipeline output.") + + first_item = data[0] + if isinstance(first_item, torch.Tensor): + return first_item.to(self.device) + if isinstance(first_item, np.ndarray): + return torch.from_numpy(first_item).to(self.device) + + raise ValueError( + f"Unexpected type for {name}[0]: {type(first_item)}. Expected torch.Tensor or np.ndarray." + ) + + raise ValueError( + f"Unexpected type for '{name}': {type(data)}. Expected torch.Tensor, np.ndarray, or list of tensors/arrays." + ) + + def load_sample(self, index: int) -> Dict[str, Any]: + """Load sample by running the full pipeline once. + + Returns a dict containing all data needed for inference and evaluation: + - points: Points tensor (ready for inference) + - metainfo: Sample metadata + - ground_truth: Raw eval_ann_info from MMDet3D (kept unconverted) + """ + if index >= len(self.dataset): + raise IndexError(f"Sample index {index} out of range (0-{len(self.dataset)-1})") + + # Run pipeline once + data = self.dataset[index] + + # Extract inputs + pipeline_inputs = data.get("inputs", {}) + if "points" not in pipeline_inputs: + raise ValueError(f"Expected 'points' in inputs. Got keys: {list(pipeline_inputs.keys())}") + + points_tensor = self._to_tensor(pipeline_inputs["points"], name="points") + if points_tensor.ndim != 2: + raise ValueError(f"Expected points tensor with shape [N, features], got {points_tensor.shape}") + + # Extract metainfo and eval_ann_info from data_samples + data_samples = data.get("data_samples") + metainfo: Dict[str, Any] = {} + ground_truth: Dict[str, Any] = {} + + if data_samples is not None: + metainfo = getattr(data_samples, "metainfo", {}) or {} + eval_ann_info = getattr(data_samples, "eval_ann_info", {}) or {} + # Keep raw eval_ann_info here; evaluator will convert to the metrics format. + ground_truth = dict(eval_ann_info) + + return { + "points": points_tensor, + "metainfo": metainfo, + "ground_truth": ground_truth, + } + + def preprocess(self, sample: Dict[str, Any]) -> Dict[str, Any]: + """Extract points and metainfo from loaded sample. + + This is a lightweight operation - pipeline already ran in load_sample(). + """ + return { + "points": sample["points"], + "metainfo": sample["metainfo"], + } + + @property + def num_samples(self) -> int: + return len(self.dataset) + + @property + def class_names(self) -> List[str]: + # Get from dataset's metainfo or model_cfg + if hasattr(self.dataset, "metainfo") and "classes" in self.dataset.metainfo: + return list(self.dataset.metainfo["classes"]) + + if hasattr(self.model_cfg, "class_names"): + return list(self.model_cfg.class_names) + + raise ValueError("class_names not found in dataset.metainfo or model_cfg") diff --git a/deployment/projects/centerpoint/entrypoint.py b/deployment/projects/centerpoint/entrypoint.py new file mode 100644 index 000000000..6d030492c --- /dev/null +++ b/deployment/projects/centerpoint/entrypoint.py @@ -0,0 +1,70 @@ +"""CenterPoint deployment entrypoint invoked by the unified CLI.""" + +from __future__ import annotations + +import argparse +import logging +from typing import Any, Mapping + +from mmengine.config import Config + +from deployment.core.config.base_config import BaseDeploymentConfig, setup_logging +from deployment.core.contexts import CenterPointExportContext +from deployment.projects.centerpoint.data_loader import CenterPointDataLoader +from deployment.projects.centerpoint.evaluator import CenterPointEvaluator +from deployment.projects.centerpoint.metrics_utils import extract_t4metric_v2_config +from deployment.projects.centerpoint.runner import CenterPointDeploymentRunner + + +def run(args: argparse.Namespace) -> int: + """Run the CenterPoint deployment workflow for the unified CLI. + + This wires together the CenterPoint bundle components (data loader, evaluator, + runner) and executes export/verification/evaluation according to `deploy_cfg`. + + Args: + args: Parsed command-line arguments containing deploy_cfg and model_cfg paths. + + Returns: + Exit code (0 for success). + """ + logger = setup_logging(args.log_level) + + deploy_cfg = Config.fromfile(args.deploy_cfg) + model_cfg = Config.fromfile(args.model_cfg) + config = BaseDeploymentConfig(deploy_cfg) + + # Extract components config for dependency injection + components_cfg: Mapping[str, Any] = deploy_cfg.get("components", {}) + + logger.info("=" * 80) + logger.info("CenterPoint Deployment Pipeline (Unified CLI)") + logger.info("=" * 80) + + data_loader = CenterPointDataLoader( + info_file=config.runtime_config.info_file, + model_cfg=model_cfg, + device="cpu", + task_type=config.task_type, + ) + logger.info(f"Loaded {data_loader.num_samples} samples") + + metrics_config = extract_t4metric_v2_config(model_cfg, logger=logger) + + evaluator = CenterPointEvaluator( + model_cfg=model_cfg, + metrics_config=metrics_config, + components_cfg=components_cfg, + ) + + runner = CenterPointDeploymentRunner( + data_loader=data_loader, + evaluator=evaluator, + config=config, + model_cfg=model_cfg, + logger=logger, + ) + + context = CenterPointExportContext(rot_y_axis_reference=bool(getattr(args, "rot_y_axis_reference", False))) + runner.run(context=context) + return 0 diff --git a/deployment/projects/centerpoint/evaluator.py b/deployment/projects/centerpoint/evaluator.py new file mode 100644 index 000000000..d7dd12eac --- /dev/null +++ b/deployment/projects/centerpoint/evaluator.py @@ -0,0 +1,416 @@ +""" +CenterPoint Evaluator for deployment. +""" + +import logging +from typing import Any, Dict, List, Mapping, Optional + +import numpy as np +from mmengine.config import Config + +from deployment.core import ( + BaseEvaluator, + Detection3DMetricsConfig, + Detection3DMetricsInterface, + EvalResultDict, + InferenceInput, + ModelSpec, + TaskProfile, +) +from deployment.core.io.base_data_loader import BaseDataLoader +from deployment.pipelines.factory import PipelineFactory + +logger = logging.getLogger(__name__) + + +class CenterPointEvaluator(BaseEvaluator): + """Evaluator implementation for CenterPoint 3D detection. + + This builds a task profile (class names, display name) and uses the configured + `Detection3DMetricsInterface` to compute metrics from pipeline outputs. + + Args: + model_cfg: Model configuration with class_names + metrics_config: Configuration for 3D detection metrics + components_cfg: Optional unified components configuration dict. + Used to get output names from components.backbone_head.io.outputs + """ + + def __init__( + self, + model_cfg: Config, + metrics_config: Detection3DMetricsConfig, + components_cfg: Optional[Mapping[str, Any]] = None, + ): + if hasattr(model_cfg, "class_names"): + class_names = model_cfg.class_names + else: + raise ValueError("class_names must be provided via model_cfg.class_names.") + + if components_cfg is None: + components_cfg = {} + if not isinstance(components_cfg, Mapping): + raise TypeError(f"components_cfg must be a mapping, got {type(components_cfg).__name__}") + self._components_cfg = components_cfg + + task_profile = TaskProfile( + task_name="centerpoint_3d_detection", + display_name="CenterPoint 3D Object Detection", + class_names=tuple(class_names), + num_classes=len(class_names), + ) + + metrics_interface = Detection3DMetricsInterface(metrics_config) + + super().__init__( + metrics_interface=metrics_interface, + task_profile=task_profile, + model_cfg=model_cfg, + ) + + def set_onnx_config(self, model_cfg: Config) -> None: + self.model_cfg = model_cfg + + def _get_output_names(self) -> List[str]: + """Get head output names from components config.""" + backbone_head_cfg = self._components_cfg.get("backbone_head", {}) + io_cfg = backbone_head_cfg.get("io", {}) + outputs = io_cfg.get("outputs", []) + + if not outputs: + raise ValueError( + "Output names must be provided via components_cfg.backbone_head.io.outputs. " + "No fallback values are allowed in deployment framework." + ) + + output_names = [out.get("name") for out in outputs if out.get("name")] + if not output_names: + raise ValueError( + "Output names must be provided via components_cfg.backbone_head.io.outputs. " + "Each output must have a 'name' field." + ) + + return output_names + + def _create_pipeline(self, model_spec: ModelSpec, device: str) -> Any: + return PipelineFactory.create( + project_name="centerpoint", + model_spec=model_spec, + pytorch_model=self.pytorch_model, + device=device, + components_cfg=self._components_cfg, + ) + + def _prepare_input( + self, + sample: Dict[str, Any], + data_loader: BaseDataLoader, + device: str, + ) -> InferenceInput: + if "points" in sample: + points = sample["points"] + metadata = sample.get("metainfo", {}) + else: + raise ValueError(f"Expected 'points' in sample. Got keys: {list(sample.keys())}") + return InferenceInput(data=points, metadata=metadata) + + def _parse_predictions(self, pipeline_output: Any) -> List[Dict]: + return pipeline_output if isinstance(pipeline_output, list) else [] + + def _parse_ground_truths(self, gt_data: Dict[str, Any]) -> List[Dict]: + ground_truths = [] + + if "gt_bboxes_3d" in gt_data and "gt_labels_3d" in gt_data: + gt_bboxes_3d = gt_data["gt_bboxes_3d"] + gt_labels_3d = gt_data["gt_labels_3d"] + + gt_bboxes_3d = np.asarray(gt_bboxes_3d, dtype=np.float32).reshape( + -1, np.asarray(gt_bboxes_3d).shape[-1] if np.asarray(gt_bboxes_3d).ndim > 1 else 7 + ) + gt_labels_3d = np.asarray(gt_labels_3d, dtype=np.int64).reshape(-1) + + for i in range(len(gt_bboxes_3d)): + ground_truths.append({"bbox_3d": gt_bboxes_3d[i].tolist(), "label": int(gt_labels_3d[i])}) + + return ground_truths + + def _add_to_interface(self, predictions: List[Dict], ground_truths: List[Dict]) -> None: + self.metrics_interface.add_frame(predictions, ground_truths) + + def _build_results( + self, + latencies: List[float], + latency_breakdowns: List[Dict[str, float]], + num_samples: int, + ) -> EvalResultDict: + latency_stats = self.compute_latency_stats(latencies) + + map_results = self.metrics_interface.compute_metrics() + summary = self.metrics_interface.summary + summary_dict = summary.to_dict() if hasattr(summary, "to_dict") else summary + + result: EvalResultDict = { + "mAP_by_mode": summary_dict.get("mAP_by_mode", {}), + "mAPH_by_mode": summary_dict.get("mAPH_by_mode", {}), + "per_class_ap_by_mode": summary_dict.get("per_class_ap_by_mode", {}), + "detailed_metrics": map_results, + "latency": latency_stats, # Store LatencyStats directly + "num_samples": num_samples, + } + + if latency_breakdowns: + result["latency_breakdown"] = self._compute_latency_breakdown(latency_breakdowns) + + return result + + def print_results(self, results: EvalResultDict) -> None: + """Print evaluation results including metrics, latency, and breakdown.""" + # Print metrics report + metrics_report = self.metrics_interface.format_metrics_report() + if not metrics_report: + raise ValueError( + "Metrics report is empty. Ensure that frames have been added and metrics have been computed." + ) + print(metrics_report) + + # Print latency statistics + if "latency" not in results: + raise ValueError( + "Latency statistics not found in results. Ensure that evaluation has been run with latency tracking." + ) + latency_stats = results["latency"] + latency_dict = latency_stats.to_dict() + print("\nLatency Statistics:") + print(f" Mean: {latency_dict['mean_ms']:.2f} ms") + print(f" Std: {latency_dict['std_ms']:.2f} ms") + print(f" Min: {latency_dict['min_ms']:.2f} ms") + print(f" Max: {latency_dict['max_ms']:.2f} ms") + print(f" Median: {latency_dict['median_ms']:.2f} ms") + + # Print stage-wise latency breakdown + if "latency_breakdown" in results: + breakdown = results["latency_breakdown"] + breakdown_dict = breakdown.to_dict() if hasattr(breakdown, "to_dict") else breakdown + + if breakdown_dict: + print("\nStage-wise Latency Breakdown:") + top_level_stages = {"preprocessing_ms", "model_ms", "postprocessing_ms"} + for stage, stats in breakdown_dict.items(): + stats_dict = stats.to_dict() if hasattr(stats, "to_dict") else stats + stage_name = stage.replace("_ms", "").replace("_", " ").title() + + if stage in top_level_stages: + print(f" {stage_name:18s}: {stats_dict['mean_ms']:.2f} ± {stats_dict['std_ms']:.2f} ms") + else: + print(f" {stage_name:16s}: {stats_dict['mean_ms']:.2f} ± {stats_dict['std_ms']:.2f} ms") + + print(f"\nTotal Samples: {results.get('num_samples', 0)}") + + +""" +CenterPoint Evaluator for deployment. +""" + +import logging +from typing import Any, Dict, List, Mapping, Optional + +import numpy as np +from mmengine.config import Config + +from deployment.core import ( + BaseEvaluator, + Detection3DMetricsConfig, + Detection3DMetricsInterface, + EvalResultDict, + InferenceInput, + ModelSpec, + TaskProfile, +) +from deployment.core.io.base_data_loader import BaseDataLoader +from deployment.pipelines.factory import PipelineFactory + +logger = logging.getLogger(__name__) + + +class CenterPointEvaluator(BaseEvaluator): + """Evaluator implementation for CenterPoint 3D detection. + + This builds a task profile (class names, display name) and uses the configured + `Detection3DMetricsInterface` to compute metrics from pipeline outputs. + + Args: + model_cfg: Model configuration with class_names + metrics_config: Configuration for 3D detection metrics + components_cfg: Optional unified components configuration dict. + Used to get output names from components.backbone_head.io.outputs + """ + + def __init__( + self, + model_cfg: Config, + metrics_config: Detection3DMetricsConfig, + components_cfg: Optional[Mapping[str, Any]] = None, + ): + if hasattr(model_cfg, "class_names"): + class_names = model_cfg.class_names + else: + raise ValueError("class_names must be provided via model_cfg.class_names.") + + if components_cfg is None: + components_cfg = {} + if not isinstance(components_cfg, Mapping): + raise TypeError(f"components_cfg must be a mapping, got {type(components_cfg).__name__}") + self._components_cfg = components_cfg + + task_profile = TaskProfile( + task_name="centerpoint_3d_detection", + display_name="CenterPoint 3D Object Detection", + class_names=tuple(class_names), + num_classes=len(class_names), + ) + + metrics_interface = Detection3DMetricsInterface(metrics_config) + + super().__init__( + metrics_interface=metrics_interface, + task_profile=task_profile, + model_cfg=model_cfg, + ) + + def set_onnx_config(self, model_cfg: Config) -> None: + self.model_cfg = model_cfg + + def _get_output_names(self) -> List[str]: + """Get head output names from components config.""" + backbone_head_cfg = self._components_cfg.get("backbone_head", {}) + io_cfg = backbone_head_cfg.get("io", {}) + outputs = io_cfg.get("outputs", []) + + if not outputs: + raise ValueError( + "Output names must be provided via components_cfg.backbone_head.io.outputs. " + "No fallback values are allowed in deployment framework." + ) + + output_names = [out.get("name") for out in outputs if out.get("name")] + if not output_names: + raise ValueError( + "Output names must be provided via components_cfg.backbone_head.io.outputs. " + "Each output must have a 'name' field." + ) + + return output_names + + def _create_pipeline(self, model_spec: ModelSpec, device: str) -> Any: + return PipelineFactory.create( + project_name="centerpoint", + model_spec=model_spec, + pytorch_model=self.pytorch_model, + device=device, + components_cfg=self._components_cfg, + ) + + def _prepare_input( + self, + sample: Dict[str, Any], + data_loader: BaseDataLoader, + device: str, + ) -> InferenceInput: + if "points" in sample: + points = sample["points"] + metadata = sample.get("metainfo", {}) + else: + raise ValueError(f"Expected 'points' in sample. Got keys: {list(sample.keys())}") + return InferenceInput(data=points, metadata=metadata) + + def _parse_predictions(self, pipeline_output: Any) -> List[Dict]: + return pipeline_output if isinstance(pipeline_output, list) else [] + + def _parse_ground_truths(self, gt_data: Dict[str, Any]) -> List[Dict]: + ground_truths = [] + + if "gt_bboxes_3d" in gt_data and "gt_labels_3d" in gt_data: + gt_bboxes_3d = gt_data["gt_bboxes_3d"] + gt_labels_3d = gt_data["gt_labels_3d"] + + gt_bboxes_3d = np.asarray(gt_bboxes_3d, dtype=np.float32).reshape( + -1, np.asarray(gt_bboxes_3d).shape[-1] if np.asarray(gt_bboxes_3d).ndim > 1 else 7 + ) + gt_labels_3d = np.asarray(gt_labels_3d, dtype=np.int64).reshape(-1) + + for i in range(len(gt_bboxes_3d)): + ground_truths.append({"bbox_3d": gt_bboxes_3d[i].tolist(), "label": int(gt_labels_3d[i])}) + + return ground_truths + + def _add_to_interface(self, predictions: List[Dict], ground_truths: List[Dict]) -> None: + self.metrics_interface.add_frame(predictions, ground_truths) + + def _build_results( + self, + latencies: List[float], + latency_breakdowns: List[Dict[str, float]], + num_samples: int, + ) -> EvalResultDict: + latency_stats = self.compute_latency_stats(latencies) + + map_results = self.metrics_interface.compute_metrics() + summary = self.metrics_interface.summary + summary_dict = summary.to_dict() if hasattr(summary, "to_dict") else summary + + result: EvalResultDict = { + "mAP_by_mode": summary_dict.get("mAP_by_mode", {}), + "mAPH_by_mode": summary_dict.get("mAPH_by_mode", {}), + "per_class_ap_by_mode": summary_dict.get("per_class_ap_by_mode", {}), + "detailed_metrics": map_results, + "latency": latency_stats, # Store LatencyStats directly + "num_samples": num_samples, + } + + if latency_breakdowns: + result["latency_breakdown"] = self._compute_latency_breakdown(latency_breakdowns) + + return result + + def print_results(self, results: EvalResultDict) -> None: + """Print evaluation results including metrics, latency, and breakdown.""" + # Print metrics report + metrics_report = self.metrics_interface.format_metrics_report() + if not metrics_report: + raise ValueError( + "Metrics report is empty. Ensure that frames have been added and metrics have been computed." + ) + print(metrics_report) + + # Print latency statistics + if "latency" not in results: + raise ValueError( + "Latency statistics not found in results. Ensure that evaluation has been run with latency tracking." + ) + latency_stats = results["latency"] + latency_dict = latency_stats.to_dict() + print("\nLatency Statistics:") + print(f" Mean: {latency_dict['mean_ms']:.2f} ms") + print(f" Std: {latency_dict['std_ms']:.2f} ms") + print(f" Min: {latency_dict['min_ms']:.2f} ms") + print(f" Max: {latency_dict['max_ms']:.2f} ms") + print(f" Median: {latency_dict['median_ms']:.2f} ms") + + # Print stage-wise latency breakdown + if "latency_breakdown" in results: + breakdown = results["latency_breakdown"] + breakdown_dict = breakdown.to_dict() if hasattr(breakdown, "to_dict") else breakdown + + if breakdown_dict: + print("\nStage-wise Latency Breakdown:") + top_level_stages = {"preprocessing_ms", "model_ms", "postprocessing_ms"} + for stage, stats in breakdown_dict.items(): + stats_dict = stats.to_dict() if hasattr(stats, "to_dict") else stats + stage_name = stage.replace("_ms", "").replace("_", " ").title() + + if stage in top_level_stages: + print(f" {stage_name:18s}: {stats_dict['mean_ms']:.2f} ± {stats_dict['std_ms']:.2f} ms") + else: + print(f" {stage_name:16s}: {stats_dict['mean_ms']:.2f} ± {stats_dict['std_ms']:.2f} ms") + + print(f"\nTotal Samples: {results.get('num_samples', 0)}") diff --git a/deployment/projects/centerpoint/export/component_extractor.py b/deployment/projects/centerpoint/export/component_extractor.py new file mode 100644 index 000000000..f14408f26 --- /dev/null +++ b/deployment/projects/centerpoint/export/component_extractor.py @@ -0,0 +1,234 @@ +""" +CenterPoint-specific component extractor. + +Extracts exportable submodules from CenterPoint using the unified component config. +""" + +import logging +from typing import Any, Dict, List, Mapping, Tuple + +import torch + +from deployment.core import BaseDeploymentConfig +from deployment.exporters.common.configs import ONNXExportConfig +from deployment.exporters.export_pipelines.interfaces import ExportableComponent, ModelComponentExtractor +from deployment.projects.centerpoint.onnx_models.centerpoint_onnx import CenterPointHeadONNX + +logger = logging.getLogger(__name__) + + +class CenterPointComponentExtractor(ModelComponentExtractor): + """Extract exportable CenterPoint submodules for multi-file ONNX export. + + For CenterPoint we export two components: + - `voxel_encoder` (pts_voxel_encoder) + - `backbone_neck_head` (pts_backbone + pts_neck + pts_bbox_head) + + Uses the unified `components` config structure where each component defines + its own IO specification, filenames, and TensorRT profiles. + """ + + def __init__(self, config: BaseDeploymentConfig, logger: logging.Logger = None): + self.config = config + self.logger = logger or logging.getLogger(__name__) + self._validate_config() + + @property + def _components_cfg(self) -> Dict[str, Any]: + """Get unified components configuration.""" + return dict(self.config.deploy_cfg.get("components", {})) + + @property + def _onnx_config(self) -> Dict[str, Any]: + """Get shared ONNX export settings.""" + onnx_config_raw = self.config.onnx_config + if onnx_config_raw is None: + onnx_config_raw = {} + if not isinstance(onnx_config_raw, Mapping): + raise TypeError(f"onnx_config must be a mapping, got {type(onnx_config_raw).__name__}") + return dict(onnx_config_raw) + + def extract_components(self, model: torch.nn.Module, sample_data: Any) -> List[ExportableComponent]: + input_features, voxel_dict = self._unpack_sample(sample_data) + self.logger.info("Extracting CenterPoint components for export...") + + voxel_component = self._create_voxel_encoder_component(model, input_features) + backbone_component = self._create_backbone_component(model, input_features, voxel_dict) + + self.logger.info("Extracted 2 components: voxel_encoder, backbone_neck_head") + return [voxel_component, backbone_component] + + def _validate_config(self) -> None: + """Validate component configuration.""" + components = self._components_cfg + + missing = [] + for required_key in ("voxel_encoder", "backbone_head"): + if required_key not in components: + missing.append(required_key) + if missing: + raise KeyError( + "Missing required `components` entries for CenterPoint export: " + f"{missing}. Please set them in deploy config." + ) + + def _require_fields(comp_key: str, fields: Tuple[str, ...]) -> None: + comp = dict(components.get(comp_key, {})) + missing_fields = [f for f in fields if not comp.get(f)] + if missing_fields: + raise KeyError( + f"Missing required fields in components['{comp_key}']: {missing_fields}. " + "Expected at least: " + ", ".join(fields) + ) + + _require_fields("voxel_encoder", ("name", "onnx_file")) + _require_fields("backbone_head", ("name", "onnx_file")) + + def _unpack_sample(self, sample_data: Any) -> Tuple[torch.Tensor, dict]: + """ + Unpack extractor output into `(input_features, voxel_dict)`. + + We intentionally keep this contract simple to avoid extra project-specific types. + """ + if not (isinstance(sample_data, (list, tuple)) and len(sample_data) == 2): + raise TypeError( + "Invalid sample_data for CenterPoint export. Expected a 2-tuple " + "`(input_features: torch.Tensor, voxel_dict: dict)`." + ) + input_features, voxel_dict = sample_data + if not isinstance(input_features, torch.Tensor): + raise TypeError(f"input_features must be a torch.Tensor, got: {type(input_features)}") + if not isinstance(voxel_dict, dict): + raise TypeError(f"voxel_dict must be a dict, got: {type(voxel_dict)}") + if "coors" not in voxel_dict: + raise KeyError("voxel_dict must contain key 'coors' for CenterPoint export") + return input_features, voxel_dict + + def _get_component_io(self, component: str) -> Mapping[str, Any]: + """Get IO specification for a component.""" + comp_cfg = self._components_cfg.get(component, {}) + return comp_cfg.get("io", {}) + + def _build_onnx_config_for_component( + self, + component: str, + input_names: Tuple[str, ...], + output_names: Tuple[str, ...], + dynamic_axes: Dict[str, Dict[int, str]] | None = None, + ) -> ONNXExportConfig: + """Build ONNX export config for a component using unified config.""" + comp_cfg = self._components_cfg.get(component, {}) + comp_io = comp_cfg.get("io", {}) + onnx_settings = self._onnx_config + + # Use dynamic_axes from component IO config if not explicitly provided + if dynamic_axes is None: + dynamic_axes = comp_io.get("dynamic_axes", {}) + + return ONNXExportConfig( + input_names=input_names, + output_names=output_names, + dynamic_axes=dynamic_axes, + opset_version=onnx_settings.get("opset_version", 16), + do_constant_folding=onnx_settings.get("do_constant_folding", True), + simplify=bool(onnx_settings.get("simplify", True)), + save_file=comp_cfg.get("onnx_file", f"{component}.onnx"), + ) + + def _create_voxel_encoder_component( + self, model: torch.nn.Module, input_features: torch.Tensor + ) -> ExportableComponent: + """Create exportable component for voxel encoder.""" + comp_cfg = self._components_cfg["voxel_encoder"] + comp_io = comp_cfg.get("io", {}) + + # Get input/output names from IO config + inputs = comp_io.get("inputs", []) + outputs = comp_io.get("outputs", []) + input_names = tuple(inp.get("name", "input_features") for inp in inputs) or ("input_features",) + output_names = tuple(out.get("name", "pillar_features") for out in outputs) or ("pillar_features",) + + return ExportableComponent( + name=comp_cfg["name"], + module=model.pts_voxel_encoder, + sample_input=input_features, + config_override=self._build_onnx_config_for_component( + component="voxel_encoder", + input_names=input_names, + output_names=output_names, + ), + ) + + def _create_backbone_component( + self, model: torch.nn.Module, input_features: torch.Tensor, voxel_dict: dict + ) -> ExportableComponent: + """Create exportable component for backbone + neck + head.""" + backbone_input = self._prepare_backbone_input(model, input_features, voxel_dict) + backbone_module = self._create_backbone_module(model) + + comp_cfg = self._components_cfg["backbone_head"] + comp_io = comp_cfg.get("io", {}) + + # Get input/output names from IO config + inputs = comp_io.get("inputs", []) + outputs = comp_io.get("outputs", []) + input_names = tuple(inp.get("name", "spatial_features") for inp in inputs) or ("spatial_features",) + output_names = self._get_output_names(model, outputs) + + return ExportableComponent( + name=comp_cfg["name"], + module=backbone_module, + sample_input=backbone_input, + config_override=self._build_onnx_config_for_component( + component="backbone_head", + input_names=input_names, + output_names=output_names, + ), + ) + + def _prepare_backbone_input( + self, model: torch.nn.Module, input_features: torch.Tensor, voxel_dict: dict + ) -> torch.Tensor: + with torch.no_grad(): + voxel_features = model.pts_voxel_encoder(input_features).squeeze(1) + coors = voxel_dict["coors"] + batch_size = int(coors[-1, 0].item()) + 1 if len(coors) > 0 else 1 + spatial_features = model.pts_middle_encoder(voxel_features, coors, batch_size) + return spatial_features + + def _create_backbone_module(self, model: torch.nn.Module) -> torch.nn.Module: + return CenterPointHeadONNX(model.pts_backbone, model.pts_neck, model.pts_bbox_head) + + def _get_output_names(self, model: torch.nn.Module, io_outputs: List[Dict[str, Any]]) -> Tuple[str, ...]: + """Get output names from config or model. + + Priority: + 1. Component IO config outputs + 2. model.pts_bbox_head.output_names + 3. Raise error if neither available + """ + # Try from component IO config first + if io_outputs: + return tuple(out.get("name") for out in io_outputs if out.get("name")) + + # Try from model + if hasattr(model, "pts_bbox_head") and hasattr(model.pts_bbox_head, "output_names"): + output_names = model.pts_bbox_head.output_names + if isinstance(output_names, (list, tuple)): + return tuple(output_names) + return (output_names,) + + raise KeyError( + "Missing head output names for CenterPoint export. " + "Set `components.backbone_head.io.outputs` in the deployment config, " + "or define `model.pts_bbox_head.output_names`." + ) + + def extract_features(self, model: torch.nn.Module, data_loader: Any, sample_idx: int) -> Tuple[torch.Tensor, dict]: + if hasattr(model, "_extract_features"): + raw = model._extract_features(data_loader, sample_idx) + return self._unpack_sample(raw) + raise AttributeError( + "CenterPoint model must have _extract_features method for ONNX export. " + "Please ensure the model is built with ONNX compatibility." + ) diff --git a/deployment/projects/centerpoint/export/onnx_export_pipeline.py b/deployment/projects/centerpoint/export/onnx_export_pipeline.py new file mode 100644 index 000000000..5b0b06947 --- /dev/null +++ b/deployment/projects/centerpoint/export/onnx_export_pipeline.py @@ -0,0 +1,122 @@ +""" +CenterPoint ONNX export pipeline using composition. +""" + +from __future__ import annotations + +import logging +import os +from pathlib import Path +from typing import Any, Iterable, Optional, Tuple + +import torch + +from deployment.core import Artifact, BaseDataLoader, BaseDeploymentConfig +from deployment.exporters.common.factory import ExporterFactory +from deployment.exporters.common.model_wrappers import IdentityWrapper +from deployment.exporters.export_pipelines.base import OnnxExportPipeline +from deployment.exporters.export_pipelines.interfaces import ExportableComponent, ModelComponentExtractor + + +class CenterPointONNXExportPipeline(OnnxExportPipeline): + """ONNX export pipeline for CenterPoint (multi-file export). + + Uses a `ModelComponentExtractor` to split the model into exportable components + and exports each with the configured ONNX exporter. + """ + + def __init__( + self, + exporter_factory: type[ExporterFactory], + component_extractor: ModelComponentExtractor, + logger: Optional[logging.Logger] = None, + ): + self.exporter_factory = exporter_factory + self.component_extractor = component_extractor + self.logger = logger or logging.getLogger(__name__) + + def export( + self, + *, + model: torch.nn.Module, + data_loader: BaseDataLoader, + output_dir: str, + config: BaseDeploymentConfig, + sample_idx: int = 0, + ) -> Artifact: + output_dir_path = Path(output_dir) + output_dir_path.mkdir(parents=True, exist_ok=True) + + self._log_header(output_dir_path, sample_idx) + sample_data = self._extract_sample_data(model, data_loader, sample_idx) + components = self.component_extractor.extract_components(model, sample_data) + + exported_paths = self._export_components(components, output_dir_path, config) + self._log_summary(exported_paths) + + return Artifact(path=str(output_dir_path), multi_file=True) + + def _log_header(self, output_dir: Path, sample_idx: int) -> None: + self.logger.info("=" * 80) + self.logger.info("Exporting CenterPoint to ONNX (multi-file)") + self.logger.info("=" * 80) + self.logger.info(f"Output directory: {output_dir}") + self.logger.info(f"Using sample index: {sample_idx}") + + def _extract_sample_data( + self, + model: torch.nn.Module, + data_loader: BaseDataLoader, + sample_idx: int, + ) -> Any: + self.logger.info("Extracting features from sample data...") + try: + return self.component_extractor.extract_features(model, data_loader, sample_idx) + except Exception as exc: + self.logger.error("Failed to extract features", exc_info=exc) + raise RuntimeError("Feature extraction failed") from exc + + def _export_components( + self, + components: Iterable[ExportableComponent], + output_dir: Path, + config: BaseDeploymentConfig, + ) -> Tuple[str, ...]: + exported_paths: list[str] = [] + component_list = list(components) + total = len(component_list) + exporter = self._build_onnx_exporter(config) + + for index, component in enumerate(component_list, start=1): + self.logger.info(f"\n[{index}/{total}] Exporting {component.name}...") + output_path = output_dir / f"{component.name}.onnx" + + try: + exporter.export( + model=component.module, + sample_input=component.sample_input, + output_path=str(output_path), + config_override=component.config_override, + ) + except Exception as exc: + self.logger.error(f"Failed to export {component.name}", exc_info=exc) + raise RuntimeError(f"{component.name} export failed") from exc + + exported_paths.append(str(output_path)) + self.logger.info(f"Exported {component.name}: {output_path}") + + return tuple(exported_paths) + + def _build_onnx_exporter(self, config: BaseDeploymentConfig): + return self.exporter_factory.create_onnx_exporter( + config=config, + wrapper_cls=IdentityWrapper, + logger=self.logger, + ) + + def _log_summary(self, exported_paths: Tuple[str, ...]) -> None: + self.logger.info("\n" + "=" * 80) + self.logger.info("CenterPoint ONNX export successful") + self.logger.info("=" * 80) + for path in exported_paths: + self.logger.info(f" • {os.path.basename(path)}") diff --git a/deployment/projects/centerpoint/export/tensorrt_export_pipeline.py b/deployment/projects/centerpoint/export/tensorrt_export_pipeline.py new file mode 100644 index 000000000..3af4181fa --- /dev/null +++ b/deployment/projects/centerpoint/export/tensorrt_export_pipeline.py @@ -0,0 +1,187 @@ +""" +CenterPoint TensorRT export pipeline using composition. +""" + +from __future__ import annotations + +import logging +import re +from pathlib import Path +from typing import Any, Dict, List, Mapping, Optional + +import torch + +from deployment.core import Artifact, BaseDeploymentConfig +from deployment.exporters.common.configs import TensorRTExportConfig, TensorRTModelInputConfig, TensorRTProfileConfig +from deployment.exporters.common.factory import ExporterFactory +from deployment.exporters.export_pipelines.base import TensorRTExportPipeline + + +class CenterPointTensorRTExportPipeline(TensorRTExportPipeline): + """TensorRT export pipeline for CenterPoint. + + Consumes a directory of ONNX files (multi-file export) and builds a TensorRT + engine per component into `output_dir`. + + """ + + _CUDA_DEVICE_PATTERN = re.compile(r"^cuda:\d+$") + + def __init__( + self, + exporter_factory: type[ExporterFactory], + logger: Optional[logging.Logger] = None, + ): + self.exporter_factory = exporter_factory + self.logger = logger or logging.getLogger(__name__) + + def _validate_cuda_device(self, device: str) -> int: + if not self._CUDA_DEVICE_PATTERN.match(device): + raise ValueError( + f"Invalid CUDA device format: '{device}'. Expected format: 'cuda:N' (e.g., 'cuda:0', 'cuda:1')" + ) + return int(device.split(":")[1]) + + def export( + self, + *, + onnx_path: str, + output_dir: str, + config: BaseDeploymentConfig, + device: str, + ) -> Artifact: + if device is None: + raise ValueError("CUDA device must be provided for TensorRT export") + if onnx_path is None: + raise ValueError("onnx_path must be provided for CenterPoint TensorRT export") + + onnx_dir_path = Path(onnx_path) + if not onnx_dir_path.is_dir(): + raise ValueError(f"onnx_path must be a directory for multi-file export, got: {onnx_path}") + + device_id = self._validate_cuda_device(device) + torch.cuda.set_device(device_id) + self.logger.info(f"Using CUDA device: {device}") + + output_dir_path = Path(output_dir) + output_dir_path.mkdir(parents=True, exist_ok=True) + + onnx_files = self._discover_onnx_files(onnx_dir_path) + if not onnx_files: + raise FileNotFoundError(f"No ONNX files found in {onnx_dir_path}") + + components_cfg = self._get_components_cfg(config) + engine_file_map = self._build_engine_file_map(components_cfg) + onnx_stem_to_component = self._build_onnx_stem_to_component_map(components_cfg) + + num_files = len(onnx_files) + for i, onnx_file in enumerate(onnx_files, 1): + onnx_stem = onnx_file.stem + engine_file = engine_file_map.get(onnx_stem, f"{onnx_stem}.engine") + trt_path = output_dir_path / engine_file + trt_path.parent.mkdir(parents=True, exist_ok=True) + + self.logger.info(f"\n[{i}/{num_files}] Converting {onnx_file.name} → {trt_path.name}...") + + # Get component-specific TensorRT profile + component_name = onnx_stem_to_component.get(onnx_stem) + exporter = self._build_tensorrt_exporter_for_component(config, components_cfg, component_name) + + artifact = exporter.export( + model=None, + sample_input=None, + output_path=str(trt_path), + onnx_path=str(onnx_file), + ) + self.logger.info(f"TensorRT engine saved: {artifact.path}") + + self.logger.info(f"\nAll TensorRT engines exported successfully to {output_dir_path}") + return Artifact(path=str(output_dir_path), multi_file=True) + + def _discover_onnx_files(self, onnx_dir: Path) -> List[Path]: + return sorted( + (path for path in onnx_dir.iterdir() if path.is_file() and path.suffix.lower() == ".onnx"), + key=lambda p: p.name, + ) + + def _build_engine_file_map(self, components_cfg: Mapping[str, Any]) -> Dict[str, str]: + """Build mapping from ONNX stem -> engine_file.""" + mapping: Dict[str, str] = {} + for comp in components_cfg.values(): + if not isinstance(comp, Mapping): + continue + onnx_file = comp.get("onnx_file") + engine_file = comp.get("engine_file") + if not onnx_file or not engine_file: + continue + mapping[Path(onnx_file).stem] = str(engine_file) + return mapping + + def _build_onnx_stem_to_component_map(self, components_cfg: Mapping[str, Any]) -> Dict[str, str]: + """Build mapping from ONNX stem -> component name.""" + mapping: Dict[str, str] = {} + for comp_name, comp_cfg in components_cfg.items(): + if not isinstance(comp_cfg, Mapping): + continue + onnx_file = comp_cfg.get("onnx_file") + if onnx_file: + mapping[Path(onnx_file).stem] = comp_name + return mapping + + def _get_components_cfg(self, config: BaseDeploymentConfig) -> Mapping[str, Any]: + """Get unified components configuration from deploy config.""" + return dict(config.deploy_cfg.get("components", {})) + + def _build_tensorrt_exporter_for_component( + self, + config: BaseDeploymentConfig, + components_cfg: Mapping[str, Any], + component_name: Optional[str], + ): + """Build TensorRT exporter with component-specific profile. + + Converts the unified `tensorrt_profile` from the component config + into the `model_inputs` format expected by TensorRTExporter. + """ + # Get base TensorRT settings from config + trt_cfg = config.deploy_cfg.get("tensorrt_config", {}) + precision_policy = trt_cfg.get("precision_policy", "auto") + max_workspace_size = trt_cfg.get("max_workspace_size", 1 << 30) + + # Build model_inputs from component's tensorrt_profile + model_inputs = () + if component_name and component_name in components_cfg: + comp_cfg = components_cfg[component_name] + tensorrt_profile = comp_cfg.get("tensorrt_profile", {}) + + if tensorrt_profile: + # Convert tensorrt_profile to TensorRTModelInputConfig format + input_shapes = {} + for input_name, shape_cfg in tensorrt_profile.items(): + if isinstance(shape_cfg, Mapping): + input_shapes[input_name] = TensorRTProfileConfig( + min_shape=tuple(shape_cfg.get("min_shape", [])), + opt_shape=tuple(shape_cfg.get("opt_shape", [])), + max_shape=tuple(shape_cfg.get("max_shape", [])), + ) + + if input_shapes: + from types import MappingProxyType + + model_inputs = (TensorRTModelInputConfig(input_shapes=MappingProxyType(input_shapes)),) + self.logger.info( + f"Using TensorRT profile for component '{component_name}': {list(input_shapes.keys())}" + ) + + # Create TensorRT export config with component-specific model_inputs + trt_export_config = TensorRTExportConfig( + precision_policy=precision_policy, + max_workspace_size=max_workspace_size, + model_inputs=model_inputs, + ) + + return self.exporter_factory.create_tensorrt_exporter( + config=config, + logger=self.logger, + config_override=trt_export_config, + ) diff --git a/deployment/projects/centerpoint/metrics_utils.py b/deployment/projects/centerpoint/metrics_utils.py new file mode 100644 index 000000000..ad412fe82 --- /dev/null +++ b/deployment/projects/centerpoint/metrics_utils.py @@ -0,0 +1,85 @@ +""" +CenterPoint metrics utilities. + +This module extracts metrics configuration from MMEngine model configs. +""" + +import logging +from typing import List, Optional + +from mmengine.config import Config + +from deployment.core.metrics.detection_3d_metrics import Detection3DMetricsConfig + + +def extract_t4metric_v2_config( + model_cfg: Config, + class_names: Optional[List[str]] = None, + logger: Optional[logging.Logger] = None, +) -> Detection3DMetricsConfig: + """Extract `Detection3DMetricsConfig` from an MMEngine model config. + + Expects the config to contain a `T4MetricV2` evaluator (val or test). + + Args: + model_cfg: MMEngine model configuration. + class_names: Optional list of class names. If not provided, + extracted from model_cfg.class_names. + logger: Optional logger instance. + + Returns: + Detection3DMetricsConfig instance with extracted settings. + + Raises: + ValueError: If class_names not provided and not found in model_cfg, + or if evaluator config is missing or not T4MetricV2 type. + """ + if logger is None: + logger = logging.getLogger(__name__) + + if class_names is None: + if hasattr(model_cfg, "class_names"): + class_names = model_cfg.class_names + else: + raise ValueError("class_names must be provided or defined in model_cfg.class_names") + + evaluator_cfg = None + if hasattr(model_cfg, "val_evaluator"): + evaluator_cfg = model_cfg.val_evaluator + elif hasattr(model_cfg, "test_evaluator"): + evaluator_cfg = model_cfg.test_evaluator + else: + raise ValueError("No val_evaluator or test_evaluator found in model_cfg") + + def get_cfg_value(cfg, key, default=None): + if cfg is None: + return default + if isinstance(cfg, dict): + return cfg.get(key, default) + return getattr(cfg, key, default) + + evaluator_type = get_cfg_value(evaluator_cfg, "type") + if evaluator_type != "T4MetricV2": + raise ValueError(f"Evaluator type is '{evaluator_type}', not 'T4MetricV2'") + + perception_configs = get_cfg_value(evaluator_cfg, "perception_evaluator_configs", {}) + evaluation_config_dict = get_cfg_value(perception_configs, "evaluation_config_dict") + frame_id = get_cfg_value(perception_configs, "frame_id", "base_link") + + critical_object_filter_config = get_cfg_value(evaluator_cfg, "critical_object_filter_config") + frame_pass_fail_config = get_cfg_value(evaluator_cfg, "frame_pass_fail_config") + + if evaluation_config_dict and hasattr(evaluation_config_dict, "to_dict"): + evaluation_config_dict = dict(evaluation_config_dict) + if critical_object_filter_config and hasattr(critical_object_filter_config, "to_dict"): + critical_object_filter_config = dict(critical_object_filter_config) + if frame_pass_fail_config and hasattr(frame_pass_fail_config, "to_dict"): + frame_pass_fail_config = dict(frame_pass_fail_config) + + return Detection3DMetricsConfig( + class_names=class_names, + frame_id=frame_id, + evaluation_config_dict=evaluation_config_dict, + critical_object_filter_config=critical_object_filter_config, + frame_pass_fail_config=frame_pass_fail_config, + ) diff --git a/deployment/projects/centerpoint/model_loader.py b/deployment/projects/centerpoint/model_loader.py new file mode 100644 index 000000000..372338c23 --- /dev/null +++ b/deployment/projects/centerpoint/model_loader.py @@ -0,0 +1,117 @@ +""" +CenterPoint model loading utilities. + +This module provides ONNX-compatible model building from MMEngine configs. +""" + +from __future__ import annotations + +import copy +from typing import Tuple + +import torch +from mmengine.config import Config +from mmengine.registry import MODELS, init_default_scope +from mmengine.runner import load_checkpoint + +from deployment.projects.centerpoint.onnx_models import register_models + + +def create_onnx_model_cfg( + model_cfg: Config, + device: str, + rot_y_axis_reference: bool = False, +) -> Config: + """Create a model config that swaps modules to ONNX-friendly variants. + + This mutates the `model_cfg.model` subtree to reference classes registered by + `deployment.projects.centerpoint.onnx_models` (e.g., `CenterPointONNX`). + + Args: + model_cfg: Original MMEngine model configuration. + device: Target device string (e.g., 'cpu', 'cuda:0'). + rot_y_axis_reference: Whether to use y-axis rotation reference. + + Returns: + Modified config with ONNX-compatible model types. + """ + onnx_cfg = model_cfg.copy() + model_config = copy.deepcopy(onnx_cfg.model) + + model_config.type = "CenterPointONNX" + model_config.point_channels = model_config.pts_voxel_encoder.in_channels + model_config.device = device + + if model_config.pts_voxel_encoder.type == "PillarFeatureNet": + model_config.pts_voxel_encoder.type = "PillarFeatureNetONNX" + elif model_config.pts_voxel_encoder.type == "BackwardPillarFeatureNet": + model_config.pts_voxel_encoder.type = "BackwardPillarFeatureNetONNX" + + model_config.pts_bbox_head.type = "CenterHeadONNX" + model_config.pts_bbox_head.separate_head.type = "SeparateHeadONNX" + model_config.pts_bbox_head.rot_y_axis_reference = rot_y_axis_reference + + if ( + getattr(model_config, "pts_backbone", None) + and getattr(model_config.pts_backbone, "type", None) == "ConvNeXt_PC" + ): + model_config.pts_backbone.with_cp = False + + onnx_cfg.model = model_config + return onnx_cfg + + +def build_model_from_cfg( + model_cfg: Config, + checkpoint_path: str, + device: str, +) -> torch.nn.Module: + """Build a model from MMEngine config and load checkpoint weights. + + Args: + model_cfg: MMEngine model configuration. + checkpoint_path: Path to the checkpoint file. + device: Target device string. + + Returns: + Loaded and initialized PyTorch model in eval mode. + """ + # Ensure CenterPoint ONNX variants are registered into MODELS before building. + register_models() + init_default_scope("mmdet3d") + + model_config = copy.deepcopy(model_cfg.model) + model = MODELS.build(model_config) + model.to(device) + load_checkpoint(model, checkpoint_path, map_location=device) + model.eval() + model.cfg = model_cfg + return model + + +def build_centerpoint_onnx_model( + base_model_cfg: Config, + checkpoint_path: str, + device: str, + rot_y_axis_reference: bool = False, +) -> Tuple[torch.nn.Module, Config]: + """Build an ONNX-compatible CenterPoint model. + + Convenience wrapper that creates ONNX config and builds the model. + + Args: + base_model_cfg: Base MMEngine model configuration. + checkpoint_path: Path to the checkpoint file. + device: Target device string. + rot_y_axis_reference: Whether to use y-axis rotation reference. + + Returns: + Tuple of (model, onnx_compatible_config). + """ + onnx_cfg = create_onnx_model_cfg( + base_model_cfg, + device=device, + rot_y_axis_reference=rot_y_axis_reference, + ) + model = build_model_from_cfg(onnx_cfg, checkpoint_path, device=device) + return model, onnx_cfg diff --git a/deployment/projects/centerpoint/onnx_models/__init__.py b/deployment/projects/centerpoint/onnx_models/__init__.py new file mode 100644 index 000000000..40f39ffb6 --- /dev/null +++ b/deployment/projects/centerpoint/onnx_models/__init__.py @@ -0,0 +1,42 @@ +"""CenterPoint ONNX-compatible model definitions. + +This module contains model variants that support ONNX export: + +- ``CenterPointONNX``: ONNX-compatible detector with feature extraction helpers. +- ``CenterHeadONNX``: ONNX-compatible detection head with stable output ordering. +- ``SeparateHeadONNX``: ONNX-compatible separate head with deterministic ordering. +- ``PillarFeatureNetONNX``: ONNX-compatible pillar feature network. +- ``BackwardPillarFeatureNetONNX``: Backward-compatible pillar feature network. + +**Note**: These are model *definitions* for ONNX export, not exported model artifacts. +They are registered into MMEngine's ``MODELS`` registry via ``@MODELS.register_module()``. + +Usage: + Call ``register_models()`` before building models that reference types like + "CenterPointONNX", "CenterHeadONNX", etc. + +Example: + >>> from deployment.projects.centerpoint.onnx_models import register_models + >>> register_models() # Register ONNX model variants + >>> # Now you can build models with type="CenterPointONNX" in config +""" + +from __future__ import annotations + + +def register_models() -> None: + """Register CenterPoint ONNX model variants into MMEngine's MODELS registry. + + Importing the submodules triggers ``@MODELS.register_module()`` decorators, + which registers the types referenced by config strings (e.g., "CenterPointONNX"). + + This function should be called before ``MODELS.build()`` for configs that + use ONNX model variants. + """ + # Import triggers @MODELS.register_module() registrations + from deployment.projects.centerpoint.onnx_models import centerpoint_head_onnx as _head # noqa: F401 + from deployment.projects.centerpoint.onnx_models import centerpoint_onnx as _model # noqa: F401 + from deployment.projects.centerpoint.onnx_models import pillar_encoder_onnx as _encoder # noqa: F401 + + +__all__ = ["register_models"] diff --git a/deployment/projects/centerpoint/onnx_models/centerpoint_head_onnx.py b/deployment/projects/centerpoint/onnx_models/centerpoint_head_onnx.py new file mode 100644 index 000000000..2d183264c --- /dev/null +++ b/deployment/projects/centerpoint/onnx_models/centerpoint_head_onnx.py @@ -0,0 +1,146 @@ +"""CenterPoint deploy-only ONNX head variants. + +This module provides ONNX-friendly implementations of the CenterPoint heads: + +- ``SeparateHeadONNX``: a variant of + :class:`mmdet3d.models.dense_heads.centerpoint_head.SeparateHead` that + redefines the ``heads`` ordering (e.g., ``heatmap``, ``reg``, ``height``, + ``dim``, rotation-related heads, ``vel``) to produce a stable, deterministic + output layout for export. +- ``CenterHeadONNX``: a variant of + :class:`projects.CenterPoint.models.dense_heads.centerpoint_head.CenterHead` + that wraps a single-task ``SeparateHeadONNX`` and exposes an ONNX-oriented + ``forward`` interface, optionally changing the rotation representation to be + relative to the y-axis. + +In this context, *deploy-only* means these classes are intended for model +export and downstream inference (e.g., ONNXRuntime, TensorRT) rather than for +training: they focus on deterministic tensor ordering and export-compatible +forward behavior, and do not add or modify any training-time loss computation. +""" + +from typing import Dict, List, Tuple + +import torch +from mmdet3d.models.dense_heads.centerpoint_head import SeparateHead +from mmdet3d.registry import MODELS +from mmengine.logging import MMLogger + +from projects.CenterPoint.models.dense_heads.centerpoint_head import CenterHead + + +@MODELS.register_module() +class SeparateHeadONNX(SeparateHead): + """onnx support impl of mmdet3d.models.dense_heads.centerpoint_head.SeparateHead""" + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self._logger = MMLogger.get_current_instance() + self._logger.info("Running SeparateHeadONNX!") + + # Note: to fix the output order + rot_heads = {k: None for k in self.heads.keys() if "rot" in k} + + self.heads: Dict[str, None] = { + "heatmap": None, + "reg": None, + "height": None, + "dim": None, + **rot_heads, + "vel": None, + } + + +@MODELS.register_module() +class CenterHeadONNX(CenterHead): + """onnx support impl of mmdet3d.models.dense_heads.centerpoint_head.CenterHead""" + + def __init__(self, rot_y_axis_reference: bool = False, **kwargs): + """ + :param switch_width_length: Set True to switch the order of width and length. + :param rot_y_axis_reference: Set True to output rotation of sin(y), cos(x) relative to the + y-axis. + """ + super().__init__(**kwargs) + + assert len(self.task_heads) == 1, "CenterPoint must use a single-task head" + self.task_heads: List[SeparateHeadONNX] + self.output_names: List[str] = list(self.task_heads[0].heads.keys()) + self._logger = MMLogger.get_current_instance() + self._rot_y_axis_reference = rot_y_axis_reference + self._logger.info(f"Running CenterHeadONNX! Output rotations in y-axis: {self._rot_y_axis_reference}") + + def _export_forward_rot_y_axis_reference(self, head_tensors: Dict[str, torch.Tensor]) -> Tuple[torch.Tensor]: + """ + TODO(KokSeang): This is a dirty and quick fix, we need to add the same operation to all + outputs to prevent reordering from ONNX export. However, we probably should use onnx_graphsurgeon + to modify them manually. + """ + # Heatmap + heatmap_tensors = head_tensors["heatmap"][:, torch.tensor([0, 1, 2, 3, 4], dtype=torch.int), :, :] + heatmap_scale_factors = torch.tensor([1.0, 1.0, 1.0, 1.0, 1.0]).to(device=heatmap_tensors.device) + heatmap_scale_factors = heatmap_scale_factors.view([1, -1, 1, 1]) + scale_heatmap_tensors = torch.mul(heatmap_tensors, heatmap_scale_factors) + # Reg + reg_tensors = head_tensors["reg"][:, torch.tensor([0, 1], dtype=torch.int), :, :] + reg_scale_factors = torch.tensor([1.0, 1.0]).to(device=reg_tensors.device) + reg_scale_factors = reg_scale_factors.view([1, -1, 1, 1]) + scale_reg_tensors = torch.mul(reg_tensors, reg_scale_factors) + + # Height + height_tensors = head_tensors["height"][:, torch.tensor([0], dtype=torch.int), :, :] + height_scale_factors = torch.tensor([1.0]).to(device=height_tensors.device) + height_scale_factors = height_scale_factors.view([1, -1, 1, 1]) + scale_height_tensors = torch.mul(height_tensors, height_scale_factors) + + # Dim + # Swap length, width, height to width, length, height + flip_dim_tensors = head_tensors["dim"][:, torch.tensor([1, 0, 2], dtype=torch.int), :, :] + dim_scale_factors = torch.tensor([1.0, 1.0, 1.0]).to(device=flip_dim_tensors.device) + dim_scale_factors = dim_scale_factors.view([1, -1, 1, 1]) + scale_flip_dim_tensors = torch.mul(flip_dim_tensors, dim_scale_factors) + + # Rot + # Swap sin(y), cos(x) to cos(x), sin(y) + flip_rot_tensors = head_tensors["rot"][:, torch.tensor([1, 0], dtype=torch.int), :, :] + # Negate -cos(x) and -sin(y) to change direction + rot_scale_factors = torch.tensor([-1.0, -1.0]).to(device=flip_rot_tensors.device) + rot_scale_factors = rot_scale_factors.view([1, -1, 1, 1]) + scale_flip_rot_tensors = torch.mul(flip_rot_tensors, rot_scale_factors) + + # Vel + vel_tensors = head_tensors["vel"][:, torch.tensor([0, 1], dtype=torch.int), :, :] + vel_scale_factors = torch.tensor([1.0, 1.0]).to(device=vel_tensors.device) + vel_scale_factors = vel_scale_factors.view([1, -1, 1, 1]) + scale_vel_tensors = torch.mul(vel_tensors, vel_scale_factors) + + return ( + scale_heatmap_tensors, + scale_reg_tensors, + scale_height_tensors, + scale_flip_dim_tensors, + scale_flip_rot_tensors, + scale_vel_tensors, + ) + + def _export_forward_single(self, head_tensors: Dict[str, torch.Tensor]) -> Tuple[torch.Tensor]: + """ + Forward normally that uses the x-axis reference. + """ + ret_list: List[torch.Tensor] = [head_tensors[head_name] for head_name in self.output_names] + return tuple(ret_list) + + def forward(self, x: List[torch.Tensor]) -> Tuple[torch.Tensor]: + """Forward pass. + Args: + x (List[torch.Tensor]): multi-level features + Returns: + pred (Tuple[torch.Tensor]): Output results for tasks. + """ + assert len(x) == 1, "The input of CenterHeadONNX must be a single-level feature" + x = self.shared_conv(x[0]) + head_tensors: Dict[str, torch.Tensor] = self.task_heads[0](x) + if self._rot_y_axis_reference: + return self._export_forward_rot_y_axis_reference(head_tensors=head_tensors) + else: + return self._export_forward_single(head_tensors=head_tensors) diff --git a/deployment/projects/centerpoint/onnx_models/centerpoint_onnx.py b/deployment/projects/centerpoint/onnx_models/centerpoint_onnx.py new file mode 100644 index 000000000..8332ac818 --- /dev/null +++ b/deployment/projects/centerpoint/onnx_models/centerpoint_onnx.py @@ -0,0 +1,144 @@ +"""CenterPoint deploy-only ONNX model variants. + +These modules provide ONNX-friendly model wrappers and detector variants used by +the deployment/export pipeline (not training). +""" + +import os +from typing import Dict, List, Tuple + +import numpy as np +import torch +from mmdet3d.models.detectors.centerpoint import CenterPoint +from mmdet3d.registry import MODELS +from mmengine.logging import MMLogger +from torch import nn + + +class CenterPointHeadONNX(nn.Module): + """Head module for centerpoint with BACKBONE, NECK and BBOX_HEAD""" + + def __init__(self, backbone: nn.Module, neck: nn.Module, bbox_head: nn.Module): + super(CenterPointHeadONNX, self).__init__() + self.backbone: nn.Module = backbone + self.neck: nn.Module = neck + self.bbox_head: nn.Module = bbox_head + self._logger = MMLogger.get_current_instance() + self._logger.info("Running CenterPointHeadONNX!") + + def forward(self, x: torch.Tensor) -> Tuple[List[Dict[str, torch.Tensor]]]: + """ + Note: + torch.onnx.export() doesn't support triple-nested output + + Args: + x (torch.Tensor): (B, C, H, W) + Returns: + tuple[list[dict[str, any]]]: + (num_classes x [num_detect x {'reg', 'height', 'dim', 'rot', 'vel', 'heatmap'}]) + """ + x = self.backbone(x) + if self.neck is not None: + x = self.neck(x) + x = self.bbox_head(x) + + return x + + +@MODELS.register_module() +class CenterPointONNX(CenterPoint): + """onnx support impl of mmdet3d.models.detectors.CenterPoint""" + + def __init__(self, point_channels: int = 5, device: str = "cpu", **kwargs): + super().__init__(**kwargs) + self._point_channels = point_channels + self._device = device + # Handle both "cuda:0" and "gpu" device strings + if self._device.startswith("cuda") or self._device == "gpu": + self._torch_device = torch.device(self._device if self._device.startswith("cuda") else "cuda:0") + else: + self._torch_device = torch.device("cpu") + self._logger = MMLogger.get_current_instance() + self._logger.info("Running CenterPointONNX!") + + def _get_inputs(self, data_loader, sample_idx=0): + """ + Generate inputs from the provided data loader. + + Args: + data_loader: Loader that implements ``load_sample``. + sample_idx: Index of the sample to fetch. + """ + if data_loader is None: + raise ValueError("data_loader is required for CenterPoint ONNX export") + + if not hasattr(data_loader, "load_sample"): + raise AttributeError("data_loader must implement 'load_sample(sample_idx)'") + + sample = data_loader.load_sample(sample_idx) + + if "points" not in sample: + raise KeyError(f"Sample must contain 'points' (processed tensor). Got keys: {list(sample.keys())}") + + points = sample["points"] + if not isinstance(points, torch.Tensor): + raise TypeError(f"Expected points to be torch.Tensor, got {type(points)}") + + # Ensure points are on the correct device + points = points.to(self._torch_device) + points = [points] + return {"points": points, "data_samples": None} + + def _load_point_cloud(self, lidar_path: str) -> np.ndarray: + """ + Load point cloud from file. + + Args: + lidar_path: Path to point cloud file (.bin or .pcd) + + Returns: + Point cloud array (N, 5) where 5 = (x, y, z, intensity, ring_id) + """ + if lidar_path.endswith(".bin"): + # Load binary point cloud (KITTI/nuScenes format) + # T4 dataset has 5 features: x, y, z, intensity, ring_id + points = np.fromfile(lidar_path, dtype=np.float32).reshape(-1, 5) + + # Don't pad here - let the voxelization process handle feature expansion + # The voxelization process will add cluster_center (+3) and voxel_center (+3) features + # So 5 + 3 + 3 = 11 features total + + elif lidar_path.endswith(".pcd"): + # Load PCD format (placeholder - would need pypcd or similar) + raise NotImplementedError("PCD format loading not implemented yet") + else: + raise ValueError(f"Unsupported point cloud format: {lidar_path}") + + return points + + def _extract_features(self, data_loader, sample_idx=0): + """ + Extract features using samples from the provided data loader. + """ + if data_loader is None: + raise ValueError("data_loader is required to extract features") + + assert self.data_preprocessor is not None and hasattr(self.data_preprocessor, "voxelize") + + # Ensure data preprocessor is on the correct device + if hasattr(self.data_preprocessor, "to"): + self.data_preprocessor.to(self._torch_device) + + inputs = self._get_inputs(data_loader, sample_idx) + voxel_dict = self.data_preprocessor.voxelize(points=inputs["points"], data_samples=inputs["data_samples"]) + + # Ensure all voxel tensors are on the correct device + for key in ["voxels", "num_points", "coors"]: + if key in voxel_dict and isinstance(voxel_dict[key], torch.Tensor): + voxel_dict[key] = voxel_dict[key].to(self._torch_device) + + assert self.pts_voxel_encoder is not None and hasattr(self.pts_voxel_encoder, "get_input_features") + input_features = self.pts_voxel_encoder.get_input_features( + voxel_dict["voxels"], voxel_dict["num_points"], voxel_dict["coors"] + ) + return input_features, voxel_dict diff --git a/deployment/projects/centerpoint/onnx_models/pillar_encoder_onnx.py b/deployment/projects/centerpoint/onnx_models/pillar_encoder_onnx.py new file mode 100644 index 000000000..16ef0eb1f --- /dev/null +++ b/deployment/projects/centerpoint/onnx_models/pillar_encoder_onnx.py @@ -0,0 +1,223 @@ +"""CenterPoint deploy-only ONNX voxel encoder variants. + +This module defines ONNX-compatible wrappers around CenterPoint pillar voxel encoders +that are intended for deployment / inference only (that is, for ONNX export and +runtime execution, not for training). The classes here expose helper APIs and forward +signatures that are easier to trace and integrate into componentized inference +pipelines. + +Provided encoder variants include: + +- ``PillarFeatureNetONNX``: ONNX-support implementation of + :class:`mmdet3d.models.voxel_encoders.pillar_encoder.PillarFeatureNet`, keeping + the original behavior but with an ONNX-friendly interface. +- ``BackwardPillarFeatureNetONNX``: backward-compatible pillar feature network based + on :class:`projects.CenterPoint.models.voxel_encoders.pillar_encoder.BackwardPillarFeatureNet` + that prepares pillar features and runs PFN layers without Z-distance features for + use with exported CenterPoint models. +""" + +from typing import Optional + +import torch +from mmdet3d.models.voxel_encoders.pillar_encoder import PillarFeatureNet +from mmdet3d.models.voxel_encoders.utils import get_paddings_indicator +from mmdet3d.registry import MODELS +from mmengine.logging import MMLogger +from torch import Tensor + +from projects.CenterPoint.models.voxel_encoders.pillar_encoder import BackwardPillarFeatureNet + + +@MODELS.register_module() +class PillarFeatureNetONNX(PillarFeatureNet): + """onnx support impl of mmdet3d.models.voxel_encoders.pillar_encoder.PillarFeatureNet""" + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self._logger = MMLogger.get_current_instance() + self._logger.info("Running PillarFeatureNetONNX!") + + def get_input_features( + self, + features: Tensor, + num_points: Tensor, + coors: Tensor, + *args, + **kwargs, + ) -> Tensor: + """Forward function. + + Args: + features (torch.Tensor): Point features or raw points in shape + (N, M, C). + num_points (torch.Tensor): Number of points in each pillar. + coors (torch.Tensor): Coordinates of each voxel. + + Returns: + torch.Tensor: Features of pillars. + """ + features_ls = [features] + # Find distance of x, y, and z from cluster center + if self._with_cluster_center: + points_mean = features[:, :, :3].sum(dim=1, keepdim=True) / num_points.type_as(features).view(-1, 1, 1) + f_cluster = features[:, :, :3] - points_mean + features_ls.append(f_cluster) + + # Find distance of x, y, and z from pillar center + dtype = features.dtype + if self._with_voxel_center: + if not self.legacy: + f_center = torch.zeros_like(features[:, :, :3]) + f_center[:, :, 0] = features[:, :, 0] - (coors[:, 3].to(dtype).unsqueeze(1) * self.vx + self.x_offset) + f_center[:, :, 1] = features[:, :, 1] - (coors[:, 2].to(dtype).unsqueeze(1) * self.vy + self.y_offset) + f_center[:, :, 2] = features[:, :, 2] - (coors[:, 1].to(dtype).unsqueeze(1) * self.vz + self.z_offset) + else: + f_center = features[:, :, :3] + f_center[:, :, 0] = f_center[:, :, 0] - ( + coors[:, 3].type_as(features).unsqueeze(1) * self.vx + self.x_offset + ) + f_center[:, :, 1] = f_center[:, :, 1] - ( + coors[:, 2].type_as(features).unsqueeze(1) * self.vy + self.y_offset + ) + f_center[:, :, 2] = f_center[:, :, 2] - ( + coors[:, 1].type_as(features).unsqueeze(1) * self.vz + self.z_offset + ) + features_ls.append(f_center) + + if self._with_distance: + points_dist = torch.norm(features[:, :, :3], 2, 2, keepdim=True) + features_ls.append(points_dist) + + # Combine together feature decorations + features = torch.cat(features_ls, dim=-1) + # The feature decorations were calculated without regard to whether + # pillar was empty. Need to ensure that + # empty pillars remain set to zeros. + voxel_count = features.shape[1] + mask = get_paddings_indicator(num_points, voxel_count, axis=0) + mask = torch.unsqueeze(mask, -1).type_as(features) + features *= mask + return features + + def forward( + self, + features: torch.Tensor, + ) -> torch.Tensor: + """Forward function. + Args: + features (torch.Tensor): Point features in shape (N, M, C). + num_points (torch.Tensor): Number of points in each pillar. + coors (torch.Tensor): + Returns: + torch.Tensor: Features of pillars. + """ + + for pfn in self.pfn_layers: + features = pfn(features) + + return features + + +@MODELS.register_module() +class BackwardPillarFeatureNetONNX(BackwardPillarFeatureNet): + """Pillar Feature Net. + + The backward-compatible network prepares the pillar features and performs forward pass + through PFNLayers without features from Z-distance. Use this to load models trained + from older mmdet versions. + + Args: + in_channels (int, optional): Number of input features, + either x, y, z or x, y, z, r. Defaults to 4. + feat_channels (tuple, optional): Number of features in each of the + N PFNLayers. Defaults to (64, ). + with_distance (bool, optional): Whether to include Euclidean distance + to points. Defaults to False. + with_cluster_center (bool, optional): [description]. Defaults to True. + with_voxel_center (bool, optional): [description]. Defaults to True. + voxel_size (tuple[float], optional): Size of voxels, only utilize x + and y size. Defaults to (0.2, 0.2, 4). + point_cloud_range (tuple[float], optional): Point cloud range, only + utilizes x and y min. Defaults to (0, -40, -3, 70.4, 40, 1). + norm_cfg ([type], optional): [description]. + Defaults to dict(type='BN1d', eps=1e-3, momentum=0.01). + mode (str, optional): The mode to gather point features. Options are + 'max' or 'avg'. Defaults to 'max'. + legacy (bool, optional): Whether to use the new behavior or + the original behavior. Defaults to True. + """ + + def __init__(self, **kwargs): + super(BackwardPillarFeatureNetONNX, self).__init__(**kwargs) + + def get_input_features(self, features: Tensor, num_points: Tensor, coors: Tensor, *args, **kwargs) -> Tensor: + """Forward function. + + Args: + features (torch.Tensor): Point features or raw points in shape + (N, M, C). + num_points (torch.Tensor): Number of points in each pillar. + coors (torch.Tensor): Coordinates of each voxel. + + Returns: + torch.Tensor: Features of pillars. + """ + features_ls = [features] + # Find distance of x, y, and z from cluster center + if self._with_cluster_center: + points_mean = features[:, :, :3].sum(dim=1, keepdim=True) / num_points.type_as(features).view(-1, 1, 1) + f_cluster = features[:, :, :3] - points_mean + features_ls.append(f_cluster) + + # Find distance of x, y, and z from pillar center + dtype = features.dtype + if self._with_voxel_center: + if not self.legacy: + f_center = torch.zeros_like(features[:, :, :2]) + f_center[:, :, 0] = features[:, :, 0] - (coors[:, 3].to(dtype).unsqueeze(1) * self.vx + self.x_offset) + f_center[:, :, 1] = features[:, :, 1] - (coors[:, 2].to(dtype).unsqueeze(1) * self.vy + self.y_offset) + else: + f_center = features[:, :, :2] + f_center[:, :, 0] = f_center[:, :, 0] - ( + coors[:, 3].type_as(features).unsqueeze(1) * self.vx + self.x_offset + ) + f_center[:, :, 1] = f_center[:, :, 1] - ( + coors[:, 2].type_as(features).unsqueeze(1) * self.vy + self.y_offset + ) + features_ls.append(f_center) + + if self._with_distance: + points_dist = torch.norm(features[:, :, :3], 2, 2, keepdim=True) + features_ls.append(points_dist) + + # Combine together feature decorations + features = torch.cat(features_ls, dim=-1) + # The feature decorations were calculated without regard to whether + # pillar was empty. Need to ensure that + # empty pillars remain set to zeros. + voxel_count = features.shape[1] + mask = get_paddings_indicator(num_points, voxel_count, axis=0) + mask = torch.unsqueeze(mask, -1).type_as(features) + features *= mask + return features + + def forward( + self, + features: torch.Tensor, + ) -> torch.Tensor: + """Forward function. + + Args: + features (torch.Tensor): Point features or raw points in shape + (N, M, C). + num_points (torch.Tensor): Number of points in each pillar. + coors (torch.Tensor): Coordinates of each voxel. + + Returns: + torch.Tensor: Features of pillars. + """ + for pfn in self.pfn_layers: + features = pfn(features) + + return features diff --git a/deployment/projects/centerpoint/pipelines/centerpoint_pipeline.py b/deployment/projects/centerpoint/pipelines/centerpoint_pipeline.py new file mode 100644 index 000000000..54259a936 --- /dev/null +++ b/deployment/projects/centerpoint/pipelines/centerpoint_pipeline.py @@ -0,0 +1,297 @@ +""" +CenterPoint Deployment Pipeline Base Class. + +Provides common preprocessing, postprocessing, and inference logic +shared by PyTorch, ONNX, and TensorRT backend implementations. +""" + +from __future__ import annotations + +import logging +import time +from abc import abstractmethod +from typing import Any, Dict, List, Optional, Tuple, Union + +import numpy as np +import torch +from mmdet3d.structures import Det3DDataSample, LiDARInstance3DBoxes + +from deployment.pipelines.base_pipeline import BaseDeploymentPipeline + +logger = logging.getLogger(__name__) + + +class CenterPointDeploymentPipeline(BaseDeploymentPipeline): + """Base pipeline for CenterPoint staged inference. + + This normalizes preprocessing/postprocessing for CenterPoint and provides + common helpers (e.g., middle encoder processing) used by PyTorch/ONNX/TensorRT + backend-specific pipelines. + + Attributes: + pytorch_model: Reference PyTorch model for preprocessing/postprocessing. + num_classes: Number of detection classes. + class_names: List of class names. + point_cloud_range: Point cloud range [x_min, y_min, z_min, x_max, y_max, z_max]. + voxel_size: Voxel size [vx, vy, vz]. + """ + + def __init__( + self, + pytorch_model: torch.nn.Module, + device: str = "cuda", + backend_type: str = "unknown", + ) -> None: + """Initialize CenterPoint pipeline. + + Args: + pytorch_model: PyTorch model for preprocessing/postprocessing. + device: Target device ('cpu' or 'cuda:N'). + backend_type: Backend identifier ('pytorch', 'onnx', 'tensorrt'). + + Raises: + ValueError: If class_names not found in pytorch_model.cfg. + """ + cfg = getattr(pytorch_model, "cfg", None) + + class_names = getattr(cfg, "class_names", None) + if class_names is None: + raise ValueError("class_names not found in pytorch_model.cfg") + + point_cloud_range = getattr(cfg, "point_cloud_range", None) + voxel_size = getattr(cfg, "voxel_size", None) + + super().__init__( + model=pytorch_model, + device=device, + task_type="detection3d", + backend_type=backend_type, + ) + + self.num_classes: int = len(class_names) + self.class_names: List[str] = class_names + self.point_cloud_range: Optional[List[float]] = point_cloud_range + self.voxel_size: Optional[List[float]] = voxel_size + self.pytorch_model: torch.nn.Module = pytorch_model + self._stage_latencies: Dict[str, float] = {} + + def to_device_tensor(self, data: Union[torch.Tensor, np.ndarray]) -> torch.Tensor: + """Convert data to tensor on the pipeline's device. + + Args: + data: Input data (torch.Tensor or np.ndarray). + + Returns: + Tensor on self.device. + """ + if isinstance(data, np.ndarray): + data = torch.from_numpy(data) + return data.to(self.device) + + def to_numpy(self, data: torch.Tensor, dtype: np.dtype = np.float32) -> np.ndarray: + """Convert tensor to contiguous numpy array. + + Args: + data: Input tensor. + dtype: Target numpy dtype. + + Returns: + Contiguous numpy array. + """ + arr = data.cpu().numpy().astype(dtype) + if not arr.flags["C_CONTIGUOUS"]: + arr = np.ascontiguousarray(arr) + return arr + + def preprocess( + self, + points: torch.Tensor, + ) -> Tuple[Dict[str, torch.Tensor], Dict[str, Any]]: + """Preprocess point cloud data for inference. + + Performs voxelization and feature extraction using the data_preprocessor + and pts_voxel_encoder from the PyTorch model. + + Args: + points: Point cloud tensor of shape [N, point_features]. + + Returns: + Tuple of (preprocessed_dict, metadata_dict). + preprocessed_dict contains: input_features, voxels, num_points, coors. + """ + points_tensor = self.to_device_tensor(points) + + data_samples = [Det3DDataSample()] + with torch.no_grad(): + batch_inputs = self.pytorch_model.data_preprocessor( + {"inputs": {"points": [points_tensor]}, "data_samples": data_samples} + ) + + voxel_dict = batch_inputs["inputs"]["voxels"] + voxels = voxel_dict["voxels"] + num_points = voxel_dict["num_points"] + coors = voxel_dict["coors"] + + input_features: Optional[torch.Tensor] = None + with torch.no_grad(): + if hasattr(self.pytorch_model.pts_voxel_encoder, "get_input_features"): + input_features = self.pytorch_model.pts_voxel_encoder.get_input_features(voxels, num_points, coors) + + preprocessed_dict = { + "input_features": input_features, + "voxels": voxels, + "num_points": num_points, + "coors": coors, + } + + return preprocessed_dict, {} + + def process_middle_encoder( + self, + voxel_features: torch.Tensor, + coors: torch.Tensor, + ) -> torch.Tensor: + """Process voxel features through middle encoder (scatter to BEV). + + This step runs on PyTorch regardless of backend because it involves + sparse-to-dense conversion that's not easily exportable to ONNX. + + Args: + voxel_features: Encoded voxel features [N, feature_dim]. + coors: Voxel coordinates [N, 4] (batch_idx, z, y, x). + + Returns: + Spatial features tensor [B, C, H, W]. + """ + voxel_features = self.to_device_tensor(voxel_features) + coors = self.to_device_tensor(coors) + + batch_size = int(coors[-1, 0].item()) + 1 if len(coors) > 0 else 1 + + with torch.no_grad(): + spatial_features = self.pytorch_model.pts_middle_encoder(voxel_features, coors, batch_size) + + return spatial_features + + def postprocess( + self, + head_outputs: List[torch.Tensor], + sample_meta: Dict[str, Any], + ) -> List[Dict[str, Any]]: + """Postprocess head outputs to detection results. + + Args: + head_outputs: List of 6 tensors [heatmap, reg, height, dim, rot, vel]. + sample_meta: Sample metadata dict. + + Returns: + List of detection dicts with keys: bbox_3d, score, label. + + Raises: + ValueError: If head_outputs doesn't contain exactly 6 tensors. + """ + head_outputs = [self.to_device_tensor(out) for out in head_outputs] + + if len(head_outputs) != 6: + raise ValueError(f"Expected 6 head outputs, got {len(head_outputs)}") + + heatmap, reg, height, dim, rot, vel = head_outputs + + # Apply rotation axis correction if configured + if hasattr(self.pytorch_model, "pts_bbox_head"): + rot_y_axis_reference = getattr(self.pytorch_model.pts_bbox_head, "_rot_y_axis_reference", False) + if rot_y_axis_reference: + dim = dim[:, [1, 0, 2], :, :] + rot = rot * (-1.0) + rot = rot[:, [1, 0], :, :] + + preds_dict = { + "heatmap": heatmap, + "reg": reg, + "height": height, + "dim": dim, + "rot": rot, + "vel": vel, + } + preds_dicts = ([preds_dict],) + + if "box_type_3d" not in sample_meta: + sample_meta["box_type_3d"] = LiDARInstance3DBoxes + batch_input_metas = [sample_meta] + + with torch.no_grad(): + predictions_list = self.pytorch_model.pts_bbox_head.predict_by_feat( + preds_dicts=preds_dicts, batch_input_metas=batch_input_metas + ) + + results: List[Dict[str, Any]] = [] + for pred_instances in predictions_list: + bboxes_3d = pred_instances.bboxes_3d.tensor.cpu().numpy() + scores_3d = pred_instances.scores_3d.cpu().numpy() + labels_3d = pred_instances.labels_3d.cpu().numpy() + + for i in range(len(bboxes_3d)): + results.append( + { + "bbox_3d": bboxes_3d[i][:7].tolist(), + "score": float(scores_3d[i]), + "label": int(labels_3d[i]), + } + ) + + return results + + @abstractmethod + def run_voxel_encoder(self, input_features: torch.Tensor) -> torch.Tensor: + """Run voxel encoder inference. + + Args: + input_features: Input features [N, max_points, C]. + + Returns: + Voxel features [N, feature_dim]. + """ + raise NotImplementedError + + @abstractmethod + def run_backbone_head(self, spatial_features: torch.Tensor) -> List[torch.Tensor]: + """Run backbone and head inference. + + Args: + spatial_features: Spatial features [B, C, H, W]. + + Returns: + List of 6 head output tensors. + """ + raise NotImplementedError + + def run_model( + self, + preprocessed_input: Dict[str, torch.Tensor], + ) -> Tuple[List[torch.Tensor], Dict[str, float]]: + """Run the full model pipeline with latency tracking. + + Args: + preprocessed_input: Dict with keys: input_features, coors. + + Returns: + Tuple of (head_outputs, stage_latencies). + """ + stage_latencies: Dict[str, float] = {} + + start = time.perf_counter() + voxel_features = self.run_voxel_encoder(preprocessed_input["input_features"]) + stage_latencies["voxel_encoder_ms"] = (time.perf_counter() - start) * 1000 + + start = time.perf_counter() + spatial_features = self.process_middle_encoder(voxel_features, preprocessed_input["coors"]) + stage_latencies["middle_encoder_ms"] = (time.perf_counter() - start) * 1000 + + start = time.perf_counter() + head_outputs = self.run_backbone_head(spatial_features) + stage_latencies["backbone_head_ms"] = (time.perf_counter() - start) * 1000 + + return head_outputs, stage_latencies + + def __repr__(self) -> str: + return f"{self.__class__.__name__}(device={self.device}, backend={self.backend_type})" diff --git a/deployment/projects/centerpoint/pipelines/factory.py b/deployment/projects/centerpoint/pipelines/factory.py new file mode 100644 index 000000000..8a513b790 --- /dev/null +++ b/deployment/projects/centerpoint/pipelines/factory.py @@ -0,0 +1,82 @@ +""" +CenterPoint Pipeline Factory. + +Registers CenterPoint pipelines into the global pipeline_registry so evaluators can create pipelines +via `deployment.pipelines.factory.PipelineFactory`. +""" + +import logging +from typing import Any, Mapping, Optional + +from deployment.core.backend import Backend +from deployment.core.evaluation.evaluator_types import ModelSpec +from deployment.pipelines.base_factory import BasePipelineFactory +from deployment.pipelines.base_pipeline import BaseDeploymentPipeline +from deployment.pipelines.registry import pipeline_registry +from deployment.projects.centerpoint.pipelines.onnx import CenterPointONNXPipeline +from deployment.projects.centerpoint.pipelines.pytorch import CenterPointPyTorchPipeline +from deployment.projects.centerpoint.pipelines.tensorrt import CenterPointTensorRTPipeline + +logger = logging.getLogger(__name__) + + +@pipeline_registry.register +class CenterPointPipelineFactory(BasePipelineFactory): + """Pipeline factory for CenterPoint across supported backends. + + Supports passing `components_cfg` to configure component file paths + and IO specifications. + """ + + @classmethod + def get_project_name(cls) -> str: + return "centerpoint" + + @classmethod + def create_pipeline( + cls, + model_spec: ModelSpec, + pytorch_model: Any, + device: Optional[str] = None, + components_cfg: Optional[Mapping[str, Any]] = None, + ) -> BaseDeploymentPipeline: + """Create a CenterPoint pipeline for the specified backend. + + Args: + model_spec: Model specification (backend/device/path) + pytorch_model: PyTorch model instance for preprocessing + device: Override device (uses model_spec.device if None) + components_cfg: Unified component configuration dict from deploy_config. + Used to configure component file paths. + + Returns: + Pipeline instance for the specified backend + """ + device = device or model_spec.device + backend = model_spec.backend + + cls._validate_backend(backend) + + if backend is Backend.PYTORCH: + logger.info(f"Creating CenterPoint PyTorch pipeline on {device}") + return CenterPointPyTorchPipeline(pytorch_model, device=device) + + if backend is Backend.ONNX: + logger.info(f"Creating CenterPoint ONNX pipeline from {model_spec.path} on {device}") + return CenterPointONNXPipeline( + pytorch_model, + onnx_dir=model_spec.path, + device=device, + components_cfg=components_cfg, + ) + + if backend is Backend.TENSORRT: + logger.info(f"Creating CenterPoint TensorRT pipeline from {model_spec.path} on {device}") + return CenterPointTensorRTPipeline( + pytorch_model, + tensorrt_dir=model_spec.path, + device=device, + components_cfg=components_cfg, + ) + + raise ValueError(f"Unsupported backend: {backend.value}") diff --git a/deployment/projects/centerpoint/pipelines/onnx.py b/deployment/projects/centerpoint/pipelines/onnx.py new file mode 100644 index 000000000..a69cf2b73 --- /dev/null +++ b/deployment/projects/centerpoint/pipelines/onnx.py @@ -0,0 +1,195 @@ +""" +CenterPoint ONNX Pipeline Implementation. +""" + +from __future__ import annotations + +import logging +import os.path as osp +from typing import Any, List, Mapping + +import numpy as np +import onnxruntime as ort +import torch + +from deployment.core.artifacts import resolve_artifact_path +from deployment.projects.centerpoint.pipelines.centerpoint_pipeline import CenterPointDeploymentPipeline + +logger = logging.getLogger(__name__) + + +class CenterPointONNXPipeline(CenterPointDeploymentPipeline): + """ONNXRuntime-based CenterPoint pipeline (componentized inference). + + Loads separate ONNX models for voxel_encoder and backbone_head components + and runs inference using ONNXRuntime. + + Attributes: + onnx_dir: Directory containing ONNX model files. + voxel_encoder_session: ONNXRuntime session for voxel encoder. + backbone_head_session: ONNXRuntime session for backbone + head. + """ + + def __init__( + self, + pytorch_model: torch.nn.Module, + onnx_dir: str, + device: str = "cpu", + components_cfg: Mapping[str, Any] | None = None, + ) -> None: + """Initialize ONNX pipeline. + + Args: + pytorch_model: Reference PyTorch model for preprocessing. + onnx_dir: Directory containing ONNX model files. + device: Target device ('cpu' or 'cuda:N'). + components_cfg: Component configuration dict from deploy_config. + If None, uses default component names. + """ + super().__init__(pytorch_model, device, backend_type="onnx") + + self.onnx_dir = onnx_dir + if components_cfg is None: + components_cfg = {} + if not isinstance(components_cfg, Mapping): + raise TypeError(f"components_cfg must be a mapping, got {type(components_cfg).__name__}") + self._components_cfg = components_cfg + self._load_onnx_models(device) + logger.info(f"ONNX pipeline initialized with models from: {onnx_dir}") + + def _load_onnx_models(self, device: str) -> None: + """Load ONNX models for each component. + + Args: + device: Target device for execution provider selection. + + Raises: + FileNotFoundError: If ONNX model files are not found. + RuntimeError: If model loading fails. + """ + voxel_encoder_path = resolve_artifact_path( + base_dir=self.onnx_dir, + components_cfg=self._components_cfg, + component="voxel_encoder", + file_key="onnx_file", + ) + backbone_head_path = resolve_artifact_path( + base_dir=self.onnx_dir, + components_cfg=self._components_cfg, + component="backbone_head", + file_key="onnx_file", + ) + + if not osp.exists(voxel_encoder_path): + raise FileNotFoundError(f"Voxel encoder ONNX not found: {voxel_encoder_path}") + if not osp.exists(backbone_head_path): + raise FileNotFoundError(f"Backbone head ONNX not found: {backbone_head_path}") + + # Configure session options + so = ort.SessionOptions() + so.graph_optimization_level = ort.GraphOptimizationLevel.ORT_DISABLE_ALL + so.log_severity_level = 3 + + # Select execution providers based on device + if device.startswith("cuda"): + providers = ["CUDAExecutionProvider", "CPUExecutionProvider"] + logger.info("Using CUDA execution provider for ONNX") + else: + providers = ["CPUExecutionProvider"] + logger.info("Using CPU execution provider for ONNX") + + try: + self.voxel_encoder_session = ort.InferenceSession(voxel_encoder_path, sess_options=so, providers=providers) + logger.info(f"Loaded voxel encoder: {voxel_encoder_path}") + except Exception as e: + raise RuntimeError(f"Failed to load voxel encoder ONNX: {e}") + + try: + self.backbone_head_session = ort.InferenceSession(backbone_head_path, sess_options=so, providers=providers) + logger.info(f"Loaded backbone+head: {backbone_head_path}") + except Exception as e: + raise RuntimeError(f"Failed to load backbone+head ONNX: {e}") + + def run_voxel_encoder(self, input_features: torch.Tensor) -> torch.Tensor: + """Run voxel encoder using ONNXRuntime. + + Args: + input_features: Input features [N, max_points, C]. + + Returns: + Voxel features [N, feature_dim]. + """ + input_array = self.to_numpy(input_features, dtype=np.float32) + input_name = self.voxel_encoder_session.get_inputs()[0].name + output_name = self.voxel_encoder_session.get_outputs()[0].name + + outputs = self.voxel_encoder_session.run([output_name], {input_name: input_array}) + + voxel_features = torch.from_numpy(outputs[0]).to(self.device) + if voxel_features.ndim == 3 and voxel_features.shape[1] == 1: + voxel_features = voxel_features.squeeze(1) + return voxel_features + + def run_backbone_head(self, spatial_features: torch.Tensor) -> List[torch.Tensor]: + """Run backbone and head using ONNXRuntime. + + Args: + spatial_features: Spatial features [B, C, H, W]. + + Returns: + List of 6 head output tensors. + + Raises: + ValueError: If output count is not 6. + """ + input_array = self.to_numpy(spatial_features, dtype=np.float32) + + input_name = self.backbone_head_session.get_inputs()[0].name + onnx_output_names = [output.name for output in self.backbone_head_session.get_outputs()] + + # Get expected output order from components_cfg + backbone_head_cfg = self._components_cfg.get("backbone_head", {}) + io_cfg = backbone_head_cfg.get("io", {}) + outputs = io_cfg.get("outputs", []) + + if not outputs: + raise ValueError( + "Output names must be provided via components_cfg.backbone_head.io.outputs. " + "No fallback values are allowed in deployment framework." + ) + + expected_output_names = [out.get("name") for out in outputs if out.get("name")] + if not expected_output_names: + raise ValueError( + "Output names must be provided via components_cfg.backbone_head.io.outputs. " + "Each output must have a 'name' field." + ) + + # Validate outputs: check for missing or extra outputs + onnx_output_set = set(onnx_output_names) + expected_output_set = set(expected_output_names) + + missing_outputs = expected_output_set - onnx_output_set + if missing_outputs: + raise ValueError( + f"Missing outputs in ONNX model: {sorted(missing_outputs)}. " + f"Expected: {sorted(expected_output_set)}, Got: {sorted(onnx_output_set)}" + ) + + extra_outputs = onnx_output_set - expected_output_set + if extra_outputs: + raise ValueError( + f"Extra outputs in ONNX model: {sorted(extra_outputs)}. " + f"Expected: {sorted(expected_output_set)}, Got: {sorted(onnx_output_set)}" + ) + + output_names = expected_output_names + + # Run inference with ordered output names (ONNX Runtime returns outputs in the same order) + outputs = self.backbone_head_session.run(output_names, {input_name: input_array}) + head_outputs = [torch.from_numpy(out).to(self.device) for out in outputs] + + if len(head_outputs) != 6: + raise ValueError(f"Expected 6 head outputs, got {len(head_outputs)}") + + return head_outputs diff --git a/deployment/projects/centerpoint/pipelines/pytorch.py b/deployment/projects/centerpoint/pipelines/pytorch.py new file mode 100644 index 000000000..35c9ab79d --- /dev/null +++ b/deployment/projects/centerpoint/pipelines/pytorch.py @@ -0,0 +1,116 @@ +""" +CenterPoint PyTorch Pipeline Implementation. +""" + +from __future__ import annotations + +import logging +from typing import List + +import torch + +from deployment.projects.centerpoint.pipelines.centerpoint_pipeline import CenterPointDeploymentPipeline + +logger = logging.getLogger(__name__) + + +class CenterPointPyTorchPipeline(CenterPointDeploymentPipeline): + """PyTorch-based CenterPoint pipeline (staged to match ONNX/TensorRT outputs). + + This pipeline runs inference using the native PyTorch model, but structures + the execution to match the ONNX/TensorRT staged inference for consistency. + """ + + def __init__(self, pytorch_model: torch.nn.Module, device: str = "cuda") -> None: + """Initialize PyTorch pipeline. + + Args: + pytorch_model: PyTorch model for inference. + device: Target device ('cpu' or 'cuda:N'). + """ + super().__init__(pytorch_model, device, backend_type="pytorch") + logger.info("PyTorch pipeline initialized (ONNX-compatible staged inference)") + + def run_voxel_encoder(self, input_features: torch.Tensor) -> torch.Tensor: + """Run voxel encoder using PyTorch model. + + Args: + input_features: Input features [N, max_points, C]. + + Returns: + Voxel features [N, feature_dim]. + + Raises: + ValueError: If input_features is None. + RuntimeError: If output shape is unexpected. + """ + if input_features is None: + raise ValueError("input_features is None. This should not happen for ONNX models.") + + input_features = self.to_device_tensor(input_features) + + with torch.no_grad(): + voxel_features = self.pytorch_model.pts_voxel_encoder(input_features) + + # Handle various output shapes from different encoder variants + if voxel_features.ndim == 3: + if voxel_features.shape[1] == 1: + voxel_features = voxel_features.squeeze(1) + elif voxel_features.shape[2] == 1: + voxel_features = voxel_features.squeeze(2) + else: + raise RuntimeError( + f"Voxel encoder output has unexpected 3D shape: {voxel_features.shape}. " + f"Expected 2D output [N_voxels, feature_dim]. Input was: {input_features.shape}" + ) + elif voxel_features.ndim > 3: + raise RuntimeError( + f"Voxel encoder output has {voxel_features.ndim}D shape: {voxel_features.shape}. " + "Expected 2D output [N_voxels, feature_dim]." + ) + + return voxel_features + + def run_backbone_head(self, spatial_features: torch.Tensor) -> List[torch.Tensor]: + """Run backbone and head using PyTorch model. + + Args: + spatial_features: Spatial features [B, C, H, W]. + + Returns: + List of 6 head output tensors. + + Raises: + ValueError: If head output format is unexpected. + """ + spatial_features = self.to_device_tensor(spatial_features) + + with torch.no_grad(): + x = self.pytorch_model.pts_backbone(spatial_features) + + if hasattr(self.pytorch_model, "pts_neck") and self.pytorch_model.pts_neck is not None: + x = self.pytorch_model.pts_neck(x) + + head_outputs_tuple = self.pytorch_model.pts_bbox_head(x) + + if isinstance(head_outputs_tuple, tuple) and len(head_outputs_tuple) > 0: + first_element = head_outputs_tuple[0] + + if isinstance(first_element, torch.Tensor): + head_outputs = list(head_outputs_tuple) + elif isinstance(first_element, list) and len(first_element) > 0: + preds_dict = first_element[0] + head_outputs = [ + preds_dict["heatmap"], + preds_dict["reg"], + preds_dict["height"], + preds_dict["dim"], + preds_dict["rot"], + preds_dict["vel"], + ] + else: + raise ValueError(f"Unexpected task_outputs format: {type(first_element)}") + else: + raise ValueError(f"Unexpected head_outputs format: {type(head_outputs_tuple)}") + + return head_outputs diff --git a/deployment/projects/centerpoint/pipelines/tensorrt.py b/deployment/projects/centerpoint/pipelines/tensorrt.py new file mode 100644 index 000000000..63037a172 --- /dev/null +++ b/deployment/projects/centerpoint/pipelines/tensorrt.py @@ -0,0 +1,388 @@ +""" +CenterPoint TensorRT Pipeline Implementation. +""" + +from __future__ import annotations + +import logging +import os.path as osp +import time +from typing import Any, Dict, List, Mapping, Tuple + +import numpy as np +import pycuda.autoinit # noqa: F401 +import pycuda.driver as cuda +import tensorrt as trt +import torch + +from deployment.core.artifacts import resolve_artifact_path +from deployment.pipelines.gpu_resource_mixin import ( + GPUResourceMixin, + TensorRTResourceManager, + release_tensorrt_resources, +) +from deployment.projects.centerpoint.pipelines.centerpoint_pipeline import CenterPointDeploymentPipeline + +logger = logging.getLogger(__name__) + + +class CenterPointTensorRTPipeline(GPUResourceMixin, CenterPointDeploymentPipeline): + """TensorRT-based CenterPoint pipeline (engine-per-component inference). + + Loads separate TensorRT engines for voxel_encoder and backbone_head components + and runs inference using TensorRT execution contexts. + + Attributes: + tensorrt_dir: Directory containing TensorRT engine files. + """ + + def __init__( + self, + pytorch_model: torch.nn.Module, + tensorrt_dir: str, + device: str = "cuda", + components_cfg: Mapping[str, Any] | None = None, + ) -> None: + """Initialize TensorRT pipeline. + + Args: + pytorch_model: Reference PyTorch model for preprocessing. + tensorrt_dir: Directory containing TensorRT engine files. + device: Target CUDA device ('cuda:N'). + components_cfg: Component configuration dict from deploy_config. + If None, uses default component names. + + Raises: + ValueError: If device is not a CUDA device. + """ + if not device.startswith("cuda"): + raise ValueError("TensorRT requires CUDA device") + + super().__init__(pytorch_model, device, backend_type="tensorrt") + + self.tensorrt_dir = tensorrt_dir + if components_cfg is None: + components_cfg = {} + if not isinstance(components_cfg, Mapping): + raise TypeError(f"components_cfg must be a mapping, got {type(components_cfg).__name__}") + self._components_cfg = components_cfg + self._engines: dict = {} + self._contexts: dict = {} + self._logger = trt.Logger(trt.Logger.WARNING) + self._cleanup_called = False + + # Create CUDA events for GPU timing measurements + self._backbone_start_event = cuda.Event() + self._backbone_end_event = cuda.Event() + self._voxel_encoder_start_event = cuda.Event() + self._voxel_encoder_end_event = cuda.Event() + + self._load_tensorrt_engines() + logger.info(f"TensorRT pipeline initialized with engines from: {tensorrt_dir}") + + def _load_tensorrt_engines(self) -> None: + """Load TensorRT engines for each component. + + Raises: + FileNotFoundError: If engine files are not found. + RuntimeError: If engine loading or context creation fails. + """ + trt.init_libnvinfer_plugins(self._logger, "") + runtime = trt.Runtime(self._logger) + + engine_files = { + "voxel_encoder": resolve_artifact_path( + base_dir=self.tensorrt_dir, + components_cfg=self._components_cfg, + component="voxel_encoder", + file_key="engine_file", + ), + "backbone_head": resolve_artifact_path( + base_dir=self.tensorrt_dir, + components_cfg=self._components_cfg, + component="backbone_head", + file_key="engine_file", + ), + } + + for component, engine_path in engine_files.items(): + if not osp.exists(engine_path): + raise FileNotFoundError(f"TensorRT engine not found: {engine_path}") + + with open(engine_path, "rb") as f: + engine = runtime.deserialize_cuda_engine(f.read()) + if engine is None: + raise RuntimeError(f"Failed to deserialize engine: {engine_path}") + + context = engine.create_execution_context() + if context is None: + raise RuntimeError( + f"Failed to create execution context for {component}. " "This is likely due to GPU out-of-memory." + ) + + self._engines[component] = engine + self._contexts[component] = context + logger.info(f"Loaded TensorRT engine: {component}") + + def _get_io_names( + self, + engine: Any, + single_output: bool = False, + ) -> Tuple[str, Any]: + """Get input and output tensor names from engine. + + Args: + engine: TensorRT engine. + single_output: If True, return single output name instead of list. + + Returns: + Tuple of (input_name, output_name(s)). + + Raises: + RuntimeError: If input or output names cannot be found. + """ + input_name = None + output_names = [] + + for i in range(engine.num_io_tensors): + tensor_name = engine.get_tensor_name(i) + if engine.get_tensor_mode(tensor_name) == trt.TensorIOMode.INPUT: + input_name = tensor_name + elif engine.get_tensor_mode(tensor_name) == trt.TensorIOMode.OUTPUT: + output_names.append(tensor_name) + + if input_name is None: + raise RuntimeError("Could not find input tensor name") + if not output_names: + raise RuntimeError("Could not find output tensor names") + + if single_output: + return input_name, output_names[0] + return input_name, output_names + + def run_voxel_encoder(self, input_features: torch.Tensor) -> torch.Tensor: + """Run voxel encoder using TensorRT. + + Args: + input_features: Input features [N, max_points, C]. + + Returns: + Voxel features [N, feature_dim]. + + Raises: + RuntimeError: If context is None (initialization failed). + """ + engine = self._engines["voxel_encoder"] + context = self._contexts["voxel_encoder"] + if context is None: + raise RuntimeError("voxel_encoder context is None - likely failed to initialize due to GPU OOM") + + input_array = self.to_numpy(input_features, dtype=np.float32) + + input_name, output_name = self._get_io_names(engine, single_output=True) + context.set_input_shape(input_name, input_array.shape) + output_shape = context.get_tensor_shape(output_name) + output_array = np.empty(output_shape, dtype=np.float32) + if not output_array.flags["C_CONTIGUOUS"]: + output_array = np.ascontiguousarray(output_array) + + with TensorRTResourceManager() as manager: + d_input = manager.allocate(input_array.nbytes) + d_output = manager.allocate(output_array.nbytes) + stream = manager.stream + + context.set_tensor_address(input_name, int(d_input)) + context.set_tensor_address(output_name, int(d_output)) + + # Memory transfer: CPU -> GPU (not timed) + cuda.memcpy_htod_async(d_input, input_array, stream) + + # Record start event and execute inference (pure GPU time) + self._voxel_encoder_start_event.record(stream) + context.execute_async_v3(stream_handle=stream.handle) + self._voxel_encoder_end_event.record(stream) + + # Memory transfer: GPU -> CPU (not timed) + cuda.memcpy_dtoh_async(output_array, d_output, stream) + manager.synchronize() + + voxel_features = torch.from_numpy(output_array).to(self.device) + voxel_features = voxel_features.squeeze(1) + return voxel_features + + def run_backbone_head(self, spatial_features: torch.Tensor) -> List[torch.Tensor]: + """Run backbone and head using TensorRT. + + Args: + spatial_features: Spatial features [B, C, H, W]. + + Returns: + List of 6 head output tensors. + + Raises: + RuntimeError: If context is None (initialization failed). + ValueError: If output count is not 6. + """ + engine = self._engines["backbone_head"] + context = self._contexts["backbone_head"] + if context is None: + raise RuntimeError("backbone_head context is None - likely failed to initialize due to GPU OOM") + + input_array = self.to_numpy(spatial_features, dtype=np.float32) + + input_name, trt_output_names = self._get_io_names(engine, single_output=False) + context.set_input_shape(input_name, input_array.shape) + + # Get expected output order from components_cfg + backbone_head_cfg = self._components_cfg.get("backbone_head", {}) + io_cfg = backbone_head_cfg.get("io", {}) + outputs = io_cfg.get("outputs", []) + + if not outputs: + raise ValueError( + "Output names must be provided via components_cfg.backbone_head.io.outputs. " + "No fallback values are allowed in deployment framework." + ) + + expected_output_names = [out.get("name") for out in outputs if out.get("name")] + if not expected_output_names: + raise ValueError( + "Output names must be provided via components_cfg.backbone_head.io.outputs. " + "Each output must have a 'name' field." + ) + + # Validate outputs: check for missing or extra outputs + trt_output_set = set(trt_output_names) + expected_output_set = set(expected_output_names) + + missing_outputs = expected_output_set - trt_output_set + if missing_outputs: + raise ValueError( + f"Missing outputs in TensorRT engine: {sorted(missing_outputs)}. " + f"Expected: {sorted(expected_output_set)}, Got: {sorted(trt_output_set)}" + ) + + extra_outputs = trt_output_set - expected_output_set + if extra_outputs: + raise ValueError( + f"Extra outputs in TensorRT engine: {sorted(extra_outputs)}. " + f"Expected: {sorted(expected_output_set)}, Got: {sorted(trt_output_set)}" + ) + + # Use expected order to sort outputs (critical for CenterPoint head ordering) + # After validation, we know expected_output_names == trt_output_names (as sets) + # So we can use expected_output_names directly to maintain the correct order + output_names = expected_output_names + + output_arrays = {} + for output_name in output_names: + output_shape = context.get_tensor_shape(output_name) + output_array = np.empty(output_shape, dtype=np.float32) + if not output_array.flags["C_CONTIGUOUS"]: + output_array = np.ascontiguousarray(output_array) + output_arrays[output_name] = output_array + + with TensorRTResourceManager() as manager: + d_input = manager.allocate(input_array.nbytes) + d_outputs = {name: manager.allocate(arr.nbytes) for name, arr in output_arrays.items()} + stream = manager.stream + + context.set_tensor_address(input_name, int(d_input)) + for output_name in output_names: + context.set_tensor_address(output_name, int(d_outputs[output_name])) + + # Memory transfer: CPU -> GPU (not timed) + cuda.memcpy_htod_async(d_input, input_array, stream) + + # Record start event and execute inference (pure GPU time) + self._backbone_start_event.record(stream) + context.execute_async_v3(stream_handle=stream.handle) + self._backbone_end_event.record(stream) + + # Memory transfer: GPU -> CPU (not timed) + for output_name in output_names: + cuda.memcpy_dtoh_async(output_arrays[output_name], d_outputs[output_name], stream) + + manager.synchronize() + + head_outputs = [torch.from_numpy(output_arrays[name]).to(self.device) for name in output_names] + + if len(head_outputs) != 6: + raise ValueError(f"Expected 6 head outputs, got {len(head_outputs)}") + + return head_outputs + + def run_model( + self, + preprocessed_input: Dict[str, torch.Tensor], + ) -> Tuple[List[torch.Tensor], Dict[str, float]]: + """Run complete multi-stage model inference with GPU timing using CUDA events. + + This override uses CUDA events to measure pure GPU inference time for + TensorRT operations, matching the C++ implementation's timing methodology. + + Args: + preprocessed_input: Dict from preprocess() containing: + - 'input_features': Input features for voxel encoder [N_voxels, max_points, 11] + - 'coors': Voxel coordinates [N_voxels, 4] + - 'voxels': Raw voxel data + - 'num_points': Number of points per voxel + + Returns: + Tuple of (head_outputs, stage_latencies): + - head_outputs: List of head outputs [heatmap, reg, height, dim, rot, vel] + - stage_latencies: Dict mapping stage names to latency in ms + - 'voxel_encoder_ms': Pure GPU inference time (CUDA events) + - 'middle_encoder_ms': Wall-clock time (PyTorch) + - 'backbone_head_ms': Pure GPU inference time (CUDA events) + """ + # Use local variable for thread safety + stage_latencies: Dict[str, float] = {} + + # Stage 1: Voxel Encoder + voxel_features = self.run_voxel_encoder(preprocessed_input["input_features"]) + self._voxel_encoder_end_event.synchronize() + voxel_encoder_gpu_time_ms = self._voxel_encoder_end_event.time_since(self._voxel_encoder_start_event) + stage_latencies["voxel_encoder_ms"] = voxel_encoder_gpu_time_ms + + # Stage 2: Middle Encoder + start = time.perf_counter() + spatial_features = self.process_middle_encoder(voxel_features, preprocessed_input["coors"]) + stage_latencies["middle_encoder_ms"] = (time.perf_counter() - start) * 1000 + + # Stage 3: Backbone + Head + head_outputs = self.run_backbone_head(spatial_features) + self._backbone_end_event.synchronize() + backbone_head_gpu_time_ms = self._backbone_end_event.time_since(self._backbone_start_event) + stage_latencies["backbone_head_ms"] = backbone_head_gpu_time_ms + + return head_outputs, stage_latencies + + def _release_gpu_resources(self) -> None: + """Release TensorRT resources (engines and contexts) and CUDA events.""" + # Destroy CUDA events + if hasattr(self, "_backbone_start_event"): + try: + del self._backbone_start_event + except Exception: + pass + if hasattr(self, "_backbone_end_event"): + try: + del self._backbone_end_event + except Exception: + pass + if hasattr(self, "_voxel_encoder_start_event"): + try: + del self._voxel_encoder_start_event + except Exception: + pass + if hasattr(self, "_voxel_encoder_end_event"): + try: + del self._voxel_encoder_end_event + except Exception: + pass + + release_tensorrt_resources( + engines=getattr(self, "_engines", None), + contexts=getattr(self, "_contexts", None), + ) diff --git a/deployment/projects/centerpoint/runner.py b/deployment/projects/centerpoint/runner.py new file mode 100644 index 000000000..217767070 --- /dev/null +++ b/deployment/projects/centerpoint/runner.py @@ -0,0 +1,147 @@ +""" +CenterPoint-specific deployment runner. +""" + +from __future__ import annotations + +import logging +from typing import Any, Optional + +import torch +from mmengine.config import Config + +from deployment.core import BaseDeploymentConfig +from deployment.core.contexts import CenterPointExportContext, ExportContext +from deployment.core.io.base_data_loader import BaseDataLoader +from deployment.exporters.common.factory import ExporterFactory +from deployment.exporters.common.model_wrappers import IdentityWrapper +from deployment.exporters.export_pipelines.base import OnnxExportPipeline, TensorRTExportPipeline +from deployment.projects.centerpoint.evaluator import CenterPointEvaluator +from deployment.projects.centerpoint.export.component_extractor import CenterPointComponentExtractor +from deployment.projects.centerpoint.export.onnx_export_pipeline import CenterPointONNXExportPipeline +from deployment.projects.centerpoint.export.tensorrt_export_pipeline import CenterPointTensorRTExportPipeline +from deployment.projects.centerpoint.model_loader import build_centerpoint_onnx_model +from deployment.runtime.runner import BaseDeploymentRunner + + +class CenterPointDeploymentRunner(BaseDeploymentRunner): + """CenterPoint deployment runner. + + Implements project-specific model loading and wiring to export pipelines, + while reusing the project-agnostic orchestration in `BaseDeploymentRunner`. + + Attributes: + model_cfg: MMEngine model configuration. + evaluator: CenterPoint evaluator instance. + """ + + def __init__( + self, + data_loader: BaseDataLoader, + evaluator: CenterPointEvaluator, + config: BaseDeploymentConfig, + model_cfg: Config, + logger: logging.Logger, + onnx_pipeline: Optional[OnnxExportPipeline] = None, + tensorrt_pipeline: Optional[TensorRTExportPipeline] = None, + ) -> None: + """Initialize CenterPoint deployment runner. + + Args: + data_loader: Data loader for loading samples. + evaluator: Evaluator for computing metrics. + config: Deployment configuration. + model_cfg: MMEngine model configuration. + logger: Logger instance. + onnx_pipeline: Optional custom ONNX export pipeline. + tensorrt_pipeline: Optional custom TensorRT export pipeline. + """ + component_extractor = CenterPointComponentExtractor(config=config, logger=logger) + + super().__init__( + data_loader=data_loader, + evaluator=evaluator, + config=config, + model_cfg=model_cfg, + logger=logger, + onnx_wrapper_cls=IdentityWrapper, + onnx_pipeline=onnx_pipeline, + tensorrt_pipeline=tensorrt_pipeline, + ) + + if self._onnx_pipeline is None: + self._onnx_pipeline = CenterPointONNXExportPipeline( + exporter_factory=ExporterFactory, + component_extractor=component_extractor, + logger=self.logger, + ) + + if self._tensorrt_pipeline is None: + self._tensorrt_pipeline = CenterPointTensorRTExportPipeline( + exporter_factory=ExporterFactory, + logger=self.logger, + ) + + def load_pytorch_model(self, checkpoint_path: str, context: ExportContext) -> torch.nn.Module: + """Load PyTorch model for export. + + Args: + checkpoint_path: Path to the checkpoint file. + context: Export context with additional parameters. + + Returns: + Loaded PyTorch model. + """ + rot_y_axis_reference = self._extract_rot_y_axis_reference(context) + + model, onnx_cfg = build_centerpoint_onnx_model( + base_model_cfg=self.model_cfg, + checkpoint_path=checkpoint_path, + device="cpu", + rot_y_axis_reference=rot_y_axis_reference, + ) + + # Update model_cfg with ONNX-compatible version + self.model_cfg = onnx_cfg + + # Notify evaluator of model availability + self._setup_evaluator(model, onnx_cfg) + + return model + + def _extract_rot_y_axis_reference(self, context: ExportContext) -> bool: + """Extract rot_y_axis_reference from export context. + + Args: + context: Export context (typed or dict-like). + + Returns: + Boolean value for rot_y_axis_reference. + """ + if isinstance(context, CenterPointExportContext): + return context.rot_y_axis_reference + return context.get("rot_y_axis_reference", False) + + def _setup_evaluator(self, model: torch.nn.Module, onnx_cfg: Config) -> None: + """Setup evaluator with loaded model and config. + + This method updates the evaluator with the PyTorch model and + ONNX-compatible configuration needed for evaluation. + + Args: + model: Loaded PyTorch model. + onnx_cfg: ONNX-compatible model configuration. + """ + try: + self.evaluator.set_onnx_config(onnx_cfg) + self.logger.info("Updated evaluator with ONNX-compatible config") + except Exception as e: + self.logger.error(f"Failed to update evaluator config: {e}") + raise + + try: + self.evaluator.set_pytorch_model(model) + self.logger.info("Updated evaluator with PyTorch model") + except Exception as e: + self.logger.error(f"Failed to set PyTorch model on evaluator: {e}") + raise diff --git a/projects/CenterPoint/Dockerfile b/projects/CenterPoint/Dockerfile new file mode 100644 index 000000000..7b86c99eb --- /dev/null +++ b/projects/CenterPoint/Dockerfile @@ -0,0 +1,13 @@ +ARG AWML_BASE_IMAGE="autoware-ml:latest" +FROM ${AWML_BASE_IMAGE} +ARG TRT_VERSION=10.8.0.43 + +# Install pip dependencies +RUN python3 -m pip --no-cache-dir install \ + onnxruntime-gpu --extra-index-url https://aiinfra.pkgs.visualstudio.com/PublicPackages/_packaging/onnxruntime-cuda-12/pypi/simple/ \ + onnxsim \ + pycuda \ + tensorrt-cu12==${TRT_VERSION} + +WORKDIR /workspace +RUN pip install --no-cache-dir -e . diff --git a/projects/CenterPoint/README.md b/projects/CenterPoint/README.md index 6b265e890..839fbd498 100644 --- a/projects/CenterPoint/README.md +++ b/projects/CenterPoint/README.md @@ -41,6 +41,24 @@ docker run -it --rm --gpus all --shm-size=64g --name awml -p 6006:6006 -v $PWD/:/workspace -v $PWD/data:/workspace/data autoware-ml ``` +For ONNX and TensorRT evaluation + +- If you need to use deployment, ONNX runtime, or TensorRT evaluation, please build the docker image first: + +```sh +# Build the base autoware-ml image (if not already built) +DOCKER_BUILDKIT=1 docker build -t autoware-ml . + +# Build the centerpoint-deployment image +docker build -t centerpoint-deployment:latest -f projects/CenterPoint/Dockerfile . +``` + +- Run the docker container: + +```sh +docker run -it --rm --gpus all --shm-size=64g --name awml_deployment -p 6006:6006 -v $PWD/:/workspace -v $PWD/data:/workspace/data centerpoint-deployment:latest +``` + ### 2. Train #### 2.1 Environment set up @@ -110,12 +128,21 @@ where `frame-range` represents the range of frames to visualize. ### 5. Deploy -- Make an onnx file for a CenterPoint model. +- Run the deployment pipeline: + - Export ONNX/TensorRT artifacts. + - Verify the exported artifacts. + - (Optionally) run evaluation. + - Update `deployment/projects/centerpoint/config/deploy_config.py` so that the following entries point to your experiment: + - `checkpoint_path` (e.g., `checkpoint_path="work_dirs/centerpoint/t4dataset/second_secfpn_2xb8_121m_base/epoch_50.pth"`). + - `runtime_io.info_file`. + - `export.work_dir`. ```sh -# Deploy for t4dataset -DIR="work_dirs/centerpoint/t4dataset/second_secfpn_2xb8_121m_base/" && -python projects/CenterPoint/scripts/deploy.py projects/CenterPoint/configs/t4dataset/second_secfpn_2xb8_121m_base.py $DIR/epoch_50.pth --replace_onnx_models --device gpu --rot_y_axis_reference +# Deploy for t4dataset (export + verification + evaluation) +python -m deployment.cli.main centerpoint \ + deployment/projects/centerpoint/config/deploy_config.py \ + projects/CenterPoint/configs/t4dataset/second_secfpn_2xb8_121m_base.py \ + --rot-y-axis-reference ``` where `rot_y_axis_reference` can be removed if we would like to use the original counterclockwise x-axis rotation system. diff --git a/projects/CenterPoint/models/__init__.py b/projects/CenterPoint/models/__init__.py index 13d1792cb..c713add79 100644 --- a/projects/CenterPoint/models/__init__.py +++ b/projects/CenterPoint/models/__init__.py @@ -1,13 +1,10 @@ from .backbones.second import SECOND from .dense_heads.centerpoint_head import CenterHead, CustomSeparateHead -from .dense_heads.centerpoint_head_onnx import CenterHeadONNX, SeparateHeadONNX from .detectors.centerpoint import CenterPoint -from .detectors.centerpoint_onnx import CenterPointONNX from .losses.amp_gaussian_focal_loss import AmpGaussianFocalLoss from .necks.second_fpn import SECONDFPN from .task_modules.coders.centerpoint_bbox_coders import CenterPointBBoxCoder from .voxel_encoders.pillar_encoder import BackwardPillarFeatureNet -from .voxel_encoders.pillar_encoder_onnx import BackwardPillarFeatureNetONNX, PillarFeatureNetONNX __all__ = [ "SECOND", @@ -16,11 +13,6 @@ "CenterHead", "CustomSeparateHead", "BackwardPillarFeatureNet", - "PillarFeatureNetONNX", - "BackwardPillarFeatureNetONNX", - "CenterPointONNX", - "CenterHeadONNX", - "SeparateHeadONNX", "CenterPointBBoxCoder", "AmpGaussianFocalLoss", ] diff --git a/projects/CenterPoint/models/detectors/centerpoint_onnx.py b/projects/CenterPoint/models/detectors/centerpoint_onnx.py deleted file mode 100644 index ff568a00d..000000000 --- a/projects/CenterPoint/models/detectors/centerpoint_onnx.py +++ /dev/null @@ -1,176 +0,0 @@ -import os -from typing import Callable, Dict, List, Tuple - -import torch -from mmdet3d.models.detectors.centerpoint import CenterPoint -from mmdet3d.registry import MODELS -from mmengine.logging import MMLogger, print_log -from torch import nn - - -class CenterPointHeadONNX(nn.Module): - """Head module for centerpoint with BACKBONE, NECK and BBOX_HEAD""" - - def __init__(self, backbone: nn.Module, neck: nn.Module, bbox_head: nn.Module): - super(CenterPointHeadONNX, self).__init__() - self.backbone: nn.Module = backbone - self.neck: nn.Module = neck - self.bbox_head: nn.Module = bbox_head - self._logger = MMLogger.get_current_instance() - self._logger.info("Running CenterPointHeadONNX!") - - def forward(self, x: torch.Tensor) -> Tuple[List[Dict[str, torch.Tensor]]]: - """ - Note: - torch.onnx.export() doesn't support triple-nested output - - Args: - x (torch.Tensor): (B, C, H, W) - Returns: - tuple[list[dict[str, any]]]: - (num_classes x [num_detect x {'reg', 'height', 'dim', 'rot', 'vel', 'heatmap'}]) - """ - x = self.backbone(x) - if self.neck is not None: - x = self.neck(x) - x = self.bbox_head(x) - - return x - - -@MODELS.register_module() -class CenterPointONNX(CenterPoint): - """onnx support impl of mmdet3d.models.detectors.CenterPoint""" - - def __init__(self, point_channels: int = 5, device: str = "cpu", **kwargs): - super().__init__(**kwargs) - self._point_channels = point_channels - self._device = device - self._torch_device = torch.device("cuda:0") if self._device == "gpu" else torch.device("cpu") - self._logger = MMLogger.get_current_instance() - self._logger.info("Running CenterPointONNX!") - - def _get_random_inputs(self): - """ - Generate random inputs and preprocess it to feed it to onnx. - """ - # Input channels - points = [ - torch.rand(1000, self._point_channels).to(self._torch_device), - # torch.rand(1000, self._point_channels).to(self._torch_device), - ] - # We only need lidar pointclouds for CenterPoint. - return {"points": points, "data_samples": None} - - def _extract_random_features(self): - assert self.data_preprocessor is not None and hasattr(self.data_preprocessor, "voxelize") - - # Get inputs - inputs = self._get_random_inputs() - voxel_dict = self.data_preprocessor.voxelize(points=inputs["points"], data_samples=inputs["data_samples"]) - assert self.pts_voxel_encoder is not None and hasattr(self.pts_voxel_encoder, "get_input_features") - input_features = self.pts_voxel_encoder.get_input_features( - voxel_dict["voxels"], voxel_dict["num_points"], voxel_dict["coors"] - ) - return input_features, voxel_dict - - def save_onnx( - self, - save_dir: str, - verbose=False, - onnx_opset_version=13, - ): - """Save onnx model - Args: - batch_dict (dict[str, any]) - save_dir (str): directory path to save onnx models - verbose (bool, optional) - onnx_opset_version (int, optional) - """ - print_log(f"Running onnx_opset_version: {onnx_opset_version}") - # Get features - input_features, voxel_dict = self._extract_random_features() - - # === pts_voxel_encoder === - pth_onnx_pve = os.path.join(save_dir, "pts_voxel_encoder.onnx") - torch.onnx.export( - self.pts_voxel_encoder, - (input_features,), - f=pth_onnx_pve, - input_names=("input_features",), - output_names=("pillar_features",), - dynamic_axes={ - "input_features": {0: "num_voxels", 1: "num_max_points"}, - "pillar_features": {0: "num_voxels"}, - }, - verbose=verbose, - opset_version=onnx_opset_version, - ) - print_log(f"Saved pts_voxel_encoder onnx model: {pth_onnx_pve}") - voxel_features = self.pts_voxel_encoder(input_features) - voxel_features = voxel_features.squeeze(1) - - # Note: pts_middle_encoder isn't exported - coors = voxel_dict["coors"] - batch_size = coors[-1, 0] + 1 - x = self.pts_middle_encoder(voxel_features, coors, batch_size) - # x (torch.tensor): (batch_size, num_pillar_features, W, H) - - # === pts_backbone === - assert self.pts_bbox_head is not None and hasattr(self.pts_bbox_head, "output_names") - pts_backbone_neck_head = CenterPointHeadONNX( - self.pts_backbone, - self.pts_neck, - self.pts_bbox_head, - ) - # pts_backbone_neck_head = torch.jit.script(pts_backbone_neck_head) - pth_onnx_backbone_neck_head = os.path.join(save_dir, "pts_backbone_neck_head.onnx") - torch.onnx.export( - pts_backbone_neck_head, - (x,), - f=pth_onnx_backbone_neck_head, - input_names=("spatial_features",), - output_names=tuple(self.pts_bbox_head.output_names), - dynamic_axes={ - name: {0: "batch_size", 2: "H", 3: "W"} - for name in ["spatial_features"] + self.pts_bbox_head.output_names - }, - verbose=verbose, - opset_version=onnx_opset_version, - ) - print_log(f"Saved pts_backbone_neck_head onnx model: {pth_onnx_backbone_neck_head}") - - def save_torchscript( - self, - save_dir: str, - verbose: bool = False, - ): - """Save torchscript model - Args: - batch_dict (dict[str, any]) - save_dir (str): directory path to save onnx models - verbose (bool, optional) - """ - # Get features - input_features, voxel_dict = self._extract_random_features() - - pth_pt_pve = os.path.join(save_dir, "pts_voxel_encoder.pt") - traced_pts_voxel_encoder = torch.jit.trace(self.pts_voxel_encoder, (input_features,)) - traced_pts_voxel_encoder.save(pth_pt_pve) - - voxel_features = traced_pts_voxel_encoder(input_features) - voxel_features = voxel_features.squeeze() - - # Note: pts_middle_encoder isn't exported - coors = voxel_dict["coors"] - batch_size = coors[-1, 0] + 1 - x = self.pts_middle_encoder(voxel_features, coors, batch_size) - - pts_backbone_neck_head = CenterPointHeadONNX( - self.pts_backbone, - self.pts_neck, - self.pts_bbox_head, - ) - pth_pt_head = os.path.join(save_dir, "pts_backbone_neck_head.pt") - traced_pts_backbone_neck_head = torch.jit.trace(pts_backbone_neck_head, (x)) - traced_pts_backbone_neck_head.save(pth_pt_head) diff --git a/projects/CenterPoint/runners/deployment_runner.py b/projects/CenterPoint/runners/deployment_runner.py deleted file mode 100644 index bbd703cbb..000000000 --- a/projects/CenterPoint/runners/deployment_runner.py +++ /dev/null @@ -1,103 +0,0 @@ -from pathlib import Path -from typing import Optional, Union - -from mmengine.registry import MODELS, init_default_scope -from torch import nn - -from autoware_ml.detection3d.runners.base_runner import BaseRunner - - -class DeploymentRunner(BaseRunner): - """Runner to run deploment of mmdet3D model to generate ONNX with random inputs.""" - - def __init__( - self, - model_cfg_path: str, - checkpoint_path: str, - work_dir: Path, - rot_y_axis_reference: bool = False, - device: str = "gpu", - replace_onnx_models: bool = False, - default_scope: str = "mmengine", - experiment_name: str = "", - log_level: Union[int, str] = "INFO", - log_file: Optional[str] = None, - onnx_opset_version: int = 13, - ) -> None: - """ - :param model_cfg_path: MMDet3D model config path. - :param checkpoint_path: Checkpoint path to load weights. - :param work_dir: Working directory to save outputs. - :param rot_y_axis_reference: Set True to convert rotation - from x-axis counterclockwiese to y-axis clockwise. - :param device: Working devices, only 'gpu' or 'cpu' supported. - :param replace_onnx_models: Set True to replace model with ONNX, - for example, CenterHead -> CenterHeadONNX. - :param default_scope: Default scope in mmdet3D. - :param experiment_name: Experiment name. - :param log_level: Logging and display log messages above this level. - :param log_file: Logger file. - :param oxx_opset_version: onnx opset version. - """ - super(DeploymentRunner, self).__init__( - model_cfg_path=model_cfg_path, - checkpoint_path=checkpoint_path, - work_dir=work_dir, - device=device, - default_scope=default_scope, - experiment_name=experiment_name, - log_level=log_level, - log_file=log_file, - ) - - # We need init deafault scope to mmdet3d to search registries in the mmdet3d scope - init_default_scope("mmdet3d") - - self._rot_y_axis_reference = rot_y_axis_reference - self._replace_onnx_models = replace_onnx_models - self._onnx_opset_version = onnx_opset_version - - def build_model(self) -> nn.Module: - """ - Build a model. Replace the model by ONNX model if replace_onnx_model is set. - :return torch.nn.Module. A torch module. - """ - self._logger.info("===== Building CenterPoint model ====") - model_cfg = self._cfg.get("model") - # Update Model type to ONNX - if self._replace_onnx_models: - self._logger.info("Replacing ONNX models!") - model_cfg.type = "CenterPointONNX" - model_cfg.point_channels = model_cfg.pts_voxel_encoder.in_channels - model_cfg.device = self._device - model_cfg.pts_voxel_encoder.type = ( - "PillarFeatureNetONNX" - if model_cfg.pts_voxel_encoder.type == "PillarFeatureNet" - else "BackwardPillarFeatureNetONNX" - ) - model_cfg.pts_bbox_head.type = "CenterHeadONNX" - model_cfg.pts_bbox_head.separate_head.type = "SeparateHeadONNX" - model_cfg.pts_bbox_head.rot_y_axis_reference = self._rot_y_axis_reference - - if model_cfg.pts_backbone.type == "ConvNeXt_PC": - # Always set with_cp (gradient checkpointing) to False for deployment - model_cfg.pts_backbone.with_cp = False - model = MODELS.build(model_cfg) - model.to(self._torch_device) - - self._logger.info(model) - self._logger.info("===== Built CenterPoint model ====") - return model - - def run(self) -> None: - """Start running the Runner.""" - # Building a model - model = self.build_model() - - # Loading checkpoint to the model - self.load_verify_checkpoint(model=model) - - assert hasattr(model, "save_onnx"), "The model must have the function: save_onnx()!" - - # Run and save onnx model! - model.save_onnx(save_dir=self._work_dir, onnx_opset_version=self._onnx_opset_version) diff --git a/projects/CenterPoint/scripts/deploy.py b/projects/CenterPoint/scripts/deploy.py deleted file mode 100644 index 3aea2ee29..000000000 --- a/projects/CenterPoint/scripts/deploy.py +++ /dev/null @@ -1,87 +0,0 @@ -""" -Script to export CenterPoint to onnx/torchscript -""" - -import argparse -import logging -import os -from pathlib import Path - -from projects.CenterPoint.runners.deployment_runner import DeploymentRunner - - -def parse_args(): - parser = argparse.ArgumentParser( - description="Export CenterPoint model to backends.", - ) - parser.add_argument( - "model_cfg_path", - help="model config path", - ) - parser.add_argument( - "checkpoint", - help="model checkpoint path", - ) - parser.add_argument( - "--work-dir", - default="", - help="the dir to save logs and models", - ) - parser.add_argument( - "--log-level", - help="set log level", - default="INFO", - choices=list(logging._nameToLevel.keys()), - ) - parser.add_argument("--onnx_opset_version", type=int, default=13, help="onnx opset version") - parser.add_argument( - "--device", - choices=["cpu", "gpu"], - default="gpu", - help="Set running device!", - ) - parser.add_argument( - "--replace_onnx_models", - action="store_true", - help="Set False to disable replacement of model by ONNX model, for example, CenterHead -> CenterHeadONNX", - ) - parser.add_argument( - "--rot_y_axis_reference", - action="store_true", - help="Set True to output rotation in y-axis clockwise in CenterHeadONNX", - ) - args = parser.parse_args() - return args - - -def build_deploy_runner(args) -> DeploymentRunner: - """Build a DeployRunner.""" - model_cfg_path = args.model_cfg_path - checkpoint_path = args.checkpoint - experiment_name = Path(model_cfg_path).stem - work_dir = ( - Path(os.getcwd()) / "work_dirs" / "deployment" / experiment_name if not args.work_dir else Path(args.work_dir) - ) - - deployment_runner = DeploymentRunner( - experiment_name=experiment_name, - model_cfg_path=model_cfg_path, - checkpoint_path=checkpoint_path, - work_dir=work_dir, - replace_onnx_models=args.replace_onnx_models, - device=args.device, - rot_y_axis_reference=args.rot_y_axis_reference, - onnx_opset_version=args.onnx_opset_version, - ) - return deployment_runner - - -if __name__ == "__main__": - """Launch a DeployRunner.""" - args = parse_args() - - # Build DeploymentRunner - deployment_runner = build_deploy_runner(args=args) - - # Start running DeploymentRunner - deployment_runner.run()