Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 44 additions & 30 deletions joint_angle_detection/1b_video_analytic.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,20 @@

import argparse
from pathlib import Path
import pandas as pd

import cv2
import numpy as np
from ultralytics import YOLO

from core import detect_joints, compute_angles, show_frame_with_data

from core import detect_joints, compute_angles, show_frame_with_data

def parse_args():
p = argparse.ArgumentParser(description="Joint angle detection from video file")
p.add_argument("--video", type=str, required=True, help="Path to input video file")
p.add_argument("--model", type=str, default=None, help="Path to yolov8 pose weights (.pt)")
p.add_argument("--input", type=str, required=True, help="Path to input video file")
p.add_argument("--model", type=str, required=False, help="Path to YOLOv8 pose model")
# Bike angle detection removed
p.add_argument("--side", type=str, default="auto", choices=["auto", "left", "right"], help="Which side to track")
p.add_argument("--min_conf", type=float, default=0.5, help="Min keypoint confidence")
p.add_argument("--output", type=str, default=None, help="Path to save output video (optional)")
Expand All @@ -21,41 +24,34 @@ def parse_args():

def main():
args = parse_args()

# Check video file exists
video_path = Path(args.video)
if not video_path.exists():
print(f"Error: Video file not found: {video_path}")
input_path = Path(args.input)

if not input_path.exists():
print(f"Error: Input file not found: {input_path}")
return

# load the pose model (default: yolov8m-pose for better accuracy)
# Load pose model
default_model = Path(__file__).parent.parent / "models" / "yolov8m-pose.pt"
model_path = Path(args.model) if args.model else default_model

# Download if not exists
if not model_path.exists():
print(f"Model not found at {model_path}, will download yolov8m-pose.pt...")
model_path = "yolov8m-pose.pt"

model = YOLO(str(model_path))

# Open video file
cap = cv2.VideoCapture(str(video_path))


# Open video
cap = cv2.VideoCapture(str(input_path))
if not cap.isOpened():
print(f"Error: could not open video file: {video_path}")
print(f"Error: could not open video file: {input_path}")
return

# Get video properties
fps = cap.get(cv2.CAP_PROP_FPS)
width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))

print(f"Video: {video_path.name}")
print(f"Resolution: {width}x{height}, FPS: {fps:.2f}, Frames: {total_frames}")
print("Press 'q' to quit, 'space' to pause/resume.")

# Setup video writer if output specified
writer = None
if args.output:
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
Expand All @@ -64,6 +60,7 @@ def main():

frame_num = 0
paused = False
results_data = []

try:
while True:
Expand All @@ -74,30 +71,39 @@ def main():
break
frame_num += 1

# 1) image -> joints
joints = detect_joints(frame, model, side=args.side, min_conf=args.min_conf)

# 2) joints -> angles
angles = compute_angles(joints) if joints is not None else {}


show_frame_with_data(frame, joints, angles, window_name=f"Video Analysis - {input_path.name}")

row_data = {
'source_video': input_path.name,
'frame_number': frame_num,
'detected_crank_angle': angles.get('crank_angle'),
'detected_knee_angle': angles.get('knee_angle'),
'detected_hip_angle': angles.get('hip_angle'),
'detected_elbow_angle': angles.get('elbow_angle'),
'foot_x': joints['foot'][0] if joints and 'foot' in joints else None,
'foot_y': joints['foot'][1] if joints and 'foot' in joints else None,
'opposite_foot_x': joints['opposite_foot'][0] if joints and 'opposite_foot' in joints else None,
'opposite_foot_y': joints['opposite_foot'][1] if joints and 'opposite_foot' in joints else None,
'foot_conf': joints['foot'][2] if joints and 'foot' in joints else None,
'opposite_foot_conf': joints['opposite_foot'][2] if joints and 'opposite_foot' in joints else None,
}
results_data.append(row_data)

# 3) show frame with overlay
show_frame_with_data(frame, joints, angles, window_name=f"Video Analysis - {video_path.name}")

# Write to output if specified
if writer:
# Need to get the visualized frame
vis = frame.copy()
if joints:
for _, (x, y, conf) in joints.items():
if conf >= 0.5:
cv2.circle(vis, (int(x), int(y)), 4, (0, 255, 0), -1)
writer.write(vis)

# Show progress
if frame_num % 30 == 0:
print(f"Frame {frame_num}/{total_frames} ({100*frame_num/total_frames:.1f}%)")

# Handle key presses
key = cv2.waitKey(1 if not paused else 0) & 0xFF
if key == ord("q"):
break
Expand All @@ -110,6 +116,14 @@ def main():
if writer:
writer.release()
cv2.destroyAllWindows()

# Save results to CSV
output_dir = Path(__file__).parent / "output"
output_dir.mkdir(exist_ok=True)
output_csv = output_dir / "synchronized_dataset.csv"
results_df = pd.DataFrame(results_data)
results_df.to_csv(output_csv, index=False)
print(f"Saved CSV to: {output_csv}")
print(f"Processed {frame_num} frames.")


Expand Down
93 changes: 93 additions & 0 deletions joint_angle_detection/1c_csv_analytic.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
# src/BikeFitting/video_analytic.py

import argparse
from pathlib import Path
import pandas as pd

import cv2
import numpy as np
from ultralytics import YOLO


from core import detect_joints, compute_angles, show_frame_with_data

def parse_args():
p = argparse.ArgumentParser(description="Joint angle detection from video file")
p.add_argument("--input", type=str, required=False, default="../bike_angle_detection_model/data/dataset2.csv", help="Path to input CSV dataset")
p.add_argument("--frames_dir", type=str, required=False, default="../create_labeled_dataset/output/frames", help="Directory containing frame images")
p.add_argument("--model", type=str, required=False,default="./yolov8m-pose.pt", help="Path to YOLOv8 pose model")
# Bike angle detection removed
p.add_argument("--side", type=str, default="auto", choices=["auto", "left", "right"], help="Which side to track")
p.add_argument("--min_conf", type=float, default=0.5, help="Min keypoint confidence")
p.add_argument("--output", type=str, default="output/synchronized_dataset.csv", help="Path to save output video (optional)")
return p.parse_args()


def main():
args = parse_args()

input_csv = Path(args.input)
frames_dir = Path(args.frames_dir)
if not input_csv.exists():
print(f"Error: Input CSV not found: {input_csv}")
return
if not frames_dir.exists():
print(f"Error: Frames directory not found: {frames_dir}")
return

# Load CSV and model
df = pd.read_csv(input_csv)
model = YOLO(args.model)

updated_rows = []

for idx, row in df.iterrows():
frame_path = frames_dir / Path(row['original_frame_path']).name if 'original_frame_path' in row else None
if frame_path is None or not frame_path.exists():
print(f"Frame not found for row {idx}: {frame_path}")
continue
frame = cv2.imread(str(frame_path))
if frame is None:
print(f"Could not read frame: {frame_path}")
continue

if 'bike_angle_deg' in row:
bike_angle = row['bike_angle_deg']
else :
bike_angle = None

joints = detect_joints(frame, model, side=args.side, min_conf=args.min_conf)
angles = compute_angles(joints,bike_angle) if joints is not None else {}

row_data = {
'source_video': row['source_video'] if 'source_video' in row else None,
'frame_path': row['frame_path'] if 'frame_path' in row else None,
'frame_number': row['frame_number'] if 'frame_number' in row else None,
'bike_angle_deg': bike_angle,
'detected_knee_angle': angles.get('knee_angle'),
'detected_hip_angle': angles.get('hip_angle'),
'detected_elbow_angle': angles.get('elbow_angle'),
'detected_crank_angle': angles.get('crank_angle'),
'foot_x': joints['foot'][0] if joints and 'foot' in joints else None,
'foot_y': joints['foot'][1] if joints and 'foot' in joints else None,
'opposite_foot_x': joints['opposite_foot'][0] if joints and 'opposite_foot' in joints else None,
'opposite_foot_y': joints['opposite_foot'][1] if joints and 'opposite_foot' in joints else None,
'foot_conf': joints['foot'][2] if joints and 'foot' in joints else None,
'opposite_foot_conf': joints['opposite_foot'][2] if joints and 'opposite_foot' in joints else None,
}

if any(value is None for value in row_data.values()):
continue

updated_rows.append(row_data)
# show frame
# show_frame_with_data(frame, joints, angles, window_name=f"Frame {idx}")


updated_df = pd.DataFrame(updated_rows)
updated_df.to_csv(args.output, index=False)
print(f"Saved updated CSV to: {args.output}")


if __name__ == "__main__":
main()
32 changes: 18 additions & 14 deletions joint_angle_detection/2_fill_crank_angle.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,26 +14,30 @@
Then uses a GP to attempt at filling in missing crank angle
'''

REQUIRED_COLS = [
'detected_knee_angle', 'detected_hip_angle', 'detected_elbow_angle',
'foot_x', 'foot_y', 'opposite_foot_x', 'opposite_foot_y',
'foot_conf', 'opposite_foot_conf'
]

def parse_args():
p = argparse.ArgumentParser()
p.add_argument("--method", type=str, default="gp", choices=["linear", "gp"])
p.add_argument("--input_csv", type=str, required=False, default="output/synchronized_dataset.csv")
p.add_argument("--min_conf", type=float, default=0.8, help="Minimum confidence to consider a point valid")
return p.parse_args()

def remove_erroneous_points(df: pd.DataFrame, min_conf: float = 0.8) -> pd.DataFrame:
def remove_erroneous_points(df: pd.DataFrame, min_conf: float) -> pd.DataFrame:
df = df.copy()

required_cols = [
'detected_knee_angle', 'detected_hip_angle', 'detected_elbow_angle',
'foot_x', 'foot_y', 'opposite_foot_x', 'opposite_foot_y',
'foot_conf', 'opposite_foot_conf'
]
cols = REQUIRED_COLS.copy()

#if bike_angle_deg is in df, include
if 'bike_angle_deg' in df.columns:
required_cols.insert(0, 'bike_angle_deg')
cols.insert(0, 'bike_angle_deg')

# Remove rows with missing required features
missing_features = df[required_cols].isna().any(axis=1)
missing_features = df[cols].isna().any(axis=1)
df = df[~missing_features].reset_index(drop=True)

# Remove rows with low confidence
Expand Down Expand Up @@ -87,8 +91,8 @@ def interpolate_crank_angle_gp(video_df: pd.DataFrame) -> pd.DataFrame:
]

#if we have real or interpolated bike angle, include
if 'bike_angle_deg' in result_df.columns:
feature_cols.insert(0, 'bike_angle_deg')
#if 'bike_angle_deg' in result_df.columns:
# feature_cols.insert(0, 'bike_angle_deg')

valid_mask = ~pd.isna(result_df["detected_crank_angle"])
missing_mask = ~valid_mask
Expand Down Expand Up @@ -172,13 +176,13 @@ def interpolate_crank_angle(df: pd.DataFrame, method: str = "linear") -> pd.Data

def main():
args = parse_args()
input_path = Path("output/synchronized_dataset.csv")

input_path = args.input_csv
output_path = Path("output/synchronized_dataset_filled.csv")

df = pd.read_csv(input_path)
df = remove_erroneous_points(df, min_conf=0.8)
df = remove_erroneous_points(df, min_conf=args.min_conf)

if "detected_crank_angle" not in df.columns:
print("Error: 'detected_crank_angle' not found")
return
Expand Down
19 changes: 19 additions & 0 deletions joint_angle_detection/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,25 @@ Adjust detection sensitivity:
```
python 1_live_camera.py --min_conf 0.3
```
There are scripts for running this from a locally saved video
```
python 1b_video_analytic.py --input path/to/video
```
Or from a csv including a list of frames such as that created by bike_angle_detection_model and create_labeled_dataset
```
python 1b_csv_analytic.py
```

Further we have
```
python 2_fill_crank_angle.py --min_conf .8
```
Which applies the Gaussian Process to low-confidence foot measurements

```
python o_plot_angles.py
```
Which provides some charts of the relevant gaussian processes.

## Requirements

Expand Down
Loading