Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 16 additions & 15 deletions egomimic/scripts/aria_process/aria-cluster.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,6 @@ auth:
ssh_user: ubuntu
ssh_private_key: ~/.ssh/rldb-base-pem.pem

file_mounts:
"/home/ubuntu/EgoVerse": "/home/elmo/Documents/projects/EgoVerse"

rsync_exclude:
- "emimic/"
- ".git/"
Expand All @@ -22,7 +19,7 @@ rsync_exclude:
- "logs/"
- "*.parquet"

max_workers: 60
max_workers: 140
idle_timeout_minutes: 5

available_node_types:
Expand All @@ -31,10 +28,12 @@ available_node_types:
InstanceType: t3a.2xlarge
KeyName: rldb-base-pem
ImageId: ami-08c4d70c31f91a5ac
IamInstanceProfile:
Arn: arn:aws:iam::556885871428:instance-profile/ray-autoscaler-v1
BlockDeviceMappings:
- DeviceName: /dev/sda1
Ebs: { VolumeSize: 50 }
resources: { CPU: 4 }
resources: { CPU: 8 }
min_workers: 0
max_workers: 0

Expand All @@ -43,24 +42,28 @@ available_node_types:
InstanceType: r6a.2xlarge
KeyName: rldb-base-pem
ImageId: ami-08c4d70c31f91a5ac
IamInstanceProfile:
Arn: arn:aws:iam::556885871428:instance-profile/ray-autoscaler-v1
BlockDeviceMappings:
- DeviceName: /dev/sda1
Ebs: { VolumeSize: 50 }
resources: { CPU: 8, aria_small: 1 }
Ebs: { VolumeSize: 150 }
resources: { CPU: 2, aria_small: 1 }
min_workers: 0
max_workers: 50
max_workers: 120

worker_big:
node_config:
InstanceType: r6a.8xlarge
IamInstanceProfile:
Arn: arn:aws:iam::556885871428:instance-profile/ray-autoscaler-v1
KeyName: rldb-base-pem
ImageId: ami-08c4d70c31f91a5ac
BlockDeviceMappings:
- DeviceName: /dev/sda1
Ebs: { VolumeSize: 100 } # often worth bumping
resources: { CPU: 32, aria_big: 1 }
Ebs: { VolumeSize: 200 } # often worth bumping
resources: { CPU: 8, aria_big: 1 }
min_workers: 0
max_workers: 10
max_workers: 7

head_node_type: head_node

Expand Down Expand Up @@ -101,8 +104,6 @@ initialization_commands:
pip3 install --no-input -e .

setup_commands:
- sudo mkdir -p /mnt/raw /mnt/processed

- |
chmod +x ~/EgoVerse/egomimic/utils/aws/setup_secret.sh
R2_SECRET_NAME=r2/rldb/credentials DB_SECRET_NAME=rds/appdb/appuser REGION=us-east-2 \
Expand All @@ -116,10 +117,10 @@ head_setup_commands:
| grep -v ray_worker_gaurdrails.py \
| grep -v ray_worker_gaurdrails.lock ; \
echo 'CRON_TZ=America/New_York'; \
echo '0 20 * * * flock -n /tmp/run_aria_conversion.lock /bin/bash -lc "set -a; . /home/ubuntu/.egoverse_env; set +a; /usr/bin/python3 ~/EgoVerse/egomimic/scripts/aria_process/run_aria_conversion.py --skip-if-done" >> ~/aria_conversion.log 2>&1'; \
echo '0 20 * * * flock -n /tmp/run_aria_conversion.lock /bin/bash -lc "set -a; . /home/ubuntu/.egoverse_env; set +a; /usr/bin/python3 ~/EgoVerse/egomimic/scripts/aria_process/run_conversion.py --embodiment aria --skip-if-done --debug" >> ~/aria_conversion.log 2>&1'; \
echo '*/10 * * * * flock -n /tmp/ray_worker_gaurdrails.lock /usr/bin/python3 /home/ubuntu/EgoVerse/egomimic/utils/aws/budget_guardrails/ray_worker_gaurdrails.py >> /home/ubuntu/ray_worker_gaurdrails.log 2>&1') \
| crontab - || true

- crontab -l || true
# Ray startup commands - configure GCS to detect dead/stale nodes faster
# ray start --head --dashboard-host=0.0.0.0 --dashboard-port=8265 --include-dashboard=true
# ray start --head --dashboard-host=0.0.0.0 --dashboard-port=8265 --include-dashboard=true
260 changes: 260 additions & 0 deletions egomimic/scripts/aria_process/aria_to_zarr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
import argparse
import ctypes
import gc
import logging
import traceback
from pathlib import Path

import numpy as np

from egomimic.rldb.zarr.zarr_writer import ZarrWriter
from egomimic.scripts.aria_process.aria_utils import AriaVRSExtractor
from egomimic.utils.aws.aws_sql import timestamp_ms_to_episode_hash
from egomimic.utils.egomimicUtils import str2bool
from egomimic.utils.video_utils import save_preview_mp4

logger = logging.getLogger(__name__)


class DatasetConverter:
"""
A class to convert Aria VRS dataset to Zarr episodes.
Parameters
----------
raw_path : Path or str
The path to the raw dataset.
fps : int
Frames per second for the dataset.
arm : str, optional
The arm to process (e.g., 'left', 'right', or 'bimanual'), by default "".
save_mp4 : bool, optional
Whether to save a MP4 of the episode, by default False.
image_compressed : bool, optional
Whether the images are compressed, by default True.
Methods
-------
extract_episode(episode_path, task_name='', output_dir='.', dataset_name='', chunk_timesteps=100)
Extracts frames from a single episode and saves it with a description.
main(args)
Main function to convert the dataset.
argument_parse()
Parses the command-line arguments.
"""

def __init__(
self,
raw_path: Path | str,
fps: int,
arm: str = "",
save_mp4: bool = False,
image_compressed: bool = True,
debug: bool = False,
height: int = 480,
width: int = 640,
):
self.raw_path = raw_path if isinstance(raw_path, Path) else Path(raw_path)
self.fps = fps
self.arm = arm
self.image_compressed = image_compressed
self.save_mp4 = save_mp4
self.height = height
self.width = width

self.logger = logging.getLogger(self.__class__.__name__)
self.logger.setLevel(logging.INFO)

# Add console handler
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
formatter = logging.Formatter("%(asctime)s - [%(name)s] - %(message)s")
console_handler.setFormatter(formatter)
self.logger.addHandler(console_handler)

self.logger.info(f"{'-' * 10} Aria VRS -> Lerobot Converter {'-' * 10}")
self.logger.info(f"Processing Aria VRS dataset from {self.raw_path}")
self.logger.info(f"FPS: {self.fps}")
self.logger.info(f"Arm: {self.arm}")
self.logger.info(f"Image compressed: {self.image_compressed}")
self.logger.info(f"Save MP4: {self.save_mp4}")

self._mp4_path = None # set from main() if --save-mp4
self._mp4_writer = None # lazy-initialized in extract_episode()
self.episode_list = list(self.raw_path.glob("*.vrs"))

self.feats_to_zarr_keys = {}

if self.arm == "both":
self.embodiment = "aria_bimanual"
elif self.arm == "right":
self.embodiment = "aria_right_arm"
elif self.arm == "left":
self.embodiment = "aria_left_arm"

def extract_episode(
self,
episode_path,
task_name: str = "",
task_description: str = "",
output_dir: Path = Path("."),
dataset_name: str = "",
chunk_timesteps: int = 100,
):
"""
Extracts frames from an episode and saves them to the dataset.
Parameters
----------
episode_path : str
The path to the episode file.
task_description : str, optional
A description of the task associated with the episode (default is an empty string).
Returns
-------
None
"""
episode_name = dataset_name

episode_feats = AriaVRSExtractor.process_episode(
episode_path=episode_path,
arm=self.arm,
height=self.height,
width=self.width,
)
numeric_data = {}

image_data = {}
for key, value in episode_feats.items():
if "images" in key:
if key in self.feats_to_zarr_keys:
image_data[self.feats_to_zarr_keys[key]] = value
else:
image_data[key] = value
else:
if key in self.feats_to_zarr_keys:
numeric_data[self.feats_to_zarr_keys[key]] = value
else:
numeric_data[key] = value

zarr_path = ZarrWriter.create_and_write(
episode_path=output_dir / f"{episode_name}.zarr",
numeric_data=numeric_data if numeric_data else None,
image_data=image_data if image_data else None,
fps=self.fps,
embodiment=self.embodiment,
task_name=task_name,
task_description=task_description,
chunk_timesteps=chunk_timesteps,
)
if self.save_mp4:
mp4_path = output_dir / f"{episode_name}.mp4"
images_tchw = np.asarray(image_data["images.front_1"]).transpose(0, 3, 1, 2)
save_preview_mp4(images_tchw, mp4_path, self.fps, half_res=False)
else:
mp4_path = None
return zarr_path, mp4_path


def main(args) -> None:
"""Convert Eva HDF5 dataset to Zarr episodes.

Parameters
----------
args : argparse.Namespace
Parsed command-line arguments (same shape as eva_to_lerobot).
"""

try:
episode_hash = timestamp_ms_to_episode_hash(Path(args.raw_path).stem)

converter = DatasetConverter(
raw_path=Path(args.raw_path),
fps=args.fps,
arm=args.arm,
image_compressed=args.image_compressed,
save_mp4=args.save_mp4,
debug=args.debug,
)

gc.collect()
ctypes.CDLL("libc.so.6").malloc_trim(0)
zarr_path, mp4_path = converter.extract_episode(
episode_path=Path(args.raw_path),
task_name=args.task_name,
task_description=args.task_description,
output_dir=Path(args.output_dir),
dataset_name=episode_hash,
)
return zarr_path, mp4_path
except Exception:
logger.error(
"Error converting %s:\n%s", Path(args.raw_path), traceback.format_exc()
)
return None


def argument_parse():
parser = argparse.ArgumentParser(
description="Convert Aria VRS dataset to LeRobot-Robomimic hybrid and push to Hugging Face hub."
)

# Required arguments
parser.add_argument(
"--raw-path",
type=Path,
required=True,
help="Directory containing the vrs, vrs_json, and the processed mps folder.",
)
parser.add_argument(
"--fps", type=int, required=True, help="Frames per second for the dataset."
)
# Optional arguments
parser.add_argument(
"--task-name",
type=str,
default="Aria recorded dataset.",
help="Task name of the data.",
)
parser.add_argument(
"--task-description",
type=str,
default="Aria recorded dataset.",
help="Task description of the data.",
)
parser.add_argument(
"--arm",
type=str,
choices=["left", "right", "both"],
default="both",
help="Specify the arm for processing.",
)
parser.add_argument(
"--image-compressed",
type=str2bool,
default=False,
help="Set to True if the images are compressed.",
)

parser.add_argument(
"--output-dir",
type=Path,
default=None,
help="Directory where the processed dataset will be stored. Defaults to LEROBOT_HOME.",
)
parser.add_argument(
"--debug", action="store_true", help="Store only 2 episodes for debug purposes."
)

parser.add_argument(
"--save-mp4",
action="store_true",
help="If enabled, save a single half-resolution MP4 with all frames across episodes.",
)

args = parser.parse_args()

return args


if __name__ == "__main__":
args = argument_parse()
zarr_path, mp4_path = main(args)
print(zarr_path, mp4_path)
Loading