Manage code related to data work using NFL datasets
Build a data pipeline and ML system to predict NFL player stats and team outcomes (spreads, totals) using nflverse data, targeting next week's games with ~20 hours of development time.
- Database: Snowflake (primary compute/storage)
- ETL/Transformations: dbt + SQL-first approach
- Data Ingestion: Python with
nfl_data_py,polars,uv - ML: Python with
XGBoost,polars,scikit-learn - Orchestration: Dagster (for complex workflows), cron (for simple scheduling)
- Code Quality:
rufffor linting/formatting
nfl-prediction-system/
βββ README.md
βββ pyproject.toml # uv/ruff config
βββ .env.example # Environment variables template
βββ requirements.txt # Python dependencies
βββ dagster_project/ # Dagster orchestration
β βββ __init__.py
β βββ assets/ # Data assets
β βββ jobs/ # Job definitions
β βββ schedules/ # Scheduling
βββ dbt_project/ # dbt transformations
β βββ dbt_project.yml
β βββ profiles.yml
β βββ models/
β β βββ staging/ # Raw data cleaning
β β βββ intermediate/ # Feature engineering
β β βββ marts/ # Final modeling datasets
β βββ macros/ # Reusable SQL functions
β βββ tests/ # Data quality tests
βββ src/
β βββ ingestion/ # Data loading scripts
β βββ ml/ # ML model code
β βββ utils/ # Shared utilities
βββ sql/ # Ad-hoc SQL scripts
βββ models/ # Trained model artifacts
βββ notebooks/ # Analysis/exploration
- Initialize GitHub repository with proper
.gitignore - Set up
uvfor dependency management - Configure
rufffor code formatting/linting - Create basic
pyproject.toml
- Create Snowflake trial account
- Set up database/schema structure:
CREATE DATABASE NFL_ANALYTICS; CREATE SCHEMA NFL_ANALYTICS.RAW; -- External tables pointing to nflverse CREATE SCHEMA NFL_ANALYTICS.STAGING; -- Cleaned data (views) CREATE SCHEMA NFL_ANALYTICS.INTERMEDIATE; -- Feature engineering (selective tables) CREATE SCHEMA NFL_ANALYTICS.MARTS; -- Final ML datasets (tables) CREATE SCHEMA NFL_ANALYTICS.ML; -- ML artifacts/predictions
- Configure connection parameters
- Create
training_data_loader.pyusing polars + DuckDB for performance - Set up Snowflake table structures for training data
- Bulk download historical data (10 years) from nflverse GitHub URLs:
- Load data to Snowflake native tables using fast parquet + COPY INTO method
- Validate data structure and completeness with Snowflake-powered analytics
- Verify 10 years of complete training data loaded successfully
Key Features:
- Training-focused: Simple bulk replacement loads, no incremental complexity
- High performance: polars + Snowflake-native instead of pandas for 5-10x speed improvement
- Future-ready: Easy to enhance for weekly incremental updates post-training
- Install dbt-snowflake
- Initialize dbt project with Snowflake profile
- Configure sources to point to external tables
- Create basic model structure and testing framework
- Set up dbt documentation
Clean and standardize data from external tables (materialized as VIEWS):
-- models/staging/sources.yml
sources:
- name: raw
description: Native Snowflake tables loaded from nflverse via bulk Python loading
database: NFL_ANALYTICS
schema: RAW
tables:
- name: player_stats
- name: play_by_play
- name: schedules
- name: rosters
-- models/staging/stg_play_by_play.sql
-- Read from native Snowflake tables, standardize columns, basic filtering
SELECT
game_id,
season,
week,
UPPER(TRIM(posteam)) AS posteam,
CASE WHEN rush_attempt = 1 THEN TRUE ELSE FALSE END AS is_rush,
epa,
-- Include load metadata
load_type,
loaded_at
FROM {{ source('raw', 'play_by_play') }}
WHERE season >= {{ var('current_season') - var('lookback_seasons') }}
-- models/staging/stg_player_stats.sql
-- Clean player statistics, handle position changes
-- models/staging/stg_schedules.sql
-- Standardize team names, parse betting lines
-- models/staging/stg_rosters.sql
-- Clean roster data with player mappingsSelective materialization - only materialize complex calculations as TABLES
Player Features (TABLE - complex window functions):
-- models/intermediate/int_player_rolling_stats.sql
{{ config(materialized='table') }} -- Materialize for performance
WITH rolling_calculations AS (
SELECT
player_id,
game_date,
-- Heavy computation - worth materializing
AVG(fantasy_points) OVER (
PARTITION BY player_id
ORDER BY game_date
ROWS BETWEEN 4 PRECEDING AND 1 PRECEDING
) AS fantasy_points_5game_avg,
AVG(targets) OVER (
PARTITION BY player_id
ORDER BY game_date
ROWS BETWEEN 9 PRECEDING AND 1 PRECEDING
) AS targets_10game_avg
FROM {{ ref('stg_player_stats') }}
)
-- models/intermediate/int_player_situational_stats.sql
{{ config(materialized='view') }} -- Simple aggregations, keep as view
-- Red zone performance, down/distance splits
-- Weather/dome performance, prime time performanceTeam Features (TABLE - aggregations across large datasets):
-- models/intermediate/int_team_offensive_metrics.sql
{{ config(materialized='table') }} -- Materialize team aggregations
WITH team_offense AS (
SELECT
posteam AS team,
season,
week,
AVG(epa) AS avg_epa_per_play,
SUM(CASE WHEN yards_gained >= 10 THEN 1 ELSE 0 END) / COUNT(*) AS explosive_play_rate,
SUM(CASE WHEN is_first_down THEN 1 ELSE 0 END) / COUNT(*) AS success_rate
FROM {{ ref('stg_play_by_play') }}
WHERE is_rush OR is_pass
GROUP BY posteam, season, week
)
-- models/intermediate/int_team_defensive_metrics.sql
{{ config(materialized='table') }}
-- Defensive EPA allowed, pressure rates, coverage metrics
-- models/intermediate/int_team_situational_factors.sql
{{ config(materialized='view') }} -- Simple joins, keep as view
-- Home/away splits, division games, rest advantageAlways materialized as TABLES for ML performance
-- models/marts/fct_player_game_predictions.sql
{{ config(
materialized='table',
schema='marts'
) }}
-- Final dataset for player stat predictions
-- One row per player per upcoming game
WITH player_base AS (
SELECT DISTINCT
p.player_id,
p.player_name,
p.position,
p.team,
s.game_id,
s.game_date,
s.home_team,
s.away_team
FROM {{ ref('stg_rosters') }} p
JOIN {{ ref('stg_schedules') }} s ON p.team IN (s.home_team, s.away_team)
WHERE s.game_date > CURRENT_DATE() -- Only future games
),
enriched_features AS (
SELECT
pb.*,
-- Rolling stats
pr.fantasy_points_5game_avg,
pr.targets_10game_avg,
-- Team context
CASE WHEN pb.team = pb.home_team THEN tom.avg_epa_per_play ELSE toa.avg_epa_per_play END AS team_offensive_epa,
-- Opponent defense
CASE WHEN pb.team = pb.home_team THEN tda.avg_epa_allowed ELSE tdm.avg_epa_allowed END AS opp_defensive_epa
FROM player_base pb
LEFT JOIN {{ ref('int_player_rolling_stats') }} pr
ON pb.player_id = pr.player_id
LEFT JOIN {{ ref('int_team_offensive_metrics') }} tom
ON pb.home_team = tom.team
LEFT JOIN {{ ref('int_team_offensive_metrics') }} toa
ON pb.away_team = toa.team
LEFT JOIN {{ ref('int_team_defensive_metrics') }} tdm
ON pb.home_team = tdm.team
LEFT JOIN {{ ref('int_team_defensive_metrics') }} tda
ON pb.away_team = tda.team
)
SELECT * FROM enriched_features
-- models/marts/fct_team_game_predictions.sql
{{ config(
materialized='table',
schema='marts'
) }}
-- Final dataset for spread/total predictions
-- One row per game with both teams' features
WITH upcoming_games AS (
SELECT
game_id,
game_date,
home_team,
away_team,
spread_line,
total_line
FROM {{ ref('stg_schedules') }}
WHERE game_date > CURRENT_DATE()
),
team_features AS (
SELECT
ug.*,
-- Home team metrics
tom_h.avg_epa_per_play AS home_offensive_epa,
tdm_h.avg_epa_allowed AS home_defensive_epa,
-- Away team metrics
tom_a.avg_epa_per_play AS away_offensive_epa,
tdm_a.avg_epa_allowed AS away_defensive_epa,
-- Situational factors
tsf.rest_advantage,
tsf.is_division_game
FROM upcoming_games ug
LEFT JOIN {{ ref('int_team_offensive_metrics') }} tom_h ON ug.home_team = tom_h.team
LEFT JOIN {{ ref('int_team_defensive_metrics') }} tdm_h ON ug.home_team = tdm_h.team
LEFT JOIN {{ ref('int_team_offensive_metrics') }} tom_a ON ug.away_team = tom_a.team
LEFT JOIN {{ ref('int_team_defensive_metrics') }} tdm_a ON ug.away_team = tdm_a.team
LEFT JOIN {{ ref('int_team_situational_factors') }} tsf ON ug.game_id = tsf.game_id
)
SELECT * FROM team_features# src/ml/base_model.py
from abc import ABC, abstractmethod
import polars as pl
import xgboost as xgb
import snowflake.connector
class BaseModel(ABC):
def __init__(self, connection_params: dict):
self.conn = snowflake.connector.connect(**connection_params)
def load_data_from_snowflake(self, query: str) -> pl.DataFrame:
"""Load data directly from Snowflake marts"""
cursor = self.conn.cursor()
cursor.execute(query)
results = cursor.fetchall()
columns = [desc[0] for desc in cursor.description]
return pl.DataFrame(results, schema=columns)
@abstractmethod
def prepare_features(self, df: pl.DataFrame) -> pl.DataFrame:
pass
@abstractmethod
def train(self, df: pl.DataFrame) -> None:
pass
@abstractmethod
def predict(self, df: pl.DataFrame) -> pl.DataFrame:
pass# src/ml/player_models.py
class PlayerStatModel(BaseModel):
"""Predict player statistics (yards, TDs, receptions, etc.)"""
def __init__(self, connection_params: dict, stat_type: str, position: str):
super().__init__(connection_params)
self.stat_type = stat_type # 'fantasy_points', 'receiving_yards', etc.
self.position = position # 'QB', 'RB', 'WR', 'TE'
self.model = xgb.XGBRegressor(
n_estimators=100,
max_depth=6,
learning_rate=0.1,
subsample=0.8
)
def load_training_data(self) -> pl.DataFrame:
"""Load historical player data for training"""
query = f"""
SELECT *
FROM NFL_ANALYTICS.MARTS.FCT_PLAYER_GAME_PREDICTIONS
WHERE position = '{self.position}'
AND game_date < CURRENT_DATE()
AND {self.stat_type} IS NOT NULL
ORDER BY game_date DESC
LIMIT 10000
"""
return self.load_data_from_snowflake(query)
def load_prediction_data(self) -> pl.DataFrame:
"""Load upcoming games for predictions"""
query = f"""
SELECT *
FROM NFL_ANALYTICS.MARTS.FCT_PLAYER_GAME_PREDICTIONS
WHERE position = '{self.position}'
AND game_date >= CURRENT_DATE()
"""
return self.load_data_from_snowflake(query)
def prepare_features(self, df: pl.DataFrame) -> pl.DataFrame:
"""Prepare features for ML model"""
feature_cols = [
'fantasy_points_5game_avg',
'targets_10game_avg',
'team_offensive_epa',
'opp_defensive_epa',
# Add more position-specific features
]
if self.position in ['WR', 'TE']:
feature_cols.extend([
'target_share_5game_avg',
'air_yards_share_5game_avg'
])
elif self.position == 'RB':
feature_cols.extend([
'carries_5game_avg',
'goal_line_carries_avg'
])
return df.select(feature_cols + [self.stat_type])# src/ml/team_models.py
class SpreadModel(BaseModel):
"""Predict point spreads"""
def __init__(self, connection_params: dict):
super().__init__(connection_params)
self.model = xgb.XGBRegressor(
objective='reg:squarederror',
n_estimators=200,
max_depth=8,
learning_rate=0.05
)
def load_training_data(self) -> pl.DataFrame:
"""Load historical game data with outcomes"""
query = """
WITH historical_games AS (
SELECT
tgp.*,
s.away_score - s.home_score AS actual_spread,
s.away_score + s.home_score AS actual_total
FROM NFL_ANALYTICS.MARTS.FCT_TEAM_GAME_PREDICTIONS tgp
JOIN NFL_ANALYTICS.STAGING.STG_SCHEDULES s
ON tgp.game_id = s.game_id
WHERE s.away_score IS NOT NULL -- Game has been played
AND s.game_date >= '2019-01-01'
)
SELECT * FROM historical_games
ORDER BY game_date DESC
"""
return self.load_data_from_snowflake(query)
def prepare_features(self, df: pl.DataFrame) -> pl.DataFrame:
"""Prepare team-level features"""
feature_cols = [
'home_offensive_epa',
'home_defensive_epa',
'away_offensive_epa',
'away_defensive_epa',
'rest_advantage',
'is_division_game',
'spread_line' # Market spread as feature
]
return df.select(feature_cols + ['actual_spread'])
class TotalModel(BaseModel):
"""Predict game totals (over/under)"""
def __init__(self, connection_params: dict):
super().__init__(connection_params)
self.model = xgb.XGBRegressor(
objective='reg:squarederror',
n_estimators=150,
max_depth=6
)
def prepare_features(self, df: pl.DataFrame) -> pl.DataFrame:
"""Prepare features for total points prediction"""
feature_cols = [
'home_offensive_epa',
'away_offensive_epa',
'home_defensive_epa',
'away_defensive_epa',
'total_line', # Market total as feature
'weather_wind_speed',
'is_dome_game'
]
return df.select(feature_cols + ['actual_total'])Start with simple cron-based scheduling, upgrade to Dagster later if needed
# scripts/weekly_refresh.sh
#!/bin/bash
# Weekly data refresh - run Tuesdays after MNF
echo "Starting weekly refresh..."
# Refresh external tables (automatic with nflverse updates)
echo "External tables auto-refresh with nflverse updates"
# Run dbt transformations
cd dbt_project
dbt run --select marts
dbt test --select marts
# Retrain models
cd ../
python src/ml/train_models.py --retrain
# Generate predictions for upcoming week
python src/ml/predict.py --week next
echo "Weekly refresh complete!"# src/ml/train_models.py
"""Simple model training script"""
import click
from ml.player_models import PlayerStatModel
from ml.team_models import SpreadModel, TotalModel
from utils.snowflake_connection import get_connection_params
@click.command()
@click.option('--retrain', is_flag=True, help='Retrain all models')
@click.option('--position', help='Train models for specific position only')
def main(retrain: bool, position: str):
"""Train ML models"""
conn_params = get_connection_params()
if retrain or not position:
# Train team models
spread_model = SpreadModel(conn_params)
total_model = TotalModel(conn_params)
print("Training spread model...")
spread_data = spread_model.load_training_data()
spread_features = spread_model.prepare_features(spread_data)
spread_model.train(spread_features)
print("Training total model...")
total_data = total_model.load_training_data()
total_features = total_model.prepare_features(total_data)
total_model.train(total_features)
# Train player models by position
positions = [position] if position else ['QB', 'RB', 'WR', 'TE']
for pos in positions:
print(f"Training {pos} fantasy points model...")
player_model = PlayerStatModel(conn_params, 'fantasy_points', pos)
player_data = player_model.load_training_data()
player_features = player_model.prepare_features(player_data)
player_model.train(player_features)
if __name__ == "__main__":
main()-- sql/monitoring_queries.sql
-- Monitor external table query costs
SELECT
query_text,
execution_time,
bytes_scanned,
credits_used,
start_time
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY
WHERE query_text LIKE '%ext_%'
AND start_time >= DATEADD(day, -7, CURRENT_TIMESTAMP())
ORDER BY credits_used DESC;
-- Monitor dbt model performance
SELECT
query_text,
execution_time,
credits_used
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY
WHERE query_text LIKE '%dbt%'
AND start_time >= DATEADD(day, -1, CURRENT_TIMESTAMP())
ORDER BY execution_time DESC;
-- Check warehouse utilization
SELECT
warehouse_name,
credits_used,
SUM(credits_used) OVER (PARTITION BY warehouse_name) AS total_credits
FROM SNOWFLAKE.ACCOUNT_USAGE.WAREHOUSE_METERING_HISTORY
WHERE start_time >= DATEADD(month, -1, CURRENT_TIMESTAMP());# src/utils/cost_monitor.py
"""Monitor Snowflake costs and performance"""
import snowflake.connector
from datetime import datetime, timedelta
class CostMonitor:
def __init__(self, connection_params: dict):
self.conn = snowflake.connector.connect(**connection_params)
def check_daily_costs(self) -> dict:
"""Check costs for the last 24 hours"""
query = """
SELECT
SUM(credits_used) AS total_credits,
AVG(execution_time) / 1000 AS avg_execution_time_seconds,
COUNT(*) AS total_queries
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY
WHERE start_time >= DATEADD(day, -1, CURRENT_TIMESTAMP())
"""
cursor = self.conn.cursor()
cursor.execute(query)
result = cursor.fetchone()
return {
'credits_used': result[0] or 0,
'avg_execution_time': result[1] or 0,
'total_queries': result[2] or 0,
'estimated_daily_cost': (result[0] or 0) * 2 # Rough $2 per credit
}
def optimize_warehouses(self):
"""Auto-suspend idle warehouses"""
cursor = self.conn.cursor()
cursor.execute("ALTER WAREHOUSE NFL_ANALYTICS_WH SUSPEND")
print("Warehouse suspended to save costs")OLD: nfl_data_py β Python Processing β Snowflake Raw β dbt
NEW: GitHub nflverse β External Tables β dbt Staging β Selective Materialization
- External Tables: ~$5-10/month (query costs only)
- Selective Materialization: Only complex calculations stored as tables
- Views for Simple Logic: Column renames, filters stay as views
- Smart Warehouse Management: Auto-suspend, right-sizing
- Slower: Initial external table queries (2-3x slower than native)
- Faster: Complex feature engineering (materialized tables)
- Optimal: ML model training (marts are native tables)
- β Complete Phase 1 (Infrastructure + External Tables)
- β Have external tables connected to nflverse data
- β Basic dbt project with staging models working
- β Complete Phase 2 (dbt transformations)
- β Feature engineering pipeline with selective materialization
- β Data quality tests passing on marts
- β Complete Phases 3-4 (ML + Simple Automation)
- β First working models generating predictions
- β Basic refresh scripts running
- β Upgrade to Dagster orchestration if needed
- β Advanced monitoring and alerting
- β Model performance tuning
-- Calculate 5-game rolling averages for player performance
WITH player_game_stats AS (
SELECT
player_id,
game_id,
game_date,
passing_yards,
rushing_yards,
receiving_yards,
fantasy_points
FROM {{ ref('stg_player_stats') }}
),
rolling_stats AS (
SELECT
player_id,
game_id,
AVG(passing_yards) OVER (
PARTITION BY player_id
ORDER BY game_date
ROWS BETWEEN 4 PRECEDING AND 1 PRECEDING
) AS passing_yards_5game_avg,
AVG(fantasy_points) OVER (
PARTITION BY player_id
ORDER BY game_date
ROWS BETWEEN 4 PRECEDING AND 1 PRECEDING
) AS fantasy_points_5game_avg
FROM player_game_stats
)
SELECT * FROM rolling_stats-- Team defensive rankings against specific positions
WITH defensive_rankings AS (
SELECT
team,
season,
RANK() OVER (PARTITION BY season ORDER BY avg_passing_yards_allowed) AS pass_def_rank,
RANK() OVER (PARTITION BY season ORDER BY avg_rushing_yards_allowed) AS rush_def_rank,
avg_fantasy_points_allowed_to_qb,
avg_fantasy_points_allowed_to_rb,
avg_fantasy_points_allowed_to_wr
FROM {{ ref('int_team_defensive_metrics') }}
)
SELECT * FROM defensive_rankings- Complete Phases 1-2 (Infrastructure + Data Ingestion)
- Have raw nflverse data flowing into Snowflake
- Basic dbt project structure
- Complete Phase 3 (dbt transformations)
- Feature engineering pipeline fully built
- Data quality tests passing
- Complete Phases 4-5 (ML + Orchestration)
- First working models generating predictions
- Automated pipeline running
- External tables successfully read nflverse data with <$50/month total costs
- Models generate predictions for next week's games
- Pipeline processes 5+ years of historical data efficiently via selective materialization
- Code is well-documented and AI assistant-friendly
- Predictions include confidence intervals/uncertainty estimates
- dbt models run in <5 minutes for weekly refresh
- External table queries complete in <30 seconds for staging models
- ML models achieve >60% directional accuracy on player stats
- Team models beat market spread/total predictions by >52% accuracy
- Snowflake External Table Queries: ~$10-15/month (reading from GitHub)
- Materialized Table Storage: ~$5-10/month (only complex features stored)
- Compute for dbt/ML: ~$15-20/month (X-Small warehouse, auto-suspend)
- Python dependencies: Free (open source stack)
- Total Expected: ~$30-45/month (well under $50 budget)
- Development time: 20 hours target
-- Weekly cost check (run this every Tuesday)
SELECT
DATE_TRUNC('week', start_time) AS week,
SUM(credits_used) AS total_credits,
SUM(credits_used) * 2 AS estimated_cost_usd
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY
WHERE start_time >= DATEADD(week, -4, CURRENT_TIMESTAMP())
GROUP BY week
ORDER BY week DESC;
-- External table usage specifically
SELECT
DATE(start_time) AS query_date,
COUNT(*) AS external_table_queries,
SUM(credits_used) AS external_table_credits
FROM SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY
WHERE query_text ILIKE '%ext_%'
AND start_time >= DATEADD(week, -1, CURRENT_TIMESTAMP())
GROUP BY query_date
ORDER BY query_date DESC;- β Repository Setup: uv, ruff, .gitignore configured
- π― Snowflake Trial: Create account and run external table setup
- π Find nflverse URLs: Research GitHub release URLs for parquet files
- π Test External Tables: Verify connectivity and basic queries
- ποΈ dbt Foundation: Initialize project and create first staging model
- π€ First ML Model: Simple player stat prediction from marts
-- Template for external table creation (URLs need to be researched)
-- Check https://github.com/nflverse/nflverse-data/releases for actual URLs
CREATE EXTERNAL TABLE ext_play_by_play(
-- Column definitions from earlier artifact
)
LOCATION = 'https://github.com/nflverse/nflverse-data/releases/download/pbp/play_by_play_YYYY.parquet'
FILE_FORMAT = parquet_format
AUTO_REFRESH = TRUE;
-- Pattern likely to be:
-- pbp: 'https://github.com/nflverse/nflverse-data/releases/download/pbp/'
-- player_stats: 'https://github.com/nflverse/nflverse-data/releases/download/player_stats/'
-- schedules: 'https://github.com/nflverse/nflverse-data/releases/download/schedules/'
-- rosters: 'https://github.com/nflverse/nflverse-data/releases/download/rosters/'If external tables become too expensive or slow:
- Hybrid Approach: External for historical, native for current season
- Selective Ingestion: Use Python loader for just recent data (current season only)
- Materialization Strategy: Increase what's stored as native tables vs. views
- Warehouse Scaling: Move to larger warehouse if query performance is too slow
This plan balances your requirements for SQL-first processing, modern Python tooling, external table efficiency, and cost-effectiveness while building toward production-ready ML predictions with next week's games as the target output.# NFL Prediction System - Project Plan