From f7a068d962054e89525aad15655d9716c26695ce Mon Sep 17 00:00:00 2001 From: ElmoPA Date: Thu, 5 Mar 2026 02:39:02 +0000 Subject: [PATCH] Refactor run_conversion to be embodiment agnostic --- egomimic/scripts/ray_helper.py | 626 +++++++++++++++++++++++++++++ egomimic/scripts/run_conversion.py | 459 +++++++++++++++++++++ egomimic/utils/aws/load_episode.sh | 45 +++ 3 files changed, 1130 insertions(+) create mode 100644 egomimic/scripts/ray_helper.py create mode 100644 egomimic/scripts/run_conversion.py create mode 100755 egomimic/utils/aws/load_episode.sh diff --git a/egomimic/scripts/ray_helper.py b/egomimic/scripts/ray_helper.py new file mode 100644 index 00000000..937a0aed --- /dev/null +++ b/egomimic/scripts/ray_helper.py @@ -0,0 +1,626 @@ +import contextlib +import json +import os +import shutil +import sys +import time +import traceback +import uuid +from abc import abstractmethod +from pathlib import Path +from types import SimpleNamespace +from typing import Iterator, Tuple + +from cloudpathlib import S3Path + +from egomimic.scripts.aria_process.aria_to_zarr import main as aria_main +from egomimic.scripts.eva_process.eva_to_zarr import main as eva_main +from egomimic.utils.aws.aws_data_utils import ( + get_boto3_s3_client, + get_cloudpathlib_s3_client, + s3_sync_to_local, + upload_dir_to_s3, +) + + +def ensure_path_ready(p: S3Path, retries: int = 30) -> bool: + if not isinstance(p, S3Path): + raise ValueError(f"Expected S3Path, got {type(p)}") + for _ in range(retries): + try: + if p.exists(): + return True + except Exception: + pass + time.sleep(1) + return False + + +def _parse_s3_uri(uri: str, *, default_bucket: str | None = None) -> tuple[str, str]: + """ + Parse s3 URI or key prefix. + - "s3://bucket/prefix" -> ("bucket", "prefix") + - "prefix" -> (default_bucket, "prefix") + """ + uri = (uri or "").strip() + if uri.startswith("s3://"): + rest = uri[len("s3://") :] + bucket, _, key_prefix = rest.partition("/") + return bucket, key_prefix.strip("/") + if default_bucket is None: + raise ValueError( + f"Expected s3://... but got '{uri}' and no default_bucket provided" + ) + return default_bucket, uri.strip("/") + + +class _Tee: + def __init__(self, *streams): + self._streams = streams + + def write(self, data: str) -> int: + for s in self._streams: + s.write(data) + s.flush() + return len(data) + + def flush(self) -> None: + for s in self._streams: + s.flush() + + def isatty(self) -> bool: + return False + + +class EmbodimentRay: + def __init__( + self, + processed_local_root: Path, + log_root: Path, + ): + self.processed_local_root = processed_local_root + self.log_root = log_root + self.num_cpus_small = 2 + self.num_cpus_big = 8 + + @abstractmethod + def iter_bundles(self, root_s3: str): + pass + + @abstractmethod + def convert_one_bundle(self, **kwargs): + pass + + +class AriaRay(EmbodimentRay): + def __init__( + self, + processed_local_root: Path, + log_root: Path, + ): + super().__init__( + processed_local_root, + log_root, + ) + self.raw_remote_prefix = os.environ.get( + "RAW_REMOTE_PREFIX", "s3://rldb/raw_v2/test_aria" + ).rstrip("/") + self.processed_remote_prefix = os.environ.get( + "PROCESSED_REMOTE_PREFIX", "s3://rldb/processed_v3/test_aria" + ).rstrip("/") + self.bucket = os.environ.get("BUCKET", "rldb") + self.resources_small = {"eva_small": 1} # TODO: change to aria_small + self.resources_big = {"eva_big": 1} # TODO: change to aria_big + self.num_cpus_small = 2 + self.num_cpus_big = 8 + self.log_root = log_root + + def iter_bundles(self): + """ + root_s3: like "s3://rldb/raw_v2/aria/" + Returns S3Path objects (cloudpathlib), not local filesystem paths. + + Uses a single `root.walk(...)` traversal and avoids per-path `.exists()` / `.is_dir()`. + """ + s3_client = get_cloudpathlib_s3_client() + root = S3Path(self.raw_remote_prefix, client=s3_client) + + vrs_by_name: dict[str, S3Path] = {} + has_json: set[str] = set() + has_hand: set[str] = set() + has_slam: set[str] = set() + + # Prefer topdown so we can prune recursion aggressively (don’t enumerate huge mps trees). + try: + walker = root.walk(topdown=True) # cloudpathlib often mirrors os.walk + can_prune = True + except TypeError: + walker = root.walk() + can_prune = False # can’t reliably prune, but still single API surface + + for dirpath, dirnames, filenames in walker: + # Figure out depth relative to `root` + try: + rel = dirpath.relative_to(root) + rel_str = rel.as_posix() + rel_parts = () if rel_str in (".", "") else rel.parts + except Exception: + rel_parts = () + + depth = len(rel_parts) + + if depth == 0: + # Root-level files: *.vrs and *.json + for fn in filenames: + if fn.endswith(".vrs"): + name = fn[:-4] + vrs_by_name[name] = dirpath / fn + elif fn.endswith(".json"): + has_json.add(fn[:-5]) + + # Only descend into potential mps dirs + if can_prune: + dirnames[:] = [ + d + for d in dirnames + if d.startswith("mps_") and d.endswith("_vrs") + ] + + elif depth == 1: + # We’re inside something like mps_{name}_vrs/ + d0 = rel_parts[0] + if d0.startswith("mps_") and d0.endswith("_vrs"): + name = d0[len("mps_") : -len("_vrs")] + # If these prefixes exist (i.e., have objects under them), they should appear as dirnames. + if "hand_tracking" in dirnames: + has_hand.add(name) + if "slam" in dirnames: + has_slam.add(name) + + # We don’t need to enumerate anything deeper. + if can_prune: + dirnames[:] = [] + + else: + if can_prune: + dirnames[:] = [] + + # Match original ordering: sort by vrs filename + for filename in sorted(vrs_by_name, key=lambda n: vrs_by_name[n].name): + if filename in has_json and filename in has_hand and filename in has_slam: + vrs = vrs_by_name[filename] + jsonf = root / f"{filename}.json" + mps_dir = root / f"mps_{filename}_vrs" + # arm, task_name, task_description are inferred from row in run_converion.py + args = { + "processed_local_root": str(self.processed_local_root), + "processed_remote_prefix": self.processed_remote_prefix, + "bucket": self.bucket, + "raw_remote_prefix": self.raw_remote_prefix, + "log_root": str(self.log_root), + "vrs": str(vrs), + "jsonf": str(jsonf), + "mps_dir": str(mps_dir), + "out_dir": str(self.processed_local_root), + "fps": 30, + "chunk_timesteps": 100, + "save_mp4": True, + } + name = vrs_by_name[filename].stem + yield name, args + + @staticmethod + def convert_one_bundle( + processed_local_root: str, + processed_remote_prefix: str, + bucket: str, + raw_remote_prefix: str, + log_root: str, + vrs: str, + jsonf: str, + mps_dir: str, + out_dir: str, + arm: str, + fps: int, + task_name: str, + task_description: str, + chunk_timesteps: int, + image_compressed: bool, + save_mp4: bool, + ) -> tuple[str, str, int]: + """ + Perform conversion for a single episode. + Returns (ds_path, mp4_path, total_frames). + • ds_path: dataset folder path + • mp4_path: per-episode MP4 ('' if not created) + • total_frames: -1 if unknown/failure + """ + processed_local_root = Path(processed_local_root) + processed_remote_prefix = processed_remote_prefix.rstrip("/") + bucket = bucket.rstrip("/") + log_root = Path(log_root) + raw_remote_prefix = raw_remote_prefix.rstrip("/") + s3_client = get_cloudpathlib_s3_client() + boto3_client = get_boto3_s3_client() + vrs = S3Path(vrs, client=s3_client) if isinstance(vrs, str) else vrs + jsonf = S3Path(jsonf, client=s3_client) if isinstance(jsonf, str) else jsonf + mps_dir = ( + S3Path(mps_dir, client=s3_client) if isinstance(mps_dir, str) else mps_dir + ) + + stem = vrs.stem + log_root.mkdir(parents=True, exist_ok=True) + log_path = log_root / f"{stem}-{uuid.uuid4().hex[:8]}.log" + + tmp_dir = Path.home() / "temp_mps_processing" / f"{stem}-{uuid.uuid4().hex[:6]}" + tmp_dir.mkdir(parents=True, exist_ok=True) + + with log_path.open("a", encoding="utf-8") as log_fh: + tee_out = _Tee(sys.stdout, log_fh) + tee_err = _Tee(sys.stderr, log_fh) + with ( + contextlib.redirect_stdout(tee_out), + contextlib.redirect_stderr(tee_err), + ): + print(f"[LOG] {stem}: {log_path}", flush=True) + targets = [ + vrs, + jsonf, + mps_dir, + ] + + raw_bucket, raw_prefix = _parse_s3_uri( + raw_remote_prefix, default_bucket=bucket + ) + raw_root = S3Path(raw_remote_prefix, client=s3_client) + + for t in targets: + if not ensure_path_ready(t): + print(f"[ERR] missing {t}", flush=True) + shutil.rmtree(tmp_dir, ignore_errors=True) + return "", "", -1 + link = tmp_dir / t.name + # `t` is an S3Path; compute relative key under RAW_REMOTE_PREFIX. + rel = t.relative_to(raw_root).as_posix() + t_key = f"{raw_prefix.rstrip('/')}/{rel}".strip("/") + + try: + if t.is_dir(): + s3_sync_to_local(raw_bucket, t_key, str(link)) + else: + boto3_client.download_file(raw_bucket, t_key, str(link)) + except Exception as e: + print(f"[ERR] aws copy failed for {t}: {e}", flush=True) + shutil.rmtree(tmp_dir, ignore_errors=True) + return "", "", -1 + + ds_parent = Path(out_dir) + ds_parent.mkdir(parents=True, exist_ok=True) + vrs_path = tmp_dir / vrs.name + + try: + zarr_path, mp4_path = AriaRay.zarr_job( + raw_path=str(vrs_path), + output_dir=str(ds_parent), + arm=arm, + fps=fps, + task_name=task_name, + task_description=task_description, + chunk_timesteps=chunk_timesteps, + image_compressed=image_compressed, + save_mp4=save_mp4, + ) + frames = -1 + zarr_store_path = zarr_path + info = zarr_store_path / "zarr.json" + print(f"[DEBUG] Zarr metadata path: {info}", flush=True) + if info.exists(): + try: + meta = json.loads(info.read_text()) + print( + f"[DEBUG] Zarr metadata keys: {list(meta.keys())}", + flush=True, + ) + frames = int( + meta.get("attributes", {}).get("total_frames", -1) + ) + except Exception as e: + print( + f"[ERR] Failed to parse zarr metadata {info}: {e}", + flush=True, + ) + frames = -1 + else: + print(f"[ERR] Zarr metadata not found: {info}", flush=True) + frames = -1 + + try: + out_bucket, out_prefix = _parse_s3_uri( + processed_remote_prefix, default_bucket=bucket + ) + zarr_filename = Path(zarr_path).stem + ds_s3_prefix = ( + f"{out_prefix.rstrip('/')}/{zarr_filename}.zarr".strip("/") + ) + upload_dir_to_s3( + str(zarr_store_path), out_bucket, prefix=ds_s3_prefix + ) + shutil.rmtree(str(zarr_store_path), ignore_errors=True) + print( + f"[CLEANUP] Removed local zarr store: {zarr_store_path}", + flush=True, + ) + if mp4_path: + mp4_s3_key = ( + f"{out_prefix.rstrip('/')}/{Path(mp4_path).name}" + ) + boto3_client.upload_file( + str(mp4_path), out_bucket, mp4_s3_key + ) + Path(mp4_path).unlink(missing_ok=True) + print( + f"[CLEANUP] Removed local mp4: {mp4_path}", flush=True + ) + except Exception as e: + print( + f"[ERR] Failed to upload {zarr_store_path} to S3: {e}", + flush=True, + ) + return "", "", -2 + + return str(zarr_path), str(mp4_path), frames + + except Exception as e: + err_msg = f"[FAIL] {stem}: {e}\n{traceback.format_exc()}" + print(err_msg, flush=True) + return "", "", -1 + finally: + shutil.rmtree(tmp_dir, ignore_errors=True) + + @staticmethod + def zarr_job( + raw_path: str | Path, + output_dir: str | Path, + arm: str, + fps: int = 30, + task_name: str = "", + task_description: str = "", + chunk_timesteps: int = 100, + image_compressed: bool = False, + save_mp4: bool = True, + ) -> None: + args = SimpleNamespace( + raw_path=raw_path, + output_dir=output_dir, + arm=arm, + fps=fps, + task_name=task_name, + task_description=task_description, + chunk_timesteps=chunk_timesteps, + image_compressed=image_compressed, + save_mp4=save_mp4, + debug=False, + ) + + return aria_main(args) + + +class EvaRay(EmbodimentRay): + def __init__( + self, + processed_local_root: Path, + log_root: Path, + ): + super().__init__( + processed_local_root, + log_root, + ) + self.raw_remote_prefix = os.environ.get( + "RAW_REMOTE_PREFIX", "s3://rldb/raw_v2/test_eva" + ).rstrip("/") + self.processed_remote_prefix = os.environ.get( + "PROCESSED_REMOTE_PREFIX", "s3://rldb/processed_v3/test_eva" + ).rstrip("/") + self.bucket = os.environ.get("BUCKET", "rldb") + self.resources_small = {"eva_small": 1} + self.resources_big = {"eva_big": 1} + + def iter_bundles(self) -> Iterator[Tuple[S3Path, str]]: + """Walk R2 for *.hdf5 files.""" + s3_client = get_cloudpathlib_s3_client() + root = S3Path(self.raw_remote_prefix, client=s3_client) + for hdf5 in sorted(root.glob("*.hdf5"), key=lambda p: p.name): + name = hdf5.stem + # Unused for now + # meta_json_s3 = root / f"{name}_metadata.json" + # try: + # if meta_json_s3.exists(): + # obj = json.loads(meta_json_s3.read_text()) + # except Exception: + # pass + # taskname and arm are inferred from row in run_converion.py + args = { + "processed_local_root": str(self.processed_local_root), + "processed_remote_prefix": str(self.processed_remote_prefix), + "bucket": self.bucket, + "raw_remote_prefix": str(self.raw_remote_prefix), + "log_root": str(self.log_root), + "data_h5_s3": str(hdf5), + "out_dir": str(self.processed_local_root), + "fps": 30, + "chunk_timesteps": 100, + "save_mp4": True, + } + name = hdf5.stem + yield name, args + + @staticmethod + def convert_one_bundle( + processed_local_root: str, + processed_remote_prefix: str, + bucket: str, + raw_remote_prefix: str, + log_root: str, + data_h5_s3: str, + out_dir: str, + arm: str, + fps: int, + task_name: str, + task_description: str, + chunk_timesteps: int, + save_mp4: bool, + ) -> tuple[str, str, int]: + s3_client = get_boto3_s3_client() + processed_local_root = Path(processed_local_root) + processed_remote_prefix = processed_remote_prefix.rstrip("/") + bucket = bucket.rstrip("/") + log_root = Path(log_root) + raw_remote_prefix = raw_remote_prefix.rstrip("/") + hdf5_s3 = S3Path(data_h5_s3) + stem = hdf5_s3.stem + + log_root.mkdir(parents=True, exist_ok=True) + log_path = log_root / f"{stem}-{uuid.uuid4().hex[:8]}.log" + + tmp_dir = Path.home() / "temp_eva_processing" / f"{stem}-{uuid.uuid4().hex[:6]}" + tmp_dir.mkdir(parents=True, exist_ok=True) + + with log_path.open("a", encoding="utf-8") as log_fh: + tee_out = _Tee(sys.stdout, log_fh) + tee_err = _Tee(sys.stderr, log_fh) + with ( + contextlib.redirect_stdout(tee_out), + contextlib.redirect_stderr(tee_err), + ): + print(f"[LOG] {stem}: {log_path}", flush=True) + + raw_bucket, raw_prefix = _parse_s3_uri( + raw_remote_prefix, default_bucket=bucket + ) + raw_root = S3Path(raw_remote_prefix) + + rel = hdf5_s3.relative_to(raw_root).as_posix() + t_key = f"{raw_prefix.rstrip('/')}/{rel}".strip("/") + local_hdf5 = tmp_dir / hdf5_s3.name + try: + s3_client.download_file(raw_bucket, t_key, str(local_hdf5)) + except Exception as e: + print( + f"[ERR] aws download failed for {data_h5_s3}: {e}", flush=True + ) + shutil.rmtree(tmp_dir, ignore_errors=True) + return "", "", -1 + + ds_parent = Path(out_dir) + ds_parent.mkdir(parents=True, exist_ok=True) + ds_path = ds_parent + + try: + print( + f"[INFO] Converting: {stem} → {ds_path} (arm={arm})", + flush=True, + ) + job_kwargs = dict( + raw_path=str(local_hdf5), + output_dir=str(ds_parent), + arm=arm, + fps=fps, + task_name=task_name, + task_description=task_description, + chunk_timesteps=chunk_timesteps, + save_mp4=save_mp4, + ) + zarr_path, mp4_path = EvaRay.zarr_job(**job_kwargs) + frames = -1 + zarr_store_path = zarr_path + info = zarr_store_path / "zarr.json" + print(f"[DEBUG] Zarr metadata path: {info}", flush=True) + if info.exists(): + try: + meta = json.loads(info.read_text()) + print( + f"[DEBUG] Zarr metadata keys: {list(meta.keys())}", + flush=True, + ) + frames = int( + meta.get("attributes", {}).get("total_frames", -1) + ) + except Exception as e: + print( + f"[ERR] Failed to parse zarr metadata {info}: {e}", + flush=True, + ) + frames = -1 + else: + print(f"[ERR] Zarr metadata not found: {info}", flush=True) + + try: + out_bucket, out_prefix = _parse_s3_uri( + processed_remote_prefix, default_bucket=bucket + ) + zarr_filename = Path(zarr_path).stem + zarr_s3_key = ( + f"{out_prefix.rstrip('/')}/{zarr_filename}.zarr".strip("/") + ) + upload_dir_to_s3( + str(zarr_store_path), out_bucket, prefix=zarr_s3_key + ) + shutil.rmtree(str(zarr_store_path), ignore_errors=True) + print( + f"[CLEANUP] Removed local zarr store: {zarr_store_path}", + flush=True, + ) + if mp4_path: + mp4_s3_key = ( + f"{out_prefix.rstrip('/')}/{Path(mp4_path).name}" + ) + s3_client.upload_file(str(mp4_path), out_bucket, mp4_s3_key) + Path(mp4_path).unlink(missing_ok=True) + print( + f"[CLEANUP] Removed local mp4: {mp4_path}", flush=True + ) + except Exception as e: + print( + f"[ERR] Failed to upload {zarr_store_path} to S3: {e}", + flush=True, + ) + return "", "", -2 + + return str(zarr_path), str(mp4_path), frames + + except Exception as e: + err_msg = f"[FAIL] {stem}: {e}\n{traceback.format_exc()}" + print(err_msg, flush=True) + return "", "", -1 + finally: + shutil.rmtree(tmp_dir, ignore_errors=True) + + @staticmethod + def zarr_job( + raw_path: str | Path, + output_dir: str | Path, + arm: str, + fps: int = 30, + save_mp4: bool = True, + task_name: str = "", + task_description: str = "", + chunk_timesteps: int = 100, + ) -> tuple[Path, Path] | None: + """ + Convert one trio to a Zarr dataset. + """ + raw_path = Path(raw_path).expanduser().resolve() + output_dir = Path(output_dir).expanduser().resolve() + + args = SimpleNamespace( + raw_path=raw_path, + output_dir=output_dir, + arm=arm, + fps=fps, + save_mp4=save_mp4, + task_name=task_name, + task_description=task_description, + chunk_timesteps=chunk_timesteps, + ) + + return eva_main(args) diff --git a/egomimic/scripts/run_conversion.py b/egomimic/scripts/run_conversion.py new file mode 100644 index 00000000..8fb140da --- /dev/null +++ b/egomimic/scripts/run_conversion.py @@ -0,0 +1,459 @@ +#!/usr/bin/env python3 + +from __future__ import annotations + +import argparse +import csv +import os +import time +from pathlib import Path +from typing import Any, Dict + +import ray +from cloudpathlib import S3Path +from ray.exceptions import OutOfMemoryError, RayTaskError, WorkerCrashedError + +from egomimic.scripts.ray_helper import AriaRay, EmbodimentRay, EvaRay +from egomimic.utils.aws.aws_data_utils import ( + get_cloudpathlib_s3_client, + load_env, +) +from egomimic.utils.aws.aws_sql import ( + TableRow, + create_default_engine, + episode_hash_to_table_row, + episode_table_to_df, + timestamp_ms_to_episode_hash, + update_episode, +) + +PROCESSED_LOCAL_ROOT = Path( + os.environ.get("PROCESSED_LOCAL_ROOT", "/home/ubuntu/processed") +).resolve() + +LOG_ROOT = Path( + os.environ.get( + "CONVERSION_LOG_ROOT", + str(PROCESSED_LOCAL_ROOT / "conversion_logs"), + ) +).resolve() + + +def ensure_path_ready(p: str | Path | S3Path, retries: int = 30) -> bool: + if isinstance(p, str): + if p.startswith("s3://"): + s3_client = get_cloudpathlib_s3_client() + p = S3Path(p, client=s3_client) + else: + p = Path(p) + for _ in range(retries): + try: + if p.exists(): + return True + except Exception: + pass + time.sleep(1) + return False + + +def _map_processed_local_to_remote(p: str | Path, processed_remote_prefix: str) -> str: + """Map any path under PROCESSED_LOCAL_ROOT → PROCESSED_REMOTE_PREFIX/relative.""" + if not p: + return "" + p = Path(p).resolve() + try: + rel = p.relative_to(PROCESSED_LOCAL_ROOT) # raises if not under root + except Exception: + return str(p) + return ( + f"{processed_remote_prefix}/{rel.as_posix()}" + if processed_remote_prefix + else str(p) + ) + + +def infer_arm_from_row(row: TableRow) -> str: + """ + Infer arm from SQL row.robot_name (e.g., 'aria_left', 'aria_right', 'aria_bimanual'). + Falls back to 'bimanual'. + """ + emb = (row.robot_name or "").lower() + if "left" in emb: + return "left" + if "right" in emb: + return "right" + if "bimanual" in emb: + return "both" + return "both" + + +def infer_task_name_from_row(row: TableRow) -> str: + """ + Infer task name from SQL row.task_name (e.g., 'task_name'). + Falls back to ''. + """ + return row.task or "unknown" + + +def infer_task_description_from_row(row: TableRow) -> str: + """ + Infer task description from SQL row.task_description (e.g., 'task_description'). + Falls back to ''. + """ + return row.task_description or "" + + +def _is_oom_exception(e: Exception) -> bool: + if isinstance(e, OutOfMemoryError): + return True + if isinstance(e, (RayTaskError, WorkerCrashedError)): + s = str(e).lower() + return ( + ("outofmemory" in s) + or ("out of memory" in s) + or ("oom" in s) + or ("killed" in s) + ) + s = str(e).lower() + return ("outofmemory" in s) or ("out of memory" in s) or ("oom" in s) + + +class _Tee: + def __init__(self, *streams): + self._streams = streams + + def write(self, data: str) -> int: + for s in self._streams: + s.write(data) + s.flush() + return len(data) + + def flush(self) -> None: + for s in self._streams: + s.flush() + + def isatty(self) -> bool: + return False + + +# --- Ray task ---------------------------------------------------------------- +def submit_convert(embodiment_ray: EmbodimentRay, size: str, **kwargs): + embodiment_ray_cls = embodiment_ray.__class__ + if size == "small": + num_cpus = embodiment_ray.num_cpus_small + resources = embodiment_ray.resources_small + + else: + num_cpus = embodiment_ray.num_cpus_big + resources = embodiment_ray.resources_big + + return ( + ray.remote(embodiment_ray_cls.convert_one_bundle) + .options(num_cpus=num_cpus, resources=resources) + .remote(**kwargs) + ) + + +# --- Driver ------------------------------------------------------------------ +def launch( + embodiment: str, + dry: bool = False, + skip_if_done: bool = False, + episode_hashes: list[str] | None = None, +): + embodiment_ray = None + if embodiment == "aria": + embodiment_ray = AriaRay( + PROCESSED_LOCAL_ROOT, + LOG_ROOT, + ) + elif embodiment == "eva": + embodiment_ray = EvaRay( + PROCESSED_LOCAL_ROOT, + LOG_ROOT, + ) + else: + raise ValueError(f"Invalid embodiment: {embodiment}") + + engine = create_default_engine() + pending: Dict[ray.ObjectRef, Dict[str, Any]] = {} + + benchmark_rows = [] + + df = episode_table_to_df(engine) + + for name, args in embodiment_ray.iter_bundles(): + # IMPORTANT: episode_hash is TEXT in DB; do not cast to int + episode_key = timestamp_ms_to_episode_hash(int(name)) + row = df[df["episode_hash"] == episode_key] + if len(row) == 1: + row = row.iloc[0] + elif len(row) > 1: + print("[WARNING] Duplicate episode hash") + else: + row = None + + if not episode_key: + print(f"[SKIP] {name}: could not parse episode_hash from stem", flush=True) + continue + + if episode_hashes is not None and episode_key not in episode_hashes: + print( + f"[SKIP] {name}: episode_key '{episode_key}' not in provided episode_hashes list", + flush=True, + ) + continue + + if row is None: + print(f"[SKIP] {name}: no matching row in SQL (app.episodes)", flush=True) + continue + + processed_path = (row.zarr_processed_path or "").strip() + if skip_if_done and len(processed_path) > 0: + print( + f"[SKIP] {name}: already has zarr_processed_path='{processed_path}'", + flush=True, + ) + continue + + if row.zarr_processing_error != "": + print( + f"[INFO] skipping episode hash: {row.episode_hash} due to zarr processing error", + flush=True, + ) + continue + + if row.is_deleted: + print(f"[SKIP] {name}: episode marked as deleted in SQL", flush=True) + continue + + print(f"[INFO] processing {name}: episode_key={episode_key}", flush=True) + + arm = infer_arm_from_row(row) + task_name = infer_task_name_from_row(row) + task_description = infer_task_description_from_row(row) + # TODO: add dry run, right now the script does not work with this dry run code + # out_dir = PROCESSED_LOCAL_ROOT + # if dry: + # ds_path = (PROCESSED_LOCAL_ROOT / dataset_name).resolve() + # stem = name + # mp4_candidate = PROCESSED_LOCAL_ROOT / f"{stem}_video.mp4" + + # mapped_ds = _map_processed_local_to_remote( + # ds_path, embodiment_ray.processed_remote_prefix + # ) + # mapped_mp4 = _map_processed_local_to_remote( + # mp4_candidate, embodiment_ray.processed_remote_prefix + # ) + + # print( + # f"[DRY] {name}: arm={arm} | out_dir={out_dir}/{dataset_name}\n" + # f" would write to SQL:\n" + # f" zarr_processed_path={mapped_ds}\n" + # f" zarr_mp4_path={mapped_mp4}", + # flush=True, + # ) + # continue + + args["arm"] = arm + args["task_name"] = task_name + args["task_description"] = task_description + start_time = time.time() + ref = submit_convert(embodiment_ray, "small", **args) + pending[ref] = { + "episode_key": episode_key, + "start_time": start_time, + "size": "small", + "args": args, + } + + if dry or not pending: + return + + # Collect and update SQL (with OOM retry on BIG) + while pending: + done_refs, _ = ray.wait(list(pending.keys()), num_returns=1) + ref = done_refs[0] + info = pending.pop(ref) + + episode_key = info["episode_key"] + start_time = info["start_time"] + duration_sec = time.time() - start_time + + row = episode_hash_to_table_row(engine, episode_key) + if row is None: + print( + f"[WARN] Episode {episode_key}: row disappeared before update?", + flush=True, + ) + continue + + try: + ds_path, mp4_path, frames = ray.get( + ref + ) # can throw (OOM, index error, etc.) + + row.num_frames = int(frames) if frames is not None else -1 + if row.num_frames > 0: + row.zarr_processed_path = _map_processed_local_to_remote( + ds_path, embodiment_ray.processed_remote_prefix + ) + row.zarr_mp4_path = _map_processed_local_to_remote( + mp4_path, embodiment_ray.processed_remote_prefix + ) + row.zarr_processing_error = "" + elif row.num_frames == -2: + row.zarr_processed_path = "" + row.zarr_mp4_path = "" + row.zarr_processing_error = "Upload Failed" + elif row.num_frames == -1: + row.zarr_processed_path = "" + row.zarr_mp4_path = "" + row.zarr_processing_error = "Zero Frames" + else: + row.zarr_processed_path = "" + row.zarr_mp4_path = "" + row.zarr_processing_error = "Conversion Failed Unhandled Error" + + update_episode(engine, row) + print( + f"[OK] Updated SQL for {episode_key}: " + f"zarr_processed_path={row.zarr_processed_path}, num_frames={row.num_frames}, " + f"duration_sec={duration_sec:.2f}", + flush=True, + ) + + if row.num_frames > 0 and row.zarr_processed_path: + benchmark_rows.append( + { + "episode_key": episode_key, + "processed_path": row.zarr_processed_path, + "mp4_path": row.zarr_mp4_path, + "num_frames": row.num_frames, + "duration_sec": duration_sec, + } + ) + + except Exception as e: + # If OOM on small, retry once on big + if _is_oom_exception(e) and info.get("size") == "small": + print( + f"[OOM] Episode {episode_key} failed on SMALL. Retrying on BIG...", + flush=True, + ) + args = info["args"] + ref2 = submit_convert(embodiment_ray, "big", **args) + pending[ref2] = { + **info, + "start_time": time.time(), + "size": "big", + } + continue + + print( + f"[FAIL] Episode {episode_key} task failed ({info.get('size', '?')}): " + f"{type(e).__name__}: {e}", + flush=True, + ) + + # mark failed in SQL (so skip-if-done won't think it's done) + row.num_frames = -1 + row.zarr_processed_path = "" + row.zarr_mp4_path = "" + row.zarr_processing_error = f"{type(e).__name__}: {e}" + try: + update_episode(engine, row) + print( + f"[FAIL] Marked SQL failed for {episode_key} (cleared zarr_processed_path)", + flush=True, + ) + except Exception as ee: + print( + f"[ERR] SQL update failed for failed episode {episode_key}: {ee}", + flush=True, + ) + + if benchmark_rows: + timing_file = Path(f"{embodiment}_conversion_timings.csv") + file_exists = timing_file.exists() + fieldnames = [ + "episode_key", + "processed_path", + "mp4_path", + "num_frames", + "duration_sec", + ] + try: + with timing_file.open("a", newline="") as f: + writer = csv.DictWriter(f, fieldnames=fieldnames) + if not file_exists: + writer.writeheader() + for bench_row in benchmark_rows: + writer.writerow(bench_row) + print( + f"[BENCH] wrote {len(benchmark_rows)} entries → {timing_file.resolve()}", + flush=True, + ) + except Exception as e: + print(f"[ERR] Failed to write benchmark CSV {timing_file}: {e}", flush=True) + + +# --- CLI --------------------------------------------------------------------- +def main(): + p = argparse.ArgumentParser() + p.add_argument("--embodiment", type=str, required=True, choices=["aria", "eva"]) + p.add_argument("--dry-run", action="store_true") + p.add_argument( + "--skip-if-done", + action="store_true", + help="Skip episodes that already have a zarr_processed_path in SQL", + ) + p.add_argument( + "--ray-address", default="auto", help="Ray cluster address (default: auto)" + ) + p.add_argument( + "--episode-hash", + action="append", + dest="episode_hashes", + help="Episode hash to process. Can be specified multiple times to process multiple episodes.", + ) + p.add_argument("--debug", action="store_true") + args = p.parse_args() + + env_vars = {} + load_env() + for k in [ + "R2_ACCESS_KEY_ID", + "R2_SECRET_ACCESS_KEY", + "R2_SESSION_TOKEN", # optional + "R2_ENDPOINT_URL", # optional; include if your helper expects it + ]: + v = os.environ.get(k) + if v: + env_vars[k] = v + + if args.debug: + runtime_env = { + "working_dir": "/home/ubuntu/EgoVerse", + "excludes": [ + "**/.git/**", + "external/openpi/third_party/aloha/**", + "**/*.pack", + "**/__pycache__/**", + "external/openpi/**", + ], + } + else: + runtime_env = {} + runtime_env["env_vars"] = env_vars + ray.init(address=args.ray_address, runtime_env=runtime_env) + launch( + embodiment=args.embodiment, + dry=args.dry_run, + skip_if_done=args.skip_if_done, + episode_hashes=args.episode_hashes, + ) + + +if __name__ == "__main__": + main() diff --git a/egomimic/utils/aws/load_episode.sh b/egomimic/utils/aws/load_episode.sh new file mode 100755 index 00000000..c5dba4f6 --- /dev/null +++ b/egomimic/utils/aws/load_episode.sh @@ -0,0 +1,45 @@ +set -a +source ~/.egoverse_env +set +a + +export AWS_ACCESS_KEY_ID="$R2_ACCESS_KEY_ID" +export AWS_SECRET_ACCESS_KEY="$R2_SECRET_ACCESS_KEY" +export AWS_DEFAULT_REGION="auto" +export AWS_REGION="auto" + +# aria download id +ID=1766002714075 +path="/home/ubuntu/eva_proc_download2" + + +# s5cmd --endpoint-url "$R2_ENDPOINT_URL" sync \ +# "s3://rldb/raw_v2/test_aria/mps_${ID}_vrs/**" \ +# "${path}/mps_${ID}_vrs/" + +# s5cmd --endpoint-url "$R2_ENDPOINT_URL" cp "s3://rldb/raw_v2/test_aria/${ID}.json" $path +# s5cmd --endpoint-url "$R2_ENDPOINT_URL" cp "s3://rldb/raw_v2/test_aria/${ID}.vrs" $path +# s5cmd --endpoint-url "$R2_ENDPOINT_URL" cp "s3://rldb/raw_v2/test_aria/${ID}_metadata.json" $path + +# delete processed directories +# s5cmd --endpoint-url "$R2_ENDPOINT_URL" rm "s3://rldb/raw_v2/test_eva2/**" + +# download processed zarrs and mp4s +# s5cmd --endpoint-url "$R2_ENDPOINT_URL" sync \ +# "s3://rldb/processed_v3/test_eva2/**" \ +# "$path" + +eva_ID=1766773834183 +# eva move hdf5 +# s5cmd --endpoint-url "$R2_ENDPOINT_URL" cp "s3://rldb/raw_v2/eva/${eva_ID}.hdf5" "s3://rldb/raw_v2/test_eva2/${eva_ID}.hdf5" + +s5cmd --endpoint-url "$R2_ENDPOINT_URL" cp \ + "s3://rldb/raw_v2/eva/${eva_ID}.hdf5" \ + "/home/ubuntu/up_res_download/${eva_ID}.hdf5" + +# Then upload to destination +# s5cmd --endpoint-url "$R2_ENDPOINT_URL" cp \ +# "/tmp/1772514686461.hdf5" \ +# "s3://rldb/raw_v2/test_eva2/1772514686461.hdf5" + +# Clean up +# rm /tmp/1772514686461.hdf5 \ No newline at end of file