diff --git a/autoware_ml/configs/calibration_classification/dataset/t4dataset/gen2_base.py b/autoware_ml/configs/calibration_classification/dataset/t4dataset/gen2_base.py index df57112be..01f1c18f3 100644 --- a/autoware_ml/configs/calibration_classification/dataset/t4dataset/gen2_base.py +++ b/autoware_ml/configs/calibration_classification/dataset/t4dataset/gen2_base.py @@ -8,7 +8,7 @@ ) # dataset type setting -dataset_type = "T4Dataset" +dataset_type = "T4CalibrationClassificationDataset" info_train_file_name = "t4dataset_gen2_base_infos_train.pkl" info_val_file_name = "t4dataset_gen2_base_infos_val.pkl" info_test_file_name = "t4dataset_gen2_base_infos_test.pkl" diff --git a/autoware_ml/configs/calibration_classification/dataset/t4dataset/gen2_lidarseg.py b/autoware_ml/configs/calibration_classification/dataset/t4dataset/gen2_lidarseg.py new file mode 100644 index 000000000..8cffc3dad --- /dev/null +++ b/autoware_ml/configs/calibration_classification/dataset/t4dataset/gen2_lidarseg.py @@ -0,0 +1,28 @@ +custom_imports = dict( + imports=[ + "autoware_ml.calibration_classification.datasets.t4_calibration_classification_dataset", + "autoware_ml.calibration_classification.datasets.transforms.calibration_classification_transform", + "autoware_ml.calibration_classification.hooks.result_visualization_hook", + ], + allow_failed_imports=False, +) + +# dataset type setting +dataset_type = "T4CalibrationClassificationDataset" +info_train_file_name = "t4dataset_gen2_lidarseg_infos_train.pkl" +info_val_file_name = "t4dataset_gen2_lidarseg_infos_val.pkl" +info_test_file_name = "t4dataset_gen2_lidarseg_infos_test.pkl" + + +# dataset scene setting +dataset_version_config_root = "autoware_ml/configs/t4dataset" +dataset_version_list = [ + "db_j6gen2_semaseg_v1", +] + + +transform_config = dict( + crop_ratio=0.6, # Ratio for image cropping (0.0-1.0, where 1.0 means no crop) + max_distortion=0.001, # Maximum affine distortion as fraction of image dimensions (0.0-1.0) + rgb_weight=0.3, # Weight for RGB component in overlay visualization (0.0-1.0, where 1.0 is full RGB) +) diff --git a/tools/calibration_classification/README.md b/tools/calibration_classification/README.md index f18b97d64..ffa962b9d 100644 --- a/tools/calibration_classification/README.md +++ b/tools/calibration_classification/README.md @@ -40,6 +40,7 @@ python tools/calibration_classification/create_data_t4dataset.py --config /works - `-o, --out_dir` (required): Output directory for info files - `--lidar_channel` (optional): Lidar channel name (default: LIDAR_CONCAT) - `--target_cameras` (optional): List of target cameras to process (default: all cameras) +- `--filter` (optional): Filter out likely black/corrupted images based on file size **Examples:** @@ -49,8 +50,14 @@ python tools/calibration_classification/create_data_t4dataset.py --config /works # Process only specific cameras python tools/calibration_classification/create_data_t4dataset.py --config /workspace/autoware_ml/configs/calibration_classification/dataset/t4dataset/gen2_base.py --version gen2_base --root_path ./data/t4dataset -o ./data/t4dataset/calibration_info/ --target_cameras CAM_FRONT CAM_LEFT CAM_RIGHT + +# Filter out black/corrupted images (recommended for datasets with potential black images) +python tools/calibration_classification/create_data_t4dataset.py --config /workspace/autoware_ml/configs/calibration_classification/dataset/t4dataset/gen2_base.py --version gen2_base --root_path ./data/t4dataset -o ./data/t4dataset/calibration_info/ --filter ``` +**Note on `--filter` flag:** +The `--filter` option detects and excludes likely black/corrupted images by analyzing file sizes. It samples images from each scene to calculate an average size, then filters out images smaller than 5% of that average (with a minimum threshold of 10KB). + **Output files:** The script generates three pickle files for train/val/test splits: - `t4dataset_{version}_infos_train.pkl` @@ -60,6 +67,7 @@ The script generates three pickle files for train/val/test splits: Each file contains calibration information including: - Camera and LiDAR data paths - Transformation matrices (lidar2cam, cam2ego, etc.) +- LiDAR sources info (all lidar sensors and their calibrated extrinsics per scene, if available) - Timestamps and metadata **Example data structure:** @@ -103,6 +111,15 @@ Each file contains calibration information including: 'sample_data_token': 'lidar_token_456', 'timestamp': 1234567890 }, + 'lidar_sources': { # optional, present if lidar sensors exist in the scene + 'LIDAR_CONCAT': { + 'sensor_token': 'lidar_sensor_token_789', + 'translation': [0.0, 0.0, 1.8], + 'rotation': [[1.0, 0.0, 0.0], + [0.0, 1.0, 0.0], + [0.0, 0.0, 1.0]], + } + }, 'sample_idx': 3, 'scene_id': 'scene_001' } @@ -113,6 +130,7 @@ Each file contains calibration information including: - `frame_idx`: Frame index in the sequence - `image`: Camera-specific data including intrinsic/extrinsic parameters - `lidar_points`: LiDAR data with transformation matrices +- `lidar_sources` (optional): All lidar sensors in the scene with their calibrated extrinsics. Keys are channel names (e.g. LIDAR_CONCAT). Each value has: sensor_token, translation [x,y,z] in meters, and rotation as a 3x3 matrix (sensor-to-base transform). Present if lidar sensors exist in the scene - `sample_idx`: Sample index in the dataset - `scene_id`: Scene identifier diff --git a/tools/calibration_classification/create_data_t4dataset.py b/tools/calibration_classification/create_data_t4dataset.py index e34ee6e20..57dade984 100644 --- a/tools/calibration_classification/create_data_t4dataset.py +++ b/tools/calibration_classification/create_data_t4dataset.py @@ -1,9 +1,7 @@ import argparse -import json import os import os.path as osp import re -from collections import defaultdict from typing import Any, Dict, List, Optional, Tuple import mmengine @@ -12,55 +10,114 @@ from mmengine.config import Config from mmengine.logging import MMLogger from pyquaternion import Quaternion +from t4_devkit import Tier4 +from t4_devkit.schema import Sample # Configuration constants DEFAULT_LIDAR_CHANNEL = "LIDAR_CONCAT" SUPPORTED_SPLITS = ["train", "val", "test"] -REQUIRED_JSON_FILES = ["sample_data.json", "ego_pose.json", "calibrated_sensor.json"] # Configure logging logger = MMLogger.get_instance(name="create_data_t4dataset") -def load_json(path: str) -> Dict[str, Any]: - """Load a JSON file from the given path. +def is_image_likely_black(img_path: str, size_threshold_bytes: int) -> bool: + """Check if an image is likely black based on file size. + + Black images compress extremely well (nearly all zeros), resulting in very small files. + This is much faster than reading and analyzing pixel values. Args: - path (str): Path to the JSON file. + img_path: Path to the image file. + size_threshold_bytes: File size threshold in bytes. Images smaller than this + are considered likely black. Returns: - Dict[str, Any]: Parsed JSON data. - - Raises: - FileNotFoundError: If the file doesn't exist. - json.JSONDecodeError: If the file contains invalid JSON. - PermissionError: If the file cannot be read due to permissions. + True if the image file is smaller than threshold (likely black), False otherwise. """ - try: - with open(path, "r") as f: - return json.load(f) - except FileNotFoundError: - raise FileNotFoundError(f"JSON file not found: {path}") - except json.JSONDecodeError as e: - raise json.JSONDecodeError(f"Invalid JSON in file {path}: {e}", e.doc, e.pos) - except PermissionError: - raise PermissionError(f"Cannot read file due to permissions: {path}") - - -def convert_quaternion_to_matrix(rotation: List[float], translation: List[float]) -> List[List[float]]: - """Convert quaternion rotation and translation to a 4x4 transformation matrix. + if not osp.isfile(img_path): + logger.warning(f"Image file not found: {img_path}") + return True + + file_size = osp.getsize(img_path) + return file_size < size_threshold_bytes + + +def calculate_size_threshold( + t4: Tier4, + root_path: str, + scene_root: str, + target_cameras: List[str], + num_samples: int = 20, + threshold_ratio: float = 0.05, +) -> int: + """Calculate file size threshold for filtering likely black images. + + Samples a number of images to determine average file size, then sets threshold + as a fraction of that average. Black images are typically <5% of normal image size. + Args: - rotation (list): Quaternion [w, x, y, z]. - translation (list): Translation [x, y, z]. + t4: Tier4 dataset instance. + root_path: Absolute root path of the dataset. + scene_root: Relative scene root path. + target_cameras: List of camera channels to sample from. + num_samples: Number of images to sample for calculating average size. + threshold_ratio: Ratio of average size to use as threshold (default 0.05 = 5%). + Returns: - list: 4x4 transformation matrix as nested lists. + Size threshold in bytes. Images smaller than this are likely black. """ + image_sizes: List[int] = [] + + for sd in t4.sample_data: + if len(image_sizes) >= num_samples: + break + + filename = sd.filename + for cam in target_cameras: + if filename.startswith(f"data/{cam}/"): + img_rel_path = osp.join(scene_root, filename) + img_abs_path = osp.join(root_path, img_rel_path) + if osp.isfile(img_abs_path): + file_size = osp.getsize(img_abs_path) + # Only include non-tiny files in the average calculation + # (to avoid black images skewing the average down) + if file_size > 10000: # > 10KB + image_sizes.append(file_size) + break + + if not image_sizes: + # Fallback to a reasonable default (50KB) if no valid samples found + logger.warning("Could not sample images for threshold calculation, using default 50KB threshold") + return 50000 - rot = Quaternion(rotation).rotation_matrix - trans = np.array(translation).reshape(3, 1) + avg_size = sum(image_sizes) / len(image_sizes) + threshold = int(avg_size * threshold_ratio) + + # Ensure minimum threshold of 10KB to avoid false positives + threshold = max(threshold, 10000) + + logger.info( + f"Calculated size threshold: {threshold / 1000:.1f}KB " + f"(sampled {len(image_sizes)} images, avg size: {avg_size / 1000:.1f}KB)" + ) + + return threshold + + +def build_transform_matrix(rotation: Quaternion, translation: np.ndarray) -> List[List[float]]: + """Build a 4x4 transformation matrix from rotation quaternion and translation vector. + + Args: + rotation: Rotation as a pyquaternion Quaternion. + translation: Translation as a numpy array [x, y, z]. + + Returns: + 4x4 transformation matrix as nested lists. + """ mat = np.eye(4) - mat[:3, :3] = rot - mat[:3, 3:4] = trans + mat[:3, :3] = rotation.rotation_matrix + mat[:3, 3] = np.array(translation) return mat.tolist() @@ -121,54 +178,18 @@ def calculate_lidar2cam_matrix( raise ValueError(error_msg) -def get_pose_dict(ego_pose_json: List[Dict[str, Any]]) -> Dict[str, Dict[str, Any]]: - """Create a dict mapping pose token to pose entry from ego_pose_json. - Args: - ego_pose_json (list): List of ego pose dictionaries from the annotation file. - Returns: - dict: Dictionary mapping pose tokens to their corresponding pose entries. - """ - return {ego_pose["token"]: ego_pose for ego_pose in ego_pose_json} - +def extract_frame_index(filename: str) -> str: + """Extract the frame index (all digits before the first dot in the basename). -def get_calib_dict(calib_json: List[Dict[str, Any]]) -> Dict[str, Dict[str, Any]]: - """Create a dict mapping calibration token to calibration entry from calib_json. Args: - calib_json (list): List of calibrated sensor dictionaries from the annotation file. - Returns: - dict: Dictionary mapping calibration tokens to their corresponding calibration entries. - """ - return {calib["token"]: calib for calib in calib_json} + filename: The filename to extract frame index from. - -def get_all_channels(sample_data_json: List[Dict[str, Any]]) -> List[str]: - """Return all unique camera channels by parsing filename in sample_data_json. - Args: - sample_data_json (list): List of sample data dictionaries from the annotation file. Returns: - list: Sorted list of unique camera channel names (e.g., ['CAM_FRONT', 'CAM_LEFT', ...]). - """ - return sorted( - set( - sample_data["filename"].split("/")[1] - for sample_data in sample_data_json - if sample_data["filename"].startswith("data/CAM_") - ) - ) - + The extracted frame index as a string. -def extract_frame_index(filename: str) -> str: - """ - Extract the frame index (all digits before the first dot in the basename). - E.g. 00001.jpg -> 00001, 123.pcd.bin -> 123, 5.jpg -> 5 - Args: - filename (str): The filename to extract frame index from. - Returns: - str: The extracted frame index as a string. Raises: ValueError: If no valid frame index can be extracted. """ - if not filename: raise ValueError("Empty filename provided") @@ -190,174 +211,174 @@ def extract_frame_index(filename: str) -> str: raise ValueError(f"Could not extract valid frame index from filename: {filename}") -def generate_calib_info( - annotation_dir: str, - lidar_channel: str = DEFAULT_LIDAR_CHANNEL, - scene_root: Optional[str] = None, - scene_id: Optional[str] = None, - start_sample_idx: int = 0, - target_cameras: Optional[List[str]] = None, -) -> Tuple[List[Dict[str, Any]], int]: +def _get_modality_value(sensor_record: Any) -> Optional[str]: + """Safely extract modality string value from a sensor record. + + Handles both enum (SensorModality) and plain string modality fields. + + Args: + sensor_record: A sensor record with a modality attribute. + + Returns: + Modality as a lowercase string, or None if not available. """ - Generate calibration info for each frame (grouped by filename index). - Each info contains all sensor sample_data (lidar, cameras, etc) belonging to the same frame. - Invalid samples (is_valid = false) are automatically filtered out. + modality = getattr(sensor_record, "modality", None) + if modality is None: + return None + return modality.value if hasattr(modality, "value") else modality + + +def get_available_cameras(t4: Tier4, target_cameras: Optional[List[str]] = None) -> List[str]: + """Get available camera channels from the dataset. + Args: - annotation_dir (str): Path to the annotation directory containing JSON files. - lidar_channel (str, optional): Name of the lidar channel. Defaults to "LIDAR_CONCAT". - scene_root (str, optional): Root path of the scene data. Defaults to None. - scene_id (str, optional): ID of the scene. Defaults to None. - start_sample_idx (int, optional): Starting index for sample numbering. Defaults to 0. - target_cameras (list, optional): List of target camera channels to process. - If None, processes all available cameras. Defaults to None. + t4: Tier4 dataset instance. + target_cameras: Optional list of desired camera channels. + If provided, only matching cameras are returned. + If None, all camera channels in the dataset are returned. + Returns: - tuple: (infos, next_sample_idx) where infos is a list of calibration info dictionaries - and next_sample_idx is the next available sample index. + Sorted list of camera channel names. """ - # Validate required files exist - for required_file in REQUIRED_JSON_FILES: - file_path = osp.join(annotation_dir, required_file) - if not osp.isfile(file_path): - raise FileNotFoundError(f"Required file not found: {file_path}") - - sample_data_json = load_json(osp.join(annotation_dir, "sample_data.json")) - ego_pose_json = load_json(osp.join(annotation_dir, "ego_pose.json")) - calib_json = load_json(osp.join(annotation_dir, "calibrated_sensor.json")) - calib_dict = get_calib_dict(calib_json) - ego_pose_dict = get_pose_dict(ego_pose_json) - - # Get all available camera channels if target_cameras is not specified - if target_cameras is None: - target_cameras = get_all_channels(sample_data_json) - logger.info(f"Using all available cameras: {target_cameras}") + all_cameras = sorted(sensor.channel for sensor in t4.sensor if _get_modality_value(sensor) == "camera") + if target_cameras is not None: + return [cam for cam in target_cameras if cam in all_cameras] + return all_cameras - # Group all sample_data by frame index, filtering out invalid samples - frame_groups: Dict[str, List[Dict[str, Any]]] = defaultdict(list) - invalid_count = 0 - for sd in sample_data_json: - if "filename" not in sd or not sd["filename"]: - continue - # Skip invalid sample data (is_valid = false) - if sd.get("is_valid") is False: - invalid_count += 1 - continue - frame_idx: str = extract_frame_index(sd["filename"]) - frame_groups[frame_idx].append(sd) - if invalid_count > 0: - logger.warning( - f"Found {len(frame_groups)} frames in scene {scene_id} (filtered out {invalid_count} invalid samples)" - ) - else: - logger.info(f"Found {len(frame_groups)} frames in scene {scene_id}") +def get_lidar_sources_info(t4: Tier4) -> Dict[str, Dict[str, Any]]: + """Collect all lidar sensors and their calibrated extrinsics. - infos: List[Dict[str, Any]] = [] - sample_idx: int = start_sample_idx - for _, (frame_idx, frame_sample_data) in enumerate(sorted(frame_groups.items())): + Args: + t4: Tier4 dataset instance. + + Returns: + Dictionary mapping channel name (e.g. LIDAR_CONCAT) to sensor info: + - sensor_token: Sensor token from the dataset. + - translation: [x, y, z] in meters (sensor-to-base). + - rotation: 3x3 rotation matrix as nested list (sensor-to-base). + """ + lidar_sources: Dict[str, Dict[str, Any]] = {} + for cs_rec in getattr(t4, "calibrated_sensor", []): try: - frame_infos = build_frame_info( - frame_idx, - frame_sample_data, - calib_dict, - ego_pose_dict, - scene_root, - target_cameras, - lidar_channel, - scene_id, - sample_idx, + sensor_rec = t4.get("sensor", cs_rec.sensor_token) + except KeyError: + continue + + modality_value = _get_modality_value(sensor_rec) + if modality_value != "lidar": + continue + + channel = sensor_rec.channel + if channel not in lidar_sources: + rot_matrix = cs_rec.rotation.rotation_matrix + lidar_sources[channel] = dict( + sensor_token=sensor_rec.token, + translation=np.array(cs_rec.translation).tolist(), + rotation=rot_matrix.tolist(), ) - if frame_infos: - infos.extend(frame_infos) - sample_idx += len(frame_infos) - except ValueError as e: - logger.error(f"Failed to process frame {frame_idx} in scene {scene_id}: {e}") - raise - return infos, sample_idx + + return lidar_sources -def build_frame_info( - frame_idx: str, - frame_sample_data: List[Dict[str, Any]], - calib_dict: Dict[str, Dict[str, Any]], - ego_pose_dict: Dict[str, Dict[str, Any]], +def build_sample_infos( + t4: Tier4, + sample: Sample, + sample_idx: int, scene_root: str, target_cameras: List[str], lidar_channel: str, - scene_id: Optional[str] = None, - sample_idx: int = 0, + scene_id: str, + lidar_sources: Dict[str, Any], + root_path: Optional[str] = None, + size_threshold: Optional[int] = None, ) -> List[Dict[str, Any]]: - """Build frame info for each target camera. If target_cameras is None, build for all cameras. + """Build calibration info dicts for a single sample, one per camera. + Args: - frame_idx (str): Frame index identifier. - frame_sample_data (list): List of sample data dictionaries for this frame. - calib_dict (dict): Dictionary mapping calibration tokens to calibration entries. - ego_pose_dict (dict): Dictionary mapping pose tokens to pose entries. - scene_root (str): Root path of the scene data. - target_cameras (list): List of target camera channels to process. - lidar_channel (str): Name of the lidar channel. - scene_id (str, optional): ID of the scene. Defaults to None. - sample_idx (int, optional): Starting index for sample numbering. Defaults to 0. + t4: Tier4 dataset instance. + sample: Sample (keyframe) record to process. + sample_idx: Starting index for sample numbering. + scene_root: Relative scene root path (from root_path). + target_cameras: List of camera channels to process. + lidar_channel: Name of the lidar channel. + scene_id: ID of the scene. + lidar_sources: Dict mapping lidar channel names to sensor info (sensor_token, translation [x,y,z], rotation 3x3 matrix). Computed once per scene. + root_path: Absolute root path of the dataset for image validation. + size_threshold: File size threshold in bytes for filtering likely black images. + Returns: - list: List of frame info dictionaries, one for each available camera in the frame. - Each info contains camera data, lidar data, and transformation matrices. - Raises: - ValueError: If no lidar data is found for the given frame. + List of info dictionaries, one per available camera in this sample. """ + # Get lidar sample_data token + lidar_token = sample.data.get(lidar_channel) + if lidar_token is None: + logger.warning(f"No lidar channel '{lidar_channel}' in sample {sample.token}, scene {scene_id}, skipping.") + return [] - # Find lidar data for this frame - lidar_data: Optional[Dict[str, Any]] = None - for sd in frame_sample_data: - filename = sd["filename"] - if filename.startswith(f"data/{lidar_channel}/"): - lidar_calib = calib_dict[sd["calibrated_sensor_token"]] - lidar_pose = ego_pose_dict[sd["ego_pose_token"]] - lidar_data = { - "lidar_path": osp.join(scene_root, filename), - "lidar_pose": convert_quaternion_to_matrix(lidar_pose["rotation"], lidar_pose["translation"]), - "lidar2ego": convert_quaternion_to_matrix(lidar_calib["rotation"], lidar_calib["translation"]), - "timestamp": sd["timestamp"], - "sample_data_token": sd["token"], - } - break - - if lidar_data is None: - logger.warning(f"No lidar data found for frame {frame_idx} in scene {scene_id}, skipping frame.") + sd_lidar = t4.get("sample_data", lidar_token) + if not sd_lidar.is_valid: + logger.warning(f"Lidar sample_data is invalid for sample {sample.token}, scene {scene_id}, skipping.") return [] - # Find camera data for this frame - camera_data: Dict[str, Dict[str, Any]] = {} - for sd in frame_sample_data: - filename = sd["filename"] - for cam in target_cameras: - if filename.startswith(f"data/{cam}/"): - cam_calib = calib_dict[sd["calibrated_sensor_token"]] - cam_pose = ego_pose_dict[sd["ego_pose_token"]] - camera_data[cam] = { - "img_path": osp.join(scene_root, filename), - "cam2img": cam_calib["camera_intrinsic"], - "cam2ego": convert_quaternion_to_matrix(cam_calib["rotation"], cam_calib["translation"]), - "cam_pose": convert_quaternion_to_matrix(cam_pose["rotation"], cam_pose["translation"]), - "distortion_coefficients": cam_calib["camera_distortion"], - "sample_data_token": sd["token"], - "timestamp": sd["timestamp"], - "height": sd["height"], - "width": sd["width"], - } - break + cs_lidar = t4.get("calibrated_sensor", sd_lidar.calibrated_sensor_token) + ep_lidar = t4.get("ego_pose", sd_lidar.ego_pose_token) - # Filter to only include cameras that exist in the data - available_cameras: List[str] = [cam for cam in target_cameras if cam in camera_data] + lidar2ego = build_transform_matrix(cs_lidar.rotation, cs_lidar.translation) + lidar_pose = build_transform_matrix(ep_lidar.rotation, ep_lidar.translation) + frame_idx = extract_frame_index(sd_lidar.filename) + + lidar_data: Dict[str, Any] = { + "lidar_path": osp.join(scene_root, sd_lidar.filename), + "lidar_pose": lidar_pose, + "lidar2ego": lidar2ego, + "timestamp": sd_lidar.timestamp, + "sample_data_token": sd_lidar.token, + } infos: List[Dict[str, Any]] = [] - for i, cam in enumerate(available_cameras): - cam_info = camera_data[cam] + valid_count = 0 - # Calculate lidar2cam transformation matrix - lidar2ego = lidar_data["lidar2ego"] - lidar_pose = lidar_data["lidar_pose"] - cam_pose = cam_info["cam_pose"] - cam2ego = cam_info["cam2ego"] + for cam in target_cameras: + cam_token = sample.data.get(cam) + if cam_token is None: + continue + + sd_cam = t4.get("sample_data", cam_token) + if not sd_cam.is_valid: + logger.debug(f"Camera {cam} sample_data is invalid in frame {frame_idx}, scene {scene_id}, skipping.") + continue + + cs_cam = t4.get("calibrated_sensor", sd_cam.calibrated_sensor_token) + ep_cam = t4.get("ego_pose", sd_cam.ego_pose_token) + + cam2ego = build_transform_matrix(cs_cam.rotation, cs_cam.translation) + cam_pose = build_transform_matrix(ep_cam.rotation, ep_cam.translation) + + cam_info: Dict[str, Any] = { + "img_path": osp.join(scene_root, sd_cam.filename), + "cam2img": np.array(cs_cam.camera_intrinsic).tolist(), + "cam2ego": cam2ego, + "cam_pose": cam_pose, + "distortion_coefficients": np.array(cs_cam.camera_distortion).tolist(), + "sample_data_token": sd_cam.token, + "timestamp": sd_cam.timestamp, + "height": sd_cam.height, + "width": sd_cam.width, + } + # Check if image is likely black based on file size (when filtering is enabled) + if size_threshold is not None and root_path is not None: + img_abs_path = osp.join(root_path, cam_info["img_path"]) + if is_image_likely_black(img_abs_path, size_threshold): + file_size = osp.getsize(img_abs_path) if osp.isfile(img_abs_path) else 0 + logger.warning( + f"Skipping likely black image for camera {cam} in frame {frame_idx}, " + f"scene {scene_id} (size: {file_size / 1000:.1f}KB < threshold {size_threshold / 1000:.1f}KB)" + ) + continue + + # Calculate lidar2cam transformation matrix try: cam_info["lidar2cam"] = calculate_lidar2cam_matrix( lidar2ego, lidar_pose, cam_pose, cam2ego, camera_name=cam @@ -366,30 +387,98 @@ def build_frame_info( logger.error(f"Failed to process frame {frame_idx} for camera {cam} in scene {scene_id}: {e}") raise - # Create info for this camera - info = { + info: Dict[str, Any] = { "frame_idx": frame_idx, "frame_id": cam, "image": cam_info, "lidar_points": lidar_data, - "sample_idx": sample_idx + i, + "lidar_sources": lidar_sources, + "sample_idx": sample_idx + valid_count, "scene_id": scene_id, } infos.append(info) + valid_count += 1 return infos +def generate_scene_calib_info( + t4: Tier4, + scene_root: str, + scene_id: str, + start_sample_idx: int = 0, + target_cameras: Optional[List[str]] = None, + lidar_channel: str = DEFAULT_LIDAR_CHANNEL, + root_path: Optional[str] = None, + filter_black_images: bool = False, +) -> Tuple[List[Dict[str, Any]], int]: + """Generate calibration info for all keyframe samples in a scene. + + Each info contains camera and lidar data with their respective transformation matrices, + plus lidar sources info for the scene. + + Args: + t4: Tier4 dataset instance for the scene. + scene_root: Relative scene root path (from root_path). + scene_id: ID of the scene. + start_sample_idx: Starting index for sample numbering. + target_cameras: List of target camera channels to process. + If None, processes all available cameras. + lidar_channel: Name of the lidar channel. + root_path: Absolute root path of the dataset for image validation. + filter_black_images: Whether to filter out black images. + + Returns: + Tuple of (infos, next_sample_idx) where infos is a list of calibration + info dictionaries and next_sample_idx is the next available sample index. + """ + # Resolve available cameras + if target_cameras is None: + target_cameras = get_available_cameras(t4) + logger.info(f"Using all available cameras: {target_cameras}") + + # Calculate size threshold for black image filtering (once per scene) + size_threshold: Optional[int] = None + if filter_black_images and root_path is not None: + size_threshold = calculate_size_threshold(t4, root_path, scene_root, target_cameras) + + # Compute lidar sources info (once per scene) + lidar_sources = get_lidar_sources_info(t4) + + logger.info(f"Processing {len(t4.sample)} samples in scene {scene_id}") + + infos: List[Dict[str, Any]] = [] + sample_idx = start_sample_idx + + for sample in t4.sample: + try: + sample_infos = build_sample_infos( + t4, + sample, + sample_idx, + scene_root, + target_cameras, + lidar_channel, + scene_id, + lidar_sources, + root_path, + size_threshold, + ) + if sample_infos: + infos.extend(sample_infos) + sample_idx += len(sample_infos) + except ValueError as e: + logger.error(f"Failed to process sample {sample.token} in scene {scene_id}: {e}") + raise + + return infos, sample_idx + + def parse_args() -> argparse.Namespace: """Parse command line arguments for the T4dataset calibration info creation script. + Returns: - argparse.Namespace: Parsed command line arguments containing: - - config: Path to the T4dataset configuration file - - root_path: Root path of the dataset - - version: Product version - - out_dir: Output directory for info files - - lidar_channel: Lidar channel name (default: LIDAR_CONCAT) - - target_cameras: List of target cameras to process (default: all cameras) + Parsed command line arguments. """ parser = argparse.ArgumentParser(description="Create calibration info for T4dataset (classification version)") parser.add_argument("--config", type=str, required=True, help="config for T4dataset") @@ -402,42 +491,42 @@ def parse_args() -> argparse.Namespace: parser.add_argument( "--target_cameras", nargs="*", default=None, help="Target cameras to generate info for (default: all cameras)" ) + parser.add_argument("--filter", action="store_true", help="Filter out black images (all pixels are zero)") return parser.parse_args() def get_scene_root_dir_path(root_path: str, dataset_version: str, scene_id: str) -> str: """Get the scene root directory path, handling version subdirectories. + Args: - root_path (str): Root path of the dataset. - dataset_version (str): Version of the dataset. - scene_id (str): ID of the scene (may include version number like "uuid version"). + root_path: Root path of the dataset. + dataset_version: Version of the dataset. + scene_id: ID of the scene (format: "uuid/version", e.g., "c4edef0a-1d00-4f0d-9e08-65ba8b0692ff/0"). + Returns: - str: Path to the scene root directory. If scene_id contains a version number, - returns the path with the version. Otherwise, looks for version subdirectories - and returns the path to the highest version number subdirectory. + Path to the scene root directory. """ - # Check if scene_id already contains a version number (e.g., "uuid version") - scene_id_parts = scene_id.strip().split() + scene_id = scene_id.strip() + scene_id_parts = scene_id.split("/") + if len(scene_id_parts) == 2 and scene_id_parts[1].isdigit(): - # scene_id contains version number, use it directly base_scene_id = scene_id_parts[0] version_id = scene_id_parts[1] scene_root_dir_path = osp.join(root_path, dataset_version, base_scene_id) return osp.join(scene_root_dir_path, version_id) - else: - raise ValueError(f"Invalid scene_id format: {scene_id}") + + raise ValueError(f"Invalid scene_id format: {scene_id}. Expected 'uuid/version'.") def main() -> None: """Main function to create calibration info for T4dataset. + This function: 1. Parses command line arguments 2. Loads configuration files 3. Iterates through dataset versions and scenes - 4. Generates calibration info for each scene + 4. Instantiates Tier4 for each scene and generates calibration info 5. Saves the results to pickle files for each split (train/val/test) - The script processes T4dataset annotations to create calibration information - that includes camera and lidar data with their respective transformation matrices. """ args = parse_args() cfg = Config.fromfile(args.config) @@ -448,42 +537,54 @@ def main() -> None: logger.info(f"Lidar channel: {args.lidar_channel}") if args.target_cameras: logger.info(f"Target cameras: {args.target_cameras}") + if args.filter: + logger.info("Black image filtering is enabled") abs_root_path = osp.abspath(args.root_path) split_infos: Dict[str, List[Dict[str, Any]]] = {split: [] for split in SUPPORTED_SPLITS} split_sample_idx: Dict[str, int] = {split: 0 for split in SUPPORTED_SPLITS} + logger.info(f"Processing dataset versions: {cfg.dataset_version_list}") for dataset_version in cfg.dataset_version_list: dataset_list = osp.join(cfg.dataset_version_config_root, dataset_version + ".yaml") with open(dataset_list, "r") as f: dataset_list_dict: Dict[str, Any] = yaml.safe_load(f) + for split in SUPPORTED_SPLITS: for scene_id in dataset_list_dict.get(split, []): - scene_root = get_scene_root_dir_path(args.root_path, dataset_version, scene_id) - annotation_dir = osp.join(scene_root, "annotation") + scene_root_dir_path = get_scene_root_dir_path(args.root_path, dataset_version, scene_id) + annotation_dir = osp.join(scene_root_dir_path, "annotation") + logger.debug( - f"split={split}, scene_id={scene_id}, annotation_dir={annotation_dir}, exists={osp.isdir(annotation_dir)}" + f"split={split}, scene_id={scene_id}, " + f"annotation_dir={annotation_dir}, exists={osp.isdir(annotation_dir)}" ) + if not osp.isdir(annotation_dir): logger.warning(f"Annotation dir not found: {annotation_dir}, skip.") continue + logger.info(f"Generating calibration info for {scene_id} ({split}) ...") - rel_scene_root = osp.relpath(scene_root, abs_root_path) - scene_infos: List[Dict[str, Any]] + rel_scene_root = osp.relpath(scene_root_dir_path, abs_root_path) + try: - scene_infos, split_sample_idx[split] = generate_calib_info( - annotation_dir, - args.lidar_channel, + t4 = Tier4(data_root=scene_root_dir_path, verbose=False) + scene_infos, split_sample_idx[split] = generate_scene_calib_info( + t4, rel_scene_root, scene_id, split_sample_idx[split], args.target_cameras, + args.lidar_channel, + abs_root_path, + args.filter, ) split_infos[split].extend(scene_infos) except ValueError as e: logger.error(f"Failed to process scene {scene_id} ({split}): {e}") raise + # Save per split metainfo: Dict[str, str] = dict(version=args.version) logger.info("Saving processed data to pickle files...")