From ad5f70b276b079c6940efef065ecffe32bbf14b7 Mon Sep 17 00:00:00 2001 From: RyanPCo Date: Wed, 11 Feb 2026 22:46:52 -0500 Subject: [PATCH] eva to zarr --- egomimic/scripts/eva_process/eva-cluster.yaml | 2 +- egomimic/scripts/eva_process/eva_helper.py | 59 +---- egomimic/scripts/eva_process/eva_to_zarr.py | 231 +++++++++--------- .../scripts/eva_process/run_eva_conversion.py | 202 +++++---------- egomimic/utils/aws/sql_tutorial.ipynb | 2 +- 5 files changed, 188 insertions(+), 308 deletions(-) diff --git a/egomimic/scripts/eva_process/eva-cluster.yaml b/egomimic/scripts/eva_process/eva-cluster.yaml index 60c5a948..6ccef049 100644 --- a/egomimic/scripts/eva_process/eva-cluster.yaml +++ b/egomimic/scripts/eva_process/eva-cluster.yaml @@ -115,7 +115,7 @@ head_setup_commands: | grep -v ray_worker_gaurdrails.py \ | grep -v ray_worker_gaurdrails.lock ; \ echo 'CRON_TZ=America/New_York'; \ - echo '0 22 * * * flock -n /tmp/run_eva_conversion.lock /bin/bash -lc "set -a; . /home/ubuntu/.egoverse_env; set +a; /usr/bin/python3 ~/EgoVerse/egomimic/scripts/eva_process/run_eva_conversion.py --skip-if-done" >> ~/eva_conversion_$(date +\%Y-\%m-\%d-\%H-\%M-\%S).log 2>&1'; \ + echo '0 22 * * * flock -n /tmp/run_eva_conversion.lock /bin/bash -lc "set -a; . /home/ubuntu/.egoverse_env; set +a; /usr/bin/python3 ~/EgoVerse/egomimic/scripts/eva_process/run_eva_conversion.py --skip-if-done --debug" >> ~/eva_conversion_$(date +\%Y-\%m-\%d-\%H-\%M-\%S).log 2>&1'; \ echo '*/10 * * * * flock -n /tmp/ray_worker_gaurdrails.lock /usr/bin/python3 /home/ubuntu/EgoVerse/egomimic/utils/aws/budget_guardrails/ray_worker_gaurdrails.py >> /home/ubuntu/ray_worker_gaurdrails.log 2>&1') \ | crontab - || true - crontab -l || true diff --git a/egomimic/scripts/eva_process/eva_helper.py b/egomimic/scripts/eva_process/eva_helper.py index ceb2c35e..ee967659 100644 --- a/egomimic/scripts/eva_process/eva_helper.py +++ b/egomimic/scripts/eva_process/eva_helper.py @@ -2,44 +2,6 @@ from types import SimpleNamespace from egomimic.scripts.eva_process.eva_to_zarr import main as zarr_main -from egomimic.scripts.eva_process.eva_to_lerobot import main as lerobot_main - -def lerobot_job( - *, - raw_path: str | Path, - output_dir: str | Path, - dataset_name: str, - arm: str, - description: str = "", - extrinsics_key: str = "x5Dec13_2", -) -> None: - - raw_path = Path(raw_path).expanduser().resolve() - output_dir = Path(output_dir).expanduser().resolve() - - args = SimpleNamespace( - name=dataset_name, - raw_path=raw_path, - dataset_repo_id=f"rpuns/{dataset_name}", - fps=30, - arm=arm, - extrinsics_key=extrinsics_key, - description=description, - image_compressed=False, - video_encoding=False, - prestack=True, - nproc=12, - nthreads=2, - output_dir=output_dir, - push=False, - private=False, - license="apache-2.0", - debug=False, - save_mp4=True, - ) - - lerobot_main(args) - def zarr_job( *, @@ -47,27 +9,26 @@ def zarr_job( output_dir: str | Path, dataset_name: str, arm: str, - description: str = "", extrinsics_key: str = "x5Dec13_2", + fps: int = 30, + description: str = "", chunk_timesteps: int = 100, + image_compressed: bool = False, + save_mp4: bool = True, ) -> None: - raw_path = Path(raw_path).expanduser().resolve() - output_dir = Path(output_dir).expanduser().resolve() - args = SimpleNamespace( raw_path=raw_path, output_dir=output_dir, - name=dataset_name, - fps=30, + dataset_name=dataset_name, arm=arm, extrinsics_key=extrinsics_key, + fps=fps, description=description, - image_compressed=False, - prestack=False, - debug=False, - save_mp4=True, chunk_timesteps=chunk_timesteps, + image_compressed=image_compressed, + save_mp4=save_mp4, ) - zarr_main(args) + zarr_path, mp4_path = zarr_main(args) + return zarr_path, mp4_path \ No newline at end of file diff --git a/egomimic/scripts/eva_process/eva_to_zarr.py b/egomimic/scripts/eva_process/eva_to_zarr.py index 7069741e..2b774e89 100644 --- a/egomimic/scripts/eva_process/eva_to_zarr.py +++ b/egomimic/scripts/eva_process/eva_to_zarr.py @@ -10,6 +10,7 @@ import traceback from pathlib import Path +import cv2 import numpy as np import torch from scipy.spatial.transform import Rotation as R @@ -34,6 +35,12 @@ # Preview MP4 # --------------------------------------------------------------------------- +def resize_video_thwc(video: np.ndarray) -> np.ndarray: + # video: (T,H,W,C) + out = np.empty((video.shape[0], 480, 640, video.shape[3]), dtype=video.dtype) + for t in range(video.shape[0]): + out[t] = cv2.resize(video[t], (640, 480), interpolation=cv2.INTER_AREA) + return out R_t_e = np.array([ [0, 0, 1], @@ -48,91 +55,91 @@ def eva_reorientation(quat: np.ndarray) -> np.ndarray: def save_preview_mp4( - images_tchw: np.ndarray, output_path: Path, fps: int + images_tchw: np.ndarray, + output_path: Path, + fps: int, + *, + half_res: bool = True, + crf: int = 23, + preset: str = "fast", ) -> None: - """Save a half-resolution, web-compatible MP4 from a (T,C,H,W) uint8 array. - - Tries torchvision.io.write_video first, then falls back to ffmpeg CLI. + import shutil + import subprocess - Parameters - ---------- - images_tchw : np.ndarray - Image array with shape (T, C, H, W), uint8. - output_path : Path - Destination path for the .mp4 file. - fps : int - Frames per second. - """ - import torch.nn.functional as F + output_path = Path(output_path) + output_path.parent.mkdir(parents=True, exist_ok=True) - imgs = torch.from_numpy(np.asarray(images_tchw)) # (T,C,H,W) - if imgs.ndim != 4 or len(imgs) == 0: - logger.warning("No valid frames for preview MP4 — skipping.") + arr = np.asarray(images_tchw) + if arr.ndim != 4 or arr.shape[0] == 0: return - _, _, H, W = imgs.shape + T, C, H, W = arr.shape - # Half-resolution with even dims for yuv420p - outW, outH = W // 2, H // 2 + if half_res: + outW, outH = W // 2, H // 2 + else: + outW, outH = W, H outW -= outW % 2 outH -= outH % 2 if outW <= 0 or outH <= 0: raise ValueError(f"Invalid output size: {outW}x{outH}") - output_path = Path(output_path) - output_path.parent.mkdir(parents=True, exist_ok=True) - - rgb_frames = [] - for chw in imgs: - t = chw.detach().cpu().to(torch.uint8) - if t.shape[0] == 1: - t = t.repeat(3, 1, 1) - t_resized = F.interpolate( - t.unsqueeze(0).float(), - size=(outH, outW), - mode="bilinear", - align_corners=False, - ).squeeze(0).to(torch.uint8) - rgb_frames.append(t_resized.permute(1, 2, 0).contiguous()) # (H,W,3) - - video_tensor = torch.stack(rgb_frames, dim=0) # (T,H,W,3) uint8 - - # Try torchvision first - try: - import torchvision.io - - torchvision.io.write_video( - str(output_path), video_tensor, fps=fps, - video_codec="libx264", - options={"crf": "23", "pix_fmt": "yuv420p"}, - ) - return - except Exception: - logger.debug("torchvision.io.write_video failed, trying ffmpeg CLI.") - - # Fallback: pipe raw frames to ffmpeg - import subprocess, shutil - ffmpeg = shutil.which("ffmpeg") if ffmpeg is None: - raise RuntimeError("Neither torchvision.io.write_video nor ffmpeg CLI available.") + raise RuntimeError("ffmpeg not found in PATH") cmd = [ ffmpeg, "-y", - "-f", "rawvideo", "-vcodec", "rawvideo", - "-s", f"{outW}x{outH}", "-pix_fmt", "rgb24", - "-r", str(fps), "-i", "-", - "-c:v", "libx264", "-pix_fmt", "yuv420p", - "-crf", "23", "-preset", "fast", + "-f", "rawvideo", + "-vcodec", "rawvideo", + "-pix_fmt", "rgb24", + "-s", f"{outW}x{outH}", + "-r", str(fps), + "-i", "-", + "-an", + "-c:v", "libx264", + "-pix_fmt", "yuv420p", + "-crf", str(crf), + "-preset", preset, str(output_path), ] + proc = subprocess.Popen(cmd, stdin=subprocess.PIPE, stderr=subprocess.PIPE) - proc.stdin.write(video_tensor.numpy().tobytes()) - proc.stdin.close() - _, stderr = proc.communicate() - if proc.returncode != 0: - raise RuntimeError(f"ffmpeg failed (rc={proc.returncode}): {stderr.decode()}") + try: + assert proc.stdin is not None + assert proc.stderr is not None + + for t in range(T): + frame_chw = arr[t] + + if frame_chw.dtype != np.uint8: + frame_chw = frame_chw.astype(np.uint8, copy=False) + + if frame_chw.shape[0] == 1: + frame_chw = np.repeat(frame_chw, 3, axis=0) + elif frame_chw.shape[0] == 4: + frame_chw = frame_chw[:3] + + frame_hwc = np.transpose(frame_chw, (1, 2, 0)) # view + + if frame_hwc.shape[0] != outH or frame_hwc.shape[1] != outW: + frame_hwc = cv2.resize(frame_hwc, (outW, outH), interpolation=cv2.INTER_AREA) + + proc.stdin.write(frame_hwc.tobytes()) + + proc.stdin.close() + + stderr = proc.stderr.read() + rc = proc.wait() + if rc != 0: + raise RuntimeError(f"ffmpeg failed (rc={rc}): {stderr.decode(errors='replace')}") + finally: + try: + if proc.poll() is None: + proc.kill() + except Exception: + pass # --------------------------------------------------------------------------- # Helpers @@ -179,7 +186,7 @@ def _separate_numeric_and_image(episode_feats: dict): # Transpose (T,C,H,W) -> (T,H,W,C) when needed if arr.ndim == 4 and arr.shape[1] in (1, 3, 4) and arr.shape[2] > arr.shape[1]: arr = arr.transpose(0, 2, 3, 1) - image_data[zarr_key] = arr + image_data[zarr_key] = resize_video_thwc(arr) # resize image to 640x480 else: numeric_data[zarr_key] = arr else: @@ -293,32 +300,36 @@ def _build_example_annotations( # --------------------------------------------------------------------------- def convert_episode( - hdf5_path: Path, - zarr_episode_path: Path, + raw_path: Path, + output_dir: Path, + dataset_name: str, arm: str, extrinsics_key: str, fps: int, description: str = "", - mp4_dir: Path | None = None, + save_mp4: bool = False, chunk_timesteps: int = 100, example_annotations: bool = False, -) -> Path: +) -> tuple[Path, Path]: """Process one HDF5 file and write a .zarr episode. Returns the zarr episode path on success. """ - hdf5_path = Path(hdf5_path) - zarr_episode_path = Path(zarr_episode_path) - extrinsics = EXTRINSICS[extrinsics_key] episode_feats = EvaHD5Extractor.process_episode( - episode_path=hdf5_path, + episode_path=raw_path, arm=arm, extrinsics=extrinsics, ) + front_key = "images.front_img_1" + obs = episode_feats.get("observations") or {} + images_tchw = None + if save_mp4 and front_key in obs: + images_tchw = np.asarray(obs[front_key]) + numeric_data, image_data = _separate_numeric_and_image(episode_feats) numeric_data = _split_per_arm(numeric_data, arm) total_frames = _infer_total_frames(numeric_data, image_data) @@ -330,8 +341,8 @@ def convert_episode( else [] ) - ZarrWriter.create_and_write( - episode_path=zarr_episode_path, + zarr_path = ZarrWriter.create_and_write( + episode_path=output_dir / f"{dataset_name}.zarr", numeric_data=numeric_data or None, image_data=image_data or None, embodiment=embodiment, @@ -342,24 +353,23 @@ def convert_episode( enable_sharding=True, ) - logger.info("Wrote zarr episode: %s", zarr_episode_path) + logger.info("Wrote zarr episode: %s", zarr_path) - # Optional preview MP4 - front_key = "images.front_img_1" - obs = episode_feats.get("observations") or {} - if mp4_dir is not None and front_key in obs: - images_tchw = np.asarray(obs[front_key]) - mp4_path = Path(mp4_dir) / f"{hdf5_path.stem}_video.mp4" + del episode_feats + del obs + mp4_path = None + if save_mp4 and images_tchw is not None: + mp4_path = output_dir / f"{dataset_name}.mp4" try: logger.info("Saving preview MP4 to: %s", mp4_path) - save_preview_mp4(images_tchw, mp4_path, fps) + save_preview_mp4(images_tchw, mp4_path, fps, half_res=False) logger.info("Saved preview MP4: %s", mp4_path) except Exception: logger.warning( "Failed to save preview MP4 at %s:\n%s", mp4_path, traceback.format_exc() ) - return zarr_episode_path + return zarr_path, mp4_path # --------------------------------------------------------------------------- @@ -374,41 +384,23 @@ def main(args) -> None: args : argparse.Namespace Parsed command-line arguments (same shape as eva_to_lerobot). """ - raw_path = Path(args.raw_path) - output_base = Path(args.output_dir) - - episode_list = sorted(raw_path.glob("*.hdf5")) - if not episode_list: - raise ValueError(f"No .hdf5 files found in {raw_path}") - - EvaHD5Extractor.check_format(episode_list, image_compressed=getattr(args, "image_compressed", False)) - mp4_dir = output_base if getattr(args, "save_mp4", False) else None - chunk_timesteps = getattr(args, "chunk_timesteps", 100) - example_annotations = getattr( - args, - "example_language_annotations", - getattr(args, "example_annotations", False), - ) - - for hdf5_path in episode_list: - stem = hdf5_path.stem - zarr_episode_path = output_base / f"{stem}.zarr" - try: - convert_episode( - hdf5_path=hdf5_path, - zarr_episode_path=zarr_episode_path, - arm=args.arm, - extrinsics_key=getattr(args, "extrinsics_key", "x5Dec13_2"), - fps=getattr(args, "fps", 30), - description=getattr(args, "description", ""), - mp4_dir=mp4_dir, - chunk_timesteps=chunk_timesteps, - example_annotations=example_annotations, - ) - except Exception: - logger.error("Error converting %s:\n%s", hdf5_path, traceback.format_exc()) - continue + try: + zarr_path, mp4_path = convert_episode( + raw_path=Path(args.raw_path), + output_dir=Path(args.output_dir), + dataset_name=args.dataset_name, + arm=args.arm, + extrinsics_key=getattr(args, "extrinsics_key", "x5Dec13_2"), + fps=getattr(args, "fps", 30), + description=getattr(args, "description", ""), + chunk_timesteps=getattr(args, "chunk_timesteps", 100), + save_mp4=args.save_mp4, + ) + return zarr_path, mp4_path + except Exception: + logger.error("Error converting %s:\n%s", args.raw_path, traceback.format_exc()) + return None, None # --------------------------------------------------------------------------- @@ -422,6 +414,7 @@ def argument_parse(): parser.add_argument("--raw-path", type=Path, required=True, help="Directory containing raw HDF5 files.") parser.add_argument("--fps", type=int, default=30, help="Frames per second.") parser.add_argument("--output-dir", type=Path, required=True, help="Root output directory.") + parser.add_argument("--dataset-name", type=str, required=True, help="Name for dataset.") parser.add_argument("--arm", type=str, choices=["left", "right", "both"], default="both") parser.add_argument("--extrinsics-key", type=str, default="x5Dec13_2") parser.add_argument("--image-compressed", type=str2bool, default=False) diff --git a/egomimic/scripts/eva_process/run_eva_conversion.py b/egomimic/scripts/eva_process/run_eva_conversion.py index 39ee009a..ad11a6aa 100644 --- a/egomimic/scripts/eva_process/run_eva_conversion.py +++ b/egomimic/scripts/eva_process/run_eva_conversion.py @@ -19,7 +19,7 @@ from cloudpathlib import S3Path from ray.exceptions import OutOfMemoryError, RayTaskError, WorkerCrashedError -from eva_helper import lerobot_job, zarr_job +from eva_helper import zarr_job from egomimic.utils.aws.aws_data_utils import ( get_boto3_s3_client, @@ -31,6 +31,7 @@ create_default_engine, episode_hash_to_table_row, episode_table_to_df, + timestamp_ms_to_episode_hash, update_episode, ) @@ -187,7 +188,6 @@ def convert_one_bundle_impl( arm: str, description: str, extrinsics_key: str, - backend: str = "zarr", ) -> tuple[str, str, int]: s3_client = get_boto3_s3_client() hdf5_s3 = S3Path(data_h5_s3) @@ -228,82 +228,53 @@ def convert_one_bundle_impl( flush=True, ) job_kwargs = dict( - raw_path=str(tmp_dir), + raw_path=str(local_hdf5), output_dir=str(ds_parent), dataset_name=dataset_name, arm=arm, description=description or "", extrinsics_key=extrinsics_key, ) - if backend == "zarr": - zarr_job(**job_kwargs) - else: - lerobot_job(**job_kwargs) + zarr_path, mp4_path = zarr_job(**job_kwargs) frames = -1 - mp4_str = "" - path_for_sql = str(ds_path) - zarr_store_path = None - if backend == "zarr": - zarr_store_path = ds_parent / f"{stem}.zarr" - info = zarr_store_path / "zarr.json" - print(f"[DEBUG] Zarr metadata path: {info}", flush=True) - if info.exists(): - try: - meta = json.loads(info.read_text()) - print(f"[DEBUG] Zarr metadata keys: {list(meta.keys())}", flush=True) - frames = int(meta.get("attributes", {}).get("total_frames", -1)) - except Exception as e: - print(f"[ERR] Failed to parse zarr metadata {info}: {e}", flush=True) - frames = -1 - else: - print(f"[ERR] Zarr metadata not found: {info}", flush=True) - path_for_sql = f"{PROCESSED_REMOTE_PREFIX}/{stem}.zarr" + zarr_store_path = zarr_path + info = zarr_store_path / "zarr.json" + print(f"[DEBUG] Zarr metadata path: {info}", flush=True) + if info.exists(): + try: + meta = json.loads(info.read_text()) + print(f"[DEBUG] Zarr metadata keys: {list(meta.keys())}", flush=True) + frames = int(meta.get("attributes", {}).get("total_frames", -1)) + except Exception as e: + print(f"[ERR] Failed to parse zarr metadata {info}: {e}", flush=True) + frames = -1 else: - info = ds_path / "meta/info.json" - if info.exists(): - try: - meta = json.loads(info.read_text()) - frames = int(meta.get("total_frames", -1)) - except Exception: - frames = -1 - - mp4_candidates = list(ds_parent.glob(f"*{stem}*_video.mp4")) + list( - ds_path.glob("**/*_video.mp4") - ) - mp4_str = str(mp4_candidates[0]) if mp4_candidates else "" + print(f"[ERR] Zarr metadata not found: {info}", flush=True) + path_for_sql = f"{PROCESSED_REMOTE_PREFIX}/{stem}.zarr" try: out_bucket, out_prefix = _parse_s3_uri(s3_processed_dir, default_bucket=BUCKET) - if backend == "zarr" and zarr_store_path is not None: - ds_s3_prefix = f"{out_prefix.rstrip('/')}/{stem}.zarr".strip("/") - upload_dir_to_s3(str(zarr_store_path), out_bucket, prefix=ds_s3_prefix) - shutil.rmtree(str(zarr_store_path), ignore_errors=True) - print(f"[CLEANUP] Removed local zarr store: {zarr_store_path}", flush=True) - else: - ds_rel = ds_path.resolve().relative_to(PROCESSED_LOCAL_ROOT).as_posix() - ds_s3_prefix = f"{out_prefix.rstrip('/')}/{ds_rel}".strip("/") - upload_dir_to_s3(str(ds_path), out_bucket, prefix=ds_s3_prefix) - shutil.rmtree(str(ds_path), ignore_errors=True) - print(f"[CLEANUP] Removed local dataset dir: {ds_path}", flush=True) - if mp4_str: - mp4_obj = Path(mp4_str) - if mp4_obj.exists(): - mp4_rel = mp4_obj.resolve().relative_to(PROCESSED_LOCAL_ROOT).as_posix() - mp4_s3_key = f"{out_prefix.rstrip('/')}/{mp4_rel}".strip("/") - s3_client.upload_file(str(mp4_obj), out_bucket, mp4_s3_key) - mp4_obj.unlink(missing_ok=True) - print(f"[CLEANUP] Removed local mp4: {mp4_obj}", flush=True) + ds_s3_prefix = f"{out_prefix.rstrip('/')}/{dataset_name}.zarr".strip("/") + upload_dir_to_s3(str(zarr_store_path), out_bucket, prefix=ds_s3_prefix) + shutil.rmtree(str(zarr_store_path), ignore_errors=True) + print(f"[CLEANUP] Removed local zarr store: {zarr_store_path}", flush=True) + mp4_obj = Path(mp4_path) + mp4_rel = mp4_obj.resolve().relative_to(PROCESSED_LOCAL_ROOT).as_posix() + mp4_s3_key = f"{out_prefix.rstrip('/')}/{mp4_rel}".strip("/") + s3_client.upload_file(str(mp4_obj), out_bucket, mp4_s3_key) + mp4_obj.unlink(missing_ok=True) + print(f"[CLEANUP] Removed local mp4: {mp4_obj}", flush=True) except Exception as e: - print(f"[ERR] Failed to upload {ds_path} to S3: {e}", flush=True) - return path_for_sql, mp4_str, -2 + print(f"[ERR] Failed to upload {zarr_store_path} to S3: {e}", flush=True) + return zarr_path, mp4_path, -2 - return path_for_sql, mp4_str, frames + return zarr_path, mp4_path, frames except Exception as e: err_msg = f"[FAIL] {stem}: {e}\n{traceback.format_exc()}" print(err_msg, flush=True) - return path_for_sql, "", -1 + return zarr_path, "", -1 finally: shutil.rmtree(tmp_dir, ignore_errors=True) @@ -321,7 +292,6 @@ def convert_one_bundle_big(*args, **kwargs): def launch( dry: bool = False, skip_if_done: bool = False, - backend: str = "zarr", episode_hashes: list[str] | None = None, ): engine = create_default_engine() @@ -359,14 +329,10 @@ def launch( print(f"[SKIP] {name}: no matching row in SQL (app.episodes)", flush=True) continue - if backend == "zarr": - processed_path = (row.zarr_processed_path or "").strip() - processing_error = row.zarr_processing_error - path_field_name = "zarr_processed_path" - else: - processed_path = (row.processed_path or "").strip() - processing_error = row.processing_error - path_field_name = "processed_path" + processed_path = (row.zarr_processed_path or "").strip() + processing_error = row.zarr_processing_error + path_field_name = "zarr_processed_path" + if skip_if_done and len(processed_path) > 0: print(f"[SKIP] {name}: already has {path_field_name}='{processed_path}'", flush=True) @@ -387,6 +353,7 @@ def launch( arm = infer_arm_from_robot_name(getattr(row, "robot_name", None)) dataset_name = hdf5_s3.stem + dataset_name = timestamp_ms_to_episode_hash(name) out_dir = PROCESSED_LOCAL_ROOT s3out_dir = PROCESSED_REMOTE_PREFIX description = row.task_description or "" @@ -394,14 +361,11 @@ def launch( if dry: ds_path = (PROCESSED_LOCAL_ROOT / dataset_name).resolve() mp4_candidate = PROCESSED_LOCAL_ROOT / f"{name}_video.mp4" - if backend == "zarr": - # Zarr stored flat under eva: prefix/.zarr - mapped_ds = f"{PROCESSED_REMOTE_PREFIX}/{name}.zarr" - else: - mapped_ds = _map_processed_local_to_remote(ds_path) + # Zarr stored flat under eva: prefix/.zarr + mapped_ds = f"{PROCESSED_REMOTE_PREFIX}/{name}.zarr" mapped_mp4 = _map_processed_local_to_remote(mp4_candidate) - path_field_name = "zarr_processed_path" if backend == "zarr" else "processed_path" - mp4_field_name = "zarr_mp4_path" if backend == "zarr" else "mp4_path" + path_field_name = "zarr_processed_path" + mp4_field_name = "zarr_mp4_path" print( f"[DRY] {name}: arm={arm} | out_dir={out_dir}/{dataset_name}\n" f" desc-bytes={len(description.encode('utf-8'))}\n" @@ -421,7 +385,6 @@ def launch( arm, description, extrinsics_key, - backend, ) start_time = time.time() @@ -432,7 +395,6 @@ def launch( "start_time": start_time, "size": "small", "args": args_tuple, - "backend": backend, } if dry or not pending: @@ -454,54 +416,32 @@ def launch( try: ds_path, mp4_path, frames = ray.get(ref) - backend = info.get("backend", "zarr") row.num_frames = int(frames) if frames is not None else -1 - if backend == "zarr": - mapped_ds = ds_path - else: - mapped_ds = _map_processed_local_to_remote(ds_path) + mapped_ds = _map_processed_local_to_remote(ds_path) mapped_mp4 = _map_processed_local_to_remote(mp4_path) - if backend == "zarr": - if row.num_frames > 0: - row.zarr_processed_path = mapped_ds - row.zarr_mp4_path = mapped_mp4 - row.zarr_processing_error = "" - elif row.num_frames == -2: - row.zarr_processed_path = "" - row.zarr_mp4_path = "" - row.zarr_processing_error = "Upload Failed" - elif row.num_frames == -1: - row.zarr_processed_path = "" - row.zarr_mp4_path = "" - row.zarr_processing_error = "Zero Frames" - else: - row.zarr_processed_path = "" - row.zarr_mp4_path = "" - row.zarr_processing_error = "Conversion Failed Unhandled Error" - path_value = row.zarr_processed_path + if row.num_frames > 0: + row.zarr_processed_path = mapped_ds + row.zarr_mp4_path = mapped_mp4 + row.zarr_processing_error = "" + elif row.num_frames == -2: + row.zarr_processed_path = "" + row.zarr_mp4_path = "" + row.zarr_processing_error = "Upload Failed" + elif row.num_frames == -1: + row.zarr_processed_path = "" + row.zarr_mp4_path = "" + row.zarr_processing_error = "Zero Frames" else: - if row.num_frames > 0: - row.processed_path = mapped_ds - row.mp4_path = mapped_mp4 - row.processing_error = "" - elif row.num_frames == -2: - row.processed_path = "" - row.mp4_path = "" - row.processing_error = "Upload Failed" - elif row.num_frames == -1: - row.processed_path = "" - row.mp4_path = "" - row.processing_error = "Zero Frames" - else: - row.processed_path = "" - row.mp4_path = "" - row.processing_error = "Conversion Failed Unhandled Error" - path_value = row.processed_path + row.zarr_processed_path = "" + row.zarr_mp4_path = "" + row.zarr_processing_error = "Conversion Failed Unhandled Error" + path_value = row.zarr_processed_path + update_episode(engine, row) - path_field_name = "zarr_processed_path" if backend == "zarr" else "processed_path" + path_field_name = "zarr_processed_path" print( f"[OK] Updated SQL for {episode_key}: " f"{path_field_name}={path_value}, num_frames={row.num_frames}, " @@ -510,7 +450,7 @@ def launch( ) if row.num_frames > 0 and path_value: - mp4_val = row.zarr_mp4_path if backend == "zarr" else row.mp4_path + mp4_val = row.zarr_mp4_path benchmark_rows.append( { "episode_key": episode_key, @@ -522,7 +462,6 @@ def launch( ) except Exception as e: - backend = info.get("backend", "zarr") if _is_oom_exception(e) and info.get("size") == "small": print( f"[OOM] Episode {episode_key} failed on SMALL. Retrying on BIG...", @@ -545,16 +484,11 @@ def launch( row.num_frames = -1 error_msg = f"{type(e).__name__}: {e}" - path_field_name = "zarr_processed_path" if backend == "zarr" else "processed_path" + path_field_name = "zarr_processed_path" - if backend == "zarr": - row.zarr_mp4_path = "" - row.zarr_processed_path = "" - row.zarr_processing_error = error_msg - else: - row.mp4_path = "" - row.processed_path = "" - row.processing_error = error_msg + row.zarr_mp4_path = "" + row.zarr_processed_path = "" + row.zarr_processing_error = error_msg try: update_episode(engine, row) @@ -598,13 +532,6 @@ def main(): action="store_true", help="Skip episodes that already have a processed_path in SQL", ) - p.add_argument( - "--backend", - type=str, - choices=["lerobot", "zarr"], - default="zarr", - help="Output backend: 'zarr' (default) or 'lerobot'", - ) p.add_argument( "--ray-address", default="auto", help="Ray cluster address (default: auto)" ) @@ -648,7 +575,6 @@ def main(): launch( dry=args.dry_run, skip_if_done=args.skip_if_done, - backend=args.backend, episode_hashes=args.episode_hashes, ) diff --git a/egomimic/utils/aws/sql_tutorial.ipynb b/egomimic/utils/aws/sql_tutorial.ipynb index 99a37616..27d41d62 100644 --- a/egomimic/utils/aws/sql_tutorial.ipynb +++ b/egomimic/utils/aws/sql_tutorial.ipynb @@ -241,4 +241,4 @@ }, "nbformat": 4, "nbformat_minor": 5 -} +} \ No newline at end of file