diff --git a/egoloc_2D_demo.py b/egoloc_2D_demo.py index db3e96e..df462cc 100755 --- a/egoloc_2D_demo.py +++ b/egoloc_2D_demo.py @@ -492,17 +492,7 @@ def select_top_n_frames_from_json(json_path, n, frame_index=None, flag=None, rec else: return invalid_list, top_n_frames -def process_task( - credentials, - video_path, - action, - grid_size, - total_frames, - search_anchor, - speed_folder, - frame_index=None, - flag=None - ): +def process_task(credentials, video_path, action, grid_size, total_frames, search_anchor, speed_folder, frame_index=None, flag=None): """ Build a grid image, ask GPT-4V to pick the frame where the action starts or ends, and return the chosen frame index. diff --git a/egoloc_3D_demo.py b/egoloc_3D_demo.py index b1b586e..93d2697 100755 --- a/egoloc_3D_demo.py +++ b/egoloc_3D_demo.py @@ -8,9 +8,7 @@ └── Video-Depth-Anything/ ← git clone https://github.com/DepthAnything/Video-Depth-Anything.git """ -# --------------------------------------------------------------------------- # Imports -# --------------------------------------------------------------------------- import argparse import base64 import json @@ -35,9 +33,7 @@ except ImportError: # Allow import on machines without torch torch = None -# --------------------------------------------------------------------------- # External repos that *must* exist inside EgoLoc root -# --------------------------------------------------------------------------- try: import hamer # type: ignore except ImportError as e: @@ -57,9 +53,7 @@ warnings.filterwarnings("ignore", category=FutureWarning) plt.switch_backend("Agg") # headless plotting -# --------------------------------------------------------------------------- -# Helper – unpack Video‑Depth‑Anything *_depths.npz → per‑frame .npy -# --------------------------------------------------------------------------- +# Helper to unpack Video-Depth-Anything *_depths.npz to per-frame .npy def _unpack_depth_npz(depth_dir: Path) -> None: """Convert VDA’s *_depths.npz to individual .npy, skipping ones that exist.""" npz_files = list(depth_dir.glob("*_depths.npz")) @@ -79,9 +73,7 @@ def _unpack_depth_npz(depth_dir: Path) -> None: -# --------------------------------------------------------------------------- # Paths & repo‑root helpers -# --------------------------------------------------------------------------- SCRIPT_DIR = Path(__file__).resolve().parent REPO_ROOT = next( p @@ -91,12 +83,10 @@ def _unpack_depth_npz(depth_dir: Path) -> None: VDA_DIR = REPO_ROOT / "Video-Depth-Anything" -DEPTH_SCALE_M = 3.0 # pixel value 255 ↔ 3 m (linear scaling) +DEPTH_SCALE_M = 3.0 # pixel value 255 corresponds to 3m (linear scaling) MAX_FEEDBACKS = 1 -# --------------------------------------------------------------------------- # Depth-quality helpers -# --------------------------------------------------------------------------- def _is_invalid_inv(inv: np.ndarray) -> bool: """ Return True when the inverse-depth tensor is all-NaN/Inf or nearly flat. @@ -111,21 +101,17 @@ def count_bad_depth_tensors(depth_dir: Path) -> Tuple[int, int]: bad, total = 0, 0 for f in depth_dir.glob("pred_depth_*.npy"): total += 1 - inv = np.load(f, mmap_mode="r") # cheap, no RAM blow-up + inv = np.load(f, mmap_mode="r") # memory-mapped to avoid loading everything if _is_invalid_inv(inv): bad += 1 return bad, total -# --------------------------------------------------------------------------- # Basic helpers -# --------------------------------------------------------------------------- def get_json_path(video_name: str, base_dir: str): return os.path.join(base_dir, f"{video_name}_speed.json") -# -------------------------------------------------------------------------- # IMAGE HELPERS -# -------------------------------------------------------------------------- def image_resize(image, width=None, height=None, inter=cv2.INTER_AREA): """Resize *image* while preserving aspect ratio.""" if width is None and height is None: @@ -159,9 +145,7 @@ def image_resize_for_vlm(frame, inter=cv2.INTER_AREA): new_h = int(new_w / aspect) return cv2.resize(frame, (new_w, new_h), interpolation=inter) -# -------------------------------------------------------------------------- # VISION–LANGUAGE MODEL WRAPPER -# -------------------------------------------------------------------------- def extract_json_part(text_output: str) -> Optional[str]: """Extract the JSON fragment {"points":[...]} from GPT text output.""" text = text_output.strip().replace(" ", "").replace("\n", "") @@ -254,9 +238,7 @@ def scene_understanding(credentials: Dict[str, Any], frame: np.ndarray, prompt: return -1, content -# -------------------------------------------------------------------------- # GRID BUILDERS & FRAME-SELECTION UTILITIES (verbatim from 2-D) -# -------------------------------------------------------------------------- def create_frame_grid_with_keyframe(video_path: str, frame_indices: List[int], grid_size: int) -> np.ndarray: """Return a numbered frame grid (BGR uint8).""" spacer = 0 @@ -398,9 +380,7 @@ def select_and_filter_keyframes_with_anchor(sel: List[int], total_idx: List[int] return sorted(filtered) -# -------------------------------------------------------------------------- # FEEDBACK + TASK-SELECTION (verbatim logic) -# -------------------------------------------------------------------------- def determine_by_state(credentials, video_path, action, grid_size, total_frames, frame_index, anchor, speed_folder): """Ask GPT-4o if *frame_index* is valid contact/separation.""" prompt = ( @@ -596,9 +576,7 @@ def process_task(credentials, video_path, action, grid_size, total_frames, ancho return used[idx] -# --------------------------------------------------------------------------- # Depth video generation via Video‑Depth‑Anything -# --------------------------------------------------------------------------- def generate_depth_video_vda(video_path: str, depth_out_path: str, *, device: str = "cuda", encoder: str = "vits") -> Path: """Run Video‑Depth‑Anything once and save the raw depth video.""" video_path = Path(video_path).resolve() @@ -634,9 +612,7 @@ def generate_depth_video_vda(video_path: str, depth_out_path: str, *, device: st return depth_out_path -# --------------------------------------------------------------------------- # DEPTH-TENSOR REPAIR HELPERS -# --------------------------------------------------------------------------- def _invalid_depth_indices(depth_dir: Path) -> List[int]: """Return a list of frame indices whose depth tensors are unusable.""" bad_idx = [] @@ -655,17 +631,13 @@ def _remove_depth_tensors(depth_dir: Path, indices: List[int]) -> None: if f.exists(): f.unlink() -# --------------------------------------------------------------------------- # Debug logging helpers removed -# --------------------------------------------------------------------------- # _log_zero_speed no longer needed – provide empty stub to avoid NameError if # residual references remain. def _log_zero_speed(*args, **kwargs): pass -# ------------------------------------------------------------------------- # Depth loading helper -# --------------------------------------------------------------------------- def _load_depth(depth_dir: Path, idx: int) -> Optional[np.ndarray]: """ VDA stores **inverse depth** (bigger = nearer). @@ -677,12 +649,10 @@ def _load_depth(depth_dir: Path, idx: int) -> Optional[np.ndarray]: inv = np.load(f).astype(np.float32) # (H, W) if _is_invalid_inv(inv): # ← early-reject unusable tensor return None - depth = DEPTH_SCALE_M / (inv + 1e-6) # invert once, not twice + depth = DEPTH_SCALE_M / (inv + 1e-6) # invert once, not twice return depth -# --------------------------------------------------------------------------- # Simple camera projection helper -# --------------------------------------------------------------------------- def _pixel_to_camera(u: float, v: float, z: float, W: int, H: int): fx = fy = max(W, H) cx, cy = W / 2.0, H / 2.0 @@ -690,9 +660,7 @@ def _pixel_to_camera(u: float, v: float, z: float, W: int, H: int): Y = (v - cy) * z / fy return X, Y, z -# --------------------------------------------------------------------------- # HaMeR / ViTPose – created once, reused -# --------------------------------------------------------------------------- _HAMER_CACHE: Dict[str, ViTPoseModel] = {} @@ -720,9 +688,7 @@ def _get_vitpose_model(device: str = "cuda") -> ViTPoseModel: return _HAMER_CACHE["cpm"] -# --------------------------------------------------------------------------- # Wrist detection helper (depth‑guided + ViTPose) -# --------------------------------------------------------------------------- def _wrist_from_frame(frame_bgr: np.ndarray, gray_depth: np.ndarray, cpm: ViTPoseModel): # 1) Depth‑based hand ROI – nearest object in view nearest = gray_depth < np.percentile(gray_depth, 10) # closest 25 % @@ -730,7 +696,7 @@ def _wrist_from_frame(frame_bgr: np.ndarray, gray_depth: np.ndarray, cpm: ViTPos if n_lbl == 0: return None - # largest blob → hand / forearm + # largest blob is hand / forearm sizes = ndimage.sum(nearest, labels, range(1, n_lbl + 1)) hand_lbl = 1 + int(np.argmax(sizes)) mask = labels == hand_lbl @@ -746,7 +712,7 @@ def _wrist_from_frame(frame_bgr: np.ndarray, gray_depth: np.ndarray, cpm: ViTPos ) pose = cpm.predict_pose(roi_bgr[:, :, ::-1], [bbox])[0] - hand_kpts = pose["keypoints"][-21:] # right‑hand keypoints + hand_kpts = pose["keypoints"][-21:] # right-hand keypoints valid = hand_kpts[:, 2] > 0.35 if valid.sum() <= 3: return None @@ -755,9 +721,7 @@ def _wrist_from_frame(frame_bgr: np.ndarray, gray_depth: np.ndarray, cpm: ViTPos wrist_v = y0 + hand_kpts[0, 1] return float(wrist_u), float(wrist_v) -# --------------------------------------------------------------------------- # Hand Position Registration With ICP and Frame 0 Alignment Helper -# --------------------------------------------------------------------------- def register_hand_positions(pcd_root, hand3d_root, save_reg_root, threshold=0.03): """ Align per-frame 3-D hand positions to the first frame using point-to-point @@ -775,68 +739,60 @@ def register_hand_positions(pcd_root, hand3d_root, save_reg_root, threshold=0.03 Returns: None """ - os.makedirs(save_reg_root, exist_ok=True) # ensure output dir exists - for video in sorted(os.listdir(pcd_root)): # iterate each video folder - pcd_dir = os.path.join(pcd_root, video) # path to this video’s PLYs - hand3d_path = os.path.join(hand3d_root, f"{video}.json") # path to camera-frame hand JSON - if not os.path.isdir(pcd_dir) or not os.path.isfile(hand3d_path): # skip if either missing + os.makedirs(save_reg_root, exist_ok=True) + for video in sorted(os.listdir(pcd_root)): + pcd_dir = os.path.join(pcd_root, video) + hand3d_path = os.path.join(hand3d_root, f"{video}.json") + if not os.path.isdir(pcd_dir) or not os.path.isfile(hand3d_path): continue - hand3d = json.load(open(hand3d_path)) # load camera-frame keypoints + hand3d = json.load(open(hand3d_path)) plys = sorted([f for f in os.listdir(pcd_dir) if f.endswith('.ply')], - key=lambda x: int(os.path.splitext(x)[0])) # list PLYs in order - - first_pcd = None # reference cloud (frame 1) - odoms = [] # unused – kept from orig code - reg_hand_dict = {} # output dict: frameID → (x,y,z) - prev_pcd = None # helps store first_pcd - - for i, ply in enumerate(plys): # walk over every cloud - frame_id = str(i + 1) # JSON is 1-based indexing - pcd = o3d.io.read_point_cloud(os.path.join(pcd_dir, ply)) # load current cloud - pts = np.asarray(pcd.points) # numpy view of XYZ - pcd.points = o3d.utility.Vector3dVector(pts) # write back to Open3D cloud - - if i == 0: # first frame: no registration - odoms.append(np.eye(4)) # placeholder (unused) - if frame_id in hand3d: # store original hand if exists - h0 = np.array(hand3d[frame_id]) # camera-frame wrist point - reg_hand_dict[frame_id] = h0.tolist() # frame 1 becomes origin + key=lambda x: int(os.path.splitext(x)[0])) + + first_pcd = None # reference cloud (frame 1) + odoms = [] # unused - kept from orig code + reg_hand_dict = {} # output dict: frameID to (x,y,z) + prev_pcd = None # helps store first_pcd + + for i, ply in enumerate(plys): + frame_id = str(i + 1) # JSON is 1-based indexing + pcd = o3d.io.read_point_cloud(os.path.join(pcd_dir, ply)) + pts = np.asarray(pcd.points) + pcd.points = o3d.utility.Vector3dVector(pts) + + if i == 0: # first frame: no registration + odoms.append(np.eye(4)) # placeholder (unused) + if frame_id in hand3d: + h0 = np.array(hand3d[frame_id]) # camera-frame wrist point + reg_hand_dict[frame_id] = h0.tolist() # frame 1 becomes origin else: - if first_pcd is None: # cache reference once + if first_pcd is None: # cache reference once first_pcd = prev_pcd - reg = o3d.pipelines.registration.registration_icp( # ICP: current → first - pcd, # source cloud - first_pcd, # target cloud - threshold, # correspondence distance (m) - np.eye(4), # initial guess = identity - o3d.pipelines.registration. - TransformationEstimationPointToPoint() + reg = o3d.pipelines.registration.registration_icp( + pcd, # source cloud + first_pcd, # target cloud + threshold, # correspondence distance (m) + np.eye(4), # initial guess = identity + o3d.pipelines.registration.TransformationEstimationPointToPoint() ) - T = reg.transformation # 4×4 rigid transform + T = reg.transformation # 4x4 rigid transform - h = np.array(hand3d.get(str(i + 1), [np.nan, np.nan, np.nan])) # wrist in camera frame - h4 = np.append(h, 1) # homogeneous coordinate - h_reg = (T @ h4)[:3].tolist() # apply ICP transform - if frame_id in hand3d: # store if key exists + h = np.array(hand3d.get(str(i + 1), [np.nan, np.nan, np.nan])) + h4 = np.append(h, 1) # homogeneous coordinate + h_reg = (T @ h4)[:3].tolist() # apply ICP transform + if frame_id in hand3d: reg_hand_dict[frame_id] = h_reg - prev_pcd = pcd # keep for first_pcd assignment + prev_pcd = pcd # keep for first_pcd assignment # save globally registered trajectory for this video with open(os.path.join(save_reg_root, f"{video}.json"), 'w') as f: json.dump(reg_hand_dict, f, indent=4) print(f"Computed globally registered 3-D hand trajectory for {video}") -# --------------------------------------------------------------------------- # Robust percentile-flavoured detector -# --------------------------------------------------------------------------- -def contact_separation_from_speed( - speed_dict: Dict[int, float], - *, - active_pct: float = 30.0, # % of frames considered “active” - sigma: int = 5 -) -> Tuple[int, int]: +def contact_separation_from_speed(speed_dict: Dict[int, float], *, active_pct: float = 30.0, sigma: int = 5) -> Tuple[int, int]: """ Return first contact & last separation indices in a *very flat* speed curve. @@ -861,9 +817,7 @@ def contact_separation_from_speed( separation = int(len(active) - np.argmax(active[::-1])) return contact, separation -# --------------------------------------------------------------------------- # VLM Refinement -# --------------------------------------------------------------------------- def refine_with_vlm(current_idx: int, creds: Dict[str, Any], video_path: str, prompt: str) -> int: """Ask GPT‑4(o) to confirm or shift the key frame. Fall back to current_idx.""" cap = cv2.VideoCapture(video_path) @@ -876,9 +830,7 @@ def refine_with_vlm(current_idx: int, creds: Dict[str, Any], video_path: str, pr new_idx = scene_understanding(creds, frame, prompt) return current_idx if new_idx in (-1, None) else int(new_idx) -# --------------------------------------------------------------------------- # 3‑D hand‑speed extraction + visualisation -# --------------------------------------------------------------------------- def extract_3d_speed_and_visualize(video_path: str, output_dir: str, *, device: str = "cuda", encoder: str = "vits"): cpm = _get_vitpose_model(device) @@ -918,7 +870,6 @@ def extract_3d_speed_and_visualize(video_path: str, output_dir: str, *, device: else: raise RuntimeError("Depth repair failed after two attempts – aborting.") - # ── NEW: build coloured point clouds ───────────────────────────────── pcd_dir = output_dir / "pointclouds" / video_name if not (pcd_dir / "0.ply").exists(): print("[PCD] Building point clouds …") @@ -926,12 +877,10 @@ def extract_3d_speed_and_visualize(video_path: str, output_dir: str, *, device: else: print("[PCD] Reusing cached point clouds in", pcd_dir) - # ── NEW: global wrist registration ────────────────────────────────── cam_hand_dir = output_dir / "hand3d_cam" cam_hand_dir.mkdir(exist_ok=True) cam_hand_json = cam_hand_dir / f"{video_name}.json" - # --- write raw camera-frame wrist coords while scanning frames ------- cap_rgb = cv2.VideoCapture(video_path) if not cap_rgb.isOpened(): raise RuntimeError("Could not open RGB video.") @@ -993,7 +942,6 @@ def extract_3d_speed_and_visualize(video_path: str, output_dir: str, *, device: with open(reg_out_dir / f"{video_name}.json") as f: reg_hand = json.load(f) -# ── compute world-frame speed ──────────────────────────────────────── speed_dict = {} prev_xyz = None for idx in range(total_frames): @@ -1015,7 +963,6 @@ def extract_3d_speed_and_visualize(video_path: str, output_dir: str, *, device: speed = float(np.linalg.norm([dx,dy,dz])) # true world speed prev_xyz = xyz speed_dict[idx] = speed -# -------------------------------------------------------------------- speed_json_path = output_dir / f"{video_name}_speed.json" with open(speed_json_path, "w") as f: @@ -1032,9 +979,7 @@ def extract_3d_speed_and_visualize(video_path: str, output_dir: str, *, device: return speed_dict, str(speed_json_path), str(speed_vis_path), str(depth_vis_path) -# --------------------------------------------------------------------------- # Point Cloud Generation -# --------------------------------------------------------------------------- def _generate_pointclouds(depth_dir: Path, video_path: str, pcd_out_dir: Path, intrinsics: Optional[Tuple[float,float,float,float]] = None): """ Create a coloured .ply point cloud for every frame whose depth tensor exists. @@ -1076,15 +1021,11 @@ def _generate_pointclouds(depth_dir: Path, video_path: str, pcd_out_dir: Path, depth_trunc=4.0, convert_rgb_to_intensity=False ) pcd = o3d.geometry.PointCloud.create_from_rgbd_image(rgbd, intrinsic) - # flip to keep +Z forward, +Y up (matches your earlier flip in register func) -# pcd.transform([[1,0,0,0],[0,-1,0,0],[0,0,-1,0],[0,0,0,1]]) # < -------------------------- o3d.io.write_point_cloud(str(pcd_out_dir / f"{idx}.ply"), pcd, write_ascii=False) cap.release() print(f"[PCD] Generated {len(depth_files)} point clouds in {pcd_out_dir}") -# --------------------------------------------------------------------------- # Video Convert -# --------------------------------------------------------------------------- def convert_video(video_path: str, action: str, credentials: Dict[str, Any], grid_size: int, speed_folder: str, max_feedbacks: int = MAX_FEEDBACKS, repeat_times: int = 3): """Driver wrapper (identical logic to 2-D).""" cap = cv2.VideoCapture(video_path) @@ -1142,9 +1083,7 @@ def convert_video(video_path: str, action: str, credentials: Dict[str, Any], gri final_separate = -1 if not separate_list else int(round(np.mean(separate_list))) return final_contact, final_separate -# --------------------------------------------------------------------------- # Tiny visual helper -# --------------------------------------------------------------------------- def visualize_frame(video_path: str, idx: int, out_path: str, label: Optional[str] = None): if idx < 0: return @@ -1161,9 +1100,7 @@ def visualize_frame(video_path: str, idx: int, out_path: str, label: Optional[st print(f"Visualized frame {idx} to {out_path}") -# --------------------------------------------------------------------------- # CLI entry point (with feedback loop) -# --------------------------------------------------------------------------- def main(): parser = argparse.ArgumentParser("EgoLoc 3-D Demo (lite edition)") parser.add_argument("--video_path", required=True, help="Input video") @@ -1191,7 +1128,6 @@ def main(): ) args = parser.parse_args() - # ------------------- device selection ------------------- if args.device == "auto": device = "cuda" if torch and torch.cuda.is_available() else "cpu" elif args.device == "cuda" and (not torch or not torch.cuda.is_available()): @@ -1200,7 +1136,6 @@ def main(): else: device = args.device - # ------------------- step 1: 3-D speed ------------------- print("\n [1/3] Extracting 3-D hand speed and visualizing ...") speed_dict, speed_json, speed_vis, depth_vid = extract_3d_speed_and_visualize( args.video_path, @@ -1209,7 +1144,6 @@ def main(): encoder=args.encoder, ) - # ------------- step 2: temporal localisation ------------- print("\n [2/3] Locating contact/separation frames and visualizing ...") credentials = dotenv.dotenv_values(args.credentials) @@ -1224,14 +1158,12 @@ def main(): ) print(f"Resolved ➜ contact: {contact_idx}, separation: {separation_idx}") - # ------------- visualise keyframes ----------------------- video_name = Path(args.video_path).stem contact_vis = Path(args.output_dir) / f"{video_name}_contact_frame.png" separation_vis = Path(args.output_dir) / f"{video_name}_separation_frame.png" visualize_frame(args.video_path, contact_idx, str(contact_vis), "Contact") visualize_frame(args.video_path, separation_idx, str(separation_vis), "Separation") - # ---------------- step 3: save results ------------------- print("\n [3/3] Saving results ...") result = { "contact_frame": contact_idx, @@ -1241,7 +1173,6 @@ def main(): with open(result_path, "w") as f: json.dump(result, f, indent=2) - # ------------------- final console log ------------------- print("EgoLoc output\n", result) print(f"Result json : {result_path}") print(f"3-D speed json : {speed_json}") diff --git a/egoloc_bimanual_3d.py b/egoloc_bimanual_3d.py new file mode 100644 index 0000000..78fa6bb --- /dev/null +++ b/egoloc_bimanual_3d.py @@ -0,0 +1,1621 @@ +""" +EgoLoc-Plus 3‑D Demo – *bimanual* edition +Assumed repo layout (relative to this file): +EgoLoc/ +│ +├── ./ ← this script lives here (anywhere inside EgoLoc) +├── HaMeR/ ← git clone https://github.com/geopavlakos/hamer.git +└── Video-Depth-Anything/ ← git clone https://github.com/DepthAnything/Video-Depth-Anything.git +""" + +import argparse +import base64 +import json +import math +import os +import subprocess +import time +import warnings +from pathlib import Path +import open3d as o3d + +import cv2 +import dotenv # read .env creds for GPT-4o +import matplotlib.pyplot as plt +import numpy as np +from scipy import ndimage # connected‑component helper +from scipy.ndimage import gaussian_filter1d +from typing import List, Dict, Tuple, Optional, Any + +try: + import torch +except ImportError: # Allow import on machines without torch + torch = None + +# External repos that *must* exist inside EgoLoc root +try: + import hamer # type: ignore +except ImportError as e: + raise ImportError( + "HaMeR repo not found. Make sure you cloned it inside the EgoLoc root." + ) from e + +try: + from hamer.vitpose_model import ViTPoseModel # type: ignore +except ImportError as e: + raise ImportError( + "vitpose_model.py not found. It ships with HaMeR; " + "confirm your PYTHONPATH contains that folder." + ) from e + +warnings.filterwarnings("ignore", category=UserWarning) +warnings.filterwarnings("ignore", category=FutureWarning) +plt.switch_backend("Agg") # headless plotting + +# Helper to unpack Video-Depth-Anything *_depths.npz to per-frame .npy +def _unpack_depth_npz(depth_dir: Path) -> None: + """Convert VDA’s *_depths.npz to individual .npy, skipping ones that exist.""" + npz_files = list(depth_dir.glob("*_depths.npz")) + if not npz_files: + return # nothing to unpack + + arr = np.load(npz_files[0])["depths"] # (N, H, W) + created = 0 + for i, depth in enumerate(arr): + out_f = depth_dir / f"pred_depth_{i:06d}.npy" + if out_f.exists(): # keep the good tensor we already have + continue + np.save(out_f, depth.astype(np.float32)) + created += 1 + if created: + print(f"[VDA] Unpacked {created} new tensors") + + + +# Paths & repo‑root helpers +SCRIPT_DIR = Path(__file__).resolve().parent +REPO_ROOT = next( + p + for p in [SCRIPT_DIR] + list(SCRIPT_DIR.parents) + if (p / "Video-Depth-Anything").exists() +) + +VDA_DIR = REPO_ROOT / "Video-Depth-Anything" + +DEPTH_SCALE_M = 3.0 # pixel value 255 corresponds to 3m (linear scaling) +MAX_FEEDBACKS = 1 + +# Depth-quality helpers +def _is_invalid_inv(inv: np.ndarray) -> bool: + """ + Return True when the inverse-depth tensor is all-NaN/Inf or nearly flat. + """ + return (not np.isfinite(inv).any()) or np.nanstd(inv) < 1e-4 + +def count_bad_depth_tensors(depth_dir: Path) -> Tuple[int, int]: + """ + Scan *.npy files in *depth_dir* and count how many are unusable. + """ + bad, total = 0, 0 + for f in depth_dir.glob("pred_depth_*.npy"): + total += 1 + inv = np.load(f, mmap_mode="r") # memory-mapped to avoid loading everything + if _is_invalid_inv(inv): + bad += 1 + return bad, total + +# Basic helpers +def get_json_path(video_name: str, base_dir: str): + return os.path.join(base_dir, f"{video_name}_speed.json") + +def get_json_path_for_hand(video_name: str, base_dir: str, hand: str) -> str: + """Return speed JSON path for a specific hand or combined. + + hand ∈ {"right", "left", "either"}; "either" maps to the combined file. + """ + hand = (hand or "either").lower() + if hand == "either": + return os.path.join(base_dir, f"{video_name}_speed.json") + if hand not in {"right", "left"}: + raise ValueError("hand must be one of: right, left, either") + return os.path.join(base_dir, f"{video_name}_speed_{hand}.json") + +def image_resize(image, width=None, height=None, inter=cv2.INTER_AREA): + """Resize *image* while preserving aspect ratio.""" + if width is None and height is None: + return image + (h, w) = image.shape[:2] + if width is None: + r = height / float(h) + dim = (int(w * r), height) + else: + r = width / float(w) + dim = (width, int(h * r)) + return cv2.resize(image, dim, interpolation=inter) + +def image_resize_for_vlm(frame, inter=cv2.INTER_AREA): + """Resize an image so the short side ≤ 768 px and long side ≤ 2000 px.""" + h, w = frame.shape[:2] + aspect = w / h + max_short, max_long = 768, 2000 + if aspect > 1: + new_w = min(w, max_long) + new_h = int(new_w / aspect) + if new_h > max_short: + new_h = max_short + new_w = int(new_h * aspect) + else: + new_h = min(h, max_long) + new_w = int(new_h * aspect) + if new_w > max_short: + new_w = max_short + new_h = int(new_w / aspect) + return cv2.resize(frame, (new_w, new_h), interpolation=inter) + +def extract_json_part(text_output: str) -> Optional[str]: + """Extract the JSON fragment {"points":[...]} from GPT text output.""" + text = text_output.strip().replace(" ", "").replace("\n", "") + try: + idx = text.index('{"points":') + frag = text[idx:] + end = frag.index("}") + 1 + return frag[:end] + except ValueError: + print("Text received:", text_output) + return None + +def scene_understanding(credentials: Dict[str, Any], frame: np.ndarray, prompt: str, *, flag: Optional[str] = None, raw: bool = False): + """ + Vision-language helper. + + ── Return modes ─────────────────────────────────────────────── + • default ( flag is None and raw is False ): + → (first_point, full_text) + • feedback / “state” checks ( flag is not None or raw is True ): + → full_text (str) + """ + + frame = image_resize_for_vlm(frame) + _, buf = cv2.imencode(".jpg", frame) + b64 = base64.b64encode(buf).decode("utf-8") + messages = [ + { + "role": "user", + "content": [ + {"type": "text", "text": prompt}, + { + "type": "image_url", + "image_url": { + "url": f"data:image/jpeg;base64,{b64}", + "detail": "high", + }, + }, + ], + } + ] + + import openai + + if credentials.get("AZURE_OPENAI_API_KEY"): + from openai import AzureOpenAI + + client = AzureOpenAI( + api_version="2024-02-01", + azure_endpoint=credentials["AZURE_OPENAI_ENDPOINT"], + api_key=credentials["AZURE_OPENAI_API_KEY"], + ) + params = {"model": credentials["AZURE_OPENAI_DEPLOYMENT_NAME"]} + else: + from openai import OpenAI + + client = OpenAI( + api_key=credentials["OPENAI_API_KEY"], + base_url="https://api.chatanywhere.tech/v1", + ) + params = {"model": "gpt-4o"} + + params.update(dict(messages=messages, max_tokens=200, temperature=0.1, top_p=0.5)) + + retries = 0 + while retries < 5: + try: + result = client.chat.completions.create(**params) + break + except (openai.RateLimitError, openai.APIStatusError): + time.sleep(2) + retries += 1 + else: + raise RuntimeError("Failed to obtain GPT-4o response.") + + content = result.choices[0].message.content + + if raw or flag is not None: + return content + + frag = extract_json_part(content) + if frag is None: + return -1, content + try: + pts = json.loads(frag)["points"] + return (pts[0] if pts else -1), content + except Exception: # malformed JSON or no "points" + return -1, content + +def create_frame_grid_with_keyframe(video_path: str, frame_indices: List[int], grid_size: int) -> np.ndarray: + """Return a numbered frame grid (BGR uint8).""" + spacer = 0 + cap = cv2.VideoCapture(video_path) + frames = [] + for idx in frame_indices: + cap.set(cv2.CAP_PROP_POS_FRAMES, idx) + ok, frm = cap.read() + if not ok: + frm = ( + np.zeros_like(frames[0]) + if frames + else np.zeros((200, 200, 3), np.uint8) + ) + frm = image_resize(frm, width=200) + frames.append(frm) + cap.release() + + while len(frames) < grid_size**2: + frames.append(np.zeros_like(frames[0])) + + fh, fw = frames[0].shape[:2] + grid_h = grid_size * fh + (grid_size - 1) * spacer + grid_w = grid_size * fw + (grid_size - 1) * spacer + grid = np.ones((grid_h, grid_w, 3), np.uint8) * 255 + + for i in range(grid_size): + for j in range(grid_size): + k = i * grid_size + j + frm = frames[k] + max_d = int(min(frm.shape[:2]) * 0.5) + cc = (frm.shape[1] - max_d // 2, max_d // 2) + overlay = frm.copy() + cv2.circle(overlay, cc, max_d // 2, (255, 255, 255), -1) + frm = cv2.addWeighted(overlay, 0.3, frm, 0.7, 0) + cv2.circle(frm, cc, max_d // 2, (255, 255, 255), 2) + fs = max_d / 50 + txtsz = cv2.getTextSize(str(k + 1), cv2.FONT_HERSHEY_SIMPLEX, fs, 2)[0] + tx = frm.shape[1] - txtsz[0] // 2 - max_d // 2 + ty = txtsz[1] // 2 + max_d // 2 + cv2.putText( + frm, str(k + 1), (tx, ty), cv2.FONT_HERSHEY_SIMPLEX, fs, (0, 0, 0), 2 + ) + y1, y2 = i * (fh + spacer), (i + 1) * fh + i * spacer + x1, x2 = j * (fw + spacer), (j + 1) * fw + j * spacer + grid[y1:y2, x1:x2] = frm + return grid + + +def select_top_n_frames_from_json(json_path: str, n: int, frame_index: Optional[int] = None, flag: Optional[str] = None, receive_flag: Optional[str] = None): + """Return indices with lowest speed; behaviour altered by *flag*.""" + with open(json_path, "r") as f: + data = json.load(f) + items = list(data.items()) + if frame_index is None: + valid = [(int(i), s) for i, s in items if s != 0 and not math.isnan(s)] + else: + if flag == "feedback": + valid = [ + (int(i), s) + for i, s in items + if s != 0 and not math.isnan(s) and int(i) > frame_index + ] + inval = [ + int(i) + for i, s in items + if s != 0 and not math.isnan(s) and int(i) <= frame_index + ] + elif flag == "speed": + valid = [ + (int(i), s) + for i, s in items + if s != 0 and not math.isnan(s) and s < frame_index + ] + inval = [ + int(i) + for i, s in items + if s != 0 and not math.isnan(s) and s >= frame_index + ] + else: + valid = [ + (int(i), s) + for i, s in items + if s != 0 and not math.isnan(s) and int(i) != frame_index + ] + inval = [ + int(i) + for i, s in items + if s != 0 and not math.isnan(s) and int(i) == frame_index + ] + top = [idx for idx, _ in sorted(valid, key=lambda x: x[1])[:n]] + # Fallback: if no valid frames found, use evenly spaced frames + if not top: + print(f"[WARNING] No valid speed data found in {json_path}, using fallback frames") + total_frames = len(items) if items else n + if total_frames > 0: + # Generate evenly spaced frame indices + step = max(1, total_frames // n) + top = list(range(0, min(total_frames, n * step), step))[:n] + else: + top = list(range(n)) # Last resort fallback + + if receive_flag is None: + return top + + # Handle inval for receive_flag case + if 'inval' not in locals(): + inval = [] + return inval, top + + +def select_frames_near_average(indices: List[int], grid_size: int, total_frames: int, invalid: List[int], min_index: Optional[int] = None): + """Return *grid_size²* unique frames centred on average of *indices*.""" + + if not indices or len(indices) == 0: + avg = total_frames // 2 + else: + mean_val = np.mean(indices) + if np.isnan(mean_val) or np.isinf(mean_val): + # Use middle of video as fallback + avg = total_frames // 2 + else: + avg = round(mean_val) + + # Ensure avg is within valid range + avg = max(0, min(avg, total_frames - 1)) + used = [] + if avg not in invalid and (min_index is None or avg > min_index): + used.append(avg) + left, right = avg - 1, avg + 1 + while len(used) < grid_size**2: + if left >= 0: + if left not in invalid and (min_index is None or left > min_index): + used.insert(0, left) + left -= 1 + if len(used) >= grid_size**2: + break + if right < total_frames: + if right not in invalid and (min_index is None or right > min_index): + used.append(right) + right += 1 + return used[: grid_size**2] + + +def select_and_filter_keyframes_with_anchor(sel: List[int], total_idx: List[int], grid_size: int, anchor: str, video_path: str): + """Keep frames in first/second half depending on *anchor* ('start'/'end').""" + cap = cv2.VideoCapture(video_path) + total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) + cap.release() + if anchor == "start": + filtered = [i for i in sel if i < total_frames // 2] + if len(filtered) < grid_size: + extra = [ + i for i in total_idx if i not in filtered and i < total_frames // 2 + ] + filtered.extend(extra[: grid_size - len(filtered)]) + elif anchor == "end": + filtered = [i for i in sel if i >= total_frames // 2] + if len(filtered) < grid_size: + extra = [ + i for i in total_idx if i not in filtered and i >= total_frames // 2 + ] + filtered.extend(extra[: grid_size - len(filtered)]) + else: + raise ValueError("anchor must be 'start' or 'end'") + return sorted(filtered) + + +def determine_by_state(credentials, video_path, action, grid_size, total_frames, frame_index, anchor, speed_folder): + """Ask GPT-4o if *frame_index* is valid contact/separation.""" + prompt = ( + ( + "I will show an image of hand-object interaction. " + "You need to determine whether the hand and the object are in obvious contact. " + ) + if anchor == "start" + else ( + "I will show an image of hand-object interaction. " + "You need to determine whether the hand and the object are clearly separate. " + ) + ) + "Return only a single character: 1 for yes, 0 for no." + + grid = create_frame_grid_with_keyframe(video_path, [frame_index], 1) + res = scene_understanding(credentials, grid, prompt, raw=True) + s = str(res).strip() + if (s[:1] == "1") or frame_index > total_frames - 5: + return frame_index + return process_task( + credentials, + video_path, + action, + grid_size, + total_frames, + anchor, + speed_folder, + frame_index, + flag="feedback", + ) + + +def determine_by_speed(credentials, video_path, action, grid_size, total_frames, frame_index, anchor, speed_folder, *, hand: str = "either"): + """Reject frames whose speed exceeds 30-percentile threshold.""" + video_name = os.path.splitext(os.path.basename(video_path))[0] + json_path = get_json_path_for_hand(video_name, speed_folder, hand) + with open(json_path, "r") as f: + data = json.load(f) + valid = [(int(i), s) for i, s in data.items() if s != 0 and not math.isnan(s)] + if not valid: + return process_task( + credentials, + video_path, + action, + grid_size, + total_frames, + anchor, + speed_folder, + frame_index, + flag="speed", + ) + valid.sort(key=lambda x: x[1]) + threshold = valid[int(0.3 * len(valid))][1] + cur_speed = data.get(str(frame_index), 0) + if cur_speed <= threshold: + return frame_index + return process_task( + credentials, + video_path, + action, + grid_size, + total_frames, + anchor, + speed_folder, + frame_index, + flag="speed", + hand=hand, + ) + + +def feedback_contact(credentials, video_path, action, grid_size, total_frames, frame_start, max_fb, anchor, speed_folder, *, hand: str = "either"): + cnt = 0 + cur = frame_start + while cnt < max_fb: + new = determine_by_state( + credentials, + video_path, + action, + grid_size, + total_frames, + cur, + anchor, + speed_folder, + ) + if new is None: + return None + if new == cur: + new_sp = determine_by_speed( + credentials, + video_path, + action, + grid_size, + total_frames, + cur, + anchor, + speed_folder, + hand=hand, + ) + if new_sp == cur: + return cur + cur = new_sp + else: + cur = new + cnt += 1 + return cur + + +def feedback_separation(credentials, video_path, action, grid_size, total_frames, frame_end, max_fb, anchor, speed_folder, *, hand: str = "either"): + return feedback_contact( + credentials, + video_path, + action, + grid_size, + total_frames, + frame_end, + max_fb, + anchor, + speed_folder, + hand=hand, + ) + + +def process_task(credentials, video_path, action, grid_size, total_frames, anchor, speed_folder, frame_index=None, flag=None, *, hand: str = "either"): + """Core routine that builds frame grid, asks GPT-4o for best frame.""" + prompt_start = ( + f"I will show an image grid of numbered frames showing a hand-object action. " + f"Pick the number closest to when the action ({action}) STARTS. " + f"If the action is absent, choose -1. " + 'Return JSON strictly as {"points": []} with no extra text.' + ) + prompt_end = ( + f"I will show an image grid of numbered frames showing a hand-object action. " + f"Pick the number closest to when the action ({action}) ENDS. " + f"If the action has not ended, choose -1. " + 'Return JSON strictly as {"points": []} with no extra text.' + ) + prompt = prompt_start if anchor == "start" else prompt_end + + video_name = os.path.splitext(os.path.basename(video_path))[0] + json_path = get_json_path_for_hand(video_name, speed_folder, hand) + + if frame_index is None: + top = select_top_n_frames_from_json(json_path, 4) + total_idx = select_top_n_frames_from_json(json_path, total_frames) + filt = select_and_filter_keyframes_with_anchor( + top, total_idx, 4, anchor, video_path + ) or sorted(top) + used = select_frames_near_average(filt, grid_size, total_frames, []) + else: + if flag == "feedback": + invalid, top = select_top_n_frames_from_json( + json_path, 4, frame_index, flag, receive_flag="right" + ) + total_idx = select_top_n_frames_from_json( + json_path, total_frames, frame_index, flag + ) + filt = select_and_filter_keyframes_with_anchor( + top, total_idx, 4, anchor, video_path + ) or sorted(top) + used = select_frames_near_average( + filt, grid_size, total_frames, invalid, min_index=frame_index + ) + elif flag == "speed": + invalid, top = select_top_n_frames_from_json( + json_path, 4, frame_index, flag, receive_flag="right" + ) + total_idx = select_top_n_frames_from_json( + json_path, total_frames, frame_index, flag + ) + filt = select_and_filter_keyframes_with_anchor( + top, total_idx, 4, anchor, video_path + ) or sorted(top) + used = select_frames_near_average(filt, grid_size, total_frames, invalid) + else: + invalid, top = select_top_n_frames_from_json( + json_path, 4, frame_index, receive_flag="right" + ) + total_idx = select_top_n_frames_from_json( + json_path, total_frames, frame_index + ) + filt = select_and_filter_keyframes_with_anchor( + top, total_idx, 4, anchor, video_path + ) or sorted(top) + used = select_frames_near_average(filt, grid_size, total_frames, invalid) + + grid = create_frame_grid_with_keyframe(video_path, used, grid_size) + result = scene_understanding(credentials, grid, prompt) + if isinstance(result, tuple): + choice, _ = result + else: + choice = result + if choice == -1: + return None + idx = max(0, min(int(choice) - 1, len(used) - 1)) + return used[idx] + + +# Depth video generation via Video‑Depth‑Anything +def generate_depth_video_vda(video_path: str, depth_out_path: str, *, device: str = "cuda", encoder: str = "vits") -> Path: + """Run Video‑Depth‑Anything once and save the raw depth video.""" + video_path = Path(video_path).resolve() + depth_out_path = Path(depth_out_path).resolve() + depth_out_path.mkdir(parents=True, exist_ok=True) + + cmd = [ + "python", + "run.py", + "--input_video", + str(video_path), + "--output_dir", + str(depth_out_path), + "--encoder", + encoder, # or "vitl" if you want the large model + "--save_npz", # save per‑frame tensors + ] + if device == "cpu": + cmd.append("--fp32") # avoids half‑precision on CPU + + env = os.environ.copy() + env["PYTHONPATH"] = f"{VDA_DIR}:{env.get('PYTHONPATH', '')}" + + print("[VDA] Running:", " ".join(cmd)) + subprocess.run(cmd, check=True, cwd=VDA_DIR, env=env) + + # Unpack & sanity‑check + _unpack_depth_npz(depth_out_path) + if not any(depth_out_path.glob("pred_depth_*.npy")): + raise RuntimeError( + "[VDA] No pred_depth_*.npy tensors were produced – check the VDA output above." + ) + + return depth_out_path + +def _invalid_depth_indices(depth_dir: Path) -> List[int]: + """Return a list of frame indices whose depth tensors are unusable.""" + bad_idx = [] + for f in depth_dir.glob("pred_depth_*.npy"): + idx = int(f.stem.split("_")[-1]) + inv = np.load(f, mmap_mode="r") + if _is_invalid_inv(inv): + bad_idx.append(idx) + return bad_idx + + +def _remove_depth_tensors(depth_dir: Path, indices: List[int]) -> None: + """Delete pred_depth_XXXXXX.npy for the given indices (if they exist).""" + for idx in indices: + f = depth_dir / f"pred_depth_{idx:06d}.npy" + if f.exists(): + f.unlink() + +# Debug logging helpers removed +# _log_zero_speed no longer needed – provide empty stub to avoid NameError if +# residual references remain. +def _log_zero_speed(*args, **kwargs): + pass + +# Depth loading helper +def _load_depth(depth_dir: Path, idx: int) -> Optional[np.ndarray]: + """ + VDA stores **inverse depth** (bigger = nearer). + Convert to metric depth in metres and keep a useful range. + """ + f = depth_dir / f"pred_depth_{idx:06d}.npy" + if not f.exists(): + return None + inv = np.load(f).astype(np.float32) # (H, W) + if _is_invalid_inv(inv): # ← early-reject unusable tensor + return None + depth = DEPTH_SCALE_M / (inv + 1e-6) # invert once, not twice + return depth + +# Simple camera projection helper +def _pixel_to_camera(u: float, v: float, z: float, W: int, H: int): + fx = fy = max(W, H) + cx, cy = W / 2.0, H / 2.0 + X = (u - cx) * z / fx + Y = (v - cy) * z / fy + return X, Y, z + +# HaMeR / ViTPose – created once, reused +_HAMER_CACHE: Dict[str, ViTPoseModel] = {} + + +def _get_vitpose_model(device: str = "cuda") -> ViTPoseModel: + """Return a cached ViTPoseModel (no Detectron2 dependency).""" + if "cpm" in _HAMER_CACHE: + return _HAMER_CACHE["cpm"] + + import hamer.vitpose_model as _vpm # local import avoids side‑effects if unused + + _vpm.ROOT_DIR = "./hamer" + _vpm.VIT_DIR = "./hamer/third-party/ViTPose" + + cfg_rel = Path( + "configs/wholebody/2d_kpt_sview_rgb_img/topdown_heatmap/coco-wholebody/" + "ViTPose_huge_wholebody_256x192.py" + ) + ckpt_rel = Path("_DATA/vitpose_ckpts/vitpose+_huge/wholebody.pth") + + for _name, _dic in _vpm.ViTPoseModel.MODEL_DICT.items(): + _dic["config"] = str(Path(_vpm.VIT_DIR) / cfg_rel) + _dic["model"] = str(Path(_vpm.ROOT_DIR) / ckpt_rel) + + _HAMER_CACHE["cpm"] = ViTPoseModel(device) + return _HAMER_CACHE["cpm"] + + +# Wrist detection helper (depth‑guided + ViTPose) +def _wrist_from_frame( + frame_bgr: np.ndarray, + gray_depth: np.ndarray, + cpm: ViTPoseModel, +) -> Dict[str, Optional[Tuple[float, float]]]: + """ + Detect both wrists (left & right). + Returns: {"right": (u, v) | None, "left": (u, v) | None} + """ + # 1) Depth-guided candidate ROIs – expand to the closest *15 %* of depth + # pixels (empirically more tolerant when the hand is not the single + # nearest object). If this still fails we will fall back to a global + # ViTPose pass later. + nearest_pct = 15.0 # tweak here if needed (10 to 15 helps many cases) + nearest = gray_depth < np.percentile(gray_depth, nearest_pct) + labels, n_lbl = ndimage.label(nearest) + if n_lbl == 0: + # no obvious near-depth blobs – let global fallback handle it + n_lbl = 0 + + sizes = ndimage.sum(nearest, labels, range(1, n_lbl + 1)) + blobs = 1 + np.argsort(sizes)[-2:] # label IDs + + wrists = {"right": None, "left": None} + for lbl in blobs: + mask = labels == lbl + ys, xs = np.where(mask) + y0, y1 = ys.min(), ys.max() + x0, x1 = xs.min(), xs.max() + roi_bgr = frame_bgr[y0 : y1 + 1, x0 : x1 + 1] + + bbox = np.array([[0, 0, roi_bgr.shape[1] - 1, roi_bgr.shape[0] - 1, 1.0]], + dtype=np.float32) + pose = cpm.predict_pose(roi_bgr[:, :, ::-1], [bbox])[0] + kpts = pose["keypoints"] + kpts_l, kpts_r = kpts[-42:-21], kpts[-21:] + + for tag, hk in (("left", kpts_l), ("right", kpts_r)): + # lowered confidence threshold to 0.25 (more tolerant) + valid = hk[:, 2] > 0.25 + if valid.sum() <= 3: + continue + u = x0 + hk[0, 0] + v = y0 + hk[0, 1] + # if two candidates, keep the nearer one + if wrists[tag] is None or gray_depth[int(v), int(u)] < gray_depth[int(wrists[tag][1]), int(wrists[tag][0])]: + wrists[tag] = (float(u), float(v)) + + # 2) Fallback: if either wrist is still missing, run ViTPose on the + # full frame. This is slower but guarantees at least one attempt. + if wrists["left"] is None or wrists["right"] is None: + full_bbox = np.array([[0, 0, frame_bgr.shape[1] - 1, frame_bgr.shape[0] - 1, 1.0]], + dtype=np.float32) + pose_full = cpm.predict_pose(frame_bgr[:, :, ::-1], [full_bbox])[0] + kpts_full = pose_full["keypoints"] + kpts_l_full, kpts_r_full = kpts_full[-42:-21], kpts_full[-21:] + + for tag, hk in (("left", kpts_l_full), ("right", kpts_r_full)): + if wrists[tag] is not None: + continue # already found via depth-guided ROI + valid = hk[:, 2] > 0.25 + if valid.sum() <= 3: + continue + u, v = hk[0, 0], hk[0, 1] + wrists[tag] = (float(u), float(v)) + return wrists + +# Hand Position Registration With ICP and Frame 0 Alignment Helper +def register_hand_positions(pcd_root, hand3d_root, save_reg_root, threshold=0.03): + """ + Align per-frame 3-D hand positions to the first frame using point-to-point + ICP. + + Args: + pcd_root (str): Directory containing colored point clouds organised as + `