diff --git a/preprocessing/sports/tracking_data/tracking_class.py b/preprocessing/sports/tracking_data/tracking_class.py index 06b74c1..ff6ac95 100644 --- a/preprocessing/sports/tracking_data/tracking_class.py +++ b/preprocessing/sports/tracking_data/tracking_class.py @@ -4,7 +4,7 @@ class Tracking_data: soccer_data_provider = ["soccer"] - ultimate_data_provider = ["ultimate_track"] + ultimate_data_provider = ["UltimateTrack", "UFA"] handball_data_provider = [] rocket_league_data_provider = [] @@ -22,64 +22,120 @@ def __new__(cls, data_provider, *args, **kwargs): if __name__ == "__main__": + import argparse import os - # Test Soccer tracking data - print("Testing Soccer tracking data...") - game_id = 0 # Select the index from the list of files in the data_path. - data_path = os.getcwd() + "/test/sports/event_data/data/datastadium/" - - try: - # Call the function for soccer directly - soccer_tracker = Soccer_tracking_data() - tracking_home, tracking_away, jerseynum_df = ( - soccer_tracker.process_datadium_tracking_data(game_id, data_path, test=True) - ) - - os.makedirs(os.path.dirname(data_path), exist_ok=True) - tracking_home.to_csv( - os.getcwd() - + "/test/sports/event_data/data/datastadium/test_tracking_home.csv", - index=False, - ) - tracking_away.to_csv( - os.getcwd() - + "/test/sports/event_data/data/datastadium/test_tracking_away.csv", - index=False, - ) - jerseynum_df.to_csv( - os.getcwd() + "/test/sports/event_data/data/datastadium/test_jerseynum.csv", - index=False, - ) - print("Soccer test completed successfully!") - except Exception as e: - print(f"Soccer test failed: {e}") - - # Test Ultimate Track data - print("\nTesting Ultimate Track data...") - ultimate_game_id = 0 # Select the first CSV file - ultimate_data_path = os.getcwd() + "/test/sports/tracking_data/data/ultimatetrack/" - - try: - # Call the function for Ultimate Track directly - ultimate_tracker = Ultimate_tracking_data() - tracking_offense, tracking_defense, team_info_df = ( - ultimate_tracker.process_ultimatetrack_tracking_data( - ultimate_game_id, ultimate_data_path, test=True + parser = argparse.ArgumentParser(description="Test tracking data processing") + parser.add_argument( + "data_provider", + type=str, + default="soccer", + help="Data provider to use (e.g., 'soccer', 'UltimateTrack', 'UFA')", + ) + args = parser.parse_args() + + data_provider = args.data_provider + + if data_provider in Tracking_data.soccer_data_provider: + # Test Soccer tracking data + print("Testing Soccer tracking data...") + game_id = 0 # Select the index from the list of files in the data_path. + data_path = os.getcwd() + "/test/sports/event_data/data/datastadium/" + + try: + # Call the function for soccer directly + soccer_tracker = Soccer_tracking_data() + tracking_home, tracking_away, jerseynum_df = ( + soccer_tracker.process_datadium_tracking_data( + game_id, data_path, test=True + ) + ) + + os.makedirs(os.path.dirname(data_path), exist_ok=True) + tracking_home.to_csv( + os.getcwd() + + "/test/sports/event_data/data/datastadium/test_tracking_home.csv", + index=False, + ) + tracking_away.to_csv( + os.getcwd() + + "/test/sports/event_data/data/datastadium/test_tracking_away.csv", + index=False, + ) + jerseynum_df.to_csv( + os.getcwd() + + "/test/sports/event_data/data/datastadium/test_jerseynum.csv", + index=False, + ) + print("Soccer test completed successfully!") + except Exception as e: + print(f"Soccer test failed: {e}") + + elif data_provider in Tracking_data.ultimate_data_provider: + if data_provider == "UFA": + # Test UFA data + print("\nTesting UFA data...") + data_path = os.getcwd() + "/test/sports/tracking_data/UFA/2_1.txt" + + try: + # Call the function for UFA directly + ufa_tracker = Ultimate_tracking_data("UFA", data_path) + tracking_offense, tracking_defense, events_df = ( + ufa_tracker.preprocessing() + ) + + # Create output directory if it doesn't exist + output_dir = os.getcwd() + "/test/sports/tracking_data/UFA/metrica/" + base_name = os.path.splitext(os.path.basename(data_path))[0] + os.makedirs(output_dir, exist_ok=True) + + tracking_offense.to_csv( + os.path.join(output_dir, f"{base_name}_home.csv"), index=False + ) + tracking_defense.to_csv( + os.path.join(output_dir, f"{base_name}_away.csv"), index=False + ) + events_df.to_csv( + os.path.join(output_dir, f"{base_name}_events.csv"), index=False + ) + print("UFA test completed successfully!") + print(f"UFA data path: {data_path}") + except Exception as e: + print(f"UFA test failed: {e}") + + elif data_provider == "UltimateTrack": + # Test Ultimate Track data + print("\nTesting Ultimate Track data...") + data_path = ( + os.getcwd() + "/test/sports/tracking_data/UltimateTrack/1_1_1.csv" ) - ) - - # Create output directory if it doesn't exist - os.makedirs(ultimate_data_path, exist_ok=True) - - tracking_offense.to_csv( - ultimate_data_path + "test_tracking_offense.csv", index=False - ) - tracking_defense.to_csv( - ultimate_data_path + "test_tracking_defense.csv", index=False - ) - team_info_df.to_csv(ultimate_data_path + "test_team_info.csv", index=False) - print("Ultimate Track test completed successfully!") - print(f"Ultimate Track data path: {ultimate_data_path}") - except Exception as e: - print(f"Ultimate Track test failed: {e}") + + try: + # Call the function for Ultimate Track directly + ultimatetrack_tracker = Ultimate_tracking_data( + "UltimateTrack", data_path + ) + tracking_offense, tracking_defense, events_df = ( + ultimatetrack_tracker.preprocessing() + ) + + # Create output directory if it doesn't exist + output_dir = ( + os.getcwd() + "/test/sports/tracking_data/UltimateTrack/metrica/" + ) + base_name = os.path.splitext(os.path.basename(data_path))[0] + os.makedirs(output_dir, exist_ok=True) + + tracking_offense.to_csv( + os.path.join(output_dir, f"{base_name}_home.csv"), index=False + ) + tracking_defense.to_csv( + os.path.join(output_dir, f"{base_name}_away.csv"), index=False + ) + events_df.to_csv( + os.path.join(output_dir, f"{base_name}_events.csv"), index=False + ) + print("Ultimate Track test completed successfully!") + print(f"Ultimate Track data path: {data_path}") + except Exception as e: + print(f"Ultimate Track test failed: {e}") diff --git a/preprocessing/sports/tracking_data/ultimate/README.md b/preprocessing/sports/tracking_data/ultimate/README.md new file mode 100644 index 0000000..d1db731 --- /dev/null +++ b/preprocessing/sports/tracking_data/ultimate/README.md @@ -0,0 +1,77 @@ +# Ultimate Frisbee Tracking Data Preprocessing + +This module provides preprocessing functionality for Ultimate Frisbee tracking data from multiple data providers, converting them into a standardized Metrica format for analysis. + +## Overview + +The Ultimate tracking data preprocessing system supports two main data providers: +- **UFA (Ultimate Frisbee Analytics)**: Professional Ultimate game data +- **Ultimate Track**: Research-grade tracking data with detailed motion features + +## Data Providers + +### UFA Data Provider +- **Input Format**: CSV/TXT files with player and disc positions +- **Features**: Player positions, velocities, disc tracking, holder identification +- **Output**: Metrica format with home/away team separation + +### Ultimate Track Data Provider +- **Input Format**: CSV files with raw tracking data +- **Features**: Enhanced motion analysis with velocity/acceleration calculations +- **Output**: Metrica format with calculated motion features + +## Architecture + +``` +ultimate/ +├── ultimate_tracking_class.py # Main interface class +├── ufa_preprocessing/ # UFA data processing +│ ├── preprocessing.py # UFA-specific preprocessing functions +│ ├── preprocess_config.py # UFA configuration constants +│ └── README.md # UFA module documentation +└── ultimatetrack_preprocessing/ # Ultimate Track data processing + ├── preprocessing.py # Ultimate Track preprocessing functions + ├── preprocess_config.py # Ultimate Track configuration constants + └── __init__.py +``` + +## Usage + +```python +from preprocessing.sports.tracking_data.ultimate import Ultimate_tracking_data + +# For UFA data +ufa_tracker = Ultimate_tracking_data("UFA", "/path/to/ufa_data.csv") +home_df, away_df, events_df = ufa_tracker.preprocessing() + +# For Ultimate Track data +ut_tracker = Ultimate_tracking_data("UltimateTrack", "/path/to/ut_data.csv") +home_df, away_df, events_df = ut_tracker.preprocessing() +``` + +## Output Format + +All data providers output data in Metrica format with three DataFrames: + +### Home/Away DataFrames +- **MultiIndex columns**: Team, Player ID, Coordinate (X/Y) +- **Data**: Player positions over time with disc tracking +- **Frequency**: Configurable based on data provider (10Hz for UFA, 15Hz for Ultimate Track) + +### Events DataFrame +- **Columns**: Team, Type, Subtype, Period, Start Frame, Start Time, End Frame, End Time, From, To, Start X, Start Y, End X, End Y +- **Data**: Disc possession events and position data per frame + +## Configuration + +Each data provider has its own configuration file defining: +- Field dimensions and specifications +- Players per team (7 for Ultimate Frisbee) +- Tracking frequency and coordinate scaling +- Column mappings and processing parameters + +## Dependencies + +- pandas: Data manipulation and analysis +- numpy: Numerical computations +- Standard Python libraries (os, argparse for CLI usage) \ No newline at end of file diff --git a/preprocessing/sports/tracking_data/ultimate/ufa_preprocessing/__init__.py b/preprocessing/sports/tracking_data/ultimate/ufa_preprocessing/__init__.py new file mode 100644 index 0000000..b1c8b26 --- /dev/null +++ b/preprocessing/sports/tracking_data/ultimate/ufa_preprocessing/__init__.py @@ -0,0 +1,6 @@ +# UFA preprocessing module +from .preprocessing import preprocessing_for_ufa + +__all__ = [ + "preprocessing_for_ufa", +] diff --git a/preprocessing/sports/tracking_data/ultimate/ufa_preprocessing/preprocess_config.py b/preprocessing/sports/tracking_data/ultimate/ufa_preprocessing/preprocess_config.py new file mode 100644 index 0000000..b59184d --- /dev/null +++ b/preprocessing/sports/tracking_data/ultimate/ufa_preprocessing/preprocess_config.py @@ -0,0 +1,66 @@ +# -*- coding: utf-8 -*- +"""Configuration for UFA data processing. + +Attributes +---------- +field_length : float + The length of a Ultimate field (109.73 meters). +field_width : float + The width of a Ultimate field (37 meters). +players_per_team : int + Number of players per team in Ultimate (7). +tracking_herz : int + Frequency of tracking data (10 frames per second). +coordinate_scale : float + Scale factor for coordinate conversion. +""" + +# Ultimate field specifications (in meters for UFA data) +FIELD_LENGTH: float = 109.73 # 109.73 meters total field length +FIELD_WIDTH: float = 48.77 # 48.77 meters width +PLAYING_FIELD_LENGTH: float = 73.15 # 73.15 meters playing field (without end zones) +END_ZONE_LENGTH: float = 18.29 # 18.29 meters each end zone + +# Player configuration +PLAYERS_PER_TEAM: int = 7 # Standard Ultimate has 7 players per team +MAX_SUBSTITUTIONS: int = 0 # Unlimited substitutions in Ultimate + +# Data processing configuration +TRACKING_HERZ: int = 10 # UFA data frame rate (10 fps) +COORDINATE_SCALE: float = 1.0 # UFA data is in meters + +# Team identifiers +OFFENSE_TEAM: str = "offense" +DEFENSE_TEAM: str = "defense" +DISC_ENTITY: str = "disc" + +# Data columns mapping for UFA +UFA_COLUMNS = { + "frame": "frame", + "id": "id", + "x": "x", + "y": "y", + "vx": "vx", + "vy": "vy", + "ax": "ax", + "ay": "ay", + "v_mag": "v_mag", + "a_mag": "a_mag", + "v_angle": "v_angle", + "a_angle": "a_angle", + "diff_v_a_angle": "diff_v_a_angle", + "diff_v_angle": "diff_v_angle", + "diff_a_angle": "diff_a_angle", + "class": "class", + "holder": "holder", + "closest": "closest", + "selected": "selected", + "prev_holder": "prev_holder", + "def_selected": "def_selected", +} + +# Columns to remove from UFA data +COLUMNS_TO_REMOVE = ["selected", "prev_holder", "def_selected"] + +# File name patterns +DEFAULT_FILE_PATTERN = r"(\d+)_(\d+)\.txt" # quarter_possession.txt diff --git a/preprocessing/sports/tracking_data/ultimate/ufa_preprocessing/preprocessing.py b/preprocessing/sports/tracking_data/ultimate/ufa_preprocessing/preprocessing.py new file mode 100644 index 0000000..1600ec2 --- /dev/null +++ b/preprocessing/sports/tracking_data/ultimate/ufa_preprocessing/preprocessing.py @@ -0,0 +1,264 @@ +import numpy as np +import pandas as pd + +from .preprocess_config import ( + COLUMNS_TO_REMOVE, + COORDINATE_SCALE, + PLAYERS_PER_TEAM, + TRACKING_HERZ, +) + + +def preprocessing_for_ufa(data_path): + """ + Preprocessing function specifically for UFA data provider + + Args: + data_path: Path to the UFA data file (CSV/TXT format) + + Returns: + Tuple of (home_df, away_df, events_df): DataFrames in Metrica format + - home_df: Home team tracking data with MultiIndex columns + - away_df: Away team tracking data with MultiIndex columns + - events_df: Events data with disc position and holder information + """ + + # Load UFA data + raw_data = pd.read_csv(data_path) + + # Remove unnecessary columns from UFA data + existing_columns_to_remove = [ + col for col in COLUMNS_TO_REMOVE if col in raw_data.columns + ] + + if existing_columns_to_remove: + print(f"Removing columns: {existing_columns_to_remove}") + processed_data = raw_data.drop(columns=existing_columns_to_remove) + else: + processed_data = raw_data.copy() + print("No columns to remove from UFA data") + + # Apply coordinate scaling using config values + if "x" in processed_data.columns and "y" in processed_data.columns: + processed_data["x"] = processed_data["x"] * COORDINATE_SCALE + processed_data["y"] = processed_data["y"] * COORDINATE_SCALE + + # Convert UFA data (intermediate file format) to Metrica format + home_df, away_df, events_df = convert_to_metrica_format(processed_data) + + return home_df, away_df, events_df + + +def convert_to_metrica_format(intermediate_df): + """ + Convert Ultimate Track intermediate data to Metrica format + + Args: + intermediate_df: DataFrame with intermediate format containing calculated motion features + + Returns: + Tuple of (home_df, away_df, events_df): Metrica format DataFrames + - home_df: Home team tracking data with MultiIndex columns + - away_df: Away team tracking data with MultiIndex columns + - events_df: Events data with disc position and holder information + """ + # Create the Metrica DataFrame for events + events_df = create_events_metrica(intermediate_df) + + # Create the Metrica DataFrame for Home and Away + home_df = create_tracking_metrica(intermediate_df, "Home") + away_df = create_tracking_metrica(intermediate_df, "Away") + + # Drop non-data columns + events_df.dropna(subset=["Start Frame"], inplace=True) + home_df.dropna(subset=[("", "", "Frame")], inplace=True) + away_df.dropna(subset=[("", "", "Frame")], inplace=True) + + return home_df, away_df, events_df + + +def create_events_metrica(df): + """ + Create the Metrica format DataFrame for events from UFA data + + Args: + df (DataFrame): The UFA intermediate DataFrame containing tracking data + with columns: frame, class, x, y, id, holder + + Returns: + DataFrame: Events DataFrame in Metrica format with columns: + Team, Type, Subtype, Period, Start Frame, Start Time [s], + End Frame, End Time [s], From, To, Start X, Start Y, End X, End Y. + Contains disc position data and holder information per frame. + """ + # Define the columns of the DataFrame + columns = [ + "Team", + "Type", + "Subtype", + "Period", + "Start Frame", + "Start Time [s]", + "End Frame", + "End Time [s]", + "From", + "To", + "Start X", + "Start Y", + "End X", + "End Y", + ] + + # Get the min and max frame + min_frame = df["frame"].min() + max_frame = df["frame"].max() + + # Get the DataFrame of the disc + disc_df = df[df["class"] == "disc"] + + # Create NaN column + nan_column = pd.Series([np.nan] * (max_frame - min_frame + 1)) + + # Create columns + start_frame = pd.Series(np.arange(min_frame, max_frame + 1)) + start_time = (start_frame / TRACKING_HERZ).round(6) + start_x = disc_df["x"].round(2).reset_index(drop=True) + start_y = disc_df["y"].round(2).reset_index(drop=True) + offense_ids = sorted(df.loc[df["class"] == "offense", "id"].unique()) + + # Get holder information + holder_data = df.loc[df["holder"]] + if not holder_data.empty: + to_id = ( + holder_data["id"] + .map(lambda x: offense_ids.index(x) if x in offense_ids else np.nan) + .reset_index(drop=True) + ) + else: + to_id = pd.Series([np.nan] * len(start_frame)) + + # Create the DataFrame for events + events_df = pd.concat( + [ + nan_column, + nan_column, + nan_column, + nan_column, + start_frame, + start_time, + nan_column, + nan_column, + to_id, + nan_column, + start_x, + start_y, + nan_column, + nan_column, + ], + axis=1, + ) + events_df.columns = columns + + return events_df + + +def create_tracking_metrica(df, team): + """ + Create the Metrica format DataFrame for team tracking data from UFA data + + Args: + df (DataFrame): The UFA intermediate DataFrame containing tracking data + with columns: frame, class, x, y, id, closest + team (str): Team designation ("Home" for offense, "Away" for defense) + + Returns: + DataFrame: Tracking DataFrame in Metrica format with MultiIndex columns: + - Level 0: "" for general columns, team name for player columns + - Level 1: Player indices for player columns + - Level 2: "Period", "Frame", "Time [s]", player position names, "Disc__" + Contains position data for up to 7 players plus disc position. + """ + # Define the levels of the MultiIndex using config values + player_columns = PLAYERS_PER_TEAM * 2 # x, y for each player + level_0 = [""] * 3 + [team] * player_columns + [""] * 2 + level_1 = [""] * 3 + [i // 2 for i in range(player_columns)] + [""] * 2 + + # Generate player column names using config + player_names = [] + for i in range(PLAYERS_PER_TEAM): + player_names.extend([f"Player{i}", f"Player{i}"]) + + level_2 = ( + [ + "Period", + "Frame", + "Time [s]", + ] + + player_names + + [ + "Disc__", + "Disc__", + ] + ) + + # Create the MultiIndex + multi_columns = pd.MultiIndex.from_arrays([level_0, level_1, level_2]) + + min_frame = df["frame"].min() + max_frame = df["frame"].max() + + nan_column = pd.Series([np.nan] * (max_frame - min_frame + 1)) + + frame = pd.Series(np.arange(min_frame, max_frame + 1)) + time = (frame / TRACKING_HERZ).round(6) + + offense_ids = sorted(df.loc[df["class"] == "offense", "id"].unique()) + if team == "Home": + player_ids = offense_ids + else: + # For Away team, use defense players closest to each offense player + player_ids = [] + for offense_id in offense_ids: + closest_defense = ( + df.loc[ + (df["class"] == "offense") & (df["id"] == offense_id), "closest" + ].iloc[0] + if len(df.loc[(df["class"] == "offense") & (df["id"] == offense_id)]) + > 0 + else None + ) + if closest_defense is not None: + player_ids.append(closest_defense) + + positions = [] + for i, player_id in enumerate(player_ids[:PLAYERS_PER_TEAM]): # Limit to 7 players + if team == "Home": + player_df = df[(df["id"] == player_id) & (df["class"] == "offense")] + else: + player_df = df[(df["id"] == player_id) & (df["class"] == "defense")] + + if not player_df.empty: + x = player_df["x"].round(2).reset_index(drop=True) + y = player_df["y"].round(2).reset_index(drop=True) + else: + x = pd.Series([np.nan] * len(frame)) + y = pd.Series([np.nan] * len(frame)) + + positions.append(x) + positions.append(y) + + # Add remaining player columns if less than 7 players + while len(positions) < PLAYERS_PER_TEAM * 2: + positions.append(pd.Series([np.nan] * len(frame))) + + disc_x = df.loc[df["class"] == "disc", "x"].round(2).reset_index(drop=True) + disc_y = df.loc[df["class"] == "disc", "y"].round(2).reset_index(drop=True) + positions.append(disc_x) + positions.append(disc_y) + + positions_df = pd.concat(positions, axis=1) + + tracking_df = pd.concat([nan_column, frame, time, positions_df], axis=1) + tracking_df.columns = multi_columns + + return tracking_df diff --git a/preprocessing/sports/tracking_data/ultimate/ultimate_tracking_class.py b/preprocessing/sports/tracking_data/ultimate/ultimate_tracking_class.py index a242607..8a5b4f8 100644 --- a/preprocessing/sports/tracking_data/ultimate/ultimate_tracking_class.py +++ b/preprocessing/sports/tracking_data/ultimate/ultimate_tracking_class.py @@ -1,38 +1,20 @@ -from .ultimatetrack_preprocessing.preprocessing import ( - process_tracking_data as process_ultimatetrack_tracking_data, -) +from .ufa_preprocessing.preprocessing import preprocessing_for_ufa +from .ultimatetrack_preprocessing.preprocessing import preprocessing_for_ultimatetrack class Ultimate_tracking_data: - """ - Ultimate Track データ処理クラス + def __init__(self, data_provider, data_path, *args, **kwargs): + self.data_provider = data_provider + self.data_path = data_path - フィールド仕様: - - 長さ: 94m - - 幅: 37m - - フレームレート: 15fps - """ + def preprocessing(self): + if self.data_provider == "UltimateTrack": + tracking_offense, tracking_defense, events_df = ( + preprocessing_for_ultimatetrack(self.data_path) + ) + elif self.data_provider == "UFA": + tracking_offense, tracking_defense, events_df = preprocessing_for_ufa( + self.data_path + ) - # Ultimate Track specifications - FIELD_LENGTH = 94.0 # meters - FIELD_WIDTH = 37.0 # meters - FRAME_RATE = 15 # fps - PLAYERS_PER_TEAM = 7 - - @staticmethod - def process_ultimatetrack_tracking_data(*args, **kwargs): - """Ultimate Track トラッキングデータの処理""" - tracking_offense, tracking_defense, tracking_disc, team_info_df = ( - process_ultimatetrack_tracking_data(*args, **kwargs) - ) - return tracking_offense, tracking_defense, tracking_disc, team_info_df - - @classmethod - def get_field_info(cls): - """フィールド情報を取得""" - return { - "length": cls.FIELD_LENGTH, - "width": cls.FIELD_WIDTH, - "frame_rate": cls.FRAME_RATE, - "players_per_team": cls.PLAYERS_PER_TEAM, - } + return tracking_offense, tracking_defense, events_df diff --git a/preprocessing/sports/tracking_data/ultimate/ultimatetrack_preprocessing/__init__.py b/preprocessing/sports/tracking_data/ultimate/ultimatetrack_preprocessing/__init__.py index a2e97ad..b66b4d5 100644 --- a/preprocessing/sports/tracking_data/ultimate/ultimatetrack_preprocessing/__init__.py +++ b/preprocessing/sports/tracking_data/ultimate/ultimatetrack_preprocessing/__init__.py @@ -1,12 +1,6 @@ # Ultimate Track preprocessing module -from .preprocessing import ( - analyze_possession_patterns, - calculate_team_metrics, - process_tracking_data, -) +from .preprocessing import preprocessing_for_ultimatetrack __all__ = [ - "process_tracking_data", - "analyze_possession_patterns", - "calculate_team_metrics", + "preprocessing_for_ultimatetrack", ] diff --git a/preprocessing/sports/tracking_data/ultimate/ultimatetrack_preprocessing/preprocess_config.py b/preprocessing/sports/tracking_data/ultimate/ultimatetrack_preprocessing/preprocess_config.py index bdfddcb..05b1806 100644 --- a/preprocessing/sports/tracking_data/ultimate/ultimatetrack_preprocessing/preprocess_config.py +++ b/preprocessing/sports/tracking_data/ultimate/ultimatetrack_preprocessing/preprocess_config.py @@ -20,13 +20,12 @@ # Ultimate Frisbee field specifications (in meters for Ultimate Track data) FIELD_LENGTH: float = 94.0 # 94 meters total field length FIELD_WIDTH: float = 37.0 # 37 meters width -PLAYING_FIELD_LENGTH: float = 64.0 # 64 meters playing field (without end zones) -END_ZONE_LENGTH: float = 15.0 # 15 meters each end zone +PLAYING_FIELD_LENGTH: float = 58.0 # 58 meters playing field (without end zones) +END_ZONE_LENGTH: float = 18.0 # 18 meters each end zone # Player configuration -ULTIMATE_PLAYERS_PER_TEAM: int = 7 # Standard Ultimate has 7 players per team +PLAYERS_PER_TEAM: int = 7 # Standard Ultimate has 7 players per team MAX_SUBSTITUTIONS: int = 0 # Unlimited substitutions in Ultimate -TOTAL_ROSTER_SIZE: int = 20 # Typical roster size # Data processing configuration TRACKING_HERZ: int = 15 # Ultimate Track data frame rate (15 fps) @@ -37,7 +36,7 @@ DEFENSE_TEAM: str = "defense" DISC_ENTITY: str = "disc" -# Data columns mapping +# Data columns mapping for UltimateTrack ULTIMATE_TRACK_COLUMNS = { "frame": "frame", "id": "id", @@ -52,18 +51,5 @@ "holder": "holder", } - -# Output column templates -def get_tracking_columns(team_name: str) -> list: - """Generate column names for tracking data output.""" - base_columns = ["Period", "Time [s]"] - player_columns = [] - - for i in range(1, ULTIMATE_PLAYERS_PER_TEAM + 1): - player_columns.extend([f"{team_name}_{i}_x", f"{team_name}_{i}_y"]) - - return base_columns + player_columns + ["disc_x", "disc_y"] - - # File name patterns -DEFAULT_FILE_PATTERN = r"(\d+)_(\d+)_(\d+)\.csv" # game_half_point.csv +DEFAULT_FILE_PATTERN = r"(\d+)_(\d+)_(\d+)\.csv" # court_game_possession.csv diff --git a/preprocessing/sports/tracking_data/ultimate/ultimatetrack_preprocessing/preprocessing.py b/preprocessing/sports/tracking_data/ultimate/ultimatetrack_preprocessing/preprocessing.py index ead2918..54998b0 100644 --- a/preprocessing/sports/tracking_data/ultimate/ultimatetrack_preprocessing/preprocessing.py +++ b/preprocessing/sports/tracking_data/ultimate/ultimatetrack_preprocessing/preprocessing.py @@ -1,128 +1,25 @@ -import os - import numpy as np import pandas as pd -from tqdm import tqdm -from .preprocess_config import FIELD_LENGTH, FIELD_WIDTH, TRACKING_HERZ +from .preprocess_config import COORDINATE_SCALE, PLAYERS_PER_TEAM, TRACKING_HERZ -def process_tracking_data(game_id, data_path, test=False): +def preprocessing_for_ultimatetrack(data_path): """ - Process Ultimate Track tracking data + Complete pipeline: process Ultimate Track data -> create intermediate file -> convert to Metrica format Args: - game_id: Game identifier (file index) - data_path: Path to data directory containing CSV files - test: Whether this is a test run + data_path: Path to Ultimate Track CSV data file Returns: - tracking_offense: DataFrame with offense team tracking data - tracking_defense: DataFrame with defense team tracking data - team_info_df: DataFrame with team composition info + Tuple of (home_df, away_df, events_df): DataFrames in Metrica format + - home_df: Home team tracking data with MultiIndex columns + - away_df: Away team tracking data with MultiIndex columns + - events_df: Events data with disc position and holder information """ - def get_csv_files(data_path): - """Get list of CSV files in data directory""" - csv_files = [f for f in os.listdir(data_path) if f.endswith(".csv")] - csv_files.sort() - return csv_files - - def create_tracking_dataframe(frames, num_players, team_type): - """Create tracking dataframe structure for Ultimate Track""" - columns = ["frame", "period", "Time [s]"] - - # Add disc coordinates - columns.extend(["disc_x", "disc_y"]) - - # Add player coordinates - for i in range(1, num_players + 1): - columns.extend([f"{team_type}_{i}_x", f"{team_type}_{i}_y"]) - - # Add velocity columns - columns.extend(["disc_vx", "disc_vy"]) - for i in range(1, num_players + 1): - columns.extend([f"{team_type}_{i}_vx", f"{team_type}_{i}_vy"]) - - # Add acceleration columns - columns.extend(["disc_ax", "disc_ay"]) - for i in range(1, num_players + 1): - columns.extend([f"{team_type}_{i}_ax", f"{team_type}_{i}_ay"]) - - df = pd.DataFrame(columns=columns) - df = df.reindex(range(len(frames)), fill_value=np.nan) - df["frame"] = frames - df["period"] = 1 # Ultimate is typically one continuous period - - # Calculate time based on 15fps frame rate - df["Time [s]"] = (df["frame"] - df["frame"].min()) / TRACKING_HERZ - - return df - - def fill_tracking_data(tracking_df, raw_data, team_class, team_type): - """Fill tracking dataframe with actual data""" - team_data = raw_data[raw_data["class"] == team_class].copy() - disc_data = raw_data[raw_data["class"] == "disc"].copy() - - frames = tracking_df["frame"].unique() - - for idx, frame in enumerate(tqdm(frames, desc=f"Processing {team_type} team")): - frame_team_data = team_data[team_data["frame"] == frame] - frame_disc_data = disc_data[disc_data["frame"] == frame] - - # Fill disc data - if not frame_disc_data.empty: - disc_row = frame_disc_data.iloc[0] - tracking_df.loc[idx, "disc_x"] = disc_row["x"] - tracking_df.loc[idx, "disc_y"] = disc_row["y"] - tracking_df.loc[idx, "disc_vx"] = disc_row["vx"] - tracking_df.loc[idx, "disc_vy"] = disc_row["vy"] - tracking_df.loc[idx, "disc_ax"] = disc_row["ax"] - tracking_df.loc[idx, "disc_ay"] = disc_row["ay"] - - # Fill player data - player_positions = {} - for _, player_row in frame_team_data.iterrows(): - player_id = player_row["id"] - if player_id not in player_positions: - player_positions[player_id] = len(player_positions) + 1 - - player_num = player_positions[player_id] - if player_num <= 7: # Ultimate typically has 7 players per team - tracking_df.loc[idx, f"{team_type}_{player_num}_x"] = player_row[ - "x" - ] - tracking_df.loc[idx, f"{team_type}_{player_num}_y"] = player_row[ - "y" - ] - tracking_df.loc[idx, f"{team_type}_{player_num}_vx"] = player_row[ - "vx" - ] - tracking_df.loc[idx, f"{team_type}_{player_num}_vy"] = player_row[ - "vy" - ] - tracking_df.loc[idx, f"{team_type}_{player_num}_ax"] = player_row[ - "ax" - ] - tracking_df.loc[idx, f"{team_type}_{player_num}_ay"] = player_row[ - "ay" - ] - - return tracking_df - - # Get list of CSV files - csv_files = get_csv_files(data_path) - - if game_id >= len(csv_files): - raise ValueError( - f"Game ID {game_id} out of range. Available files: {len(csv_files)}" - ) - - # Load the specified CSV file - file_path = os.path.join(data_path, csv_files[game_id]) - print(f"Loading Ultimate Track data from: {file_path}") - - raw_data = pd.read_csv(file_path) + # Load Ultimate Track data + raw_data = pd.read_csv(data_path) # Validate required columns required_columns = [ @@ -142,201 +39,368 @@ def fill_tracking_data(tracking_df, raw_data, team_class, team_type): if missing_columns: raise ValueError(f"Missing required columns: {missing_columns}") - # Validate field dimensions (Ultimate Track: 94m x 37m) - print(f"Field dimensions - Length: {FIELD_LENGTH}m, Width: {FIELD_WIDTH}m") - x_range = raw_data["x"].max() - raw_data["x"].min() - y_range = raw_data["y"].max() - raw_data["y"].min() - print(f"Data coordinate range - X: {x_range:.1f}m, Y: {y_range:.1f}m") + # Apply coordinate scaling using config values + if "x" in raw_data.columns and "y" in raw_data.columns: + raw_data = raw_data.copy() + raw_data["x"] = raw_data["x"] * COORDINATE_SCALE + raw_data["y"] = raw_data["y"] * COORDINATE_SCALE - if x_range > FIELD_LENGTH * 1.2 or y_range > FIELD_WIDTH * 1.2: - print("Warning: Coordinate range exceeds expected field dimensions") + # Step 1: Create intermediate file with all required columns + print("Creating intermediate file with calculated features...") + intermediate_df = create_intermediate_file(raw_data) - # Get unique frames - frames = sorted(raw_data["frame"].unique()) - print(f"Processing {len(frames)} frames at {TRACKING_HERZ}fps") + # Step 2: Convert to Metrica format + print("Converting to Metrica format...") + home_df, away_df, events_df = convert_to_metrica_format(intermediate_df) - # Get team information - offense_players = raw_data[raw_data["class"] == "offense"]["id"].unique() - defense_players = raw_data[raw_data["class"] == "defense"]["id"].unique() + return home_df, away_df, events_df - print(f"Offense players: {len(offense_players)}") - print(f"Defense players: {len(defense_players)}") - # Create tracking dataframes - tracking_offense = create_tracking_dataframe(frames, 7, "offense") - tracking_defense = create_tracking_dataframe(frames, 7, "defense") +def create_intermediate_file(raw_data): + """ + Create intermediate file with calculated motion features from raw Ultimate Track data - # Create disc tracking dataframe - disc_columns = [ - "frame", - "period", - "Time [s]", - "disc_x", - "disc_y", - "disc_vx", - "disc_vy", - "disc_ax", - "disc_ay", - ] - tracking_disc = pd.DataFrame(columns=disc_columns) - tracking_disc = tracking_disc.reindex(range(len(frames)), fill_value=np.nan) - tracking_disc["frame"] = frames - tracking_disc["period"] = 1 - tracking_disc["Time [s]"] = ( - tracking_disc["frame"] - tracking_disc["frame"].min() - ) / TRACKING_HERZ - - # Fill with actual data - tracking_offense = fill_tracking_data( - tracking_offense, raw_data, "offense", "offense" + Processes frame-by-frame to calculate velocity/acceleration magnitudes and angles, + including differential angle features for each tracked entity. + + Args: + raw_data: Raw Ultimate Track data DataFrame with columns: + frame, id, x, y, vx, vy, ax, ay, class, holder, closest + + Returns: + DataFrame: Intermediate data with calculated features including: + v_mag, a_mag, v_angle, a_angle, diff_v_a_angle, diff_v_angle, diff_a_angle + """ + intermediate_data = [] + + # Group by id to track previous angles for each entity + entity_prev_angles = {} + + # Process data frame by frame + for frame in sorted(raw_data["frame"].unique()): + frame_data = raw_data[raw_data["frame"] == frame].copy() + + for _, row in frame_data.iterrows(): + entity_id = row["id"] + entity_key = f"{entity_id}_{row['class']}" + + # Get previous angles for this entity + prev_v_angle = entity_prev_angles.get(f"{entity_key}_v", None) + prev_a_angle = entity_prev_angles.get(f"{entity_key}_a", None) + + # Calculate magnitude and angle features + ( + v_mag, + a_mag, + v_angle, + a_angle, + diff_v_a_angle, + diff_v_angle, + diff_a_angle, + ) = calculate_magnitude_angle_features( + row["vx"], + row["vy"], + row["ax"], + row["ay"], + prev_v_angle, + prev_a_angle, + ) + + # Create intermediate row + intermediate_row = { + "frame": row["frame"], + "id": row["id"], + "x": row["x"], + "y": row["y"], + "vx": row["vx"], + "vy": row["vy"], + "ax": row["ax"], + "ay": row["ay"], + "v_mag": v_mag, + "a_mag": a_mag, + "v_angle": v_angle, + "a_angle": a_angle, + "diff_v_a_angle": diff_v_a_angle, + "diff_v_angle": diff_v_angle, + "diff_a_angle": diff_a_angle, + "class": row["class"], + "holder": row["holder"], + "closest": row["closest"], + } + + intermediate_data.append(intermediate_row) + + # Update previous angles + entity_prev_angles[f"{entity_key}_v"] = v_angle + entity_prev_angles[f"{entity_key}_a"] = a_angle + + return pd.DataFrame(intermediate_data) + + +def calculate_magnitude_angle_features( + vx, vy, ax, ay, prev_v_angle=None, prev_a_angle=None +): + """Calculate magnitude and angle features""" + # Velocity magnitude and angle + v_mag = ( + round(np.sqrt(vx**2 + vy**2), 2) + if not (np.isnan(vx) or np.isnan(vy)) + else np.nan + ) + v_angle = ( + round(np.arctan2(vy, vx), 2) if not (np.isnan(vx) or np.isnan(vy)) else np.nan ) - tracking_defense = fill_tracking_data( - tracking_defense, raw_data, "defense", "defense" + + # Acceleration magnitude and angle + a_mag = ( + round(np.sqrt(ax**2 + ay**2), 2) + if not (np.isnan(ax) or np.isnan(ay)) + else np.nan + ) + a_angle = ( + round(np.arctan2(ay, ax), 2) if not (np.isnan(ax) or np.isnan(ay)) else np.nan ) - # Fill disc data - disc_data = raw_data[raw_data["class"] == "disc"].copy() - for idx, frame in enumerate(frames): - frame_disc_data = disc_data[disc_data["frame"] == frame] - if not frame_disc_data.empty: - disc_row = frame_disc_data.iloc[0] - tracking_disc.loc[idx, "disc_x"] = disc_row["x"] - tracking_disc.loc[idx, "disc_y"] = disc_row["y"] - tracking_disc.loc[idx, "disc_vx"] = disc_row["vx"] - tracking_disc.loc[idx, "disc_vy"] = disc_row["vy"] - tracking_disc.loc[idx, "disc_ax"] = disc_row["ax"] - tracking_disc.loc[idx, "disc_ay"] = disc_row["ay"] - - # Create team info dataframe - team_info_data = { - "offense_players": [list(offense_players[:7])], # Limit to 7 players - "defense_players": [list(defense_players[:7])], # Limit to 7 players - "total_frames": [len(frames)], - "file_name": [csv_files[game_id]], - } - team_info_df = pd.DataFrame(team_info_data) - - print("Ultimate Track data processing completed!") - - return tracking_offense, tracking_defense, tracking_disc, team_info_df - - -def analyze_possession_patterns(raw_data): + # Angle differences + diff_v_a_angle = np.nan + if not (np.isnan(v_angle) or np.isnan(a_angle)): + diff_v_a_angle = round( + np.arctan2(np.sin(v_angle - a_angle), np.cos(v_angle - a_angle)), 2 + ) + + diff_v_angle = np.nan + if prev_v_angle is not None and not (np.isnan(v_angle) or np.isnan(prev_v_angle)): + diff_v_angle = round( + np.arctan2(np.sin(v_angle - prev_v_angle), np.cos(v_angle - prev_v_angle)), + 2, + ) + + diff_a_angle = np.nan + if prev_a_angle is not None and not (np.isnan(a_angle) or np.isnan(prev_a_angle)): + diff_a_angle = round( + np.arctan2(np.sin(a_angle - prev_a_angle), np.cos(a_angle - prev_a_angle)), + 2, + ) + + return ( + v_mag, + a_mag, + v_angle, + a_angle, + diff_v_a_angle, + diff_v_angle, + diff_a_angle, + ) + + +def convert_to_metrica_format(intermediate_df): """ - Analyze disc possession patterns + Convert Ultimate Track intermediate data to Metrica format Args: - raw_data: Raw Ultimate Track data + intermediate_df: DataFrame with intermediate format containing calculated motion features Returns: - possession_stats: DataFrame with possession statistics + Tuple of (home_df, away_df, events_df): Metrica format DataFrames + - home_df: Home team tracking data with MultiIndex columns + - away_df: Away team tracking data with MultiIndex columns + - events_df: Events data with disc position and holder information """ - possession_data = [] - - for frame in raw_data["frame"].unique(): - frame_data = raw_data[raw_data["frame"] == frame] - holder_data = frame_data[frame_data["holder"]] - - if not holder_data.empty: - holder = holder_data.iloc[0] - possession_data.append( - { - "frame": frame, - "holder_id": holder["id"], - "holder_team": holder["class"], - "holder_x": holder["x"], - "holder_y": holder["y"], - "holder_speed": np.sqrt(holder["vx"] ** 2 + holder["vy"] ** 2), - } - ) + # Create the Metrica DataFrame for events + events_df = create_events_metrica(intermediate_df) - possession_df = pd.DataFrame(possession_data) - - # Calculate possession statistics - if not possession_df.empty: - possession_stats = ( - possession_df.groupby("holder_team") - .agg( - { - "frame": "count", - "holder_speed": "mean", - "holder_x": "mean", - "holder_y": "mean", - } - ) - .rename(columns={"frame": "possession_count"}) - ) - else: - possession_stats = pd.DataFrame() + # Create the Metrica DataFrame for Home and Away + home_df = create_tracking_metrica(intermediate_df, "Home") + away_df = create_tracking_metrica(intermediate_df, "Away") + + # Drop non-data columns + events_df.dropna(subset=["Start Frame"], inplace=True) + home_df.dropna(subset=[("", "", "Frame")], inplace=True) + away_df.dropna(subset=[("", "", "Frame")], inplace=True) - return possession_stats + return home_df, away_df, events_df -def calculate_team_metrics(tracking_data, team_type): +def create_events_metrica(df): """ - Calculate team-level metrics + Create the Metrica DataFrame for events Args: - tracking_data: Tracking data for a team - team_type: 'offense' or 'defense' + df (DataFrame): The DataFrame containing the data Returns: - metrics: Dictionary of team metrics + DataFrame: The DataFrame containing the events """ - metrics = {} - - # Player position columns - player_x_cols = [ - col for col in tracking_data.columns if f"{team_type}_" in col and "_x" in col - ] - player_y_cols = [ - col for col in tracking_data.columns if f"{team_type}_" in col and "_y" in col + # Define the columns of the DataFrame + columns = [ + "Team", + "Type", + "Subtype", + "Period", + "Start Frame", + "Start Time [s]", + "End Frame", + "End Time [s]", + "From", + "To", + "Start X", + "Start Y", + "End X", + "End Y", ] - if player_x_cols and player_y_cols: - # Calculate centroid - metrics["centroid_x"] = tracking_data[player_x_cols].mean(axis=1).mean() - metrics["centroid_y"] = tracking_data[player_y_cols].mean(axis=1).mean() + # Get the min and max frame + min_frame = df["frame"].min() + max_frame = df["frame"].max() + + # Get the DataFrame of the disc + disc_df = df[df["class"] == "disc"] + + # Create NaN column + nan_column = pd.Series([np.nan] * (max_frame - min_frame + 1)) + + # Create columns + start_frame = pd.Series(np.arange(min_frame, max_frame + 1)) + start_time = (start_frame / TRACKING_HERZ).round(6) + start_x = disc_df["x"].round(2).reset_index(drop=True) + start_y = disc_df["y"].round(2).reset_index(drop=True) + offense_ids = sorted(df.loc[df["class"] == "offense", "id"].unique()) + + # Get holder information + holder_data = df.loc[df["holder"]] + if not holder_data.empty: + to_id = ( + holder_data["id"] + .map(lambda x: offense_ids.index(x) if x in offense_ids else np.nan) + .reset_index(drop=True) + ) + else: + to_id = pd.Series([np.nan] * len(start_frame)) + + # Create the DataFrame for events + events_df = pd.concat( + [ + nan_column, + nan_column, + nan_column, + nan_column, + start_frame, + start_time, + nan_column, + nan_column, + to_id, + nan_column, + start_x, + start_y, + nan_column, + nan_column, + ], + axis=1, + ) + events_df.columns = columns + + return events_df - # Calculate spread - metrics["spread_x"] = tracking_data[player_x_cols].std(axis=1).mean() - metrics["spread_y"] = tracking_data[player_y_cols].std(axis=1).mean() - # Calculate team compactness - player_positions = tracking_data[player_x_cols + player_y_cols].values - metrics["compactness"] = np.nanstd(player_positions) +def create_tracking_metrica(df, team): + """ + Create the Metrica format DataFrame for team tracking data from UFA data - return metrics + Args: + df (DataFrame): The UFA intermediate DataFrame containing tracking data + with columns: frame, class, x, y, id, closest + team (str): Team designation ("Home" for offense, "Away" for defense) + Returns: + DataFrame: Tracking DataFrame in Metrica format with MultiIndex columns: + - Level 0: "" for general columns, team name for player columns + - Level 1: Player indices for player columns + - Level 2: "Period", "Frame", "Time [s]", player position names, "Disc__" + Contains position data for up to 7 players plus disc position. + """ + # Define the levels of the MultiIndex using config values + player_columns = PLAYERS_PER_TEAM * 2 # x, y for each player + level_0 = [""] * 3 + [team] * player_columns + [""] * 2 + level_1 = [""] * 3 + [i // 2 for i in range(player_columns)] + [""] * 2 + + # Generate player column names using config + player_names = [] + for i in range(PLAYERS_PER_TEAM): + player_names.extend([f"Player{i}", f"Player{i}"]) + + level_2 = ( + [ + "Period", + "Frame", + "Time [s]", + ] + + player_names + + [ + "Disc__", + "Disc__", + ] + ) -if __name__ == "__main__": - import os + # Create the MultiIndex + multi_columns = pd.MultiIndex.from_arrays([level_0, level_1, level_2]) - # Test with Ultimate Track data - game_id = 0 # Select the first CSV file - data_path = os.getcwd() + "/test/sports/tracking_data/data/ultimatetrack/" + min_frame = df["frame"].min() + max_frame = df["frame"].max() - # Create test directory if it doesn't exist - os.makedirs(data_path, exist_ok=True) + nan_column = pd.Series([np.nan] * (max_frame - min_frame + 1)) - try: - # Call the function - tracking_offense, tracking_defense, tracking_disc, team_info_df = ( - process_tracking_data(game_id, data_path, test=True) - ) + frame = pd.Series(np.arange(min_frame, max_frame + 1)) + time = (frame / TRACKING_HERZ).round(6) - # Save results - output_dir = os.getcwd() + "/test/sports/tracking_data/data/ultimatetrack/" - tracking_offense.to_csv(output_dir + "test_tracking_offense.csv", index=False) - tracking_defense.to_csv(output_dir + "test_tracking_defense.csv", index=False) - tracking_disc.to_csv(output_dir + "test_tracking_disc.csv", index=False) - team_info_df.to_csv(output_dir + "test_team_info.csv", index=False) - - print("Test completed successfully!") - print(f"Offense tracking shape: {tracking_offense.shape}") - print(f"Defense tracking shape: {tracking_defense.shape}") - print(f"Disc tracking shape: {tracking_disc.shape}") - print(f"Team info shape: {team_info_df.shape}") - - except Exception as e: - print(f"Test failed: {e}") + offense_ids = sorted(df.loc[df["class"] == "offense", "id"].unique()) + if team == "Home": + player_ids = offense_ids + else: + # For Away team, use defense players closest to each offense player + player_ids = [] + for offense_id in offense_ids: + closest_defense = ( + df.loc[ + (df["class"] == "offense") & (df["id"] == offense_id), "closest" + ].iloc[0] + if len(df.loc[(df["class"] == "offense") & (df["id"] == offense_id)]) + > 0 + else None + ) + if closest_defense is not None: + player_ids.append(closest_defense) + + positions = [] + for i, player_id in enumerate( + player_ids[:PLAYERS_PER_TEAM] + ): # Limit to config-defined player count + if team == "Home": + player_df = df[(df["id"] == player_id) & (df["class"] == "offense")] + else: + player_df = df[(df["id"] == player_id) & (df["class"] == "defense")] + + if not player_df.empty: + x = player_df["x"].round(2).reset_index(drop=True) + y = player_df["y"].round(2).reset_index(drop=True) + else: + x = pd.Series([np.nan] * len(frame)) + y = pd.Series([np.nan] * len(frame)) + + positions.append(x) + positions.append(y) + + # Add remaining player columns if less than 7 players + while len(positions) < PLAYERS_PER_TEAM * 2: + positions.append(pd.Series([np.nan] * len(frame))) + + disc_x = df.loc[df["class"] == "disc", "x"].round(2).reset_index(drop=True) + disc_y = df.loc[df["class"] == "disc", "y"].round(2).reset_index(drop=True) + positions.append(disc_x) + positions.append(disc_y) + + positions_df = pd.concat(positions, axis=1) + + tracking_df = pd.concat([nan_column, frame, time, positions_df], axis=1) + tracking_df.columns = multi_columns + + return tracking_df