From 90b0e4bd02673624b1aab7161a38bf7837f8a30a Mon Sep 17 00:00:00 2001 From: AnikethCheluva Date: Sun, 25 Jan 2026 13:51:47 -0500 Subject: [PATCH] make config changes --- egomimic/hydra_configs/data/mecka_all.yaml | 34 ++ .../hydra_configs/data/mecka_cotrain.yaml | 50 ++ .../data/mecka_human_fold_clothes.yaml | 36 ++ egomimic/hydra_configs/data/mecka_test.yaml | 27 +- egomimic/hydra_configs/data/obj_cont_eva.yaml | 44 ++ .../hydra/launcher/submitit_pace.yaml | 6 +- .../hydra/launcher/submitit_skynet.yaml | 23 +- egomimic/hydra_configs/logger/wandb.yaml | 2 +- .../model/hpt_bc_flow_mecka_1B.yaml | 122 +++++ .../model/hpt_bc_flow_mecka_300M.yaml | 123 +++++ .../model/hpt_bc_flow_mecka_600M.yaml | 123 +++++ egomimic/hydra_configs/train.yaml | 176 ++++--- egomimic/hydra_configs/trainer/default.yaml | 6 +- egomimic/pl_utils/pl_sampler.py | 40 ++ egomimic/rldb/utils.py | 467 +++++++++--------- egomimic/scripts/change_mecka.py | 136 +++++ egomimic/utils/aws/sql_tutorial.ipynb | 18 +- .../common/datasets/lerobot_dataset.py | 121 +++-- external/openpi | 2 +- 19 files changed, 1167 insertions(+), 389 deletions(-) create mode 100644 egomimic/hydra_configs/data/mecka_all.yaml create mode 100644 egomimic/hydra_configs/data/mecka_cotrain.yaml create mode 100644 egomimic/hydra_configs/data/mecka_human_fold_clothes.yaml create mode 100644 egomimic/hydra_configs/data/obj_cont_eva.yaml create mode 100644 egomimic/hydra_configs/model/hpt_bc_flow_mecka_1B.yaml create mode 100644 egomimic/hydra_configs/model/hpt_bc_flow_mecka_300M.yaml create mode 100644 egomimic/hydra_configs/model/hpt_bc_flow_mecka_600M.yaml create mode 100644 egomimic/pl_utils/pl_sampler.py create mode 100644 egomimic/scripts/change_mecka.py diff --git a/egomimic/hydra_configs/data/mecka_all.yaml b/egomimic/hydra_configs/data/mecka_all.yaml new file mode 100644 index 00000000..88b81f0e --- /dev/null +++ b/egomimic/hydra_configs/data/mecka_all.yaml @@ -0,0 +1,34 @@ +_target_: egomimic.pl_utils.pl_data_utils.MultiDataModuleWrapper + + +train_datasets: + dataset1: + _target_: egomimic.rldb.utils.S3RLDBDataset + bucket_name: "rldb" + mode: train + local_mode: true + sample_percent: 0.1 + filters: + lab: "mecka" + embodiment: "mecka_bimanual" + +valid_datasets: + dataset1: + _target_: egomimic.rldb.utils.S3RLDBDataset + bucket_name: "rldb" + mode: valid + local_mode: true + sample_percent: 0.1 + filters: + lab: "mecka" + embodiment: "mecka_bimanual" + +train_dataloader_params: + dataset1: + batch_size: 32 + num_workers: 8 + +valid_dataloader_params: + dataset1: + batch_size: 32 + num_workers: 8 \ No newline at end of file diff --git a/egomimic/hydra_configs/data/mecka_cotrain.yaml b/egomimic/hydra_configs/data/mecka_cotrain.yaml new file mode 100644 index 00000000..e6ce1ab1 --- /dev/null +++ b/egomimic/hydra_configs/data/mecka_cotrain.yaml @@ -0,0 +1,50 @@ +_target_: egomimic.pl_utils.pl_data_utils.MultiDataModuleWrapper + +train_datasets: + # dataset1: + # _target_: egomimic.rldb.utils.S3RLDBDataset + # bucket_name: "rldb" + # embodiment: "mecka_bimanual" + # mode: train + # filters: + # task: "potting_soil" + # local_files_only: True + dataset2: + _target_: egomimic.rldb.utils.S3RLDBDataset + bucket_name: "rldb" + embodiment: "aria_right_arm" + mode: train + filters: + task: "object in container" + lab: "song" + local_files_only: True + +valid_datasets: + # dataset1: + # _target_: egomimic.rldb.utils.S3RLDBDataset + # bucket_name: "rldb" + # embodiment: "mecka_bimanual" + # mode: valid + # filters: + # task: "potting_soil" + # local_files_only: True + dataset2: + _target_: egomimic.rldb.utils.S3RLDBDataset + bucket_name: "rldb" + embodiment: "aria_right_arm" + mode: valid + filters: + task: "object in container" + lab: "song" + local_files_only: True + + +train_dataloader_params: + dataset1: + batch_size: 32 + num_workers: 10 + +valid_dataloader_params: + dataset1: + batch_size: 32 + num_workers: 10 \ No newline at end of file diff --git a/egomimic/hydra_configs/data/mecka_human_fold_clothes.yaml b/egomimic/hydra_configs/data/mecka_human_fold_clothes.yaml new file mode 100644 index 00000000..9bbbd929 --- /dev/null +++ b/egomimic/hydra_configs/data/mecka_human_fold_clothes.yaml @@ -0,0 +1,36 @@ +_target_: egomimic.pl_utils.pl_data_utils.MultiDataModuleWrapper + +train_datasets: + dataset1: + _target_: egomimic.rldb.utils.S3RLDBDataset + bucket_name: "rldb" + main_prefix: "mecka" + mode: train + embodiment: "mecka_bimanual" + temp_root: "/coc/flash7/scratch/egoverseS3Dataset" + filters: + lab: "mecka" + task: "fold_clothes" + local_files_only: True + +valid_datasets: + dataset1: + _target_: egomimic.rldb.utils.S3RLDBDataset + bucket_name: "rldb" + mode: valid + embodiment: "mecka_bimanual" + temp_root: "/coc/flash7/scratch/egoverseS3Dataset" + filters: + lab: "mecka" + task: "fold_clothes" + local_files_only: True + +train_dataloader_params: + dataset1: + batch_size: 32 + num_workers: 10 + +valid_dataloader_params: + dataset1: + batch_size: 32 + num_workers: 10 \ No newline at end of file diff --git a/egomimic/hydra_configs/data/mecka_test.yaml b/egomimic/hydra_configs/data/mecka_test.yaml index 7f190339..03bb217b 100644 --- a/egomimic/hydra_configs/data/mecka_test.yaml +++ b/egomimic/hydra_configs/data/mecka_test.yaml @@ -2,28 +2,41 @@ _target_: egomimic.pl_utils.pl_data_utils.MultiDataModuleWrapper train_datasets: dataset1: - _target_: rldb.utils.RLDBDataset - repo_id: "mecka_test" + _target_: rldb.utils.S3RLDBDataset + bucket_name: "rldb" mode: train + local_mode: True embodiment: "mecka_bimanual" - root: "/coc/flash7/acheluva3/EgoVerse/mecka_demo" + filters: + task: "fold_clothes" local_files_only: True + valid_datasets: dataset1: - _target_: rldb.utils.RLDBDataset - repo_id: "mecka_test" + _target_: rldb.utils.S3RLDBDataset + bucket_name: "rldb" mode: valid + local_mode: True embodiment: "mecka_bimanual" - root: "/coc/flash7/acheluva3/EgoVerse/mecka_demo" + filters: + task: "fold_clothes" local_files_only: True + + train_dataloader_params: dataset1: batch_size: 32 num_workers: 10 + # dataset2: + # batch_size: 32 + # num_workers: 10 valid_dataloader_params: dataset1: batch_size: 32 - num_workers: 10 \ No newline at end of file + num_workers: 10 + # dataset2: + # batch_size: 32 + # num_workers: 10 \ No newline at end of file diff --git a/egomimic/hydra_configs/data/obj_cont_eva.yaml b/egomimic/hydra_configs/data/obj_cont_eva.yaml new file mode 100644 index 00000000..354d1185 --- /dev/null +++ b/egomimic/hydra_configs/data/obj_cont_eva.yaml @@ -0,0 +1,44 @@ +_target_: egomimic.pl_utils.pl_data_utils.MultiDataModuleWrapper +use_tokenizer: false + +train_datasets: + dataset1: + _target_: egomimic.rldb.utils.S3RLDBDataset + bucket_name: "rldb" + temp_root: "/coc/cedarp-dxu345-0/datasets/egoverse" + mode: train + embodiment: "eva_right_arm" + filters: + lab: "rl2" + task: "object in container" + robot_name: "eva_right_arm" + # objects: + # - "green bowl" + # - "caprisun" + local_files_only: True + +valid_datasets: + dataset1: + _target_: egomimic.rldb.utils.S3RLDBDataset + bucket_name: "rldb" + temp_root: "/coc/cedarp-dxu345-0/datasets/egoverse" + mode: valid + embodiment: "eva_right_arm" + filters: + lab: "rl2" + task: "object in container" + robot_name: "eva_right_arm" + # objects: + # - "green bowl" + # - "caprisun" + local_files_only: True + +train_dataloader_params: + dataset1: + batch_size: 32 + num_workers: 8 + +valid_dataloader_params: + dataset1: + batch_size: 32 + num_workers: 8 \ No newline at end of file diff --git a/egomimic/hydra_configs/hydra/launcher/submitit_pace.yaml b/egomimic/hydra_configs/hydra/launcher/submitit_pace.yaml index 2d9cd957..cdfb0374 100644 --- a/egomimic/hydra_configs/hydra/launcher/submitit_pace.yaml +++ b/egomimic/hydra_configs/hydra/launcher/submitit_pace.yaml @@ -5,13 +5,13 @@ _target_: hydra_plugins.hydra_submitit_launcher.submitit_launcher.SlurmLauncher # Slurm configuration name: ${hydra.job.name} # Default job name -partition: "gpu-h200" # Slurm partition +# partition: "gpu-h200" # Slurm partition account: "gts-dxu345-rl2" # Slurm account -cpus_per_task: 8 # Number of CPUs per task (max 4:1 CPU:GPU ratio) +cpus_per_task: 4 # Number of CPUs per task (max 4:1 CPU:GPU ratio) nodes: ${launch_params.nodes} # Number of nodes tasks_per_node: ${launch_params.gpus_per_node} # Use variable for tasks per node gres: "gpu:h200:${eval:'${launch_params.gpus_per_node} * ${launch_params.nodes}'}" # GPU type and count (h100 for H100 GPUs) -qos: "short" # Slurm QoS +qos: "inferno" # Slurm QoS mem_per_gpu: 250G timeout_min: 2880 # Timeout in minutes (48 hours) # exclude: "protocol, puma" # Nodes to exclude diff --git a/egomimic/hydra_configs/hydra/launcher/submitit_skynet.yaml b/egomimic/hydra_configs/hydra/launcher/submitit_skynet.yaml index f50c3334..370e0fb7 100644 --- a/egomimic/hydra_configs/hydra/launcher/submitit_skynet.yaml +++ b/egomimic/hydra_configs/hydra/launcher/submitit_skynet.yaml @@ -4,16 +4,23 @@ defaults: _target_: hydra_plugins.hydra_submitit_launcher.submitit_launcher.SlurmLauncher name: ${hydra.job.name} -partition: "rl2-lab" -account: "rl2-lab" +partition: "hoffman-lab" +account: "hoffman-lab" # Override nodes via: hydra.launcher.nodes= -nodes: 1 cpus_per_task: 15 -# Override via: hydra.launcher.tasks_per_node= -tasks_per_node: 4 + +nodes: ${launch_params.nodes} +tasks_per_node: ${launch_params.gpus_per_node} +gres: "gpu:a40:${eval:'${launch_params.gpus_per_node} * ${launch_params.nodes}'}" +mem_per_gpu: 250G + qos: "short" -timeout_min: 2880 +timeout_min: 2880 # 48 hours + +# env: +# NCCL_TIMEOUT: 7200 +# NCCL_DEBUG: INFO +# NCCL_ASYNC_ERROR_HANDLING: "1" additional_parameters: - requeue: true - gpus-per-node: "a40:1" \ No newline at end of file + requeue: true \ No newline at end of file diff --git a/egomimic/hydra_configs/logger/wandb.yaml b/egomimic/hydra_configs/logger/wandb.yaml index 3013fcee..f7c30e11 100644 --- a/egomimic/hydra_configs/logger/wandb.yaml +++ b/egomimic/hydra_configs/logger/wandb.yaml @@ -7,7 +7,7 @@ wandb: offline: False id: "${name}_${description}_${now:%Y-%m-%d_%H-%M-%S}" # pass correct id to resume experiment! anonymous: null # enable anonymous logging - project: "everse_flagship" + project: "everse_mecka_hpt_ossification" log_model: False # upload lightning ckpts prefix: "" # a string to put at the beginning of metric keys entity: "rl2-group" # set to name of your wandb team diff --git a/egomimic/hydra_configs/model/hpt_bc_flow_mecka_1B.yaml b/egomimic/hydra_configs/model/hpt_bc_flow_mecka_1B.yaml new file mode 100644 index 00000000..ba11520a --- /dev/null +++ b/egomimic/hydra_configs/model/hpt_bc_flow_mecka_1B.yaml @@ -0,0 +1,122 @@ +_target_: egomimic.pl_utils.pl_model.ModelWrapper +robomimic_model: + _target_: egomimic.algo.hpt.HPT + data_schematic: _${data.dataset.data_schematic} + camera_transforms: + mecka_bimanual: + _target_: egomimic.utils.egomimicUtils.CameraTransforms + intrinsics_key: "mecka" # change to base_half if using half res + extrinsics_key: "mecka" + + diffusion: true + 6dof: true + + ac_keys: + mecka_bimanual: "actions_cartesian" + + trunk: + embed_dim: 1428 #94 + num_blocks: 26 + num_heads: 14 + token_postprocessing: "action_token" + observation_horizon: 1 + action_horizon: 64 + no_trunk: false + use_domain_embedding: true + drop_path: 0.1 + weight_init_style: "pytorch" + + multitask: false + pretrained: false + pretrained_checkpoint: "" # TODO + reverse_kl_samples: 8 + + domains: ["mecka_bimanual"] + shared_obs_keys: ["front_img_1"] + + shared_stem_specs: + front_img_1: + _target_: egomimic.models.hpt_nets.MLPPolicyStem + input_dim: 1428 + output_dim: 1428 + widths: [1428] + specs: + random_horizon_masking: false + cross_attn: + crossattn_latent: 22 + crossattn_heads: 16 + crossattn_dim_head: 416 + crossattn_modality_dropout: 0.1 + modality_embed_dim: 1428 + + stem_specs: + mecka_bimanual: + state_ee_pose: + _target_: egomimic.models.hpt_nets.MLPPolicyStem + input_dim: 12 + output_dim: 1428 + widths: [1428] + specs: + random_horizon_masking: false + cross_attn: + crossattn_latent: 22 + crossattn_heads: 16 + crossattn_dim_head: 416 + crossattn_modality_dropout: 0.1 + modality_embed_dim: 1428 + + head_specs: + mecka_bimanual: + _target_: egomimic.models.fm_policy.FMPolicy + action_horizon: 100 + num_inference_steps: 50 + pooling: null + time_dist: "beta" + infer_ac_dims: + mecka_bimanual: 12 + model: + _target_: egomimic.models.denoising_nets.CrossTransformer + nblocks: 6 + cond_dim: 1428 + hidden_dim: 408 + act_dim: 12 + act_seq: 100 + n_heads: 6 + dropout: 0.1 + mlp_layers: 7 + mlp_ratio: 7 + + encoder_specs: + front_img_1: + _target_: egomimic.models.hpt_nets.ResNet + output_dim: 1428 + + train_image_augs: + _target_: torchvision.transforms.Compose + transforms: + - _target_: torchvision.transforms.ColorJitter + brightness: 0.1 + contrast: 0.1 + saturation: 0.1 + hue: 0.05 + - _target_: torchvision.transforms.Normalize + mean: [0.485, 0.456, 0.406] + std: [0.229, 0.224, 0.225] + eval_image_augs: + _target_: torchvision.transforms.Compose + transforms: + - _target_: torchvision.transforms.Normalize + mean: [0.485, 0.456, 0.406] + std: [0.229, 0.224, 0.225] + +optimizer: + _target_: torch.optim.AdamW + _partial_: true + lr: 3e-4 + weight_decay: 0.0001 + +scheduler: + _target_: torch.optim.lr_scheduler.CosineAnnealingLR + _partial_: true + T_max: 5500 + eta_min: 1e-5 diff --git a/egomimic/hydra_configs/model/hpt_bc_flow_mecka_300M.yaml b/egomimic/hydra_configs/model/hpt_bc_flow_mecka_300M.yaml new file mode 100644 index 00000000..1d63314f --- /dev/null +++ b/egomimic/hydra_configs/model/hpt_bc_flow_mecka_300M.yaml @@ -0,0 +1,123 @@ +_target_: egomimic.pl_utils.pl_model.ModelWrapper +robomimic_model: + _target_: egomimic.algo.hpt.HPT + data_schematic: _${data.dataset.data_schematic} + camera_transforms: + mecka_bimanual: + _target_: egomimic.utils.egomimicUtils.CameraTransforms + intrinsics_key: "mecka" # change to base_half if using half res + extrinsics_key: "mecka" + + diffusion: true + 6dof: true + + ac_keys: + mecka_bimanual: "actions_cartesian" + + trunk: + embed_dim: 840 # changed from 256 #84 + num_blocks: 24 # changed from 16 + num_heads: 10 # changed from 8 + token_postprocessing: "action_token" + observation_horizon: 1 + action_horizon: 64 + no_trunk: false + use_domain_embedding: true + drop_path: 0.1 + weight_init_style: "pytorch" + + multitask: false + pretrained: false + pretrained_checkpoint: "" # TODO + reverse_kl_samples: 8 + + domains: ["mecka_bimanual"] + shared_obs_keys: ["front_img_1"] + + shared_stem_specs: + front_img_1: + _target_: egomimic.models.hpt_nets.MLPPolicyStem + input_dim: 840 # changed from 512 + output_dim: 840 #changed + widths: [840] # changed from 840 + specs: + random_horizon_masking: false + cross_attn: + crossattn_latent: 18 + crossattn_heads: 10 + crossattn_dim_head: 140 # changed from 256 + crossattn_modality_dropout: 0.1 + modality_embed_dim: 840 # changed from 840 + + stem_specs: + mecka_bimanual: + state_ee_pose: + _target_: egomimic.models.hpt_nets.MLPPolicyStem + input_dim: 12 + output_dim: 840 # changed from 840 + widths: [840] # changed from 840 + specs: + random_horizon_masking: false + cross_attn: + crossattn_latent: 18 + crossattn_heads: 10 + crossattn_dim_head: 140 # changed from 256 changed from 1024 + crossattn_modality_dropout: 0.1 + modality_embed_dim: 840 # changed from 840 changed from 1536 + + head_specs: + mecka_bimanual: + _target_: egomimic.models.fm_policy.FMPolicy + action_horizon: 100 + num_inference_steps: 50 + pooling: null + time_dist: "beta" + infer_ac_dims: + mecka_bimanual: 12 + model: + _target_: egomimic.models.denoising_nets.CrossTransformer + nblocks: 6 + cond_dim: 840 # changed from 256 changed from 1536 + hidden_dim: 320 #changed from 128 + act_dim: 12 + act_seq: 100 + n_heads: 5 # changed from 4 changed from 16 + dropout: 0.1 + mlp_layers: 5 # edit num of mlp layers + mlp_ratio: 5 + + encoder_specs: + front_img_1: + _target_: egomimic.models.hpt_nets.ResNet + output_dim: 840 # changed from 512 changed from 1536 + + + train_image_augs: + _target_: torchvision.transforms.Compose + transforms: + - _target_: torchvision.transforms.ColorJitter + brightness: 0.1 + contrast: 0.1 + saturation: 0.1 + hue: 0.05 + - _target_: torchvision.transforms.Normalize + mean: [0.485, 0.456, 0.406] + std: [0.229, 0.224, 0.225] + eval_image_augs: + _target_: torchvision.transforms.Compose + transforms: + - _target_: torchvision.transforms.Normalize + mean: [0.485, 0.456, 0.406] + std: [0.229, 0.224, 0.225] + +optimizer: + _target_: torch.optim.AdamW + _partial_: true + lr: 1e-4 + weight_decay: 0.0001 + +scheduler: + _target_: torch.optim.lr_scheduler.CosineAnnealingLR + _partial_: true + T_max: 5500 + eta_min: 1e-5 diff --git a/egomimic/hydra_configs/model/hpt_bc_flow_mecka_600M.yaml b/egomimic/hydra_configs/model/hpt_bc_flow_mecka_600M.yaml new file mode 100644 index 00000000..6f49b662 --- /dev/null +++ b/egomimic/hydra_configs/model/hpt_bc_flow_mecka_600M.yaml @@ -0,0 +1,123 @@ +_target_: egomimic.pl_utils.pl_model.ModelWrapper +robomimic_model: + _target_: egomimic.algo.hpt.HPT + data_schematic: _${data.dataset.data_schematic} + camera_transforms: + mecka_bimanual: + _target_: egomimic.utils.egomimicUtils.CameraTransforms + intrinsics_key: "mecka" # change to base_half if using half res + extrinsics_key: "mecka" + + diffusion: true + 6dof: true + + ac_keys: + mecka_bimanual: "actions_cartesian" + + trunk: + embed_dim: 1176 # changed from 256 #96 + num_blocks: 20 # changed from 16 + num_heads: 12 # changed from 8 + token_postprocessing: "action_token" + observation_horizon: 1 + action_horizon: 64 + no_trunk: false + use_domain_embedding: true + drop_path: 0.1 + weight_init_style: "pytorch" + + multitask: false + pretrained: false + pretrained_checkpoint: "" # TODO + reverse_kl_samples: 8 + + domains: ["mecka_bimanual"] + shared_obs_keys: ["front_img_1"] + + shared_stem_specs: + front_img_1: + _target_: egomimic.models.hpt_nets.MLPPolicyStem + input_dim: 1176 # changed from 512 + output_dim: 1176 #changed + widths: [1176] # changed from 768 + specs: + random_horizon_masking: false + cross_attn: + crossattn_latent: 20 + crossattn_heads: 14 + crossattn_dim_head: 308 # changed from 256 + crossattn_modality_dropout: 0.1 + modality_embed_dim: 1176 # changed from 768 + + stem_specs: + mecka_bimanual: + state_ee_pose: + _target_: egomimic.models.hpt_nets.MLPPolicyStem + input_dim: 12 + output_dim: 1176 # changed from 768 + widths: [1176] # changed from 768 + specs: + random_horizon_masking: false + cross_attn: + crossattn_latent: 20 + crossattn_heads: 14 + crossattn_dim_head: 308 # changed from 256 changed from 1176 + crossattn_modality_dropout: 0.1 + modality_embed_dim: 1176 # changed from 768 changed from 1536 + + head_specs: + mecka_bimanual: + _target_: egomimic.models.fm_policy.FMPolicy + action_horizon: 100 + num_inference_steps: 50 + pooling: null + time_dist: "beta" + infer_ac_dims: + mecka_bimanual: 12 + model: + _target_: egomimic.models.denoising_nets.CrossTransformer + nblocks: 6 + cond_dim: 1176 # changed from 256 changed from 1536 + hidden_dim: 372 #changed from 128 + act_dim: 12 + act_seq: 100 + n_heads: 6 # changed from 4 changed from 16 + dropout: 0.1 + mlp_layers: 6 # edit num of mlp layers + mlp_ratio: 6 + + encoder_specs: + front_img_1: + _target_: egomimic.models.hpt_nets.ResNet + output_dim: 1176 # changed from 512 changed from 1536 + + + train_image_augs: + _target_: torchvision.transforms.Compose + transforms: + - _target_: torchvision.transforms.ColorJitter + brightness: 0.1 + contrast: 0.1 + saturation: 0.1 + hue: 0.05 + - _target_: torchvision.transforms.Normalize + mean: [0.485, 0.456, 0.406] + std: [0.229, 0.224, 0.225] + eval_image_augs: + _target_: torchvision.transforms.Compose + transforms: + - _target_: torchvision.transforms.Normalize + mean: [0.485, 0.456, 0.406] + std: [0.229, 0.224, 0.225] + +optimizer: + _target_: torch.optim.AdamW + _partial_: true + lr: 1e-4 + weight_decay: 0.0001 + +scheduler: + _target_: torch.optim.lr_scheduler.CosineAnnealingLR + _partial_: true + T_max: 5500 + eta_min: 1e-5 diff --git a/egomimic/hydra_configs/train.yaml b/egomimic/hydra_configs/train.yaml index 828a21b6..931b780c 100644 --- a/egomimic/hydra_configs/train.yaml +++ b/egomimic/hydra_configs/train.yaml @@ -1,102 +1,96 @@ -defaults: - - model: hpt_bc_flow_eva + defaults: + - model: hpt_bc_flow_mecka_1B - paths: default - trainer: ddp - debug: null - logger: wandb - - data: eva_bc_s3 + - data: mecka_all - callbacks: checkpoints - - override hydra/launcher: submitit + - override hydra/launcher: submitit_slurm - _self_ -name: test -description: test -ckpt_path: null -train: true -eval: false + name: test + description: test + ckpt_path: null + train: true + eval: false -eval_class: - _target_ : egomimic.scripts.evaluation.Eve - mode: real - arm: both - eval_path: "./logs/eval/${name}_${now:%Y-%m-%d_%H-%M-%S}" + eval_class: + _target_ : egomimic.scripts.evaluation.Eve + mode: real + arm: both + eval_path: "./logs/eval/${name}_${now:%Y-%m-%d_%H-%M-%S}" -hydra: - run: - # Dir should be experiment_name/description_{timestamp} - dir: ./logs/${name}/${description}_${now:%Y-%m-%d_%H-%M-%S} - sweep: - dir: ./logs/${name}/${description}_${now:%Y-%m-%d_%H-%M-%S} + hydra: + run: + # Dir should be experiment_name/description_{timestamp} + dir: ./logs/${name}/${description}_${now:%Y-%m-%d_%H-%M-%S} + sweep: + dir: ./logs/${name}/${description}_${now:%Y-%m-%d_%H-%M-%S} -launch_params: - gpus_per_node: 1 - nodes: 1 + launch_params: + gpus_per_node: 2 + nodes: 1 -data_schematic: # Dynamically fill in these shapes from the dataset - _target_: egomimic.rldb.utils.DataSchematic - norm_mode: quantile - schematic_dict: - eva_bimanual: - front_img_1: #batch key - key_type: camera_keys # key type - lerobot_key: observations.images.front_img_1 # dataset key - right_wrist_img: - key_type: camera_keys - lerobot_key: observations.images.right_wrist_img - left_wrist_img: - key_type: camera_keys - lerobot_key: observations.images.left_wrist_img - ee_pose: - key_type: proprio_keys - lerobot_key: observations.state.ee_pose - joint_positions: - key_type: proprio_keys - lerobot_key: observations.state.joint_positions - actions_joints: - key_type: action_keys - lerobot_key: actions_joints - actions_cartesian: - key_type: action_keys - lerobot_key: actions_cartesian - embodiment: - key_type: metadata_keys - lerobot_key: metadata.embodiment - aria_bimanual: - front_img_1: - key_type: camera_keys - lerobot_key: observations.images.front_img_1 - ee_pose: - key_type: proprio_keys - lerobot_key: observations.state.ee_pose - actions_cartesian: - key_type: action_keys - lerobot_key: actions_cartesian - embodiment: - key_type: metadata_keys - lerobot_key: metadata.embodiment - mecka_bimanual: - front_img_1: - key_type: camera_keys - lerobot_key: observations.images.front_img_1 - ee_pose: - key_type: proprio_keys - lerobot_key: observations.state.ee_pose_cam - actions_cartesian: - key_type: action_keys - lerobot_key: actions_ee_cartesian_cam - actions_keypoints: - key_type: action_keys - lerobot_key: actions_ee_keypoints_world - actions_head_cartesian: - key_type: action_keys - lerobot_key: actions_head_cartesian_world - embodiment: - key_type: metadata_keys - lerobot_key: metadata.embodiment - viz_img_key: - eva_bimanual: - front_img_1 - aria_bimanual: - front_img_1 - mecka_bimanual: - front_img_1 + data_schematic: # Dynamically fill in these shapes from the dataset + _target_: egomimic.rldb.utils.DataSchematic + norm_mode: quantile + schematic_dict: + eva_bimanual: + front_img_1: #batch key + key_type: camera_keys # key type + lerobot_key: observations.images.front_img_1 # dataset key + right_wrist_img: + key_type: camera_keys + lerobot_key: observations.images.right_wrist_img + left_wrist_img: + key_type: camera_keys + lerobot_key: observations.images.left_wrist_img + ee_pose: + key_type: proprio_keys + lerobot_key: observations.state.ee_pose + joint_positions: + key_type: proprio_keys + lerobot_key: observations.state.joint_positions + actions_joints: + key_type: action_keys + lerobot_key: actions_joints + actions_cartesian: + key_type: action_keys + lerobot_key: actions_cartesian + embodiment: + key_type: metadata_keys + lerobot_key: metadata.embodiment + aria_bimanual: + front_img_1: + key_type: camera_keys + lerobot_key: observations.images.front_img_1 + ee_pose: + key_type: proprio_keys + lerobot_key: observations.state.ee_pose + actions_cartesian: + key_type: action_keys + lerobot_key: actions_cartesian + embodiment: + key_type: metadata_keys + lerobot_key: metadata.embodiment + mecka_bimanual: + front_img_1: + key_type: camera_keys + lerobot_key: observations.images.front_img_1 + ee_pose: + key_type: proprio_keys + lerobot_key: observations.state.ee_pose_cam + actions_cartesian: + key_type: action_keys + lerobot_key: actions_ee_cartesian_cam + embodiment: + key_type: metadata_keys + lerobot_key: metadata.embodiment + viz_img_key: + eva_bimanual: + front_img_1 + aria_bimanual: + front_img_1 + mecka_bimanual: + front_img_1 diff --git a/egomimic/hydra_configs/trainer/default.yaml b/egomimic/hydra_configs/trainer/default.yaml index a6b47e35..72e3e992 100644 --- a/egomimic/hydra_configs/trainer/default.yaml +++ b/egomimic/hydra_configs/trainer/default.yaml @@ -2,7 +2,7 @@ _target_: lightning.pytorch.trainer.Trainer default_root_dir: ${paths.output_dir} -max_epochs: 2000 +max_epochs: 8000 min_epochs: 2000 accelerator: cpu @@ -11,9 +11,9 @@ devices: 1 # mixed precision for extra speed-up precision: bf16 limit_train_batches: 100 -limit_val_batches: 300 +limit_val_batches: 200 # perform a validation loop every N training epochs -check_val_every_n_epoch: 200 +check_val_every_n_epoch: 500 # set True to to ensure deterministic results # makes training slower but gives more reproducibility than just setting seeds diff --git a/egomimic/pl_utils/pl_sampler.py b/egomimic/pl_utils/pl_sampler.py new file mode 100644 index 00000000..ab176640 --- /dev/null +++ b/egomimic/pl_utils/pl_sampler.py @@ -0,0 +1,40 @@ +from torch.utils.data import Sampler +from torch.utils.data import DataLoader, random_split, default_collate +from lightning.pytorch.utilities.combined_loader import CombinedLoader +from lightning import LightningDataModule +from transformers import AutoTokenizer +from egomimic.utils.egomimicUtils import nds +import json +import os +import logging +from egomimic.rldb.utils import RLDBDataset +from termcolor import cprint +import torch + +class EpisodeValBatchSampler(Sampler[list[int]]): + """ + Validation: each step corresponds to one full episode (all its frames). + Each rank gets a disjoint subset of episodes. + """ + def __init__(self, dataset, rank=0, world_size=1): + self.dataset = dataset + self.rank = rank + self.world_size = world_size + + episode_to_frames = {} + for i in range(len(dataset)): + ep = dataset[i]["episode_index"] + episode_to_frames.setdefault(ep, []).append(i) + + self.episode_to_frames = episode_to_frames + self.episodes = sorted(episode_to_frames.keys()) + self.num_episodes = len(self.episodes) + + self.episodes_rank = [ep for ep in self.episodes if (ep % world_size) == rank] + + def __iter__(self): + for ep in self.episodes_rank: + yield self.episode_to_frames[ep] + + def __len__(self): + return len(self.episodes_rank) diff --git a/egomimic/rldb/utils.py b/egomimic/rldb/utils.py index 385c0174..37d4f5f9 100644 --- a/egomimic/rldb/utils.py +++ b/egomimic/rldb/utils.py @@ -26,7 +26,10 @@ import datasets.config as ds_cfg from datasets import DatasetDict, concatenate_datasets from datasets.utils.logging import disable_progress_bar -from lerobot.common.datasets.lerobot_dataset import LeRobotDataset, LeRobotDatasetMetadata +from lerobot.common.datasets.lerobot_dataset import ( + LeRobotDataset, + LeRobotDatasetMetadata, +) # Local EgoMimic Imports from egomimic.utils.aws.aws_sql import ( @@ -64,7 +67,6 @@ class EMBODIMENT(Enum): SEED = 42 - EMBODIMENT_ID_TO_KEY = { member.value: key for key, member in EMBODIMENT.__members__.items() } @@ -156,10 +158,10 @@ def __init__( percent=0.1, mode="train", valid_ratio: float = 0.2, + use_annotations=False, **kwargs, - ): - - + ): + logger.info(f"Loading RLDB dataset from {root}") dataset_meta = LeRobotDatasetMetadata( repo_id=repo_id, root=root, local_files_only=local_files_only ) @@ -233,7 +235,7 @@ def __init__( ) annotation_path = Path(root) / "annotations" - if annotation_path.is_dir(): + if annotation_path.is_dir() and use_annotations: self.annotations = AnnotationLoader(root=root) self.annotation_df = self.annotations.df else: @@ -320,15 +322,14 @@ def __getitem__(self, idx): frame_item = self.hf_dataset[frame_idx] frame_time = float(frame_item["timestamp"]) - frame_item["annotations"] = self._get_frame_annotation( - episode_idx=ep_idx, - frame_time=frame_time, - ) + if self._get_frame_annotation is not None: + frame_item["annotations"] = self._get_frame_annotation( + episode_idx=ep_idx, + frame_time=frame_time, + ) return frame_item - - def _get_frame_annotation( self, episode_idx: int, @@ -369,7 +370,6 @@ def _get_frame_annotation( return "" - def _slow_down_sequence(self, seq, rot_spec=None): """ Slow down a sequence of shape (S, D) along the time dimension S. @@ -526,8 +526,12 @@ def _merge_hf_datasets(self): hf_dataset = hf_dataset.rename_columns(key_map) dataset_list.append(hf_dataset) - - merged_dataset = concatenate_datasets(dataset_list) + + try: + merged_dataset = concatenate_datasets(dataset_list) + except Exception as e: + logger.error(f"Failed to merge datasets: {e}") + return None return merged_dataset @@ -612,7 +616,10 @@ def __init__( logger.warning(f"Skipped {len(skipped)} datasets: {skipped}") -def _fmt_bytes(n: int) -> str: +_PROCESS = psutil.Process() + + +def _fmt_bytes(n: float) -> str: for unit in ("B", "KB", "MB", "GB", "TB"): if n < 1024: return f"{n:.2f}{unit}" @@ -621,146 +628,153 @@ def _fmt_bytes(n: int) -> str: def _log_mem(tag: str): - """ - Logs CPU + (optional) CUDA memory. - No side effects, safe in DDP / threads. - """ + """Logs CPU + (optional) CUDA memory safely.""" try: mi = _PROCESS.memory_info() - msg = ( - f"[MEM] {tag} | " - f"RSS={_fmt_bytes(mi.rss)} | " - f"VMS={_fmt_bytes(mi.vms)}" - ) + msg = f"[MEM] {tag} | RSS={_fmt_bytes(mi.rss)} | VMS={_fmt_bytes(mi.vms)}" if torch.cuda.is_available(): msg += ( f" | CUDA_alloc={_fmt_bytes(torch.cuda.memory_allocated())}" f" | CUDA_reserved={_fmt_bytes(torch.cuda.memory_reserved())}" ) - logger.info(msg) except Exception as e: logger.warning(f"[MEM] failed at {tag}: {e}") class S3RLDBDataset(MultiRLDBDataset): - """ - A dataset class that downloads datasets from AWS S3 and instantiates them as RLDBDataset objects - """ - def __init__( self, embodiment, mode, + local_mode=False, + use_annotations=False, bucket_name="rldb", main_prefix="processed_v2", - percent=0.1, + sample_percent=1.0, # Samples % of total available collection paths + percent=1.0, # Samples % of frames within each loaded RLDBDataset local_files_only=True, key_map=None, valid_ratio=0.2, - temp_root="/coc/flash7/scratch/egoverseS3Dataset", - cache_root="/coc/flash7/scratch/.cache", - filters={}, + temp_root="/storage/project/r-dxu345-0/shared/egoverse_datasets/S3_rldb_data/S3_rldb_data", # "/coc/flash7/scratch/rldb_temp" + cache_root="/storage/project/r-dxu345-0/shared/.cache", + filters=None, debug=False, **kwargs, ): _log_mem("init:start") - - temp_root += "/S3_rldb_data" + logger.info(f"Summary Dataset and S3RLDBDataset instantiation: {filters} {embodiment} {mode} {local_mode} {use_annotations} {bucket_name} {main_prefix} {sample_percent} {percent} {local_files_only} {key_map} {valid_ratio} {temp_root} {cache_root} {debug} {kwargs}") + filters = filters or {} filters["robot_name"] = embodiment filters["is_deleted"] = False + # Environment & Cache Setup os.environ["HF_HOME"] = cache_root os.environ["HF_DATASETS_CACHE"] = f"{cache_root}/datasets" - ds_cfg.HF_DATASETS_CACHE = os.environ["HF_DATASETS_CACHE"] huggingface_hub.constants.HF_HOME = os.environ["HF_HOME"] - if temp_root[0] != "/": - temp_root = "/" + temp_root - temp_root = Path(temp_root) + temp_root_path = Path(temp_root) + temp_root_path.mkdir(parents=True, exist_ok=True) - if temp_root.is_dir(): - logger.info(f"Using existing temp_root directory: {temp_root}") - else: - temp_root.mkdir() + # ------------------------------------------------------------ + # 1. Query Metadata & Subsample FIRST (No S3 Sync yet) + # ------------------------------------------------------------ + _log_mem("init:query_metadata") + all_filtered_paths = self._get_processed_path(filters) + + if not all_filtered_paths: + raise ValueError(f"No episodes matched filters: {filters}") + + if local_mode: + local_paths = [] + for ep in all_filtered_paths: + if self._episode_already_present(temp_root_path, ep[1]): + local_paths.append(ep) + all_filtered_paths = local_paths + # Deterministic shuffle for disjoint train/valid splits + random.Random(6).shuffle(all_filtered_paths) + + # Apply sample_percent to the list of paths + total_to_sample = ( + max(1, int(len(all_filtered_paths) * sample_percent)) + if sample_percent > 0 + else 0 + ) + sampled_paths = all_filtered_paths[:total_to_sample] + logger.info(f"Sampled {len(sampled_paths)} paths from {len(all_filtered_paths)} total paths.") - logger.info(f"Filters: {filters}") + # Split into disjoint subsets + num_valid = int(len(sampled_paths) * valid_ratio) + valid_paths_subset = sampled_paths[:num_valid] + train_paths_subset = sampled_paths[num_valid:] - _log_mem("init:before_sync") + if len(valid_paths_subset) == 0 or len(train_paths_subset) == 0: + raise ValueError(f"Not enough paths to split for mode: {mode}. Valid paths: {valid_paths_subset}, Train paths: {train_paths_subset}") - filtered_paths = self.sync_from_filters( - bucket_name=bucket_name, - filters=filters, - local_dir=temp_root, - ) - - _log_mem("init:after_sync") + # Select which subset we actually need to download and load + if mode == "train": + paths_to_process = train_paths_subset + elif mode == "valid": + paths_to_process = valid_paths_subset + elif mode in ["total", "percent"]: + paths_to_process = sampled_paths + else: + raise ValueError(f"Unknown mode: {mode}") - search_path = temp_root + # ------------------------------------------------------------ + # 2. Sync ONLY the required paths from S3 + # ------------------------------------------------------------ + _log_mem("init:before_sync") + logger.info( + f"Syncing {len(paths_to_process)} sampled episodes for mode '{mode}' to {temp_root_path}" + ) - valid_collection_names = {h for _, h in filtered_paths} + if mode == "local": + pass + else: + self._sync_s3_to_local( + bucket_name=bucket_name, + s3_paths=paths_to_process, + local_dir=temp_root_path, + ) + _log_mem("init:after_sync") + # ------------------------------------------------------------ + # 3. Parallel Load + # ------------------------------------------------------------ + valid_collection_names = {h for _, h in paths_to_process} max_workers = int(os.environ.get("RLDB_LOAD_WORKERS", "10")) _log_mem("init:before_parallel_load") - - if mode in ["train", "valid", "percent"]: - mode = "total" - - datasets = {} datasets, skipped = self._load_rldb_datasets_parallel( - search_path=search_path, + search_path=temp_root_path, embodiment=embodiment, valid_collection_names=valid_collection_names, local_files_only=local_files_only, - percent=percent, + percent=percent, # Frame-level sampling valid_ratio=valid_ratio, max_workers=max_workers, debug=debug, + use_annotations=use_annotations, kwargs=kwargs, ) - _log_mem("init:after_parallel_load") - assert datasets, "No valid RLDB datasets found! Check your S3 path and filters." + if datasets is None: + raise ValueError("No datasets loaded during parallel load.") - self.train_collections, self.valid_collections = split_dataset_names( - datasets.keys(), valid_ratio=valid_ratio, seed=SEED - ) - - if mode == "train": - chosen = self.train_collections - elif mode == "valid": - chosen = self.valid_collections - elif mode == "total": - chosen = set(datasets.keys()) - elif mode == "percent": - all_names = sorted(datasets.keys()) - rng = random.Random(SEED) - rng.shuffle(all_names) - n_keep = max(1, int(len(all_names) * percent)) if percent > 0 else 0 - chosen = set(all_names[:n_keep]) - else: - raise ValueError(f"Unknown mode: {mode}") - - datasets = {rid: ds for rid, ds in datasets.items() if rid in chosen} - assert datasets, "No datasets left after applying mode split." - - key_map_per_dataset = ( - {repo_id: key_map for repo_id in datasets} if key_map else None - ) + key_map = {repo_id: key_map for repo_id in datasets} if key_map else None super().__init__( datasets=datasets, embodiment=embodiment, - key_map=key_map_per_dataset, + key_map=key_map, ) if skipped: - logger.warning(f"Skipped {len(skipped)}") - + logger.warning(f"Skipped {len(skipped)} datasets during parallel load.") _log_mem("init:done") @classmethod @@ -773,6 +787,7 @@ def _load_rldb_dataset_one( percent: float, valid_ratio: float, kwargs: dict, + use_annotations: bool, ): repo_id = collection_path.name @@ -787,17 +802,29 @@ def _load_rldb_dataset_one( mode="total", percent=percent, valid_ratio=valid_ratio, + use_annotations=use_annotations, **kwargs, ) expected = get_embodiment_id(embodiment) if ds_obj.embodiment != expected: - return repo_id, None, f"embodiment_mismatch {ds_obj.embodiment} != {expected}", None + return ( + repo_id, + None, + "embodiment_mismatch", + f"{ds_obj.embodiment} != {expected}", + ) return repo_id, ds_obj, None, None except Exception as e: - return repo_id, None, "exception", f"{e}\n{traceback.format_exc()}" + return ( + repo_id, + None, + "exception", + f"{type(e).__name__}: {e}\n{traceback.format_exc()}", + ) + @classmethod def _load_rldb_datasets_parallel( @@ -811,123 +838,109 @@ def _load_rldb_datasets_parallel( valid_ratio: float, max_workers: int, debug: bool = False, + use_annotations: bool, kwargs: dict, batch_size: int = 10, ): _log_mem("parallel_load:start") - max_workers = max(1, int(max_workers)) - process = psutil.Process() - - datasets: dict[str, RLDBDataset] = {} - skipped: list[str] = [] if debug: logger.info("Debug mode: limiting to 10 datasets.") valid_collection_names = set(list(valid_collection_names)[:10]) - def _submit_arg(p: Path): - return dict( - collection_path=p, - embodiment=embodiment, - local_files_only=local_files_only, - percent=percent, - valid_ratio=valid_ratio, - kwargs=kwargs, - ) - - def get_memory(): - mi = process.memory_info() - return mi.rss, mi.vms - - # ------------------------------------------------------------ - # Pre-filter valid dataset paths - # ------------------------------------------------------------ valid_paths = [ search_path / name for name in valid_collection_names if (search_path / name).is_dir() ] + total = len(valid_paths) + datasets: dict[str, RLDBDataset] = {} + skipped: list[str] = [] logger.info( - f"Starting parallel RLDB load: " - f"{total} datasets | workers={max_workers} | batch_size={batch_size}" + f"Starting parallel RLDB load: {total} datasets | workers={max_workers}" ) - # Hard safety cap (prevents unrecoverable thread failures) - vm_total = psutil.virtual_memory().total - VMS_ABORT_FRAC = 0.90 - - with tqdm(total=total, desc="Loading RLDBDataset") as dataset_bar, \ + with ( + tqdm(total=total, desc="Loading RLDBDataset") as dataset_bar, tqdm( - total=1, - bar_format="RSS Mem: {bar} {n:.1f}MB", - position=1, - leave=True, - ) as rss_bar, \ + total=1, bar_format="RSS Mem: {bar} {n:.1f}MB", position=1, leave=True + ) as rss_bar, tqdm( - total=1, - bar_format="VMS Mem: {bar} {n:.1f}MB", - position=2, - leave=True, - ) as vms_bar: - - # ------------------------------------------------------------ - # SINGLE executor reused for entire load - # ------------------------------------------------------------ + total=1, bar_format="VMS Mem: {bar} {n:.1f}MB", position=2, leave=True + ) as vms_bar, + ): with ThreadPoolExecutor(max_workers=max_workers) as executor: in_flight = set() path_iter = iter(valid_paths) - def submit_one(p: Path): + def _submit(p: Path): return executor.submit( cls._load_rldb_dataset_one, - **_submit_arg(p), + collection_path=p, + embodiment=embodiment, + local_files_only=local_files_only, + percent=percent, + valid_ratio=valid_ratio, + use_annotations=use_annotations, + kwargs=kwargs, ) - # Prime the pipeline - try: - for _ in range(min(batch_size, total)): - in_flight.add(submit_one(next(path_iter))) - except StopIteration: - pass + # Prime pipeline + for _ in range(min(batch_size, total)): + try: + in_flight.add(_submit(next(path_iter))) + except StopIteration: + break while in_flight: done, in_flight = concurrent.futures.wait( - in_flight, - return_when=concurrent.futures.FIRST_COMPLETED, + in_flight, return_when=concurrent.futures.FIRST_COMPLETED ) for fut in done: - repo_id, ds_obj, reason, err = fut.result() - - if ds_obj is not None: + try: + repo_id, ds_obj, reason, err = fut.result() + except Exception as e: + msg = ( + "[FUTURE FAILURE]\n" + f"{type(e).__name__}: {e}\n{traceback.format_exc()}" + ) + tqdm.write(msg) + logger.exception(msg) + dataset_bar.update(1) + continue + + if ds_obj: datasets[repo_id] = ds_obj else: + msg = ( + "\n[DATASET SKIPPED]\n" + f"repo_id: {repo_id}\n" + f"reason: {reason}\n" + f"error:\n{err if err else 'None'}\n" + ) + tqdm.write(msg) + logger.error(msg) + if reason != "not_a_dir": skipped.append(repo_id) - if err: - logger.debug(f"[SKIP] {repo_id}: {err}") dataset_bar.update(1) - # Submit next task if available try: - in_flight.add(submit_one(next(path_iter))) + in_flight.add(_submit(next(path_iter))) except StopIteration: pass - # ---------------------------------------------------- - # Memory monitoring & safety check - # ---------------------------------------------------- - rss, vms = get_memory() - rss_bar.n = rss / 1e6 - vms_bar.n = vms / 1e6 + # Memory monitoring + mi = _PROCESS.memory_info() + rss_bar.n, vms_bar.n = mi.rss / 1e6, mi.vms / 1e6 rss_bar.refresh() vms_bar.refresh() - _log_mem("parallel_load:end") return datasets, skipped @@ -938,104 +951,94 @@ def _get_processed_path(filters): df = episode_table_to_df(engine) series = pd.Series(filters) - output = df.loc[ - (df[list(filters)] == series).all(axis=1), - ["processed_path", "episode_hash"], - ] - - skipped = df[df["processed_path"].isnull()]["episode_hash"].tolist() - # logger.info(f"Skipped {len(skipped)} episodes with null processed_path: {skipped}") - - output = output[~output["episode_hash"].isin(skipped)] - paths = list(output.itertuples(index=False, name=None)) - - # logger.info(f"Paths: {paths}") - return paths + mask = (df[list(filters)] == series).all(axis=1) + output = df.loc[mask, ["processed_path", "episode_hash"]].dropna( + subset=["processed_path"] + ) + return list(output.itertuples(index=False, name=None)) @classmethod def _sync_s3_to_local(cls, bucket_name, s3_paths, local_dir: Path): _log_mem("s3_sync:start") - - if not s3_paths: - return - - to_sync = [] - for processed_path, episode_hash in s3_paths: - if not cls._episode_already_present(local_dir, episode_hash): - to_sync.append((processed_path, episode_hash)) - + to_sync = [ + p for p in s3_paths if not cls._episode_already_present(local_dir, p[1]) + ] if not to_sync: logger.info("Nothing to sync from S3.") return local_dir.mkdir(parents=True, exist_ok=True) - with tempfile.NamedTemporaryFile(prefix="_s5cmd_sync_", suffix=".txt", delete=False) as f: + # Create the full batch file as before for maximum s5cmd efficiency + with tempfile.NamedTemporaryFile( + prefix="_s5cmd_", suffix=".txt", delete=False + ) as f: + lines = [] + for src_path, episode_hash in to_sync: + src = ( + src_path + if src_path.startswith("s3://") + else f"s3://{bucket_name}/{src_path.lstrip('/')}" + ) + lines.append( + f'sync "{src.rstrip("/")}/*" "{local_dir / episode_hash}/"' + ) + f.write("\n".join(lines).encode()) batch_path = Path(f.name) - lines = [] - for processed_path, episode_hash in to_sync: - if processed_path.startswith("s3://"): - src = processed_path.rstrip("/") + "/*" - else: - src = f"s3://{bucket_name}/{processed_path.lstrip('/').rstrip('/')}" + "/*" - - dst = local_dir / episode_hash - lines.append(f'sync "{src}" "{dst}/"') - - batch_path.write_text("\n".join(lines) + "\n") + logger.info(f"Syncing {len(to_sync)} episodes via s5cmd...") + + # Execute s5cmd and stream output to tqdm + # We use a simple counter for files synced since we don't know the exact file count per episode easily + with tqdm(total=len(to_sync), desc="S3 Sync Progress", unit="ep") as pbar: + process = subprocess.Popen( + ["s5cmd", "run", str(batch_path)], + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + bufsize=1, + ) - _log_mem("s3_sync:before_s5cmd") - subprocess.run(["s5cmd", "run", str(batch_path)], check=True) + # We look for the completion of a folder/sync operation in the logs + # s5cmd logs individual files, but we'll increment the bar based on unique episode hashes seen + completed_hashes = set() + for line in process.stdout: + # s5cmd output usually contains the destination path + # We check which episode hash is mentioned in the current log line + for _, episode_hash in to_sync: + if episode_hash in line and episode_hash not in completed_hashes: + completed_hashes.add(episode_hash) + pbar.update(1) + break + + process.wait() + if process.returncode != 0: + raise subprocess.CalledProcessError(process.returncode, process.args) + + batch_path.unlink(missing_ok=True) _log_mem("s3_sync:after_s5cmd") - try: - batch_path.unlink(missing_ok=True) - except Exception: - pass - @classmethod def _episode_already_present(cls, local_dir: Path, episode_hash: str) -> bool: ep = local_dir / episode_hash - meta = ep / "meta" - chunk0 = ep / "data" / "chunk-000" - - if not meta.is_dir() or not chunk0.is_dir(): - return False - + meta, chunk0 = ep / "meta", ep / "data" / "chunk-000" try: - if not any(meta.iterdir()): - return False - if not any(chunk0.iterdir()): - return False - except FileNotFoundError: + return all(d.is_dir() and any(d.iterdir()) for d in [meta, chunk0]) + except (FileNotFoundError, StopIteration): return False - return True - @classmethod - def sync_from_filters( - cls, - *, - bucket_name: str, - filters: dict, - local_dir: Path, - ): + def sync_from_filters(cls, *, bucket_name: str, filters: dict, local_dir: Path): _log_mem("sync_from_filters:start") - filtered_paths = cls._get_processed_path(filters) if not filtered_paths: logger.warning("No episodes matched filters.") return [] logger.info(f"Syncing S3 datasets with filters {filters} to {local_dir}") - cls._sync_s3_to_local( - bucket_name=bucket_name, - s3_paths=filtered_paths, - local_dir=local_dir, + bucket_name=bucket_name, s3_paths=filtered_paths, local_dir=local_dir ) - _log_mem("sync_from_filters:end") return filtered_paths @@ -1191,6 +1194,9 @@ def infer_norm_from_dataset(self, dataset): memory_usage = psutil.Process().memory_info().rss / (1024**2) logger.info(f"Memory usage before column processing: {memory_usage:.2f} MB") + memory_usage = psutil.Process().memory_info().rss / (1024**2) + logger.info(f"Memory usage before column processing: {memory_usage:.2f} MB") + column_name = self.keyname_to_lerobot_key(column, embodiment) logger.info(f"[NormStats] Processing column={column_name}") @@ -1202,6 +1208,9 @@ def infer_norm_from_dataset(self, dataset): memory_usage = psutil.Process().memory_info().rss / (1024**2) logger.info(f"Memory usage before mean calculation: {memory_usage:.2f} MB") + memory_usage = psutil.Process().memory_info().rss / (1024**2) + logger.info(f"Memory usage before mean calculation: {memory_usage:.2f} MB") + if column_data.ndim not in (2, 3): raise ValueError( f"Column {column} has shape {column_data.shape}, " @@ -1416,5 +1425,3 @@ def unnormalize_data(self, data, embodiment): denorm_data[key] = tensor return denorm_data - - diff --git a/egomimic/scripts/change_mecka.py b/egomimic/scripts/change_mecka.py new file mode 100644 index 00000000..937eefd8 --- /dev/null +++ b/egomimic/scripts/change_mecka.py @@ -0,0 +1,136 @@ +#!/usr/bin/env python3 +""" +Fix processed_path in the SQL episode table. + +Goal: +- For rows where processed_path contains "mecka" (case-insensitive), +- ensure processed_path starts with: s3://rldb/ + +This script only updates the SQL table (no S3 file edits). +""" + +import argparse +from typing import Optional + +import pandas as pd + +# Add parent directory to path to import egomimic modules (match your repo layout) +import sys +from pathlib import Path +sys.path.insert(0, str(Path(__file__).parent.parent.parent.parent)) + +from egomimic.utils.aws.aws_sql import ( + create_default_engine, + episode_table_to_df, + episode_hash_to_table_row, + update_episode, +) + + +TARGET_PREFIX = "s3://rldb/" + + +def normalize_to_rldb_s3_uri(p: Optional[str]) -> Optional[str]: + if p is None: + return None + if isinstance(p, float) and pd.isna(p): + return p + + p = str(p).strip() + + # Already a proper s3 uri + if p.startswith("s3://"): + return p + + # Special case: your data sometimes stores "rldb:/..." (or "rldb:...") + # Desired: "s3://rldb:/..." + if p.startswith("rldb:"): + return "s3://" + p + + # Otherwise treat as path/key and force into s3://rldb/ + key = p.lstrip("/") + return f"s3://rldb/{key}" + + +def main(): + parser = argparse.ArgumentParser(description="Fix processed_path to start with s3://rldb/ for mecka rows") + parser.add_argument("--dry-run", action="store_true", help="Print changes without writing to DB") + parser.add_argument("--contains", type=str, default="mecka", help="Substring filter for processed_path (default: mecka)") + parser.add_argument("--case-sensitive", action="store_true", help="Make substring match case-sensitive") + parser.add_argument("--limit", type=int, default=None, help="Limit number of rows updated (for testing)") + args = parser.parse_args() + + engine = create_default_engine() + + df = episode_table_to_df(engine) + + if "processed_path" not in df.columns: + raise RuntimeError("processed_path column not found in episode table dataframe") + + # Filter rows where processed_path contains substring (default 'mecka') + # str.contains supports na=False to treat NaNs as False during filtering [web:49] + mask = df["processed_path"].astype("string").str.contains( + args.contains, + case=args.case_sensitive, + na=False, + regex=False, + ) + df = df[mask].copy() + + if args.limit is not None: + df = df.head(args.limit) + + if df.empty: + print("No rows matched; nothing to do.") + return 0 + + total = len(df) + print(f"Matched {total} rows where processed_path contains '{args.contains}'") + + changed = 0 + skipped = 0 + errors = 0 + + for i, row in enumerate(df.itertuples(index=False), 1): + episode_hash = getattr(row, "episode_hash") + old_path = getattr(row, "processed_path") + + new_path = normalize_to_rldb_s3_uri(old_path) + + if old_path == new_path: + skipped += 1 + continue + + if args.dry_run: + print(f"[{i}/{total}] DRY RUN {episode_hash}:") + print(f" old: {old_path}") + print(f" new: {new_path}") + changed += 1 + continue + + try: + table_row = episode_hash_to_table_row(engine, episode_hash) + if table_row is None: + print(f"[{i}/{total}] WARNING: episode_hash not found in SQL table: {episode_hash}") + errors += 1 + continue + + table_row.processed_path = new_path + update_episode(engine, table_row) + + print(f"[{i}/{total}] UPDATED {episode_hash}") + changed += 1 + except Exception as e: + print(f"[{i}/{total}] ERROR updating {episode_hash}: {e}") + errors += 1 + + print("\nDone.") + print(f" Changed: {changed}") + print(f" Skipped (already ok): {skipped}") + print(f" Errors: {errors}") + + return 0 if errors == 0 else 1 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/egomimic/utils/aws/sql_tutorial.ipynb b/egomimic/utils/aws/sql_tutorial.ipynb index 78f6edc8..4bfd4c8a 100644 --- a/egomimic/utils/aws/sql_tutorial.ipynb +++ b/egomimic/utils/aws/sql_tutorial.ipynb @@ -160,21 +160,31 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 2, "id": "cbb7d2a8", "metadata": {}, "outputs": [], "source": [ - "def delete_episodes_by_task(task_name: str):\n", + "def delete_episodes_by_task(hash: str):\n", " episodes_tbl = Table(\"episodes\", MetaData(), autoload_with=engine, schema=\"app\")\n", " stmt = (\n", " update(episodes_tbl)\n", - " .where(episodes_tbl.c.task == task_name)\n", + " .where(episodes_tbl.c.episode_hash == hash)\n", " .values(is_deleted=True)\n", " )\n", " with engine.begin() as conn:\n", " conn.execute(stmt)" ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "33681321", + "metadata": {}, + "outputs": [], + "source": [ + "delete_episodes_by_task(\"692bb2f4eb589f6e17676d3b\")" + ] } ], "metadata": { @@ -193,7 +203,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.11.14" + "version": "3.11.13" } }, "nbformat": 4, diff --git a/external/lerobot/lerobot/common/datasets/lerobot_dataset.py b/external/lerobot/lerobot/common/datasets/lerobot_dataset.py index 5234abf5..4463c23e 100644 --- a/external/lerobot/lerobot/common/datasets/lerobot_dataset.py +++ b/external/lerobot/lerobot/common/datasets/lerobot_dataset.py @@ -76,7 +76,8 @@ # For maintainers, see lerobot/common/datasets/push_dataset_to_hub/CODEBASE_VERSION.md CODEBASE_VERSION = "v2.0" -LEROBOT_HOME = Path(os.environ.get("HF_HOME", "~/.cache/huggingface")).expanduser() / "lerobot" +LEROBOT_HOME = Path(os.environ.get( + "HF_HOME", "~/.cache/huggingface")).expanduser() / "lerobot" SEED = 42 @@ -125,12 +126,14 @@ def _version(self) -> str: def get_data_file_path(self, ep_index: int) -> Path: ep_chunk = self.get_episode_chunk(ep_index) - fpath = self.data_path.format(episode_chunk=ep_chunk, episode_index=ep_index) + fpath = self.data_path.format( + episode_chunk=ep_chunk, episode_index=ep_index) return Path(fpath) def get_video_file_path(self, ep_index: int, vid_key: str) -> Path: ep_chunk = self.get_episode_chunk(ep_index) - fpath = self.video_path.format(episode_chunk=ep_chunk, video_key=vid_key, episode_index=ep_index) + fpath = self.video_path.format( + episode_chunk=ep_chunk, video_key=vid_key, episode_index=ep_index) return Path(fpath) def get_episode_chunk(self, ep_index: int) -> int: @@ -281,10 +284,12 @@ def _update_splits(self, seed: int = SEED, valid_ratio: float = 0.2) -> None: random.seed(seed) random.shuffle(all_indices) - valid_size = max(1, min(int(valid_ratio * total_episodes), total_episodes - 1)) + valid_size = max( + 1, min(int(valid_ratio * total_episodes), total_episodes - 1)) # Assign indices to train and valid splits - self.info["splits"] = {"train": all_indices[valid_size:], "valid": all_indices[:valid_size]} + self.info["splits"] = { + "train": all_indices[valid_size:], "valid": all_indices[:valid_size]} def write_video_info(self) -> None: """ @@ -293,7 +298,8 @@ def write_video_info(self) -> None: """ for key in self.video_keys: if not self.features[key].get("info", None): - video_path = self.root / self.get_video_file_path(ep_index=0, vid_key=key) + video_path = self.root / \ + self.get_video_file_path(ep_index=0, vid_key=key) self.info["features"][key]["info"] = get_video_info(video_path) write_json(self.info, self.root / INFO_PATH) @@ -344,7 +350,8 @@ def create( features = {**features, **DEFAULT_FEATURES} obj.tasks, obj.stats, obj.episodes = {}, {}, [] - obj.info = create_empty_dataset_info(CODEBASE_VERSION, fps, robot_type, features, use_videos) + obj.info = create_empty_dataset_info( + CODEBASE_VERSION, fps, robot_type, features, use_videos) if len(obj.video_keys) > 0 and not use_videos: raise ValueError() write_json(obj.info, obj.root / INFO_PATH) @@ -489,23 +496,29 @@ def __init__( self.root.mkdir(exist_ok=True, parents=True) # Load metadata - self.meta = LeRobotDatasetMetadata(self.repo_id, self.root, self.local_files_only) + self.meta = LeRobotDatasetMetadata( + self.repo_id, self.root, self.local_files_only) # Check version - check_version_compatibility(self.repo_id, self.meta._version, CODEBASE_VERSION) + check_version_compatibility( + self.repo_id, self.meta._version, CODEBASE_VERSION) # Load actual data self.download_episodes(download_videos) self.hf_dataset = self.load_hf_dataset() - self.episode_data_index = get_episode_data_index(self.meta.episodes, self.episodes) + self.episode_data_index = get_episode_data_index( + self.meta.episodes, self.episodes) # Check timestamps - check_timestamps_sync(self.hf_dataset, self.episode_data_index, self.fps, self.tolerance_s) + check_timestamps_sync( + self.hf_dataset, self.episode_data_index, self.fps, self.tolerance_s) # Setup delta_indices if self.delta_timestamps is not None: - check_delta_timestamps(self.delta_timestamps, self.fps, self.tolerance_s) - self.delta_indices = get_delta_indices(self.delta_timestamps, self.fps) + check_delta_timestamps(self.delta_timestamps, + self.fps, self.tolerance_s) + self.delta_indices = get_delta_indices( + self.delta_timestamps, self.fps) # Available stats implies all videos have been encoded and dataset is iterable self.consolidated = self.meta.stats is not None @@ -546,7 +559,8 @@ def push_to_hub( tags=tags, dataset_info=self.meta.info, license=license, **card_kwargs ) card.push_to_hub(repo_id=self.repo_id, repo_type="dataset") - create_branch(repo_id=self.repo_id, branch=CODEBASE_VERSION, repo_type="dataset") + create_branch(repo_id=self.repo_id, + branch=CODEBASE_VERSION, repo_type="dataset") def pull_from_repo( self, @@ -574,7 +588,8 @@ def download_episodes(self, download_videos: bool = True) -> None: files = None ignore_patterns = None if download_videos else "videos/" if self.episodes is not None: - files = [str(self.meta.get_data_file_path(ep_idx)) for ep_idx in self.episodes] + files = [str(self.meta.get_data_file_path(ep_idx)) + for ep_idx in self.episodes] if len(self.meta.video_keys) > 0 and download_videos: video_files = [ str(self.meta.get_video_file_path(ep_idx, vid_key)) @@ -583,7 +598,8 @@ def download_episodes(self, download_videos: bool = True) -> None: ] files += video_files - self.pull_from_repo(allow_patterns=files, ignore_patterns=ignore_patterns) + self.pull_from_repo(allow_patterns=files, + ignore_patterns=ignore_patterns) def load_hf_dataset(self) -> datasets.Dataset: """hf_dataset contains all the observations, states, actions, rewards, etc.""" @@ -591,8 +607,10 @@ def load_hf_dataset(self) -> datasets.Dataset: path = str(self.root / "data") hf_dataset = load_dataset("parquet", data_dir=path, split="train") else: - files = [str(self.root / self.meta.get_data_file_path(ep_idx)) for ep_idx in self.episodes] - hf_dataset = load_dataset("parquet", data_files=files, split="train") + files = [str(self.root / self.meta.get_data_file_path(ep_idx)) + for ep_idx in self.episodes] + hf_dataset = load_dataset( + "parquet", data_files=files, split="train") # TODO(aliberts): hf_dataset.set_format("torch") hf_dataset = hf_dataset.with_format("arrow") @@ -632,12 +650,14 @@ def _get_query_indices(self, idx: int, ep_idx: int) -> tuple[dict[str, list[int ep_start = self.episode_data_index["from"][ep_idx] ep_end = self.episode_data_index["to"][ep_idx] query_indices = { - key: [max(ep_start.item(), min(ep_end.item() - 1, idx + delta)) for delta in delta_idx] + key: [max(ep_start.item(), min(ep_end.item() - 1, idx + delta)) + for delta in delta_idx] for key, delta_idx in self.delta_indices.items() } padding = { # Pad values outside of current episode range f"{key}_is_pad": torch.BoolTensor( - [(idx + delta < ep_start.item()) | (idx + delta >= ep_end.item()) for delta in delta_idx] + [(idx + delta < ep_start.item()) | (idx + delta >= ep_end.item()) + for delta in delta_idx] ) for key, delta_idx in self.delta_indices.items() } @@ -651,7 +671,8 @@ def _get_query_timestamps( query_timestamps = {} for key in self.meta.video_keys: if query_indices is not None and key in query_indices: - timestamps = self.hf_dataset.select(query_indices[key])["timestamp"] + timestamps = self.hf_dataset.select( + query_indices[key])["timestamp"] query_timestamps[key] = torch.stack(timestamps).tolist() else: query_timestamps[key] = [current_ts] @@ -673,7 +694,8 @@ def _query_videos(self, query_timestamps: dict[str, list[float]], ep_idx: int) - """ item = {} for vid_key, query_ts in query_timestamps.items(): - video_path = self.root / self.meta.get_video_file_path(ep_idx, vid_key) + video_path = self.root / \ + self.meta.get_video_file_path(ep_idx, vid_key) frames = decode_video_frames_torchvision( video_path, query_ts, self.tolerance_s, self.video_backend ) @@ -695,8 +717,10 @@ def __getitem__(self, idx) -> dict: query_indices = None if self.delta_indices is not None: - current_ep_idx = self.episodes.index(ep_idx) if self.episodes is not None else ep_idx - query_indices, padding = self._get_query_indices(idx, current_ep_idx) + current_ep_idx = self.episodes.index( + ep_idx) if self.episodes is not None else ep_idx + query_indices, padding = self._get_query_indices( + idx, current_ep_idx) query_result = self._query_hf_dataset(query_indices) item = {**item, **padding} for key, val in query_result.items(): @@ -704,7 +728,8 @@ def __getitem__(self, idx) -> dict: if len(self.meta.video_keys) > 0: current_ts = item["timestamp"].item() - query_timestamps = self._get_query_timestamps(current_ts, query_indices) + query_timestamps = self._get_query_timestamps( + current_ts, query_indices) video_frames = self._query_videos(query_timestamps, ep_idx) item = {**video_frames, **item} @@ -760,7 +785,8 @@ def add_frame(self, frame: dict) -> None: self.episode_buffer = self.create_episode_buffer() frame_index = self.episode_buffer["size"] - timestamp = frame.pop("timestamp") if "timestamp" in frame else frame_index / self.fps + timestamp = frame.pop( + "timestamp") if "timestamp" in frame else frame_index / self.fps self.episode_buffer["frame_index"].append(frame_index) self.episode_buffer["timestamp"].append(timestamp) @@ -769,7 +795,8 @@ def add_frame(self, frame: dict) -> None: raise ValueError(key) if self.features[key]["dtype"] not in ["image", "video"]: - item = frame[key].numpy() if isinstance(frame[key], torch.Tensor) else frame[key] + item = frame[key].numpy() if isinstance( + frame[key], torch.Tensor) else frame[key] self.episode_buffer[key].append(item) elif self.features[key]["dtype"] in ["image", "video"]: img_path = self._get_image_file_path( @@ -830,14 +857,15 @@ def save_episode( episode_buffer[key] = np.full((episode_length,), episode_index) elif key == "task_index": episode_buffer[key] = np.full((episode_length,), task_index) - ## ryanthecreator : added prestacked key to deal with non singular shape actions / observations - ## ryanthecreator : added meta str to fix metadata keys + # ryanthecreator : added prestacked key to deal with non singular shape actions / observations + # ryanthecreator : added meta str to fix metadata keys elif "prestacked" in ft["dtype"]: continue elif ft["dtype"] in ["image", "video"]: continue elif len(ft["shape"]) == 1 and ft["shape"][0] == 1: - episode_buffer[key] = np.array(episode_buffer[key], dtype=ft["dtype"]) + episode_buffer[key] = np.array( + episode_buffer[key], dtype=ft["dtype"]) elif len(ft["shape"]) == 1 and ft["shape"][0] > 1: episode_buffer[key] = np.stack(episode_buffer[key]) else: @@ -846,7 +874,8 @@ def save_episode( self._wait_image_writer() self._save_episode_table(episode_buffer, episode_index) - self.meta.save_episode(episode_index, episode_length, task, task_index, valid_ratio=valid_ratio) + self.meta.save_episode(episode_index, episode_length, + task, task_index, valid_ratio=valid_ratio) if encode_videos and len(self.meta.video_keys) > 0: video_paths = self.encode_episode_videos(episode_index) @@ -860,8 +889,10 @@ def save_episode( def _save_episode_table(self, episode_buffer: dict, episode_index: int) -> None: episode_dict = {key: episode_buffer[key] for key in self.hf_features} - ep_dataset = datasets.Dataset.from_dict(episode_dict, features=self.hf_features, split="train") - ep_data_path = self.root / self.meta.get_data_file_path(ep_index=episode_index) + ep_dataset = datasets.Dataset.from_dict( + episode_dict, features=self.hf_features, split="train") + ep_data_path = self.root / \ + self.meta.get_data_file_path(ep_index=episode_index) ep_data_path.parent.mkdir(parents=True, exist_ok=True) write_parquet(ep_dataset, ep_data_path) @@ -920,7 +951,8 @@ def encode_episode_videos(self, episode_index: int) -> dict: """ video_paths = {} for key in self.meta.video_keys: - video_path = self.root / self.meta.get_video_file_path(episode_index, key) + video_path = self.root / \ + self.meta.get_video_file_path(episode_index, key) video_paths[key] = str(video_path) if video_path.is_file(): # Skip if video is already encoded. Could be the case when resuming data recording. @@ -934,8 +966,10 @@ def encode_episode_videos(self, episode_index: int) -> dict: def consolidate(self, run_compute_stats: bool = True, keep_image_files: bool = False) -> None: self.hf_dataset = self.load_hf_dataset() - self.episode_data_index = get_episode_data_index(self.meta.episodes, self.episodes) - check_timestamps_sync(self.hf_dataset, self.episode_data_index, self.fps, self.tolerance_s) + self.episode_data_index = get_episode_data_index( + self.meta.episodes, self.episodes) + check_timestamps_sync( + self.hf_dataset, self.episode_data_index, self.fps, self.tolerance_s) if len(self.meta.video_keys) > 0: self.encode_videos() @@ -947,7 +981,8 @@ def consolidate(self, run_compute_stats: bool = True, keep_image_files: bool = F shutil.rmtree(self.root / "images") video_files = list(self.root.rglob("*.mp4")) - assert len(video_files) == self.num_episodes * len(self.meta.video_keys) + assert len(video_files) == self.num_episodes * \ + len(self.meta.video_keys) parquet_files = list(self.root.rglob("*.parquet")) assert len(parquet_files) == self.num_episodes @@ -997,7 +1032,8 @@ def create( obj.image_writer = None if image_writer_processes or image_writer_threads: - obj.start_image_writer(image_writer_processes, image_writer_threads) + obj.start_image_writer( + image_writer_processes, image_writer_threads) # TODO(aliberts, rcadene, alexander-soare): Merge this with OnlineBuffer/DataBuffer obj.episode_buffer = obj.create_episode_buffer() @@ -1041,7 +1077,8 @@ def __init__( super().__init__() self.repo_ids = repo_ids self.root = Path(root) if root else LEROBOT_HOME - self.tolerances_s = tolerances_s if tolerances_s else {repo_id: 1e-4 for repo_id in repo_ids} + self.tolerances_s = tolerances_s if tolerances_s else { + repo_id: 1e-4 for repo_id in repo_ids} # Construct the underlying datasets passing everything but `transform` and `delta_timestamps` which # are handled by this class. self._datasets = [ @@ -1122,7 +1159,8 @@ def video(self) -> bool: def features(self) -> datasets.Features: features = {} for dataset in self._datasets: - features.update({k: v for k, v in dataset.hf_features.items() if k not in self.disabled_features}) + features.update( + {k: v for k, v in dataset.hf_features.items() if k not in self.disabled_features}) return features @property @@ -1184,7 +1222,8 @@ def __getitem__(self, idx: int) -> dict[str, torch.Tensor]: continue break else: - raise AssertionError("We expect the loop to break out as long as the index is within bounds.") + raise AssertionError( + "We expect the loop to break out as long as the index is within bounds.") item = self._datasets[dataset_idx][idx - start_idx] item["dataset_index"] = torch.tensor(dataset_idx) for data_key in self.disabled_features: diff --git a/external/openpi b/external/openpi index 981483dc..5bff19b0 160000 --- a/external/openpi +++ b/external/openpi @@ -1 +1 @@ -Subproject commit 981483dca0fd9acba698fea00aa6e52d56a66c58 +Subproject commit 5bff19b0c0c447c7a7eaaaccf03f36d50998ec9d