diff --git a/joint_angle_detection/1b_video_analytic.py b/joint_angle_detection/1b_video_analytic.py index b0123ec..0271720 100644 --- a/joint_angle_detection/1b_video_analytic.py +++ b/joint_angle_detection/1b_video_analytic.py @@ -2,17 +2,20 @@ import argparse from pathlib import Path +import pandas as pd import cv2 +import numpy as np from ultralytics import YOLO -from core import detect_joints, compute_angles, show_frame_with_data +from core import detect_joints, compute_angles, show_frame_with_data def parse_args(): p = argparse.ArgumentParser(description="Joint angle detection from video file") - p.add_argument("--video", type=str, required=True, help="Path to input video file") - p.add_argument("--model", type=str, default=None, help="Path to yolov8 pose weights (.pt)") + p.add_argument("--input", type=str, required=True, help="Path to input video file") + p.add_argument("--model", type=str, required=False, help="Path to YOLOv8 pose model") + # Bike angle detection removed p.add_argument("--side", type=str, default="auto", choices=["auto", "left", "right"], help="Which side to track") p.add_argument("--min_conf", type=float, default=0.5, help="Min keypoint confidence") p.add_argument("--output", type=str, default=None, help="Path to save output video (optional)") @@ -21,41 +24,34 @@ def parse_args(): def main(): args = parse_args() - - # Check video file exists - video_path = Path(args.video) - if not video_path.exists(): - print(f"Error: Video file not found: {video_path}") + input_path = Path(args.input) + + if not input_path.exists(): + print(f"Error: Input file not found: {input_path}") return - # load the pose model (default: yolov8m-pose for better accuracy) + # Load pose model default_model = Path(__file__).parent.parent / "models" / "yolov8m-pose.pt" model_path = Path(args.model) if args.model else default_model - - # Download if not exists if not model_path.exists(): print(f"Model not found at {model_path}, will download yolov8m-pose.pt...") model_path = "yolov8m-pose.pt" - model = YOLO(str(model_path)) - # Open video file - cap = cv2.VideoCapture(str(video_path)) + + + # Open video + cap = cv2.VideoCapture(str(input_path)) if not cap.isOpened(): - print(f"Error: could not open video file: {video_path}") + print(f"Error: could not open video file: {input_path}") return - # Get video properties fps = cap.get(cv2.CAP_PROP_FPS) width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) - - print(f"Video: {video_path.name}") - print(f"Resolution: {width}x{height}, FPS: {fps:.2f}, Frames: {total_frames}") print("Press 'q' to quit, 'space' to pause/resume.") - # Setup video writer if output specified writer = None if args.output: fourcc = cv2.VideoWriter_fourcc(*'mp4v') @@ -64,6 +60,7 @@ def main(): frame_num = 0 paused = False + results_data = [] try: while True: @@ -74,18 +71,29 @@ def main(): break frame_num += 1 - # 1) image -> joints joints = detect_joints(frame, model, side=args.side, min_conf=args.min_conf) - - # 2) joints -> angles angles = compute_angles(joints) if joints is not None else {} + + + show_frame_with_data(frame, joints, angles, window_name=f"Video Analysis - {input_path.name}") + + row_data = { + 'source_video': input_path.name, + 'frame_number': frame_num, + 'detected_crank_angle': angles.get('crank_angle'), + 'detected_knee_angle': angles.get('knee_angle'), + 'detected_hip_angle': angles.get('hip_angle'), + 'detected_elbow_angle': angles.get('elbow_angle'), + 'foot_x': joints['foot'][0] if joints and 'foot' in joints else None, + 'foot_y': joints['foot'][1] if joints and 'foot' in joints else None, + 'opposite_foot_x': joints['opposite_foot'][0] if joints and 'opposite_foot' in joints else None, + 'opposite_foot_y': joints['opposite_foot'][1] if joints and 'opposite_foot' in joints else None, + 'foot_conf': joints['foot'][2] if joints and 'foot' in joints else None, + 'opposite_foot_conf': joints['opposite_foot'][2] if joints and 'opposite_foot' in joints else None, + } + results_data.append(row_data) - # 3) show frame with overlay - show_frame_with_data(frame, joints, angles, window_name=f"Video Analysis - {video_path.name}") - - # Write to output if specified if writer: - # Need to get the visualized frame vis = frame.copy() if joints: for _, (x, y, conf) in joints.items(): @@ -93,11 +101,9 @@ def main(): cv2.circle(vis, (int(x), int(y)), 4, (0, 255, 0), -1) writer.write(vis) - # Show progress if frame_num % 30 == 0: print(f"Frame {frame_num}/{total_frames} ({100*frame_num/total_frames:.1f}%)") - # Handle key presses key = cv2.waitKey(1 if not paused else 0) & 0xFF if key == ord("q"): break @@ -110,6 +116,14 @@ def main(): if writer: writer.release() cv2.destroyAllWindows() + + # Save results to CSV + output_dir = Path(__file__).parent / "output" + output_dir.mkdir(exist_ok=True) + output_csv = output_dir / "synchronized_dataset.csv" + results_df = pd.DataFrame(results_data) + results_df.to_csv(output_csv, index=False) + print(f"Saved CSV to: {output_csv}") print(f"Processed {frame_num} frames.") diff --git a/joint_angle_detection/1c_csv_analytic.py b/joint_angle_detection/1c_csv_analytic.py new file mode 100644 index 0000000..cb8027c --- /dev/null +++ b/joint_angle_detection/1c_csv_analytic.py @@ -0,0 +1,93 @@ +# src/BikeFitting/video_analytic.py + +import argparse +from pathlib import Path +import pandas as pd + +import cv2 +import numpy as np +from ultralytics import YOLO + + +from core import detect_joints, compute_angles, show_frame_with_data + +def parse_args(): + p = argparse.ArgumentParser(description="Joint angle detection from video file") + p.add_argument("--input", type=str, required=False, default="../bike_angle_detection_model/data/dataset2.csv", help="Path to input CSV dataset") + p.add_argument("--frames_dir", type=str, required=False, default="../create_labeled_dataset/output/frames", help="Directory containing frame images") + p.add_argument("--model", type=str, required=False,default="./yolov8m-pose.pt", help="Path to YOLOv8 pose model") + # Bike angle detection removed + p.add_argument("--side", type=str, default="auto", choices=["auto", "left", "right"], help="Which side to track") + p.add_argument("--min_conf", type=float, default=0.5, help="Min keypoint confidence") + p.add_argument("--output", type=str, default="output/synchronized_dataset.csv", help="Path to save output video (optional)") + return p.parse_args() + + +def main(): + args = parse_args() + + input_csv = Path(args.input) + frames_dir = Path(args.frames_dir) + if not input_csv.exists(): + print(f"Error: Input CSV not found: {input_csv}") + return + if not frames_dir.exists(): + print(f"Error: Frames directory not found: {frames_dir}") + return + + # Load CSV and model + df = pd.read_csv(input_csv) + model = YOLO(args.model) + + updated_rows = [] + + for idx, row in df.iterrows(): + frame_path = frames_dir / Path(row['original_frame_path']).name if 'original_frame_path' in row else None + if frame_path is None or not frame_path.exists(): + print(f"Frame not found for row {idx}: {frame_path}") + continue + frame = cv2.imread(str(frame_path)) + if frame is None: + print(f"Could not read frame: {frame_path}") + continue + + if 'bike_angle_deg' in row: + bike_angle = row['bike_angle_deg'] + else : + bike_angle = None + + joints = detect_joints(frame, model, side=args.side, min_conf=args.min_conf) + angles = compute_angles(joints,bike_angle) if joints is not None else {} + + row_data = { + 'source_video': row['source_video'] if 'source_video' in row else None, + 'frame_path': row['frame_path'] if 'frame_path' in row else None, + 'frame_number': row['frame_number'] if 'frame_number' in row else None, + 'bike_angle_deg': bike_angle, + 'detected_knee_angle': angles.get('knee_angle'), + 'detected_hip_angle': angles.get('hip_angle'), + 'detected_elbow_angle': angles.get('elbow_angle'), + 'detected_crank_angle': angles.get('crank_angle'), + 'foot_x': joints['foot'][0] if joints and 'foot' in joints else None, + 'foot_y': joints['foot'][1] if joints and 'foot' in joints else None, + 'opposite_foot_x': joints['opposite_foot'][0] if joints and 'opposite_foot' in joints else None, + 'opposite_foot_y': joints['opposite_foot'][1] if joints and 'opposite_foot' in joints else None, + 'foot_conf': joints['foot'][2] if joints and 'foot' in joints else None, + 'opposite_foot_conf': joints['opposite_foot'][2] if joints and 'opposite_foot' in joints else None, + } + + if any(value is None for value in row_data.values()): + continue + + updated_rows.append(row_data) + # show frame + # show_frame_with_data(frame, joints, angles, window_name=f"Frame {idx}") + + + updated_df = pd.DataFrame(updated_rows) + updated_df.to_csv(args.output, index=False) + print(f"Saved updated CSV to: {args.output}") + + +if __name__ == "__main__": + main() diff --git a/joint_angle_detection/2_fill_crank_angle.py b/joint_angle_detection/2_fill_crank_angle.py index fd13020..2668428 100644 --- a/joint_angle_detection/2_fill_crank_angle.py +++ b/joint_angle_detection/2_fill_crank_angle.py @@ -14,26 +14,30 @@ Then uses a GP to attempt at filling in missing crank angle ''' +REQUIRED_COLS = [ + 'detected_knee_angle', 'detected_hip_angle', 'detected_elbow_angle', + 'foot_x', 'foot_y', 'opposite_foot_x', 'opposite_foot_y', + 'foot_conf', 'opposite_foot_conf' + ] + def parse_args(): p = argparse.ArgumentParser() p.add_argument("--method", type=str, default="gp", choices=["linear", "gp"]) + p.add_argument("--input_csv", type=str, required=False, default="output/synchronized_dataset.csv") + p.add_argument("--min_conf", type=float, default=0.8, help="Minimum confidence to consider a point valid") return p.parse_args() -def remove_erroneous_points(df: pd.DataFrame, min_conf: float = 0.8) -> pd.DataFrame: +def remove_erroneous_points(df: pd.DataFrame, min_conf: float) -> pd.DataFrame: df = df.copy() - required_cols = [ - 'detected_knee_angle', 'detected_hip_angle', 'detected_elbow_angle', - 'foot_x', 'foot_y', 'opposite_foot_x', 'opposite_foot_y', - 'foot_conf', 'opposite_foot_conf' - ] + cols = REQUIRED_COLS.copy() #if bike_angle_deg is in df, include if 'bike_angle_deg' in df.columns: - required_cols.insert(0, 'bike_angle_deg') + cols.insert(0, 'bike_angle_deg') # Remove rows with missing required features - missing_features = df[required_cols].isna().any(axis=1) + missing_features = df[cols].isna().any(axis=1) df = df[~missing_features].reset_index(drop=True) # Remove rows with low confidence @@ -87,8 +91,8 @@ def interpolate_crank_angle_gp(video_df: pd.DataFrame) -> pd.DataFrame: ] #if we have real or interpolated bike angle, include - if 'bike_angle_deg' in result_df.columns: - feature_cols.insert(0, 'bike_angle_deg') + #if 'bike_angle_deg' in result_df.columns: + # feature_cols.insert(0, 'bike_angle_deg') valid_mask = ~pd.isna(result_df["detected_crank_angle"]) missing_mask = ~valid_mask @@ -172,13 +176,13 @@ def interpolate_crank_angle(df: pd.DataFrame, method: str = "linear") -> pd.Data def main(): args = parse_args() - - input_path = Path("output/synchronized_dataset.csv") + + input_path = args.input_csv output_path = Path("output/synchronized_dataset_filled.csv") df = pd.read_csv(input_path) - df = remove_erroneous_points(df, min_conf=0.8) - + df = remove_erroneous_points(df, min_conf=args.min_conf) + if "detected_crank_angle" not in df.columns: print("Error: 'detected_crank_angle' not found") return diff --git a/joint_angle_detection/README.md b/joint_angle_detection/README.md index f69f826..179ee03 100644 --- a/joint_angle_detection/README.md +++ b/joint_angle_detection/README.md @@ -34,6 +34,25 @@ Adjust detection sensitivity: ``` python 1_live_camera.py --min_conf 0.3 ``` +There are scripts for running this from a locally saved video +``` +python 1b_video_analytic.py --input path/to/video +``` +Or from a csv including a list of frames such as that created by bike_angle_detection_model and create_labeled_dataset +``` +python 1b_csv_analytic.py +``` + +Further we have +``` +python 2_fill_crank_angle.py --min_conf .8 +``` +Which applies the Gaussian Process to low-confidence foot measurements + +``` +python o_plot_angles.py +``` +Which provides some charts of the relevant gaussian processes. ## Requirements diff --git a/joint_angle_detection/core.py b/joint_angle_detection/core.py index f5a7452..9b5dca6 100644 --- a/joint_angle_detection/core.py +++ b/joint_angle_detection/core.py @@ -23,6 +23,7 @@ def detect_joints( image: np.ndarray, model: YOLO, side: str = "auto", # "right", "left" or "auto" + bike_angle: float = None, min_conf: float = 0.5, ) -> Optional[Dict[str, Tuple[float, float, float]]]: """ @@ -142,7 +143,8 @@ def _angle(a: Tuple[float, float], b: Tuple[float, float], c: Tuple[float, float def compute_angles( - joints: Dict[str, Tuple[float, float, float]] + joints: Dict[str, Tuple[float, float, float]], + bike_angle: float = None ) -> Dict[str, float]: """ Takes the joint dict from detect_joints and returns a dict of angles @@ -153,11 +155,22 @@ def compute_angles( - "elbow_angle" (shoulder–elbow–hand) - "crank_angle" (knee–foot–opposite_foot) """ - + def adjust_from_bike_angle(vector: np.ndarray, bike_angle: float) -> np.ndarray: + if bike_angle is None: + return vector[0,0],vector[0,1] + #print("VECTOR:", vector) + # Simple adjustment: rotate the vector by the bike angle + rotation_matrix = cv2.getRotationMatrix2D((0, 0), bike_angle, 1) + adjusted_vector = cv2.warpAffine(vector, rotation_matrix, (vector.shape[1], vector.shape[0])) + #print("ADJUSTED VECTOR:", adjusted_vector) + + return adjusted_vector[0, 0], adjusted_vector[0, 1] + def pt(name: str) -> Optional[Tuple[float, float]]: if name not in joints: return None x, y, _ = joints[name] + x, y = adjust_from_bike_angle(np.array([[x, y]]), bike_angle) return (x, y) def maybe_angle(a: str, b: str, c: str) -> Optional[float]: @@ -190,6 +203,7 @@ def maybe_angle(a: str, b: str, c: str) -> Optional[float]: return angles + # 3) CAMERA INPUT (SOURCE 0) def open_camera(source: int = 0) -> cv2.VideoCapture: """ @@ -204,6 +218,7 @@ def show_frame_with_data( frame: np.ndarray, joints: Optional[Dict[str, Tuple[float, float, float]]], angles: Dict[str, float], + bike_angle: Optional[float] = None, window_name: str = "Bike Fitting", ) -> None: """ @@ -212,6 +227,13 @@ def show_frame_with_data( """ vis = frame.copy() + # Draw bike angle in top right if available + if bike_angle is not None: + text = f"Bike: {bike_angle:.1f} deg" + (tw, th), _ = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, 0.7, 2) + cv2.putText(vis, text, (vis.shape[1] - tw - 10, 30), + cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 255), 2, cv2.LINE_AA) + # Draw joints if joints: for _, (x, y, conf) in joints.items():