diff --git a/.github/workflows/python-tests.yml b/.github/workflows/ci.yml similarity index 53% rename from .github/workflows/python-tests.yml rename to .github/workflows/ci.yml index 4ec196f..c600f77 100644 --- a/.github/workflows/python-tests.yml +++ b/.github/workflows/ci.yml @@ -1,10 +1,10 @@ -name: Python Tests and Coverage +name: CI on: push: - branches: [ main, feature/* ] + branches: [main, feature/*] pull_request: - branches: [ main ] + branches: [main] jobs: lint-pr-title: @@ -23,23 +23,29 @@ jobs: exit 1 fi - test: + quality: runs-on: ubuntu-latest - steps: - - uses: actions/checkout@v4 - - - name: Set up Python - uses: actions/setup-python@v5 - with: - python-version: '3.9' - - - name: Install dependencies - run: | - python -m pip install --upgrade pip - pip install -r requirements.txt - - - name: Run tests with coverage - run: | - export PYTHONPATH=$PYTHONPATH:. - pytest --cov=. --cov-report=term-missing --cov-fail-under=80 tests/ + - uses: actions/checkout@v4 + + - uses: actions/setup-python@v5 + with: + python-version: "3.9" + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install ruff mypy types-requests -r requirements.txt + + - name: Lint and format check + run: | + ruff check . + ruff format --check . + + - name: Type check + run: mypy + + - name: Tests and coverage + run: | + export PYTHONPATH=$PYTHONPATH:. + pytest --cov=. --cov-report=term-missing --cov-fail-under=80 tests/ diff --git a/GEMINI.md b/CLAUDE.md similarity index 71% rename from GEMINI.md rename to CLAUDE.md index 92e06f9..3003f1f 100644 --- a/GEMINI.md +++ b/CLAUDE.md @@ -11,14 +11,14 @@ This document serves as the foundational mandate for all development work perfor ## 2. Python Standards -* **Type Safety**: Use Python 3.10+ type hints for all function signatures and complex variables. -* **Style**: Adhere strictly to PEP 8. Use `ruff` or `black` for formatting. +* **Type Safety**: Use Python 3.9+ type hints for all function signatures and complex variables. Use `from __future__ import annotations` where needed for forward references. +* **Style**: Adhere strictly to PEP 8. Use `ruff` for linting and formatting (replaces black/flake8/isort). * **Documentation**: Every function must have a docstring (Google or NumPy style) explaining its purpose, parameters, and return values. * **Environment**: Always use the virtual environment (`venv/`) and keep `requirements.txt` updated. ## 3. Data & Privacy (Mandatory) -* **Anonymity**: Never hardcode personal data (locations, usernames, credentials) into the codebase. +* **Anonymity**: Never hardcode personal data (locations, usernames, credentials) into the codebase. * **Externalize Assumptions**: Any personal identifying data or location assumptions must reside in external JSON files (e.g., `default_assumptions.json.example`) or environment variables. * **Credential Protection**: Use the `AUTOBIO_` environment variable prefix for all configuration. Never log or print API keys or secrets. @@ -27,7 +27,7 @@ This document serves as the foundational mandate for all development work perfor * **Framework**: Use `pytest` for all tests. * **Granularity**: Prefer unit tests for utility logic (`analysis_utils.py`) and integration tests for UI/CLI flows (`visualize.py`, `record_flythrough.py`). * **Mocks**: Properly mock external dependencies (Last.fm API, Streamlit UI components) to ensure tests are fast, deterministic, and can run in CI. -* **Validation Step**: AI agents MUST run the full test suite (`.\venv\Scripts\python -m pytest tests/`) before proposing any change. +* **Validation Step**: AI agents MUST run the full test suite and all static analysis tools before proposing any change (see Section 7). ## 5. Caching & Efficiency @@ -39,5 +39,34 @@ This document serves as the foundational mandate for all development work perfor 1. **Research**: Map dependencies and identify the minimal path to implementation. 2. **Strategy**: Formulate a plan that prioritizes the least disruptive, most maintainable change. 3. **Act**: Apply surgical edits. Use `replace` for targeted updates to large files. -4. **Validate**: Run tests, check linting, and verify manual use cases. +4. **Validate**: Run the full local gate (Section 7) before committing or pushing. +## 7. Local Quality Gate (Required Before Every Commit or Push) + +All of the following must pass with zero errors before any commit or push to GitHub. AI agents must run these in order and fix all failures before proceeding. + +```bash +# 1. Lint and format check +ruff check . +ruff format --check . + +# 2. Type checking +mypy . + +# 3. Security scan +bandit -r . -x venv,tests + +# 4. Tests with coverage +pytest --cov=. --cov-report=term-missing --cov-fail-under=80 tests/ +``` + +To auto-fix ruff lint and format issues before checking: +```bash +ruff check --fix . +ruff format . +``` + +Install all tools into the venv if not present: +```bash +pip install ruff mypy bandit +``` diff --git a/analysis_utils.py b/analysis_utils.py index cc14f1b..d03feef 100644 --- a/analysis_utils.py +++ b/analysis_utils.py @@ -1,67 +1,72 @@ -import pandas as pd -import os -import json import glob import hashlib +import json +import os +from typing import Any, Optional + import numpy as np -from typing import Optional, Dict, List, Any -from bisect import bisect_right -from datetime import datetime, timedelta +import pandas as pd + -def get_cache_key(lastfm_file: str, swarm_dir: Optional[str] = None, assumptions_file: Optional[str] = None) -> str: +def get_cache_key( + lastfm_file: str, swarm_dir: Optional[str] = None, assumptions_file: Optional[str] = None +) -> str: """Generate a unique cache key based on input files and their modification times.""" if not os.path.exists(lastfm_file): return "none" - + lastfm_mtime = os.path.getmtime(lastfm_file) key_parts = [lastfm_file, str(lastfm_mtime)] - + if swarm_dir and os.path.isdir(swarm_dir): # Sort files to ensure deterministic key swarm_files = sorted(glob.glob(os.path.join(swarm_dir, "checkins*.json"))) for f in swarm_files: key_parts.append(f) key_parts.append(str(os.path.getmtime(f))) - + if assumptions_file and os.path.exists(assumptions_file): key_parts.append(assumptions_file) key_parts.append(str(os.path.getmtime(assumptions_file))) - + # Include version to invalidate cache if logic changes - key_parts.append("v1.4") - - return hashlib.md5("".join(key_parts).encode()).hexdigest() + key_parts.append("v1.4") + + return hashlib.md5("".join(key_parts).encode(), usedforsecurity=False).hexdigest() # noqa: S324 + def get_cached_data(cache_key: str, cache_dir: str = "data/cache") -> Optional[pd.DataFrame]: """Retrieve processed data from cache if it exists.""" if cache_key == "none": return None - + cache_path = os.path.join(cache_dir, f"{cache_key}.csv.gz") if os.path.exists(cache_path): try: - df = pd.read_csv(cache_path, compression='gzip') - if 'date_text' in df.columns: - df['date_text'] = pd.to_datetime(df['date_text']) + df = pd.read_csv(cache_path, compression="gzip") + if "date_text" in df.columns: + df["date_text"] = pd.to_datetime(df["date_text"]) return df - except Exception: - pass + except Exception as e: + print(f"Warning: failed to read cache at {cache_path}: {e}") return None -def save_to_cache(df: pd.DataFrame, cache_key: str, cache_dir: str = "data/cache"): + +def save_to_cache(df: pd.DataFrame, cache_key: str, cache_dir: str = "data/cache") -> None: """Save processed data to cache.""" if cache_key == "none": return - + if not os.path.exists(cache_dir): os.makedirs(cache_dir, exist_ok=True) cache_path = os.path.join(cache_dir, f"{cache_key}.csv.gz") try: - df.to_csv(cache_path, index=False, compression='gzip') + df.to_csv(cache_path, index=False, compression="gzip") except Exception as e: print(f"Error saving to cache: {e}") -def load_assumptions(assumptions_file: Optional[str]) -> Dict[str, Any]: + +def load_assumptions(assumptions_file: Optional[str]) -> dict[str, Any]: """Load location assumptions from a JSON file.""" default_data = { "defaults": { @@ -70,116 +75,125 @@ def load_assumptions(assumptions_file: Optional[str]) -> Dict[str, Any]: "country": "Iceland", "lat": 64.1265, "lng": -21.8174, - "timezone": "Atlantic/Reykjavik" + "timezone": "Atlantic/Reykjavik", }, "holidays": [], "trips": [], - "residency": [] + "residency": [], } - + if not assumptions_file or not os.path.exists(assumptions_file): return default_data - + try: - with open(assumptions_file, 'r') as f: + with open(assumptions_file) as f: user_data = json.load(f) # Merge with defaults to ensure all keys exist for key in default_data: if key not in user_data: user_data[key] = default_data[key] - return user_data + return user_data # type: ignore[no-any-return] except Exception as e: print(f"Error loading assumptions: {e}") return default_data + def load_listening_data(file_path: str) -> Optional[pd.DataFrame]: """Load and preprocess listening history from CSV.""" if not os.path.exists(file_path): return None try: df = pd.read_csv(file_path) - if 'date_text' in df.columns: - df['date_text'] = pd.to_datetime(df['date_text']) - + if "date_text" in df.columns: + df["date_text"] = pd.to_datetime(df["date_text"]) + # Ensure we have a unix timestamp for lookup (Last.fm 'uts') - if 'timestamp' not in df.columns and 'date_text' in df.columns: - df['timestamp'] = df['date_text'].astype('int64') // 10**9 - + if "timestamp" not in df.columns and "date_text" in df.columns: + df["timestamp"] = df["date_text"].astype("int64") // 10**9 + return df except Exception: return None + def load_swarm_data(swarm_dir: str) -> pd.DataFrame: """Load and parse Swarm checkin data from JSON files.""" all_checkins = [] if not swarm_dir or not os.path.exists(swarm_dir): - return pd.DataFrame(columns=['timestamp', 'offset', 'city', 'state', 'country', 'venue', 'lat', 'lng']) + return pd.DataFrame( + columns=["timestamp", "offset", "city", "state", "country", "venue", "lat", "lng"] + ) json_files = glob.glob(os.path.join(swarm_dir, "checkins*.json")) for file_path in json_files: try: - with open(file_path, 'r', encoding='utf-8') as f: + with open(file_path, encoding="utf-8") as f: data = json.load(f) - items = data.get('items', []) + items = data.get("items", []) for item in items: - raw_created_at = item.get('createdAt') + raw_created_at = item.get("createdAt") if raw_created_at is None: continue - + try: if isinstance(raw_created_at, (int, float)): - created_at = pd.to_datetime(raw_created_at, unit='s', utc=True) + created_at = pd.to_datetime(raw_created_at, unit="s", utc=True) else: created_at = pd.to_datetime(raw_created_at, utc=True) ts = int(created_at.timestamp()) except (ValueError, TypeError): continue - - offset = item.get('timeZoneOffset', 0) - venue = item.get('venue') or {} - location = venue.get('location') or {} - - city = location.get('city') - state = location.get('state') - country = location.get('country') - + + offset = item.get("timeZoneOffset", 0) + venue = item.get("venue") or {} + location = venue.get("location") or {} + + city = location.get("city") + state = location.get("state") + country = location.get("country") + if not city: - city = state or country or venue.get('name', 'Unknown') + city = state or country or venue.get("name", "Unknown") if not state: - state = country or 'Unknown' + state = country or "Unknown" if not country: - country = 'Unknown' - - lat = item.get('lat') or location.get('lat') - lng = item.get('lng') or location.get('lng') - - all_checkins.append({ - 'timestamp': ts, - 'offset': offset, - 'city': city, - 'state': state, - 'country': country, - 'venue': venue.get('name', 'Unknown'), - 'lat': lat, - 'lng': lng - }) + country = "Unknown" + + lat = item.get("lat") or location.get("lat") + lng = item.get("lng") or location.get("lng") + + all_checkins.append( + { + "timestamp": ts, + "offset": offset, + "city": city, + "state": state, + "country": country, + "venue": venue.get("name", "Unknown"), + "lat": lat, + "lng": lng, + } + ) except Exception as e: print(f"Error loading {file_path}: {e}") - + if not all_checkins: - return pd.DataFrame(columns=['timestamp', 'offset', 'city', 'state', 'country', 'venue', 'lat', 'lng']) - + return pd.DataFrame( + columns=["timestamp", "offset", "city", "state", "country", "venue", "lat", "lng"] + ) + df = pd.DataFrame(all_checkins) - df = df.sort_values('timestamp').drop_duplicates('timestamp') + df = df.sort_values("timestamp").drop_duplicates("timestamp") return df -def get_assumption_location(ts: int, assumptions: Dict[str, Any]) -> Optional[Dict[str, Any]]: + +def get_assumption_location(ts: int, assumptions: dict[str, Any]) -> Optional[dict[str, Any]]: """ Get location and offset based on runtime assumptions (Issue #39). This is a non-vectorized version mainly used for tests and single lookups. """ - dt_utc = pd.to_datetime([ts], unit='s', utc=True) - + dt_utc = pd.to_datetime([ts], unit="s", utc=True) + # Simple recurring holiday check for holiday in assumptions.get("holidays", []): tz = holiday.get("timezone", "UTC") @@ -193,7 +207,7 @@ def get_assumption_location(ts: int, assumptions: Dict[str, Any]) -> Optional[Di "state": holiday.get("state", holiday.get("city")), "country": holiday.get("country", "Unknown"), "lat": holiday.get("lat"), - "lng": holiday.get("lng") + "lng": holiday.get("lng"), } # Trip check @@ -209,7 +223,7 @@ def get_assumption_location(ts: int, assumptions: Dict[str, Any]) -> Optional[Di "state": trip.get("state", trip.get("city")), "country": trip.get("country", "Unknown"), "lat": trip.get("lat"), - "lng": trip.get("lng") + "lng": trip.get("lng"), } # Residency check @@ -223,13 +237,18 @@ def get_assumption_location(ts: int, assumptions: Dict[str, Any]) -> Optional[Di local_time = dt_utc.tz_convert(tz)[0] cond = rule.get("condition") if cond == "work_hours": - if local_time.weekday() < 5 and ((local_time.hour == 8 and local_time.minute >= 30) or (9 <= local_time.hour < 16) or (local_time.hour == 16 and local_time.minute <= 30)): + if local_time.weekday() < 5 and ( + (local_time.hour == 8 and local_time.minute >= 30) + or (9 <= local_time.hour < 16) + or (local_time.hour == 16 and local_time.minute <= 30) + ): return { "offset": int(local_time.utcoffset().total_seconds() / 60), - "city": rule.get("city"), + "city": rule.get("city"), "state": rule.get("state", rule.get("city")), "country": rule.get("country", "Unknown"), - "lat": rule.get("lat"), "lng": rule.get("lng") + "lat": rule.get("lat"), + "lng": rule.get("lng"), } elif cond == "home_logic": home_1_end = pd.to_datetime(rule.get("home_1_end")).replace(tzinfo=None) @@ -237,21 +256,29 @@ def get_assumption_location(ts: int, assumptions: Dict[str, Any]) -> Optional[Di return { "offset": int(local_time.utcoffset().total_seconds() / 60), "city": rule.get("city_1") if use_home_1 else rule.get("city_2"), - "state": (rule.get("state_1") if use_home_1 else rule.get("state_2")) or (rule.get("city_1") if use_home_1 else rule.get("city_2")), + "state": (rule.get("state_1") if use_home_1 else rule.get("state_2")) + or (rule.get("city_1") if use_home_1 else rule.get("city_2")), "country": rule.get("country", "Unknown"), "lat": rule.get("lat_1") if use_home_1 else rule.get("lat_2"), - "lng": rule.get("lng_1") if use_home_1 else rule.get("lng_2") + "lng": rule.get("lng_1") if use_home_1 else rule.get("lng_2"), } return { - "offset": 0, - "city": res.get("city"), + "offset": 0, + "city": res.get("city"), "state": res.get("state", res.get("city")), "country": res.get("country", "Unknown"), - "lat": res.get("lat"), "lng": res.get("lng") + "lat": res.get("lat"), + "lng": res.get("lng"), } return None -def apply_swarm_offsets(lastfm_df: pd.DataFrame, swarm_df: pd.DataFrame, assumptions: Dict[str, Any], max_age_days: int = 30) -> pd.DataFrame: + +def apply_swarm_offsets( + lastfm_df: pd.DataFrame, + swarm_df: pd.DataFrame, + assumptions: dict[str, Any], + max_age_days: int = 30, +) -> pd.DataFrame: """ Adjust Last.fm track timestamps and locations based on Swarm checkins or runtime assumptions. Highly optimized vectorized implementation (Issue #39 optimization). @@ -269,44 +296,44 @@ def apply_swarm_offsets(lastfm_df: pd.DataFrame, swarm_df: pd.DataFrame, assumpt DEFAULT_TZ = defaults.get("timezone", "Atlantic/Reykjavik") # 1. Pre-calculate UTC timestamps and local variants for checks - dt_utc = pd.to_datetime(df['timestamp'], unit='s', utc=True) - + dt_utc = pd.to_datetime(df["timestamp"], unit="s", utc=True) + # Initialize result columns with defaults - df['tz_offset_min'] = 0 - df['city'] = DEFAULT_CITY - df['state'] = DEFAULT_STATE - df['country'] = DEFAULT_COUNTRY - df['lat'] = DEFAULT_LAT - df['lng'] = DEFAULT_LNG - + df["tz_offset_min"] = 0 + df["city"] = DEFAULT_CITY + df["state"] = DEFAULT_STATE + df["country"] = DEFAULT_COUNTRY + df["lat"] = DEFAULT_LAT + df["lng"] = DEFAULT_LNG + # Track which rows have been geocoded to avoid overwriting - geocoded_mask = np.zeros(len(df), dtype=bool) + geocoded_mask: np.ndarray = np.zeros(len(df), dtype=bool) # 2. Try Swarm Data (Fastest Lookup) if not swarm_df.empty: - swarm_ts = swarm_df['timestamp'].values + swarm_ts = swarm_df["timestamp"].values max_age_sec = max_age_days * 24 * 60 * 60 - + # Use binary search to find the most recent checkin for every track - indices = np.searchsorted(swarm_ts, df['timestamp'].values, side='right') - 1 - + indices = np.searchsorted(swarm_ts, df["timestamp"].values, side="right") - 1 + # Filter indices that are within range and not too old - valid_indices_mask = (indices >= 0) + valid_indices_mask = indices >= 0 if valid_indices_mask.any(): checkin_ts = swarm_ts[indices[valid_indices_mask]] - age_mask = (df['timestamp'].values[valid_indices_mask] - checkin_ts) <= max_age_sec - + age_mask = (df["timestamp"].values[valid_indices_mask] - checkin_ts) <= max_age_sec + final_swarm_mask = valid_indices_mask.copy() final_swarm_mask[valid_indices_mask] = age_mask - + if final_swarm_mask.any(): match_indices = indices[final_swarm_mask] - df.loc[final_swarm_mask, 'tz_offset_min'] = swarm_df['offset'].values[match_indices] - df.loc[final_swarm_mask, 'city'] = swarm_df['city'].values[match_indices] - df.loc[final_swarm_mask, 'state'] = swarm_df['state'].values[match_indices] - df.loc[final_swarm_mask, 'country'] = swarm_df['country'].values[match_indices] - df.loc[final_swarm_mask, 'lat'] = swarm_df['lat'].values[match_indices] - df.loc[final_swarm_mask, 'lng'] = swarm_df['lng'].values[match_indices] + df.loc[final_swarm_mask, "tz_offset_min"] = swarm_df["offset"].values[match_indices] + df.loc[final_swarm_mask, "city"] = swarm_df["city"].values[match_indices] + df.loc[final_swarm_mask, "state"] = swarm_df["state"].values[match_indices] + df.loc[final_swarm_mask, "country"] = swarm_df["country"].values[match_indices] + df.loc[final_swarm_mask, "lat"] = swarm_df["lat"].values[match_indices] + df.loc[final_swarm_mask, "lng"] = swarm_df["lng"].values[match_indices] geocoded_mask[final_swarm_mask] = True # 3. Apply Runtime Assumptions (Residency, Trips, Holidays) @@ -316,138 +343,167 @@ def apply_swarm_offsets(lastfm_df: pd.DataFrame, swarm_df: pd.DataFrame, assumpt processed_trips = [] for t in assumptions.get("trips", []): t_copy = t.copy() - t_copy['_start'] = pd.to_datetime(t.get('start')).date() - t_copy['_end'] = pd.to_datetime(t.get('end')).date() + t_copy["_start"] = pd.to_datetime(t.get("start")).date() + t_copy["_end"] = pd.to_datetime(t.get("end")).date() processed_trips.append(t_copy) - + processed_residency = [] for r in assumptions.get("residency", []): r_copy = r.copy() - r_copy['_start'] = pd.to_datetime(r.get('start')).replace(tzinfo=None) - r_copy['_end'] = pd.to_datetime(r.get('end')).replace(tzinfo=None) + r_copy["_start"] = pd.to_datetime(r.get("start")).replace(tzinfo=None) + r_copy["_end"] = pd.to_datetime(r.get("end")).replace(tzinfo=None) processed_residency.append(r_copy) - # For efficiency, we'll only compute the local time once per unique timezone used in assumptions + # For efficiency, compute local time once per unique timezone used in assumptions tz_to_local = {} - + # Apply Holidays (recurring) for holiday in assumptions.get("holidays", []): - if not remaining_mask.any(): break + if not remaining_mask.any(): + break tz = holiday.get("timezone", "UTC") if tz not in tz_to_local: tz_to_local[tz] = dt_utc.dt.tz_convert(tz) - + local_time = tz_to_local[tz] month = holiday.get("month") day_range = holiday.get("day_range", []) - - holiday_mask = remaining_mask & (local_time.dt.month == month) & \ - (local_time.dt.day >= day_range[0]) & (local_time.dt.day <= day_range[1]) - + + holiday_mask = ( + remaining_mask + & (local_time.dt.month == month) + & (local_time.dt.day >= day_range[0]) + & (local_time.dt.day <= day_range[1]) + ) + if holiday_mask.any(): - holiday_offsets = (local_time[holiday_mask].dt.tz_localize(None) - dt_utc[holiday_mask].dt.tz_localize(None)).dt.total_seconds() / 60 - df.loc[holiday_mask, 'tz_offset_min'] = holiday_offsets - df.loc[holiday_mask, 'city'] = holiday.get("city") - df.loc[holiday_mask, 'state'] = holiday.get("state", holiday.get("city")) - df.loc[holiday_mask, 'country'] = holiday.get("country", "Unknown") - df.loc[holiday_mask, 'lat'] = holiday.get("lat") - df.loc[holiday_mask, 'lng'] = holiday.get("lng") + holiday_offsets = ( + local_time[holiday_mask].dt.tz_localize(None) + - dt_utc[holiday_mask].dt.tz_localize(None) + ).dt.total_seconds() / 60 + df.loc[holiday_mask, "tz_offset_min"] = holiday_offsets + df.loc[holiday_mask, "city"] = holiday.get("city") + df.loc[holiday_mask, "state"] = holiday.get("state", holiday.get("city")) + df.loc[holiday_mask, "country"] = holiday.get("country", "Unknown") + df.loc[holiday_mask, "lat"] = holiday.get("lat") + df.loc[holiday_mask, "lng"] = holiday.get("lng") geocoded_mask[holiday_mask] = True remaining_mask = ~geocoded_mask # Apply Trips for trip in processed_trips: - if not remaining_mask.any(): break + if not remaining_mask.any(): + break tz = trip.get("timezone", "UTC") if tz not in tz_to_local: tz_to_local[tz] = dt_utc.dt.tz_convert(tz) - + local_time = tz_to_local[tz] local_date = local_time.dt.date - trip_mask = remaining_mask & (local_date >= trip['_start']) & (local_date <= trip['_end']) - + trip_mask = ( + remaining_mask & (local_date >= trip["_start"]) & (local_date <= trip["_end"]) + ) + if trip_mask.any(): - trip_offsets = (local_time[trip_mask].dt.tz_localize(None) - dt_utc[trip_mask].dt.tz_localize(None)).dt.total_seconds() / 60 - df.loc[trip_mask, 'tz_offset_min'] = trip_offsets - df.loc[trip_mask, 'city'] = trip.get("city") - df.loc[trip_mask, 'state'] = trip.get("state", trip.get("city")) - df.loc[trip_mask, 'country'] = trip.get("country", "Unknown") - df.loc[trip_mask, 'lat'] = trip.get("lat") - df.loc[trip_mask, 'lng'] = trip.get("lng") + trip_offsets = ( + local_time[trip_mask].dt.tz_localize(None) + - dt_utc[trip_mask].dt.tz_localize(None) + ).dt.total_seconds() / 60 + df.loc[trip_mask, "tz_offset_min"] = trip_offsets + df.loc[trip_mask, "city"] = trip.get("city") + df.loc[trip_mask, "state"] = trip.get("state", trip.get("city")) + df.loc[trip_mask, "country"] = trip.get("country", "Unknown") + df.loc[trip_mask, "lat"] = trip.get("lat") + df.loc[trip_mask, "lng"] = trip.get("lng") geocoded_mask[trip_mask] = True remaining_mask = ~geocoded_mask # Apply Residency (with sub-rules) dt_naive = dt_utc.dt.tz_localize(None) for res in processed_residency: - if not remaining_mask.any(): break - res_mask = remaining_mask & (dt_naive >= res['_start']) & (dt_naive <= res['_end']) - + if not remaining_mask.any(): + break + res_mask = remaining_mask & (dt_naive >= res["_start"]) & (dt_naive <= res["_end"]) + if res_mask.any(): # Apply sub-rules within this residency period res_remaining = res_mask.copy() for rule in res.get("sub_rules", []): - if not res_remaining.any(): break + if not res_remaining.any(): + break tz = rule.get("timezone", "UTC") if tz not in tz_to_local: tz_to_local[tz] = dt_utc.dt.tz_convert(tz) - + local_time = tz_to_local[tz] cond = rule.get("condition") - + if cond == "work_hours": # Mon-Fri, 8:30 - 16:30 - work_mask = res_remaining & (local_time.dt.weekday < 5) & ( - ((local_time.dt.hour == 8) & (local_time.dt.minute >= 30)) | - ((local_time.dt.hour >= 9) & (local_time.dt.hour < 16)) | - ((local_time.dt.hour == 16) & (local_time.dt.minute <= 30)) + work_mask = ( + res_remaining + & (local_time.dt.weekday < 5) + & ( + ((local_time.dt.hour == 8) & (local_time.dt.minute >= 30)) + | ((local_time.dt.hour >= 9) & (local_time.dt.hour < 16)) + | ((local_time.dt.hour == 16) & (local_time.dt.minute <= 30)) + ) ) if work_mask.any(): - work_offsets = (local_time[work_mask].dt.tz_localize(None) - dt_utc[work_mask].dt.tz_localize(None)).dt.total_seconds() / 60 - df.loc[work_mask, 'tz_offset_min'] = work_offsets - df.loc[work_mask, 'city'] = rule.get("city") - df.loc[work_mask, 'state'] = rule.get("state", rule.get("city")) - df.loc[work_mask, 'country'] = rule.get("country", "Unknown") - df.loc[work_mask, 'lat'] = rule.get("lat") - df.loc[work_mask, 'lng'] = rule.get("lng") + work_offsets = ( + local_time[work_mask].dt.tz_localize(None) + - dt_utc[work_mask].dt.tz_localize(None) + ).dt.total_seconds() / 60 + df.loc[work_mask, "tz_offset_min"] = work_offsets + df.loc[work_mask, "city"] = rule.get("city") + df.loc[work_mask, "state"] = rule.get("state", rule.get("city")) + df.loc[work_mask, "country"] = rule.get("country", "Unknown") + df.loc[work_mask, "lat"] = rule.get("lat") + df.loc[work_mask, "lng"] = rule.get("lng") geocoded_mask[work_mask] = True res_remaining &= ~work_mask - + elif cond == "home_logic": home_1_end = pd.to_datetime(rule.get("home_1_end")).replace(tzinfo=None) h1_mask = res_remaining & (dt_naive <= home_1_end) h2_mask = res_remaining & (dt_naive > home_1_end) - + if h1_mask.any(): - h1_offsets = (local_time[h1_mask].dt.tz_localize(None) - dt_utc[h1_mask].dt.tz_localize(None)).dt.total_seconds() / 60 - df.loc[h1_mask, 'tz_offset_min'] = h1_offsets - df.loc[h1_mask, 'city'] = rule.get("city_1") - df.loc[h1_mask, 'state'] = rule.get("state_1", rule.get("city_1")) - df.loc[h1_mask, 'country'] = rule.get("country", "Unknown") - df.loc[h1_mask, 'lat'] = rule.get("lat_1") - df.loc[h1_mask, 'lng'] = rule.get("lng_1") + h1_offsets = ( + local_time[h1_mask].dt.tz_localize(None) + - dt_utc[h1_mask].dt.tz_localize(None) + ).dt.total_seconds() / 60 + df.loc[h1_mask, "tz_offset_min"] = h1_offsets + df.loc[h1_mask, "city"] = rule.get("city_1") + df.loc[h1_mask, "state"] = rule.get("state_1", rule.get("city_1")) + df.loc[h1_mask, "country"] = rule.get("country", "Unknown") + df.loc[h1_mask, "lat"] = rule.get("lat_1") + df.loc[h1_mask, "lng"] = rule.get("lng_1") geocoded_mask[h1_mask] = True if h2_mask.any(): - h2_offsets = (local_time[h2_mask].dt.tz_localize(None) - dt_utc[h2_mask].dt.tz_localize(None)).dt.total_seconds() / 60 - df.loc[h2_mask, 'tz_offset_min'] = h2_offsets - df.loc[h2_mask, 'city'] = rule.get("city_2") - df.loc[h2_mask, 'state'] = rule.get("state_2", rule.get("city_2")) - df.loc[h2_mask, 'country'] = rule.get("country", "Unknown") - df.loc[h2_mask, 'lat'] = rule.get("lat_2") - df.loc[h2_mask, 'lng'] = rule.get("lng_2") + h2_offsets = ( + local_time[h2_mask].dt.tz_localize(None) + - dt_utc[h2_mask].dt.tz_localize(None) + ).dt.total_seconds() / 60 + df.loc[h2_mask, "tz_offset_min"] = h2_offsets + df.loc[h2_mask, "city"] = rule.get("city_2") + df.loc[h2_mask, "state"] = rule.get("state_2", rule.get("city_2")) + df.loc[h2_mask, "country"] = rule.get("country", "Unknown") + df.loc[h2_mask, "lat"] = rule.get("lat_2") + df.loc[h2_mask, "lng"] = rule.get("lng_2") geocoded_mask[h2_mask] = True res_remaining &= ~(h1_mask | h2_mask) # Final fallback for residency if no sub-rules matched if res_remaining.any(): - df.loc[res_remaining, 'tz_offset_min'] = 0 # Default offset - df.loc[res_remaining, 'city'] = res.get("city") - df.loc[res_remaining, 'state'] = res.get("state", res.get("city")) - df.loc[res_remaining, 'country'] = res.get("country", "Unknown") - df.loc[res_remaining, 'lat'] = res.get("lat") - df.loc[res_remaining, 'lng'] = res.get("lng") + df.loc[res_remaining, "tz_offset_min"] = 0 # Default offset + df.loc[res_remaining, "city"] = res.get("city") + df.loc[res_remaining, "state"] = res.get("state", res.get("city")) + df.loc[res_remaining, "country"] = res.get("country", "Unknown") + df.loc[res_remaining, "lat"] = res.get("lat") + df.loc[res_remaining, "lng"] = res.get("lng") geocoded_mask[res_remaining] = True - + remaining_mask = ~geocoded_mask # 4. Final Default (remaining tracks) @@ -455,154 +511,176 @@ def apply_swarm_offsets(lastfm_df: pd.DataFrame, swarm_df: pd.DataFrame, assumpt if remaining_mask.any(): # Compute default timezone once for all remaining default_local = dt_utc[remaining_mask].dt.tz_convert(DEFAULT_TZ) - default_offsets = (default_local.dt.tz_localize(None) - dt_utc[remaining_mask].dt.tz_localize(None)).dt.total_seconds() / 60 - df.loc[remaining_mask, 'tz_offset_min'] = default_offsets - df.loc[remaining_mask, 'city'] = DEFAULT_CITY - df.loc[remaining_mask, 'state'] = DEFAULT_STATE - df.loc[remaining_mask, 'country'] = DEFAULT_COUNTRY - df.loc[remaining_mask, 'lat'] = DEFAULT_LAT - df.loc[remaining_mask, 'lng'] = DEFAULT_LNG + default_offsets = ( + default_local.dt.tz_localize(None) - dt_utc[remaining_mask].dt.tz_localize(None) + ).dt.total_seconds() / 60 + df.loc[remaining_mask, "tz_offset_min"] = default_offsets + df.loc[remaining_mask, "city"] = DEFAULT_CITY + df.loc[remaining_mask, "state"] = DEFAULT_STATE + df.loc[remaining_mask, "country"] = DEFAULT_COUNTRY + df.loc[remaining_mask, "lat"] = DEFAULT_LAT + df.loc[remaining_mask, "lng"] = DEFAULT_LNG # Apply the computed offsets to date_text - df['local_date'] = pd.to_datetime(df['timestamp'], unit='s') + pd.to_timedelta(df['tz_offset_min'], unit='m') - df['original_date_text'] = df['date_text'] - df['date_text'] = df['local_date'] - + df["local_date"] = pd.to_datetime(df["timestamp"], unit="s") + pd.to_timedelta( + df["tz_offset_min"], unit="m" + ) + df["original_date_text"] = df["date_text"] + df["date_text"] = df["local_date"] + return df -def get_top_entities(df: pd.DataFrame, entity: str = 'artist', limit: int = 10) -> pd.DataFrame: + +def get_top_entities(df: pd.DataFrame, entity: str = "artist", limit: int = 10) -> pd.DataFrame: """Get the top n most played entities (artist, album, track).""" if entity not in df.columns: return pd.DataFrame() top = df[entity].value_counts().head(limit).reset_index() - top.columns = [entity, 'Plays'] + top.columns = [entity, "Plays"] return top -def get_unique_entities(subset_df: pd.DataFrame, full_df: pd.DataFrame, entity: str = 'artist', limit: int = 10) -> pd.DataFrame: + +def get_unique_entities( + subset_df: pd.DataFrame, full_df: pd.DataFrame, entity: str = "artist", limit: int = 10 +) -> pd.DataFrame: """ Identify entities that are uniquely prominent in the subset compared to the full dataset. Uses a simple 'Over-representation' score: (Subset Frequency / Total Frequency). """ if subset_df.empty or full_df.empty or entity not in full_df.columns: return pd.DataFrame() - + subset_counts = subset_df[entity].value_counts() full_counts = full_df[entity].value_counts() - + # Filter to only entities present in subset relevant_full = full_counts[subset_counts.index] - + # Score = (subset count) / (total count) # This favors entities that appear ONLY in this subset scores = subset_counts / relevant_full - - unique_data = pd.DataFrame({ - entity: scores.index, - 'Uniqueness': scores.values, - 'Plays': subset_counts.values - }).sort_values('Uniqueness', ascending=False).head(limit) - + + unique_data = ( + pd.DataFrame( + {entity: scores.index, "Uniqueness": scores.values, "Plays": subset_counts.values} + ) + .sort_values("Uniqueness", ascending=False) + .head(limit) + ) + return unique_data -def get_listening_intensity(df: pd.DataFrame, freq: str = 'D') -> pd.DataFrame: + +def get_listening_intensity(df: pd.DataFrame, freq: str = "D") -> pd.DataFrame: """Calculate play counts per specified frequency ('D' for day, 'W' for week, 'ME' for month).""" - if 'date_text' not in df.columns or df.empty: + if "date_text" not in df.columns or df.empty: return pd.DataFrame() df_copy = df.copy() - df_copy['date_group'] = df_copy['date_text'].dt.to_period(freq).dt.to_timestamp() - intensity = df_copy.groupby('date_group').size().reset_index(name='Plays') - intensity.rename(columns={'date_group': 'date'}, inplace=True) + df_copy["date_group"] = df_copy["date_text"].dt.to_period(freq).dt.to_timestamp() + intensity = df_copy.groupby("date_group").size().reset_index(name="Plays") + intensity.rename(columns={"date_group": "date"}, inplace=True) return intensity -def get_milestones(df: pd.DataFrame, intervals: List[int] = [1000, 5000, 10000, 50000]) -> pd.DataFrame: + +def get_milestones(df: pd.DataFrame, intervals: Optional[list[int]] = None) -> pd.DataFrame: """Find tracks that hit specific volume milestones.""" + if intervals is None: + intervals = [1000, 5000, 10000, 50000] if df.empty: return pd.DataFrame() - df_sorted = df.sort_values('date_text').reset_index(drop=True) + df_sorted = df.sort_values("date_text").reset_index(drop=True) milestones = [] for interval in intervals: if len(df_sorted) >= interval: track = df_sorted.iloc[interval - 1] - milestones.append({ - 'Milestone': f"{interval:,} Tracks", - 'Artist': track['artist'], - 'Track': track['track'], - 'Date': track['date_text'] - }) + milestones.append( + { + "Milestone": f"{interval:,} Tracks", + "Artist": track["artist"], + "Track": track["track"], + "Date": track["date_text"], + } + ) return pd.DataFrame(milestones) -def get_listening_streaks(df: pd.DataFrame) -> Dict: + +def get_listening_streaks(df: pd.DataFrame) -> dict: """Find the longest streak of consecutive days with at least one play.""" if df.empty: - return {'longest_streak': 0, 'current_streak': 0} - - dates = pd.to_datetime(df['date_text']).dt.date.unique() + return {"longest_streak": 0, "current_streak": 0} + + dates = pd.to_datetime(df["date_text"]).dt.date.unique() dates = sorted(dates) - + if not dates: - return {'longest_streak': 0, 'current_streak': 0} - + return {"longest_streak": 0, "current_streak": 0} + longest = 1 current = 1 - + for i in range(1, len(dates)): - if (dates[i] - dates[i-1]).days == 1: + if (dates[i] - dates[i - 1]).days == 1: current += 1 else: longest = max(longest, current) current = 1 - + longest = max(longest, current) today = pd.Timestamp.now().date() is_active = (today - dates[-1]).days <= 1 - + return { - 'longest_streak': longest, - 'current_streak': current if is_active else 0, - 'last_active': dates[-1] + "longest_streak": longest, + "current_streak": current if is_active else 0, + "last_active": dates[-1], } -def get_forgotten_favorites(df: pd.DataFrame, top_n: int = 10, months_threshold: int = 6) -> pd.DataFrame: + +def get_forgotten_favorites( + df: pd.DataFrame, top_n: int = 10, months_threshold: int = 6 +) -> pd.DataFrame: """Identify artists that were once favorites but haven't been heard recently.""" if df.empty: return pd.DataFrame() - - latest_date = df['date_text'].max() + + latest_date = df["date_text"].max() threshold_date = latest_date - pd.DateOffset(months=months_threshold) - - past_df = df[df['date_text'] < threshold_date] - recent_df = df[df['date_text'] >= threshold_date] - + + past_df = df[df["date_text"] < threshold_date] + recent_df = df[df["date_text"] >= threshold_date] + if past_df.empty: return pd.DataFrame() - - past_top = past_df['artist'].value_counts().head(top_n * 2) - recent_artists = set(recent_df['artist'].unique()) - + + past_top = past_df["artist"].value_counts().head(top_n * 2) + recent_artists = set(recent_df["artist"].unique()) + forgotten = [] for artist, count in past_top.items(): if artist not in recent_artists: - forgotten.append({'Artist': artist, 'Past Plays': count}) + forgotten.append({"Artist": artist, "Past Plays": count}) if len(forgotten) >= top_n: break - + return pd.DataFrame(forgotten) + def get_cumulative_plays(df: pd.DataFrame) -> pd.DataFrame: """Calculate cumulative plays over time.""" - if 'date_text' not in df.columns or df.empty: + if "date_text" not in df.columns or df.empty: return pd.DataFrame() - df_copy = df.sort_values('date_text') - df_copy['date'] = df_copy['date_text'].dt.date - daily = df_copy.groupby('date').size().reset_index(name='DailyPlays') - daily['CumulativePlays'] = daily['DailyPlays'].cumsum() + df_copy = df.sort_values("date_text") + df_copy["date"] = df_copy["date_text"].dt.date + daily = df_copy.groupby("date").size().reset_index(name="DailyPlays") + daily["CumulativePlays"] = daily["DailyPlays"].cumsum() return daily + def get_hourly_distribution(df: pd.DataFrame) -> pd.DataFrame: """Calculate the distribution of plays throughout the hours of the day.""" - if 'date_text' not in df.columns: + if "date_text" not in df.columns: return pd.DataFrame() df_copy = df.copy() - df_copy['hour'] = df_copy['date_text'].dt.hour - hourly = df_copy.groupby('hour').size().reset_index(name='Plays') + df_copy["hour"] = df_copy["date_text"].dt.hour + hourly = df_copy.groupby("hour").size().reset_index(name="Plays") return hourly diff --git a/autobiographer.py b/autobiographer.py index bd9b49b..95af382 100644 --- a/autobiographer.py +++ b/autobiographer.py @@ -1,9 +1,11 @@ -import os -import requests -import pandas as pd import argparse +import os import time -from typing import List, Dict, Optional +from typing import Optional + +import pandas as pd +import requests + class Autobiographer: BASE_URL = "http://ws.audioscrobbler.com/2.0/" @@ -13,90 +15,98 @@ def __init__(self, api_key: str, api_secret: str, username: str): self.api_secret = api_secret self.username = username - def _fetch_page(self, method: str, params: Dict) -> Dict: + def _fetch_page(self, method: str, params: dict) -> dict: """Helper to fetch a single page from Last.fm API.""" - params.update({ - 'method': method, - 'api_key': self.api_key, - 'format': 'json', - 'user': self.username - }) - response = requests.get(self.BASE_URL, params=params) + params.update( + {"method": method, "api_key": self.api_key, "format": "json", "user": self.username} + ) + response = requests.get(self.BASE_URL, params=params, timeout=30) response.raise_for_status() - return response.json() + return response.json() # type: ignore[no-any-return] - def fetch_recent_tracks(self, limit: int = 200, pages: Optional[int] = None, from_ts: Optional[int] = None, to_ts: Optional[int] = None) -> List[Dict]: + def fetch_recent_tracks( + self, + limit: int = 200, + pages: Optional[int] = None, + from_ts: Optional[int] = None, + to_ts: Optional[int] = None, + ) -> list[dict]: """Fetch recent tracks for the user.""" all_tracks = [] current_page = 1 - + while True: print(f"Fetching page {current_page}...") - params = { - 'limit': limit, - 'page': current_page - } + params = {"limit": limit, "page": current_page} if from_ts: - params['from'] = from_ts + params["from"] = from_ts if to_ts: - params['to'] = to_ts + params["to"] = to_ts - data = self._fetch_page('user.getrecenttracks', params) - - tracks = data.get('recenttracks', {}).get('track', []) + data = self._fetch_page("user.getrecenttracks", params) + + tracks = data.get("recenttracks", {}).get("track", []) if not tracks: break - + # Filter out currently playing track if any - tracks = [t for t in tracks if not t.get('@attr', {}).get('nowplaying') == 'true'] + tracks = [t for t in tracks if not t.get("@attr", {}).get("nowplaying") == "true"] all_tracks.extend(tracks) - - total_pages = int(data.get('recenttracks', {}).get('@attr', {}).get('totalPages', 1)) + + total_pages = int(data.get("recenttracks", {}).get("@attr", {}).get("totalPages", 1)) if pages and current_page >= pages: break if current_page >= total_pages: break - + current_page += 1 time.sleep(0.25) # Rate limiting - + return all_tracks - def save_tracks_to_csv(self, tracks: List[Dict], filename: Optional[str] = None): + def save_tracks_to_csv(self, tracks: list[dict], filename: Optional[str] = None) -> None: """Clean and save tracks to a CSV file.""" if not filename: filename = f"data/lastfm_{self.username}_tracks.csv" - + flat_data = [] for track in tracks: - flat_data.append({ - 'artist': track.get('artist', {}).get('#text'), - 'album': track.get('album', {}).get('#text'), - 'track': track.get('name'), - 'timestamp': track.get('date', {}).get('uts'), - 'date_text': track.get('date', {}).get('#text') - }) - + flat_data.append( + { + "artist": track.get("artist", {}).get("#text"), + "album": track.get("album", {}).get("#text"), + "track": track.get("name"), + "timestamp": track.get("date", {}).get("uts"), + "date_text": track.get("date", {}).get("#text"), + } + ) + df = pd.DataFrame(flat_data) os.makedirs(os.path.dirname(filename), exist_ok=True) df.to_csv(filename, index=False) print(f"Saved {len(df)} tracks to {filename}") -def main(): + +def main() -> None: parser = argparse.ArgumentParser(description="Fetch Last.fm listening history.") - parser.add_argument("--user", help="Last.fm username (defaults to AUTOBIO_LASTFM_USERNAME env var)") + parser.add_argument( + "--user", help="Last.fm username (defaults to AUTOBIO_LASTFM_USERNAME env var)" + ) parser.add_argument("--pages", type=int, help="Limit number of pages to fetch") parser.add_argument("--from_date", help="Start date (YYYY-MM-DD)") parser.add_argument("--to_date", help="End date (YYYY-MM-DD)") - + args = parser.parse_args() - + api_key = os.getenv("AUTOBIO_LASTFM_API_KEY") api_secret = os.getenv("AUTOBIO_LASTFM_API_SECRET") username = args.user or os.getenv("AUTOBIO_LASTFM_USERNAME") - + if not all([api_key, api_secret, username]): - print("Error: AUTOBIO_LASTFM_API_KEY, AUTOBIO_LASTFM_API_SECRET, and AUTOBIO_LASTFM_USERNAME must be set.") + print( + "Error: AUTOBIO_LASTFM_API_KEY, AUTOBIO_LASTFM_API_SECRET, and " + "AUTOBIO_LASTFM_USERNAME must be set." + ) return from_ts = None @@ -118,10 +128,12 @@ def main(): print(f"Error: Invalid to_date format '{args.to_date}'. Use YYYY-MM-DD.") return + if not api_key or not api_secret or not username: + return visualizer = Autobiographer(api_key, api_secret, username) tracks = visualizer.fetch_recent_tracks(pages=args.pages, from_ts=from_ts, to_ts=to_ts) visualizer.save_tracks_to_csv(tracks) + if __name__ == "__main__": main() - diff --git a/find_checkin.py b/find_checkin.py index 01f9f46..8a2c0b1 100644 --- a/find_checkin.py +++ b/find_checkin.py @@ -1,39 +1,49 @@ -import os -import json -import glob +from __future__ import annotations + import argparse +import glob +import json +import os -def find_checkins(swarm_dir, pattern): + +def find_checkins(swarm_dir: str, pattern: str) -> list[tuple]: """Find check-ins matching a pattern in the given directory.""" if not os.path.exists(swarm_dir): return [] - + json_files = glob.glob(os.path.join(swarm_dir, "checkins*.json")) found = [] for file_path in json_files: try: - with open(file_path, 'r', encoding='utf-8') as f: + with open(file_path, encoding="utf-8") as f: data = json.load(f) - items = data.get('items', []) + items = data.get("items", []) for item in items: - venue_name = item.get('venue', {}).get('name', '') + venue_name = item.get("venue", {}).get("name", "") if pattern.lower() in venue_name.lower(): - created_at = item.get('createdAt') + created_at = item.get("createdAt") found.append((created_at, venue_name)) - except Exception: - pass + except Exception as e: + print(f"Warning: failed to parse {file_path}: {e}") return found -def main(): + +def main() -> None: parser = argparse.ArgumentParser(description="Find specific Swarm check-ins.") - parser.add_argument("--dir", default=r"G:\My Drive\Projects\Swarm Foursquare JFS 2026-02", help="Swarm data directory") - parser.add_argument("--pattern", default="Holiday Inn Express Fremont", help="Venue name pattern to search for") - + parser.add_argument( + "--dir", + default=r"G:\My Drive\Projects\Swarm Foursquare JFS 2026-02", + help="Swarm data directory", + ) + parser.add_argument( + "--pattern", default="Holiday Inn Express Fremont", help="Venue name pattern to search for" + ) + args = parser.parse_args() - + results = find_checkins(args.dir, args.pattern) - + if results: print(f"Found {len(results)} check-ins for '{args.pattern}':") for dt, name in sorted(results): @@ -41,5 +51,6 @@ def main(): else: print(f"No check-ins found for '{args.pattern}'.") + if __name__ == "__main__": main() diff --git a/notebooks/autobiographer_analysis.ipynb b/notebooks/autobiographer_analysis.ipynb index 8ed05d4..d70194e 100644 --- a/notebooks/autobiographer_analysis.ipynb +++ b/notebooks/autobiographer_analysis.ipynb @@ -18,14 +18,19 @@ "source": [ "%load_ext autoreload\n", "%autoreload 2\n", - "import pandas as pd\n", - "import plotly.express as px\n", "import os\n", "import sys\n", "\n", + "import plotly.express as px\n", + "\n", "# Add the parent directory to sys.path to import analysis_utils\n", - "sys.path.append('..')\n", - "from analysis_utils import load_listening_data, get_top_entities, get_listening_intensity, get_hourly_distribution" + "sys.path.append(\"..\")\n", + "from analysis_utils import (\n", + " get_hourly_distribution,\n", + " get_listening_intensity,\n", + " get_top_entities,\n", + " load_listening_data,\n", + ")" ] }, { @@ -44,13 +49,13 @@ "metadata": {}, "outputs": [], "source": [ - "data_file = '../data/lastfm_yourusername_tracks.csv' # Update this path\n", + "data_file = \"../data/lastfm_yourusername_tracks.csv\" # Update this path\n", "if not os.path.exists(data_file):\n", - " print(f'File {data_file} not found. Available files:')\n", - " print([f for f in os.listdir('../data') if f.endswith('.csv')])\n", + " print(f\"File {data_file} not found. Available files:\")\n", + " print([f for f in os.listdir(\"../data\") if f.endswith(\".csv\")])\n", "else:\n", " df = load_listening_data(data_file)\n", - " print(f'Loaded {len(df)} tracks.')\n", + " print(f\"Loaded {len(df)} tracks.\")\n", " display(df.head())" ] }, @@ -69,12 +74,12 @@ "metadata": {}, "outputs": [], "source": [ - "if 'df' in locals():\n", - " print(f'Total Tracks: {len(df)}')\n", - " print(f'Unique Artists: {df[\"artist\"].nunique()}')\n", - " print(f'Unique Albums: {df[\"album\"].nunique()}')\n", + "if \"df\" in locals():\n", + " print(f\"Total Tracks: {len(df)}\")\n", + " print(f\"Unique Artists: {df['artist'].nunique()}\")\n", + " print(f\"Unique Albums: {df['album'].nunique()}\")\n", " if not df.empty:\n", - " print(f'Date Range: {df[\"date_text\"].min()} to {df[\"date_text\"].max()}')" + " print(f\"Date Range: {df['date_text'].min()} to {df['date_text'].max()}\")" ] }, { @@ -92,10 +97,10 @@ "metadata": {}, "outputs": [], "source": [ - "if 'df' in locals():\n", - " top_artists = get_top_entities(df, 'artist', limit=20)\n", - " fig = px.bar(top_artists, x='Plays', y='artist', orientation='h', title='Top 20 Artists')\n", - " fig.update_layout(yaxis={'categoryorder':'total ascending'})\n", + "if \"df\" in locals():\n", + " top_artists = get_top_entities(df, \"artist\", limit=20)\n", + " fig = px.bar(top_artists, x=\"Plays\", y=\"artist\", orientation=\"h\", title=\"Top 20 Artists\")\n", + " fig.update_layout(yaxis={\"categoryorder\": \"total ascending\"})\n", " fig.show()" ] }, @@ -115,9 +120,9 @@ "metadata": {}, "outputs": [], "source": [ - "if 'df' in locals():\n", + "if \"df\" in locals():\n", " hourly = get_hourly_distribution(df)\n", - " fig = px.bar(hourly, x='hour', y='Plays', title='Listening Intensity by Hour of Day')\n", + " fig = px.bar(hourly, x=\"hour\", y=\"Plays\", title=\"Listening Intensity by Hour of Day\")\n", " fig.show()" ] }, @@ -137,9 +142,9 @@ "metadata": {}, "outputs": [], "source": [ - "if 'df' in locals():\n", + "if \"df\" in locals():\n", " intensity = get_listening_intensity(df)\n", - " fig = px.line(intensity, x='date', y='Plays', title='Daily Listening Activity')\n", + " fig = px.line(intensity, x=\"date\", y=\"Plays\", title=\"Daily Listening Activity\")\n", " fig.show()" ] } diff --git a/pyproject.toml b/pyproject.toml index c3777a9..21a7eb0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -17,3 +17,43 @@ tag_format = "v{version}" allowed_tags = ["feat", "fix", "perf", "refactor", "docs", "style", "test", "chore", "ci", "build"] minor_tags = ["feat"] patch_tags = ["fix", "perf"] + +[tool.ruff] +target-version = "py39" +line-length = 100 +exclude = ["venv", ".venv"] + +[tool.ruff.lint] +select = [ + "E", # pycodestyle errors + "W", # pycodestyle warnings + "F", # pyflakes + "I", # isort + "B", # flake8-bugbear + "S", # flake8-bandit (security) + "UP", # pyupgrade +] +ignore = [ + "S101", # assert statements (used in tests) +] + +[tool.ruff.lint.per-file-ignores] +"tests/*" = ["S", "B"] +"record_flythrough.py" = ["E501"] # long lines are embedded JS inside f-strings + +[tool.mypy] +python_version = "3.9" +files = ["autobiographer.py", "analysis_utils.py", "visualize.py", "record_flythrough.py", "find_checkin.py", "tools"] +ignore_missing_imports = true +warn_unused_ignores = true +warn_return_any = true +disallow_untyped_defs = true + +[[tool.mypy.overrides]] +module = ["streamlit.*", "pydeck.*", "plotly.*", "geopandas.*", "moviepy.*", "imageio.*", "playwright.*"] +ignore_errors = true +follow_imports = "skip" + +[tool.bandit] +exclude_dirs = ["venv", "tests"] +skips = ["B101"] # assert statements diff --git a/record_flythrough.py b/record_flythrough.py index 9e53336..79ca982 100644 --- a/record_flythrough.py +++ b/record_flythrough.py @@ -1,41 +1,53 @@ -import pandas as pd -import pydeck as pdk -import os -import json +from __future__ import annotations + import argparse -import sys import asyncio -import numpy as np +import json +import math +import os from datetime import datetime -from playwright.async_api import async_playwright +from typing import Any + +import numpy as np +import pandas as pd +import pydeck as pdk from moviepy import ImageSequenceClip -import math +from playwright.async_api import async_playwright -def haversine(lat1, lon1, lat2, lon2): + +def haversine(lat1: float, lon1: float, lat2: float, lon2: float) -> float: """Calculate the great circle distance between two points in kilometers.""" R = 6371 phi1, phi2 = math.radians(lat1), math.radians(lat2) dphi = math.radians(lat2 - lat1) dlambda = math.radians(lon2 - lon1) - a = math.sin(dphi / 2)**2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlambda / 2)**2 + a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlambda / 2) ** 2 return 2 * R * math.atan2(math.sqrt(a), math.sqrt(1 - a)) -def filter_data(df, artist=None, start_date=None, end_date=None): + +def filter_data( + df: pd.DataFrame, + artist: str | None = None, + start_date: str | None = None, + end_date: str | None = None, +) -> pd.DataFrame: """Apply filters to the dataframe.""" filtered_df = df.copy() - if not pd.api.types.is_datetime64_any_dtype(filtered_df['date_text']): - filtered_df['date_text'] = pd.to_datetime(filtered_df['date_text']) + if not pd.api.types.is_datetime64_any_dtype(filtered_df["date_text"]): + filtered_df["date_text"] = pd.to_datetime(filtered_df["date_text"]) if artist and artist != "All": - filtered_df = filtered_df[filtered_df['artist'] == artist] + filtered_df = filtered_df[filtered_df["artist"] == artist] if start_date: - filtered_df = filtered_df[filtered_df['date_text'] >= pd.to_datetime(start_date)] + filtered_df = filtered_df[filtered_df["date_text"] >= pd.to_datetime(start_date)] if end_date: - filtered_df = filtered_df[filtered_df['date_text'] <= pd.to_datetime(end_date)] + filtered_df = filtered_df[filtered_df["date_text"] <= pd.to_datetime(end_date)] return filtered_df -def interpolate_views(start, end, n_frames, easing="sine"): + +def interpolate_views(start: dict, end: dict, n_frames: int, easing: str = "sine") -> list[dict]: """Interpolate between two view states, including timestamp.""" - if n_frames <= 0: return [] + if n_frames <= 0: + return [] t = np.linspace(0, 1, n_frames) if easing == "sine": t_eased = (1 - np.cos(t * np.pi)) / 2 @@ -43,7 +55,7 @@ def interpolate_views(start, end, n_frames, easing="sine"): t_eased = np.where(t < 0.5, 4 * t**3, 1 - np.power(-2 * t + 2, 3) / 2) else: t_eased = t - + res = [] for i in range(n_frames): view = { @@ -59,12 +71,18 @@ def interpolate_views(start, end, n_frames, easing="sine"): res.append(view) return res -async def capture_frames(html_path, frames_dir, view_states, viewport=(1920, 1080)): + +async def capture_frames( + html_path: str, + frames_dir: str, + view_states: list[dict], + viewport: tuple[int, int] = (1920, 1080), +) -> None: """Capture PNG frames using Playwright.""" os.makedirs(frames_dir, exist_ok=True) - with open(html_path, 'r', encoding='utf-8') as f: + with open(html_path, encoding="utf-8") as f: html_content = f.read() - + # Hijack script + Overlay setup hijack_script = """ """ html_content = html_content.replace("", f"{hijack_script}") - with open(html_path, 'w', encoding='utf-8') as f: + with open(html_path, "w", encoding="utf-8") as f: f.write(html_content) - print(f"Starting frame capture...") + print("Starting frame capture...") async with async_playwright() as p: - browser = await p.chromium.launch(args=[ - "--disable-web-security", "--allow-file-access-from-files", - "--use-gl=angle", "--use-angle=gl", "--ignore-gpu-blocklist", "--disable-gpu-allowlist", - ]) + browser = await p.chromium.launch( + args=[ + "--disable-web-security", + "--allow-file-access-from-files", + "--use-gl=angle", + "--use-angle=gl", + "--ignore-gpu-blocklist", + "--disable-gpu-allowlist", + ] + ) page = await browser.new_page(viewport={"width": viewport[0], "height": viewport[1]}) - + abs_html_path = f"file:///{os.path.abspath(html_path)}".replace("\\", "/") await page.goto(abs_html_path, wait_until="networkidle") - + try: await page.wait_for_function("window.deckglInstance !== undefined", timeout=30000) except Exception: print("Error: Map instance not found.") - - await asyncio.sleep(10) # Wait for tiles + + await asyncio.sleep(10) # Wait for tiles await page.add_style_tag(content=".deck-tooltip { display: none !important; }") for i, vs in enumerate(view_states): if i % 50 == 0: print(f"Capturing frame {i}/{len(view_states)}...") - + # Format date string for the overlay date_str = "" if "timestamp" in vs: @@ -141,105 +165,176 @@ async def capture_frames(html_path, frames_dir, view_states, viewport=(1920, 108 const overlay = document.getElementById('date-overlay'); if(overlay) overlay.innerText = {json.dumps(date_str)}; """) - - await asyncio.sleep(0.05) - await page.screenshot(path=os.path.join(frames_dir, f"frame_{i:04d}.png"), type='png') - + + await asyncio.sleep(0.05) + await page.screenshot(path=os.path.join(frames_dir, f"frame_{i:04d}.png"), type="png") + await browser.close() -def sanitize_native(val): + +def sanitize_native(val: Any) -> Any: """Convert numpy types to native Python types for JSON serializability.""" - if hasattr(val, 'item'): return val.item() - if isinstance(val, dict): return {k: sanitize_native(v) for k, v in val.items()} - if isinstance(val, (list, tuple)): return [sanitize_native(x) for x in val] + if hasattr(val, "item"): + return val.item() + if isinstance(val, dict): + return {k: sanitize_native(v) for k, v in val.items()} + if isinstance(val, (list, tuple)): + return [sanitize_native(x) for x in val] return val -def create_recording_assets(csv_path=None, artist=None, start_date=None, end_date=None, - marker_zoom=3.0, swarm_dir=None, assumptions_path=None): + +def create_recording_assets( + csv_path: str | None = None, + artist: str | None = None, + start_date: str | None = None, + end_date: str | None = None, + marker_zoom: float = 3.0, + swarm_dir: str | None = None, + assumptions_path: str | None = None, +) -> tuple | None: """Load data and prepare the PyDeck object and keyframes.""" if not csv_path: data_dir = os.getenv("AUTOBIO_LASTFM_DATA_DIR", "data") if os.path.exists(data_dir): files = [f for f in os.listdir(data_dir) if f.endswith("_tracks.csv")] - if files: csv_path = os.path.join(data_dir, files[0]) - else: return None, None - else: return None, None + if files: + csv_path = os.path.join(data_dir, files[0]) + else: + return None, None + else: + return None, None + + from analysis_utils import ( + apply_swarm_offsets, + load_assumptions, + load_listening_data, + load_swarm_data, + ) - from analysis_utils import load_listening_data, load_assumptions, apply_swarm_offsets, load_swarm_data df = load_listening_data(csv_path) - if df is None: return None, None - - if 'lat' not in df.columns or df['lat'].isna().all(): + if df is None: + return None, None + + if "lat" not in df.columns or df["lat"].isna().all(): swarm_dir = swarm_dir or os.getenv("AUTOBIO_SWARM_DIR") - assumptions_path = assumptions_path or os.getenv("AUTOBIO_ASSUMPTIONS_FILE", "default_assumptions.json") - swarm_df = load_swarm_data(swarm_dir) if swarm_dir and os.path.exists(swarm_dir) else pd.DataFrame() + assumptions_path = assumptions_path or os.getenv( + "AUTOBIO_ASSUMPTIONS_FILE", "default_assumptions.json" + ) + swarm_df = ( + load_swarm_data(swarm_dir) + if swarm_dir and os.path.exists(swarm_dir) + else pd.DataFrame() + ) assumptions = load_assumptions(assumptions_path) df = apply_swarm_offsets(df, swarm_df, assumptions) df = filter_data(df, artist, start_date, end_date) - if df.empty: return None, None - - geo_data = df.groupby(['lat', 'lng', 'city']).size().reset_index(name='Plays') - geo_data['elevation_log'] = np.log1p(geo_data['Plays']) - max_log = geo_data['elevation_log'].max() - - dynamic_radius = (50000 / (2 ** (marker_zoom - 1))) - geo_data['elevation'] = (geo_data['elevation_log'] / max_log) * (1.4 * dynamic_radius) if max_log > 0 else 0 - - def get_color(val, max_val): + if df.empty: + return None, None + + geo_data = df.groupby(["lat", "lng", "city"]).size().reset_index(name="Plays") + geo_data["elevation_log"] = np.log1p(geo_data["Plays"]) + max_log = geo_data["elevation_log"].max() + + dynamic_radius = 50000 / (2 ** (marker_zoom - 1)) + geo_data["elevation"] = ( + (geo_data["elevation_log"] / max_log) * (1.4 * dynamic_radius) if max_log > 0 else 0 + ) + + def get_color(val: float, max_val: float) -> list[int]: ratio = val / max_val if max_val > 0 else 0 - if ratio < 0.5: r, g, b = 236 + (166 - 236) * (ratio * 2), 226 + (189 - 226) * (ratio * 2), 240 + (219 - 240) * (ratio * 2) - else: r, g, b = 166 + (28 - 166) * ((ratio - 0.5) * 2), 189 + (144 - 189) * ((ratio - 0.5) * 2), 219 + (153 - 219) * ((ratio - 0.5) * 2) + if ratio < 0.5: + r, g, b = ( + 236 + (166 - 236) * (ratio * 2), + 226 + (189 - 226) * (ratio * 2), + 240 + (219 - 240) * (ratio * 2), + ) + else: + r, g, b = ( + 166 + (28 - 166) * ((ratio - 0.5) * 2), + 189 + (144 - 189) * ((ratio - 0.5) * 2), + 219 + (153 - 219) * ((ratio - 0.5) * 2), + ) return [int(r), int(g), int(b), 200] - geo_data['color'] = geo_data['elevation_log'].apply(lambda x: get_color(x, max_log)) - + geo_data["color"] = geo_data["elevation_log"].apply(lambda x: get_color(x, max_log)) + # Ensure geodata uses standard Python types for serializability (Issue #46 refinement) # pd.DataFrame.to_dict('records') helps convert to native types - records = geo_data.to_dict('records') + records = geo_data.to_dict("records") records = [sanitize_native(r) for r in records] layer = pdk.Layer( - "ColumnLayer", records, - get_position=["lng", "lat"], get_elevation="elevation", - elevation_scale=10, radius=float(dynamic_radius), - get_fill_color="color", pickable=True, + "ColumnLayer", + records, + get_position=["lng", "lat"], + get_elevation="elevation", + elevation_scale=10, + radius=float(dynamic_radius), + get_fill_color="color", + pickable=True, ) # Sort locations chronologically based on first visit - first_visits = df.groupby(['lat', 'lng', 'city'])['timestamp'].min().reset_index() - ordered_locations = first_visits.sort_values('timestamp') - + first_visits = df.groupby(["lat", "lng", "city"])["timestamp"].min().reset_index() + ordered_locations = first_visits.sort_values("timestamp") + keyframes = [] # Start Global - keyframes.append(sanitize_native({ - "latitude": geo_data['lat'].mean(), "longitude": geo_data['lng'].mean(), "zoom": 2, "pitch": 0, "bearing": 0, - "timestamp": ordered_locations['timestamp'].iloc[0] - })) + keyframes.append( + sanitize_native( + { + "latitude": geo_data["lat"].mean(), + "longitude": geo_data["lng"].mean(), + "zoom": 2, + "pitch": 0, + "bearing": 0, + "timestamp": ordered_locations["timestamp"].iloc[0], + } + ) + ) # Tour through locations for i, (_, row) in enumerate(ordered_locations.iterrows()): angle = (i * 45) % 360 - keyframes.append(sanitize_native({ - "latitude": row['lat'], "longitude": row['lng'], "zoom": 11, "pitch": 45 + (i % 3) * 5, "bearing": angle - 180, - "timestamp": row['timestamp'] - })) + keyframes.append( + sanitize_native( + { + "latitude": row["lat"], + "longitude": row["lng"], + "zoom": 11, + "pitch": 45 + (i % 3) * 5, + "bearing": angle - 180, + "timestamp": row["timestamp"], + } + ) + ) # End Global - keyframes.append(sanitize_native({ - "latitude": geo_data['lat'].mean(), "longitude": geo_data['lng'].mean(), "zoom": 3, "pitch": 45, "bearing": 0, - "timestamp": ordered_locations['timestamp'].iloc[-1] - })) + keyframes.append( + sanitize_native( + { + "latitude": geo_data["lat"].mean(), + "longitude": geo_data["lng"].mean(), + "zoom": 3, + "pitch": 45, + "bearing": 0, + "timestamp": ordered_locations["timestamp"].iloc[-1], + } + ) + ) # Remove timestamp for ViewState but keep it in our keyframes list for interpolation vs_init = keyframes[0].copy() - if "timestamp" in vs_init: del vs_init["timestamp"] - + if "timestamp" in vs_init: + del vs_init["timestamp"] + deck = pdk.Deck(layers=[layer], initial_view_state=pdk.ViewState(**vs_init), map_style="light") return deck, keyframes -def main(): + +def main() -> None: parser = argparse.ArgumentParser(description="Generate a cinematic fly-through video.") - parser.add_argument("csv", nargs='?', help="Path to track CSV") + parser.add_argument("csv", nargs="?", help="Path to track CSV") parser.add_argument("--output", default="flythrough.mp4", help="Output file") parser.add_argument("--artist", help="Filter by artist name") parser.add_argument("--start_date", help="Start date (YYYY-MM-DD)") @@ -251,44 +346,68 @@ def main(): parser.add_argument("--assumptions", help="Path to location assumptions JSON") parser.add_argument("--swarm_dir", help="Path to Swarm data directory") parser.add_argument("--keep_frames", action="store_true", help="Keep temp frames") - + args = parser.parse_args() - deck, keyframes = create_recording_assets(args.csv, artist=args.artist, start_date=args.start_date, end_date=args.end_date, marker_zoom=args.marker_zoom, swarm_dir=args.swarm_dir, assumptions_path=args.assumptions) - - if not deck: return + result = create_recording_assets( + args.csv, + artist=args.artist, + start_date=args.start_date, + end_date=args.end_date, + marker_zoom=args.marker_zoom, + swarm_dir=args.swarm_dir, + assumptions_path=args.assumptions, + ) + + if result is None: + return + deck, keyframes = result + if not deck: + return if args.output.endswith(".html"): deck.to_html(args.output) return html_temp, frames_dir = "temp_render.html", "temp_frames" deck.to_html(html_temp) - + full_path = [] - for i in range(len(keyframes)-1): - p1, p2 = keyframes[i], keyframes[i+1] - dist = haversine(p1['latitude'], p1['longitude'], p2['latitude'], p2['longitude']) - - if dist < 10: duration = 1.0 - elif dist > 50: duration = 5.0 - else: duration = 3.0 - + for i in range(len(keyframes) - 1): + p1, p2 = keyframes[i], keyframes[i + 1] + dist = haversine(p1["latitude"], p1["longitude"], p2["latitude"], p2["longitude"]) + + if dist < 10: + duration = 1.0 + elif dist > 50: + duration = 5.0 + else: + duration = 3.0 + segment = interpolate_views(p1, p2, int(args.fps * duration)) - full_path.extend(segment[(1 if i>0 else 0):]) - + full_path.extend(segment[(1 if i > 0 else 0) :]) + if dist > 50: full_path.extend([p2] * (args.fps * 2)) - + try: - asyncio.run(capture_frames(html_temp, frames_dir, full_path, viewport=(args.width, args.height))) - frames = [os.path.join(frames_dir, f) for f in sorted(os.listdir(frames_dir)) if f.endswith(".png")] + asyncio.run( + capture_frames(html_temp, frames_dir, full_path, viewport=(args.width, args.height)) + ) + frames = [ + os.path.join(frames_dir, f) + for f in sorted(os.listdir(frames_dir)) + if f.endswith(".png") + ] if frames: clip = ImageSequenceClip(frames, fps=args.fps) clip.write_videofile(args.output, codec="libx264", audio=False) finally: - if os.path.exists(html_temp): os.remove(html_temp) + if os.path.exists(html_temp): + os.remove(html_temp) if not args.keep_frames and os.path.exists(frames_dir): import shutil + shutil.rmtree(frames_dir) + if __name__ == "__main__": main() diff --git a/tests/test_analysis_utils.py b/tests/test_analysis_utils.py index d9e6b60..0fd5299 100644 --- a/tests/test_analysis_utils.py +++ b/tests/test_analysis_utils.py @@ -1,28 +1,33 @@ +import os import unittest + import pandas as pd -import os + from analysis_utils import ( - load_listening_data, - get_top_entities, - get_listening_intensity, - get_hourly_distribution, get_cumulative_plays, - get_milestones, + get_forgotten_favorites, + get_hourly_distribution, + get_listening_intensity, get_listening_streaks, - get_forgotten_favorites + get_milestones, + get_top_entities, + load_listening_data, ) + class TestAnalysisUtils(unittest.TestCase): def setUp(self): self.test_csv = "data/test_analysis_utils.csv" os.makedirs("data", exist_ok=True) - self.df = pd.DataFrame({ - 'artist': ['Artist 1', 'Artist 2', 'Artist 1'], - 'album': ['Album 1', 'Album 2', 'Album 1'], - 'track': ['Track 1', 'Track 2', 'Track 3'], - 'timestamp': [1610000000, 1610000100, 1610000200], - 'date_text': ['2021-01-01 10:00', '2021-01-01 10:01', '2021-01-01 11:02'] - }) + self.df = pd.DataFrame( + { + "artist": ["Artist 1", "Artist 2", "Artist 1"], + "album": ["Album 1", "Album 2", "Album 1"], + "track": ["Track 1", "Track 2", "Track 3"], + "timestamp": [1610000000, 1610000100, 1610000200], + "date_text": ["2021-01-01 10:00", "2021-01-01 10:01", "2021-01-01 11:02"], + } + ) self.df.to_csv(self.test_csv, index=False) def tearDown(self): @@ -33,25 +38,25 @@ def test_load_listening_data(self): df = load_listening_data(self.test_csv) self.assertIsNotNone(df) self.assertEqual(len(df), 3) - self.assertTrue(pd.api.types.is_datetime64_any_dtype(df['date_text'])) + self.assertTrue(pd.api.types.is_datetime64_any_dtype(df["date_text"])) def test_get_top_entities(self): - top_artists = get_top_entities(self.df, entity='artist') + top_artists = get_top_entities(self.df, entity="artist") self.assertEqual(len(top_artists), 2) - self.assertEqual(top_artists.iloc[0]['artist'], 'Artist 1') - self.assertEqual(top_artists.iloc[0]['Plays'], 2) + self.assertEqual(top_artists.iloc[0]["artist"], "Artist 1") + self.assertEqual(top_artists.iloc[0]["Plays"], 2) def test_get_listening_intensity(self): df_loaded = load_listening_data(self.test_csv) - intensity_day = get_listening_intensity(df_loaded, freq='D') + intensity_day = get_listening_intensity(df_loaded, freq="D") self.assertEqual(len(intensity_day), 1) - self.assertEqual(intensity_day.iloc[0]['Plays'], 3) - - intensity_week = get_listening_intensity(df_loaded, freq='W') + self.assertEqual(intensity_day.iloc[0]["Plays"], 3) + + intensity_week = get_listening_intensity(df_loaded, freq="W") self.assertEqual(len(intensity_week), 1) def test_get_listening_intensity_empty(self): - empty_df = pd.DataFrame(columns=['artist', 'date_text']) + empty_df = pd.DataFrame(columns=["artist", "date_text"]) intensity = get_listening_intensity(empty_df) self.assertTrue(intensity.empty) @@ -59,54 +64,54 @@ def test_get_milestones(self): # Create a df with enough tracks for a milestone data = [] for i in range(1001): - data.append({ - 'artist': f'Artist {i}', - 'track': f'Track {i}', - 'date_text': pd.Timestamp('2021-01-01') + pd.Timedelta(minutes=i) - }) + data.append( + { + "artist": f"Artist {i}", + "track": f"Track {i}", + "date_text": pd.Timestamp("2021-01-01") + pd.Timedelta(minutes=i), + } + ) df = pd.DataFrame(data) milestones = get_milestones(df, intervals=[1000]) self.assertEqual(len(milestones), 1) - self.assertEqual(milestones.iloc[0]['Milestone'], "1,000 Tracks") + self.assertEqual(milestones.iloc[0]["Milestone"], "1,000 Tracks") def test_get_listening_streaks(self): # 3 consecutive days dates = [ - '2021-01-01 10:00', - '2021-01-02 10:00', - '2021-01-03 10:00', - '2021-01-05 10:00' # Gap + "2021-01-01 10:00", + "2021-01-02 10:00", + "2021-01-03 10:00", + "2021-01-05 10:00", # Gap ] - df = pd.DataFrame({'date_text': pd.to_datetime(dates)}) + df = pd.DataFrame({"date_text": pd.to_datetime(dates)}) streaks = get_listening_streaks(df) - self.assertEqual(streaks['longest_streak'], 3) + self.assertEqual(streaks["longest_streak"], 3) def test_get_forgotten_favorites(self): # Artist 1 played 6 months ago, not recently now = pd.Timestamp.now() past = now - pd.DateOffset(months=7) recent = now - pd.DateOffset(days=1) - - df = pd.DataFrame({ - 'artist': ['Artist 1', 'Artist 2'], - 'date_text': [past, recent] - }) + + df = pd.DataFrame({"artist": ["Artist 1", "Artist 2"], "date_text": [past, recent]}) forgotten = get_forgotten_favorites(df, months_threshold=6) self.assertEqual(len(forgotten), 1) - self.assertEqual(forgotten.iloc[0]['Artist'], 'Artist 1') + self.assertEqual(forgotten.iloc[0]["Artist"], "Artist 1") def test_get_cumulative_plays(self): df_loaded = load_listening_data(self.test_csv) cumulative = get_cumulative_plays(df_loaded) self.assertEqual(len(cumulative), 1) - self.assertEqual(cumulative.iloc[0]['CumulativePlays'], 3) + self.assertEqual(cumulative.iloc[0]["CumulativePlays"], 3) def test_get_hourly_distribution(self): df_loaded = load_listening_data(self.test_csv) hourly = get_hourly_distribution(df_loaded) self.assertEqual(len(hourly), 2) # Hour 10 and 11 - self.assertEqual(hourly[hourly['hour'] == 10].iloc[0]['Plays'], 2) - self.assertEqual(hourly[hourly['hour'] == 11].iloc[0]['Plays'], 1) + self.assertEqual(hourly[hourly["hour"] == 10].iloc[0]["Plays"], 2) + self.assertEqual(hourly[hourly["hour"] == 11].iloc[0]["Plays"], 1) + -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/tests/test_autobiographer.py b/tests/test_autobiographer.py index 1615003..ed91e1f 100644 --- a/tests/test_autobiographer.py +++ b/tests/test_autobiographer.py @@ -1,10 +1,13 @@ import os -import unittest import time -from unittest.mock import patch, MagicMock +import unittest +from unittest.mock import MagicMock, patch + import pandas as pd + from autobiographer import Autobiographer + class TestAutobiographer(unittest.TestCase): def setUp(self): self.api_key = "test_key" @@ -12,135 +15,136 @@ def setUp(self): self.username = "test_user" self.visualizer = Autobiographer(self.api_key, self.api_secret, self.username) - @patch('requests.get') + @patch("requests.get") def test_fetch_recent_tracks(self, mock_get): # Mock response from Last.fm mock_response = MagicMock() mock_response.status_code = 200 mock_response.json.return_value = { - 'recenttracks': { - 'track': [ + "recenttracks": { + "track": [ { - 'artist': {'#text': 'Artist 1'}, - 'album': {'#text': 'Album 1'}, - 'name': 'Track 1', - 'date': {'uts': '1610000000', '#text': 'Date 1'} + "artist": {"#text": "Artist 1"}, + "album": {"#text": "Album 1"}, + "name": "Track 1", + "date": {"uts": "1610000000", "#text": "Date 1"}, } ], - '@attr': {'totalPages': '1'} + "@attr": {"totalPages": "1"}, } } mock_get.return_value = mock_response tracks = self.visualizer.fetch_recent_tracks(pages=1) - + self.assertEqual(len(tracks), 1) - self.assertEqual(tracks[0]['name'], 'Track 1') - self.assertEqual(tracks[0]['artist']['#text'], 'Artist 1') + self.assertEqual(tracks[0]["name"], "Track 1") + self.assertEqual(tracks[0]["artist"]["#text"], "Artist 1") - @patch('requests.get') + @patch("requests.get") def test_fetch_page(self, mock_get): # Mock response from Last.fm mock_response = MagicMock() mock_response.status_code = 200 - mock_response.json.return_value = {'test': 'data'} + mock_response.json.return_value = {"test": "data"} mock_get.return_value = mock_response - data = self.visualizer._fetch_page('user.getinfo', {}) - - self.assertEqual(data, {'test': 'data'}) + data = self.visualizer._fetch_page("user.getinfo", {}) + + self.assertEqual(data, {"test": "data"}) mock_get.assert_called_once() def test_save_tracks_to_csv(self): # Sample data to save tracks = [ { - 'artist': {'#text': 'Artist 1'}, - 'album': {'#text': 'Album 1'}, - 'name': 'Track 1', - 'date': {'uts': '1610000000', '#text': 'Date 1'} + "artist": {"#text": "Artist 1"}, + "album": {"#text": "Album 1"}, + "name": "Track 1", + "date": {"uts": "1610000000", "#text": "Date 1"}, } ] test_filename = "data/test_tracks.csv" - + # Save to CSV self.visualizer.save_tracks_to_csv(tracks, filename=test_filename) - + # Verify file exists and content is correct self.assertTrue(os.path.exists(test_filename)) df = pd.read_csv(test_filename) self.assertEqual(len(df), 1) - self.assertEqual(df.iloc[0]['artist'], 'Artist 1') - self.assertEqual(df.iloc[0]['track'], 'Track 1') - + self.assertEqual(df.iloc[0]["artist"], "Artist 1") + self.assertEqual(df.iloc[0]["track"], "Track 1") + # Cleanup os.remove(test_filename) - @patch('requests.get') + @patch("requests.get") def test_fetch_recent_tracks_with_dates(self, mock_get): # Mock response from Last.fm mock_response = MagicMock() mock_response.status_code = 200 mock_response.json.return_value = { - 'recenttracks': { - 'track': [], - '@attr': {'totalPages': '1'} - } + "recenttracks": {"track": [], "@attr": {"totalPages": "1"}} } mock_get.return_value = mock_response self.visualizer.fetch_recent_tracks(from_ts=1610000000, to_ts=1610000100) - + # Verify that the correct parameters were passed to requests.get args, kwargs = mock_get.call_args - params = kwargs.get('params', {}) - self.assertEqual(params.get('from'), 1610000000) - self.assertEqual(params.get('to'), 1610000100) - - @patch('autobiographer.Autobiographer.fetch_recent_tracks') - @patch('autobiographer.Autobiographer.save_tracks_to_csv') - @patch('os.getenv') - @patch('argparse.ArgumentParser.parse_args') + params = kwargs.get("params", {}) + self.assertEqual(params.get("from"), 1610000000) + self.assertEqual(params.get("to"), 1610000100) + + @patch("autobiographer.Autobiographer.fetch_recent_tracks") + @patch("autobiographer.Autobiographer.save_tracks_to_csv") + @patch("os.getenv") + @patch("argparse.ArgumentParser.parse_args") def test_main_with_to_date(self, mock_args, mock_getenv, mock_save, mock_fetch): # Mock CLI arguments with a specific to_date - mock_args.return_value = MagicMock(user='test_user', pages=None, from_date=None, to_date='2026-01-01') + mock_args.return_value = MagicMock( + user="test_user", pages=None, from_date=None, to_date="2026-01-01" + ) mock_getenv.side_effect = lambda k: { - 'AUTOBIO_LASTFM_API_KEY': 'key', - 'AUTOBIO_LASTFM_API_SECRET': 'secret', - 'AUTOBIO_LASTFM_USERNAME': 'test_user' + "AUTOBIO_LASTFM_API_KEY": "key", + "AUTOBIO_LASTFM_API_SECRET": "secret", + "AUTOBIO_LASTFM_USERNAME": "test_user", }.get(k) - + from autobiographer import main + main() - + # Verify to_ts is end of day for 2026-01-01 # 2026-01-01 00:00:00 local timestamp + 86399 - expected_to_struct = time.strptime('2026-01-01', "%Y-%m-%d") + expected_to_struct = time.strptime("2026-01-01", "%Y-%m-%d") expected_to_ts = int(time.mktime(expected_to_struct)) + 86399 - + mock_fetch.assert_called_with(pages=None, from_ts=None, to_ts=expected_to_ts) - @patch('autobiographer.Autobiographer.fetch_recent_tracks') - @patch('autobiographer.Autobiographer.save_tracks_to_csv') - @patch('os.getenv') - @patch('argparse.ArgumentParser.parse_args') + @patch("autobiographer.Autobiographer.fetch_recent_tracks") + @patch("autobiographer.Autobiographer.save_tracks_to_csv") + @patch("os.getenv") + @patch("argparse.ArgumentParser.parse_args") def test_main(self, mock_args, mock_getenv, mock_save, mock_fetch): # Mock CLI arguments and env vars - mock_args.return_value = MagicMock(user='test_user', pages=1, from_date=None, to_date=None) + mock_args.return_value = MagicMock(user="test_user", pages=1, from_date=None, to_date=None) mock_getenv.side_effect = lambda k: { - 'AUTOBIO_LASTFM_API_KEY': 'key', - 'AUTOBIO_LASTFM_API_SECRET': 'secret', - 'AUTOBIO_LASTFM_USERNAME': 'user' + "AUTOBIO_LASTFM_API_KEY": "key", + "AUTOBIO_LASTFM_API_SECRET": "secret", + "AUTOBIO_LASTFM_USERNAME": "user", }.get(k) - + mock_fetch.return_value = [] - + from autobiographer import main + main() - + mock_fetch.assert_called_with(pages=1, from_ts=None, to_ts=None) mock_save.assert_called_once() -if __name__ == '__main__': - unittest.main() +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_caching.py b/tests/test_caching.py index ea8ecae..667fb3a 100644 --- a/tests/test_caching.py +++ b/tests/test_caching.py @@ -1,27 +1,31 @@ -import unittest -import pandas as pd import os import shutil import time -import glob +import unittest + +import pandas as pd + from analysis_utils import get_cache_key, get_cached_data, save_to_cache + class TestCaching(unittest.TestCase): def setUp(self): self.test_dir = "data/test_cache_dir" self.cache_dir = "data/test_cache" os.makedirs(self.test_dir, exist_ok=True) os.makedirs(self.cache_dir, exist_ok=True) - + self.lastfm_file = os.path.join(self.test_dir, "test_tracks.csv") - self.df = pd.DataFrame({ - 'artist': ['Artist 1', 'Artist 2'], - 'track': ['Track 1', 'Track 2'], - 'timestamp': [1610000000, 1610000100], - 'date_text': ['2021-01-01 10:00', '2021-01-01 10:01'] - }) + self.df = pd.DataFrame( + { + "artist": ["Artist 1", "Artist 2"], + "track": ["Track 1", "Track 2"], + "timestamp": [1610000000, 1610000100], + "date_text": ["2021-01-01 10:00", "2021-01-01 10:01"], + } + ) self.df.to_csv(self.lastfm_file, index=False) - + self.swarm_dir = os.path.join(self.test_dir, "swarm") os.makedirs(self.swarm_dir, exist_ok=True) with open(os.path.join(self.swarm_dir, "checkins_1.json"), "w") as f: @@ -40,37 +44,38 @@ def test_cache_key_consistency(self): def test_cache_key_changes_on_lastfm_update(self): key1 = get_cache_key(self.lastfm_file, self.swarm_dir) - + # Wait more than 1s to ensure mtime changes even on 1s resolution filesystems time.sleep(1.1) with open(self.lastfm_file, "a") as f: f.write("\n") - + key2 = get_cache_key(self.lastfm_file, self.swarm_dir) self.assertNotEqual(key1, key2) def test_cache_key_changes_on_swarm_update(self): key1 = get_cache_key(self.lastfm_file, self.swarm_dir) - + time.sleep(1.1) with open(os.path.join(self.swarm_dir, "checkins_2.json"), "w") as f: f.write('{"items": []}') - + key2 = get_cache_key(self.lastfm_file, self.swarm_dir) self.assertNotEqual(key1, key2) def test_save_and_load_cache(self): key = get_cache_key(self.lastfm_file, self.swarm_dir) save_to_cache(self.df, key, cache_dir=self.cache_dir) - + loaded_df = get_cached_data(key, cache_dir=self.cache_dir) self.assertIsNotNone(loaded_df) self.assertEqual(len(loaded_df), 2) - self.assertEqual(loaded_df.iloc[0]['artist'], 'Artist 1') - + self.assertEqual(loaded_df.iloc[0]["artist"], "Artist 1") + def test_invalid_cache_key(self): df = get_cached_data("nonexistent_key", cache_dir=self.cache_dir) self.assertIsNone(df) -if __name__ == '__main__': + +if __name__ == "__main__": unittest.main() diff --git a/tests/test_find_checkin.py b/tests/test_find_checkin.py index 895e771..f3db99a 100644 --- a/tests/test_find_checkin.py +++ b/tests/test_find_checkin.py @@ -1,29 +1,28 @@ -import unittest -import os import json -import tempfile +import os import shutil +import tempfile +import unittest + from find_checkin import find_checkins + class TestFindCheckin(unittest.TestCase): def setUp(self): self.test_dir = tempfile.mkdtemp() self.checkins_file = os.path.join(self.test_dir, "checkins_test.json") - + self.mock_data = { "items": [ { "createdAt": "2026-01-01T12:00:00Z", - "venue": {"name": "Holiday Inn Express Fremont"} + "venue": {"name": "Holiday Inn Express Fremont"}, }, - { - "createdAt": "2026-01-02T12:00:00Z", - "venue": {"name": "Coffee Shop"} - } + {"createdAt": "2026-01-02T12:00:00Z", "venue": {"name": "Coffee Shop"}}, ] } - - with open(self.checkins_file, "w", encoding='utf-8') as f: + + with open(self.checkins_file, "w", encoding="utf-8") as f: json.dump(self.mock_data, f) def tearDown(self): @@ -33,7 +32,7 @@ def test_find_checkins(self): results = find_checkins(self.test_dir, "Holiday Inn") self.assertEqual(len(results), 1) self.assertEqual(results[0][1], "Holiday Inn Express Fremont") - + def test_find_checkins_no_match(self): results = find_checkins(self.test_dir, "Pizza Hut") self.assertEqual(len(results), 0) @@ -42,5 +41,6 @@ def test_find_checkins_invalid_dir(self): results = find_checkins("non_existent_dir", "Any") self.assertEqual(results, []) -if __name__ == '__main__': + +if __name__ == "__main__": unittest.main() diff --git a/tests/test_location_fallbacks.py b/tests/test_location_fallbacks.py index 0364288..68073ee 100644 --- a/tests/test_location_fallbacks.py +++ b/tests/test_location_fallbacks.py @@ -1,8 +1,10 @@ import unittest -import pandas as pd -import os from datetime import datetime, timezone -from analysis_utils import get_assumption_location, apply_swarm_offsets, load_assumptions + +import pandas as pd + +from analysis_utils import apply_swarm_offsets, get_assumption_location, load_assumptions + class TestLocationFallbacks(unittest.TestCase): @classmethod @@ -16,9 +18,9 @@ def test_perth_fallback(self): ts = int(dt.timestamp()) location = get_assumption_location(ts, self.assumptions) self.assertIsNotNone(location) - self.assertEqual(location['city'], "Perth, AU") + self.assertEqual(location["city"], "Perth, AU") # Perth is UTC+8 - self.assertEqual(location['offset'], 480) + self.assertEqual(location["offset"], 480) def test_cairo_fallback(self): # March 25, 2022 (During Cairo trip) @@ -26,9 +28,9 @@ def test_cairo_fallback(self): ts = int(dt.timestamp()) location = get_assumption_location(ts, self.assumptions) self.assertIsNotNone(location) - self.assertEqual(location['city'], "Cairo, EG") + self.assertEqual(location["city"], "Cairo, EG") # Cairo is UTC+2 - self.assertEqual(location['offset'], 120) + self.assertEqual(location["offset"], 120) def test_oslo_svalbard_overlap(self): # Aug 14, 2020 - Should be Oslo (first in list) @@ -36,14 +38,14 @@ def test_oslo_svalbard_overlap(self): ts = int(dt.timestamp()) location = get_assumption_location(ts, self.assumptions) self.assertIsNotNone(location) - self.assertEqual(location['city'], "Oslo, NO") - + self.assertEqual(location["city"], "Oslo, NO") + # Aug 15, 2020 - Should be Svalbard dt = datetime(2020, 8, 15, 12, 0, tzinfo=timezone.utc) ts = int(dt.timestamp()) location = get_assumption_location(ts, self.assumptions) self.assertIsNotNone(location) - self.assertEqual(location['city'], "Svalbard, NO") + self.assertEqual(location["city"], "Svalbard, NO") def test_athens_fallback(self): # Nov 7, 2020 @@ -51,9 +53,9 @@ def test_athens_fallback(self): ts = int(dt.timestamp()) location = get_assumption_location(ts, self.assumptions) self.assertIsNotNone(location) - self.assertEqual(location['city'], "Athens, GR") + self.assertEqual(location["city"], "Athens, GR") # Athens is UTC+2 in November - self.assertEqual(location['offset'], 120) + self.assertEqual(location["offset"], 120) def test_residency_fallback_still_works(self): # Oct 12, 2020 (Monday) @@ -63,7 +65,7 @@ def test_residency_fallback_still_works(self): ts = int(dt.timestamp()) location = get_assumption_location(ts, self.assumptions) self.assertIsNotNone(location) - self.assertEqual(location['city'], "Co-working Space") + self.assertEqual(location["city"], "Co-working Space") def test_apply_swarm_offsets_comprehensive(self): # Create a Last.fm df covering many cases @@ -73,27 +75,34 @@ def test_apply_swarm_offsets_comprehensive(self): # 2. Trip: Perth {"dt": datetime(2022, 5, 10, 12, 0, tzinfo=timezone.utc), "expected": "Perth, AU"}, # 3. Residency Work Hours: Oct 12, 2020 10:00 AM EAT (07:00 UTC) - {"dt": datetime(2020, 10, 12, 7, 0, tzinfo=timezone.utc), "expected": "Co-working Space"}, + { + "dt": datetime(2020, 10, 12, 7, 0, tzinfo=timezone.utc), + "expected": "Co-working Space", + }, # 4. Residency Home 1: Jan 3, 2016 (Nairobi) - SUNDAY to avoid work_hours {"dt": datetime(2016, 1, 3, 12, 0, tzinfo=timezone.utc), "expected": "Nairobi, KE"}, # 5. Residency Home 2: Jan 5, 2020 (Sunday) (Mombasa) {"dt": datetime(2020, 1, 5, 12, 0, tzinfo=timezone.utc), "expected": "Mombasa, KE"}, # 6. Default: 2026 (After residency ends) - {"dt": datetime(2026, 1, 1, 12, 0, tzinfo=timezone.utc), "expected": "Reykjavik, IS"} + {"dt": datetime(2026, 1, 1, 12, 0, tzinfo=timezone.utc), "expected": "Reykjavik, IS"}, ] - - df = pd.DataFrame({ - 'timestamp': [int(tc['dt'].timestamp()) for tc in test_cases], - 'date_text': [tc['dt'].strftime('%Y-%m-%d %H:%M') for tc in test_cases], - 'artist': ['A'] * len(test_cases), - 'track': ['T'] * len(test_cases) - }) - - swarm_df = pd.DataFrame(columns=['timestamp', 'offset', 'city', 'venue', 'lat', 'lng']) + + df = pd.DataFrame( + { + "timestamp": [int(tc["dt"].timestamp()) for tc in test_cases], + "date_text": [tc["dt"].strftime("%Y-%m-%d %H:%M") for tc in test_cases], + "artist": ["A"] * len(test_cases), + "track": ["T"] * len(test_cases), + } + ) + + swarm_df = pd.DataFrame(columns=["timestamp", "offset", "city", "venue", "lat", "lng"]) result_df = apply_swarm_offsets(df, swarm_df, self.assumptions) - + for i, tc in enumerate(test_cases): - self.assertEqual(result_df.iloc[i]['city'], tc['expected'], f"Failed case {i}: {tc['expected']}") + self.assertEqual( + result_df.iloc[i]["city"], tc["expected"], f"Failed case {i}: {tc['expected']}" + ) def test_dublin_fallback(self): # Jul 17, 2021 (During Dublin trip) @@ -101,7 +110,7 @@ def test_dublin_fallback(self): ts = int(dt.timestamp()) location = get_assumption_location(ts, self.assumptions) self.assertIsNotNone(location) - self.assertEqual(location['city'], "Dublin, IE") + self.assertEqual(location["city"], "Dublin, IE") def test_stockholm_fallback(self): # May 17, 2023 @@ -109,7 +118,8 @@ def test_stockholm_fallback(self): ts = int(dt.timestamp()) location = get_assumption_location(ts, self.assumptions) self.assertIsNotNone(location) - self.assertEqual(location['city'], "Stockholm, SE") + self.assertEqual(location["city"], "Stockholm, SE") + -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/tests/test_record_flythrough.py b/tests/test_record_flythrough.py index c3a1319..1dccb3b 100644 --- a/tests/test_record_flythrough.py +++ b/tests/test_record_flythrough.py @@ -1,28 +1,33 @@ -import unittest -import pandas as pd import os import shutil +import unittest +from unittest.mock import patch + +import pandas as pd import pydeck as pdk -from unittest.mock import patch, MagicMock -from record_flythrough import filter_data, create_recording_assets + +from record_flythrough import create_recording_assets, filter_data + class TestRecordFlythrough(unittest.TestCase): def setUp(self): self.test_dir = "data_test_fly" os.makedirs(self.test_dir, exist_ok=True) self.test_csv = os.path.join(self.test_dir, "test_tracks.csv") - - self.df = pd.DataFrame({ - 'artist': ['Artist A', 'Artist B', 'Artist A'], - 'album': ['Album 1', 'Album 2', 'Album 1'], - 'track': ['Track 1', 'Track 2', 'Track 3'], - 'timestamp': [1610000000, 1610000100, 1610000200], - 'date_text': ['2021-01-01 10:00', '2021-01-01 10:01', '2021-01-01 11:02'], - 'lat': [41.0, 42.0, 41.0], - 'lng': [-87.0, -88.0, -87.0], - 'city': ['Reykjavik', 'Perth', 'Reykjavik'] - }) - self.df['date_text'] = pd.to_datetime(self.df['date_text']) + + self.df = pd.DataFrame( + { + "artist": ["Artist A", "Artist B", "Artist A"], + "album": ["Album 1", "Album 2", "Album 1"], + "track": ["Track 1", "Track 2", "Track 3"], + "timestamp": [1610000000, 1610000100, 1610000200], + "date_text": ["2021-01-01 10:00", "2021-01-01 10:01", "2021-01-01 11:02"], + "lat": [41.0, 42.0, 41.0], + "lng": [-87.0, -88.0, -87.0], + "city": ["Reykjavik", "Perth", "Reykjavik"], + } + ) + self.df["date_text"] = pd.to_datetime(self.df["date_text"]) self.df.to_csv(self.test_csv, index=False) def tearDown(self): @@ -30,15 +35,15 @@ def tearDown(self): shutil.rmtree(self.test_dir) def test_filter_data_artist(self): - filtered = filter_data(self.df, artist='Artist A') + filtered = filter_data(self.df, artist="Artist A") self.assertEqual(len(filtered), 2) - self.assertTrue((filtered['artist'] == 'Artist A').all()) + self.assertTrue((filtered["artist"] == "Artist A").all()) def test_filter_data_dates(self): # All tracks are in 2021-01-01 - filtered = filter_data(self.df, start_date='2021-01-01 10:30') + filtered = filter_data(self.df, start_date="2021-01-01 10:30") self.assertEqual(len(filtered), 1) - self.assertEqual(filtered.iloc[0]['track'], 'Track 3') + self.assertEqual(filtered.iloc[0]["track"], "Track 3") def test_create_recording_assets_success(self): deck, keyframes = create_recording_assets(self.test_csv) @@ -47,28 +52,32 @@ def test_create_recording_assets_success(self): self.assertIsNotNone(keyframes) self.assertTrue(len(keyframes) >= 2) - @patch('analysis_utils.load_swarm_data') - @patch('analysis_utils.apply_swarm_offsets') - @patch('os.path.exists') - def test_create_recording_assets_geocoding_trigger(self, mock_exists, mock_apply, mock_load_swarm): + @patch("analysis_utils.load_swarm_data") + @patch("analysis_utils.apply_swarm_offsets") + @patch("os.path.exists") + def test_create_recording_assets_geocoding_trigger( + self, mock_exists, mock_apply, mock_load_swarm + ): # Create CSV without geodata no_geo_csv = os.path.join(self.test_dir, "no_geo.csv") - self.df.drop(columns=['lat', 'lng', 'city']).to_csv(no_geo_csv, index=False) - - mock_load_swarm.return_value = pd.DataFrame({'timestamp': [1]}) - mock_apply.return_value = self.df # Return the one with geodata - + self.df.drop(columns=["lat", "lng", "city"]).to_csv(no_geo_csv, index=False) + + mock_load_swarm.return_value = pd.DataFrame({"timestamp": [1]}) + mock_apply.return_value = self.df # Return the one with geodata + # Configure mock_exists to return True for the CSV and the swarm_dir def exists_side_effect(path): - if path in [no_geo_csv, 'mock_swarm', 'default_assumptions.json']: + if path in [no_geo_csv, "mock_swarm", "default_assumptions.json"]: return True return False + mock_exists.side_effect = exists_side_effect - - create_recording_assets(no_geo_csv, swarm_dir='mock_swarm') - + + create_recording_assets(no_geo_csv, swarm_dir="mock_swarm") + self.assertTrue(mock_load_swarm.called) self.assertTrue(mock_apply.called) -if __name__ == '__main__': + +if __name__ == "__main__": unittest.main() diff --git a/tests/test_swarm_integration.py b/tests/test_swarm_integration.py index 26e2e7c..d8ee7a3 100644 --- a/tests/test_swarm_integration.py +++ b/tests/test_swarm_integration.py @@ -1,46 +1,50 @@ -import unittest -import pandas as pd -import os import json -import tempfile +import os import shutil -from analysis_utils import load_swarm_data, apply_swarm_offsets +import tempfile +import unittest + +import pandas as pd + +from analysis_utils import apply_swarm_offsets, load_swarm_data + class TestSwarmIntegration(unittest.TestCase): def setUp(self): self.test_dir = tempfile.mkdtemp() - + # Create a mock Swarm checkins file self.swarm_data = { "items": [ { "createdAt": "2026-01-01 12:00:00.000000", - "timeZoneOffset": 540, # JST (UTC+9) - "venue": { - "name": "Ramen Shop", - "location": {"city": "Tokyo"} - }, + "timeZoneOffset": 540, # JST (UTC+9) + "venue": {"name": "Ramen Shop", "location": {"city": "Tokyo"}}, "lat": 35.6762, - "lng": 139.6503 + "lng": 139.6503, }, { "createdAt": "2026-01-02 12:00:00.000000", - "timeZoneOffset": 660, # AEDT (UTC+11) - "venue": { - "name": "Opera House", - "location": {"city": "Sydney"} - }, + "timeZoneOffset": 660, # AEDT (UTC+11) + "venue": {"name": "Opera House", "location": {"city": "Sydney"}}, "lat": -33.8568, - "lng": 151.2153 - } + "lng": 151.2153, + }, ] } - + with open(os.path.join(self.test_dir, "checkins1.json"), "w") as f: json.dump(self.swarm_data, f) - + # Mock assumptions - self.assumptions = {"defaults": {"city": "Reykjavik, IS", "lat": 64.1265, "lng": -21.8174, "timezone": "Atlantic/Reykjavik"}} + self.assumptions = { + "defaults": { + "city": "Reykjavik, IS", + "lat": 64.1265, + "lng": -21.8174, + "timezone": "Atlantic/Reykjavik", + } + } def tearDown(self): shutil.rmtree(self.test_dir) @@ -48,82 +52,90 @@ def tearDown(self): def test_load_swarm_data(self): df = load_swarm_data(self.test_dir) self.assertEqual(len(df), 2) - self.assertIn('offset', df.columns) - self.assertEqual(df.iloc[0]['city'], 'Tokyo') - self.assertEqual(df.iloc[1]['city'], 'Sydney') - self.assertEqual(df.iloc[0]['offset'], 540) + self.assertIn("offset", df.columns) + self.assertEqual(df.iloc[0]["city"], "Tokyo") + self.assertEqual(df.iloc[1]["city"], "Sydney") + self.assertEqual(df.iloc[0]["offset"], 540) def test_apply_swarm_offsets(self): swarm_df = load_swarm_data(self.test_dir) - + # Last.fm tracks in UTC tracks = [ # 1 hour after first checkin (2026-01-01 12:00 UTC) - {"timestamp": int(pd.to_datetime("2026-01-01 13:00:00", utc=True).timestamp()), "date_text": "2026-01-01 13:00:00"}, + { + "timestamp": int(pd.to_datetime("2026-01-01 13:00:00", utc=True).timestamp()), + "date_text": "2026-01-01 13:00:00", + }, # 1 hour after second checkin (2026-01-02 12:00 UTC) - {"timestamp": int(pd.to_datetime("2026-01-02 13:00:00", utc=True).timestamp()), "date_text": "2026-01-02 13:00:00"} + { + "timestamp": int(pd.to_datetime("2026-01-02 13:00:00", utc=True).timestamp()), + "date_text": "2026-01-02 13:00:00", + }, ] lastfm_df = pd.DataFrame(tracks) - lastfm_df['date_text'] = pd.to_datetime(lastfm_df['date_text']) - + lastfm_df["date_text"] = pd.to_datetime(lastfm_df["date_text"]) + adjusted_df = apply_swarm_offsets(lastfm_df, swarm_df, self.assumptions) - + # Check first track: 13:00 UTC + 9 hours (JST) = 22:00 - self.assertEqual(adjusted_df.iloc[0]['date_text'].hour, 22) - self.assertEqual(adjusted_df.iloc[0]['city'], 'Tokyo') - + self.assertEqual(adjusted_df.iloc[0]["date_text"].hour, 22) + self.assertEqual(adjusted_df.iloc[0]["city"], "Tokyo") + # Check second track: 13:00 UTC + 11 hours (AEDT) = 00:00 next day (hour 0) - self.assertEqual(adjusted_df.iloc[1]['date_text'].hour, 0) - self.assertEqual(adjusted_df.iloc[1]['city'], 'Sydney') + self.assertEqual(adjusted_df.iloc[1]["date_text"].hour, 0) + self.assertEqual(adjusted_df.iloc[1]["city"], "Sydney") def test_swarm_location_fallbacks(self): # Mock data with missing city but has state json_data = { - 'items': [{ - 'createdAt': 1334000000, - 'timeZoneOffset': 480, - 'venue': { - 'name': 'Western Australia Venue', - 'location': { - 'state': 'Western Australia', - 'lat': -31.9505, - 'lng': 115.8605 - } + "items": [ + { + "createdAt": 1334000000, + "timeZoneOffset": 480, + "venue": { + "name": "Western Australia Venue", + "location": { + "state": "Western Australia", + "lat": -31.9505, + "lng": 115.8605, + }, + }, } - }] + ] } - with open(os.path.join(self.test_dir, 'checkins_fallback.json'), 'w') as f: + with open(os.path.join(self.test_dir, "checkins_fallback.json"), "w") as f: json.dump(json_data, f) df = load_swarm_data(self.test_dir) - self.assertEqual(df.iloc[0]['city'], 'Western Australia') - self.assertEqual(df.iloc[0]['lat'], -31.9505) + self.assertEqual(df.iloc[0]["city"], "Western Australia") + self.assertEqual(df.iloc[0]["lat"], -31.9505) # Ensure timestamp is correct (1334000000) - self.assertEqual(df.iloc[0]['timestamp'], 1334000000) + self.assertEqual(df.iloc[0]["timestamp"], 1334000000) def test_swarm_venue_fallback(self): # Mock data with only venue name json_data = { - 'items': [{ - 'createdAt': 1335000000, - 'timeZoneOffset': 0, - 'venue': { - 'name': 'Greenwich Observatory', - 'location': { - 'lat': 51.4769, - 'lng': 0.0005 - } + "items": [ + { + "createdAt": 1335000000, + "timeZoneOffset": 0, + "venue": { + "name": "Greenwich Observatory", + "location": {"lat": 51.4769, "lng": 0.0005}, + }, } - }] + ] } - with open(os.path.join(self.test_dir, 'checkins_venue.json'), 'w') as f: + with open(os.path.join(self.test_dir, "checkins_venue.json"), "w") as f: json.dump(json_data, f) df = load_swarm_data(self.test_dir) - self.assertEqual(df.iloc[0]['city'], 'Greenwich Observatory') - self.assertEqual(df.iloc[0]['lat'], 51.4769) + self.assertEqual(df.iloc[0]["city"], "Greenwich Observatory") + self.assertEqual(df.iloc[0]["lat"], 51.4769) + -if __name__ == '__main__': +if __name__ == "__main__": unittest.main() diff --git a/tests/test_visualize.py b/tests/test_visualize.py index b56b908..0876803 100644 --- a/tests/test_visualize.py +++ b/tests/test_visualize.py @@ -1,163 +1,212 @@ -import unittest -import pandas as pd import os import shutil -from unittest.mock import patch, MagicMock, ANY -from visualize import render_top_charts, render_timeline_analysis, render_spatial_analysis, render_insights_and_narrative, main -from analysis_utils import load_listening_data +import unittest +from unittest.mock import ANY, MagicMock, patch + +import pandas as pd + +from visualize import ( + main, + render_insights_and_narrative, + render_spatial_analysis, + render_timeline_analysis, + render_top_charts, +) + class TestVisualize(unittest.TestCase): def setUp(self): self.test_dir = "data_test" os.makedirs(self.test_dir, exist_ok=True) self.test_csv = os.path.join(self.test_dir, "test_user_tracks.csv") - - self.df = pd.DataFrame({ - 'artist': ['Artist 1', 'Artist 2', 'Artist 1'], - 'album': ['Album 1', 'Album 2', 'Album 1'], - 'track': ['Track 1', 'Track 2', 'Track 3'], - 'timestamp': [1610000000, 1610000100, 1610000200], - 'date_text': ['2021-01-01 10:00', '2021-01-01 10:01', '2021-01-01 11:02'], - 'lat': [41.0, 42.0, 41.0], - 'lng': [-87.0, -88.0, -87.0], - 'city': ['Reykjavik', 'Perth', 'Reykjavik'], - 'state': ['IS', 'WA', 'IS'], - 'country': ['Iceland', 'Australia', 'Iceland'] - }) - self.df['date_text'] = pd.to_datetime(self.df['date_text']) + + self.df = pd.DataFrame( + { + "artist": ["Artist 1", "Artist 2", "Artist 1"], + "album": ["Album 1", "Album 2", "Album 1"], + "track": ["Track 1", "Track 2", "Track 3"], + "timestamp": [1610000000, 1610000100, 1610000200], + "date_text": ["2021-01-01 10:00", "2021-01-01 10:01", "2021-01-01 11:02"], + "lat": [41.0, 42.0, 41.0], + "lng": [-87.0, -88.0, -87.0], + "city": ["Reykjavik", "Perth", "Reykjavik"], + "state": ["IS", "WA", "IS"], + "country": ["Iceland", "Australia", "Iceland"], + } + ) + self.df["date_text"] = pd.to_datetime(self.df["date_text"]) self.df.to_csv(self.test_csv, index=False) def tearDown(self): if os.path.exists(self.test_dir): shutil.rmtree(self.test_dir) - @patch('streamlit.header') - @patch('streamlit.selectbox') - @patch('streamlit.date_input') - @patch('streamlit.columns') - @patch('streamlit.pydeck_chart') - @patch('streamlit.dataframe') - @patch('streamlit.slider') - def test_render_spatial_analysis(self, mock_slider, mock_df, mock_deck, mock_cols, mock_date, mock_select, mock_header): - mock_select.return_value = 'All' - mock_date.return_value = [self.df['date_text'].min().date(), self.df['date_text'].max().date()] + @patch("streamlit.header") + @patch("streamlit.selectbox") + @patch("streamlit.date_input") + @patch("streamlit.columns") + @patch("streamlit.pydeck_chart") + @patch("streamlit.dataframe") + @patch("streamlit.slider") + def test_render_spatial_analysis( + self, mock_slider, mock_df, mock_deck, mock_cols, mock_date, mock_select, mock_header + ): + mock_select.return_value = "All" + mock_date.return_value = [ + self.df["date_text"].min().date(), + self.df["date_text"].max().date(), + ] mock_cols.side_effect = [ - [MagicMock(), MagicMock()], # col_f1, col_f2 - [MagicMock(), MagicMock(), MagicMock()], # col_a, col_b, col_c - [MagicMock(), MagicMock()] # fly_col1, fly_col2 + [MagicMock(), MagicMock()], # col_f1, col_f2 + [MagicMock(), MagicMock(), MagicMock()], # col_a, col_b, col_c + [MagicMock(), MagicMock()], # fly_col1, fly_col2 ] mock_slider.return_value = 3.0 - + mock_state = MagicMock() mock_state.__contains__.return_value = False - - with patch('streamlit.session_state', mock_state): + + with patch("streamlit.session_state", mock_state): render_spatial_analysis(self.df) - + mock_header.assert_called_with("Spatial Music Explorer") mock_deck.assert_called_once() mock_df.assert_called_once() - @patch('streamlit.header') - @patch('streamlit.radio') - @patch('streamlit.slider') - @patch('streamlit.columns') - @patch('streamlit.plotly_chart') - def test_render_top_charts(self, mock_plotly, mock_columns, mock_slider, mock_radio, mock_header): - mock_radio.return_value = 'artist' + @patch("streamlit.header") + @patch("streamlit.radio") + @patch("streamlit.slider") + @patch("streamlit.columns") + @patch("streamlit.plotly_chart") + def test_render_top_charts( + self, mock_plotly, mock_columns, mock_slider, mock_radio, mock_header + ): + mock_radio.return_value = "artist" mock_slider.return_value = 10 mock_columns.return_value = [MagicMock(), MagicMock()] - + render_top_charts(self.df) - + mock_header.assert_called_with("Top Charts") self.assertEqual(mock_plotly.call_count, 2) - mock_plotly.assert_any_call(ANY, width='stretch') - - @patch('streamlit.header') - @patch('streamlit.selectbox') - @patch('streamlit.plotly_chart') - @patch('streamlit.subheader') - def test_render_timeline_analysis(self, mock_subheader, mock_plotly, mock_selectbox, mock_header): - mock_selectbox.return_value = 'Daily' - + mock_plotly.assert_any_call(ANY, width="stretch") + + @patch("streamlit.header") + @patch("streamlit.selectbox") + @patch("streamlit.plotly_chart") + @patch("streamlit.subheader") + def test_render_timeline_analysis( + self, mock_subheader, mock_plotly, mock_selectbox, mock_header + ): + mock_selectbox.return_value = "Daily" + render_timeline_analysis(self.df) - + mock_header.assert_called_with("Activity Over Time") self.assertEqual(mock_plotly.call_count, 2) - mock_plotly.assert_any_call(ANY, width='stretch') - - @patch('streamlit.header') - @patch('streamlit.subheader') - @patch('streamlit.selectbox') - @patch('streamlit.columns') - @patch('streamlit.plotly_chart') - @patch('streamlit.dataframe') - @patch('streamlit.tabs') - @patch('streamlit.metric') - def test_render_insights_and_narrative(self, mock_metric, mock_tabs, mock_df, mock_plotly, mock_cols, mock_select, mock_subheader, mock_header): - mock_select.return_value = 'All' + mock_plotly.assert_any_call(ANY, width="stretch") + + @patch("streamlit.header") + @patch("streamlit.subheader") + @patch("streamlit.selectbox") + @patch("streamlit.columns") + @patch("streamlit.plotly_chart") + @patch("streamlit.dataframe") + @patch("streamlit.tabs") + @patch("streamlit.metric") + def test_render_insights_and_narrative( + self, + mock_metric, + mock_tabs, + mock_df, + mock_plotly, + mock_cols, + mock_select, + mock_subheader, + mock_header, + ): + mock_select.return_value = "All" # Provide lists for each st.columns call mock_cols.side_effect = [ - [MagicMock()] * 4, # col_filter1-4 - [MagicMock()] * 2, # col_top1-2 - [MagicMock()] * 2, # col_pat1-2 - [MagicMock()] * 2 # col_nar1-2 + [MagicMock()] * 4, # col_filter1-4 + [MagicMock()] * 2, # col_top1-2 + [MagicMock()] * 2, # col_pat1-2 + [MagicMock()] * 2, # col_nar1-2 ] mock_tabs.return_value = [MagicMock(), MagicMock(), MagicMock()] - + render_insights_and_narrative(self.df) - + mock_header.assert_called_with("Insights & Narrative") self.assertTrue(mock_select.called) - @patch('streamlit.set_page_config') - @patch('streamlit.title') - @patch('streamlit.sidebar.selectbox') - @patch('visualize.load_listening_data') - @patch('streamlit.tabs') - @patch('streamlit.sidebar.header') - @patch('streamlit.sidebar.text_input') - @patch('streamlit.sidebar.date_input') - @patch('streamlit.sidebar.button') - @patch('streamlit.spinner') - def test_main_success(self, mock_spinner, mock_button, mock_date_input, mock_text_input, mock_sidebar_header, mock_tabs, mock_load, mock_selectbox, mock_title, mock_config): + @patch("streamlit.set_page_config") + @patch("streamlit.title") + @patch("streamlit.sidebar.selectbox") + @patch("visualize.load_listening_data") + @patch("streamlit.tabs") + @patch("streamlit.sidebar.header") + @patch("streamlit.sidebar.text_input") + @patch("streamlit.sidebar.date_input") + @patch("streamlit.sidebar.button") + @patch("streamlit.spinner") + def test_main_success( + self, + mock_spinner, + mock_button, + mock_date_input, + mock_text_input, + mock_sidebar_header, + mock_tabs, + mock_load, + mock_selectbox, + mock_title, + mock_config, + ): original_exists = os.path.exists - mock_date_input.return_value = [self.df['date_text'].min().date(), self.df['date_text'].max().date()] + mock_date_input.return_value = [ + self.df["date_text"].min().date(), + self.df["date_text"].max().date(), + ] mock_button.return_value = False - + mock_spinner.return_value.__enter__.return_value = None mock_spinner.return_value.__exit__.return_value = None - - with patch('os.listdir') as mock_listdir, \ - patch('visualize.render_top_charts'), \ - patch('visualize.render_timeline_analysis'), \ - patch('visualize.render_spatial_analysis'), \ - patch('visualize.render_insights_and_narrative'): - - with patch('os.path.exists') as mock_exists: + + with ( + patch("os.listdir") as mock_listdir, + patch("visualize.render_top_charts"), + patch("visualize.render_timeline_analysis"), + patch("visualize.render_spatial_analysis"), + patch("visualize.render_insights_and_narrative"), + ): + with patch("os.path.exists") as mock_exists: + def side_effect(path): - if path == 'data': return True - if path == 'default_assumptions.json.example': return True + if path == "data": + return True + if path == "default_assumptions.json.example": + return True return original_exists(path) + mock_exists.side_effect = side_effect - - mock_listdir.return_value = ['test_user_tracks.csv'] - mock_selectbox.return_value = 'test_user_tracks.csv' - mock_text_input.side_effect = ['data', '', 'default_assumptions.json.example'] + + mock_listdir.return_value = ["test_user_tracks.csv"] + mock_selectbox.return_value = "test_user_tracks.csv" + mock_text_input.side_effect = ["data", "", "default_assumptions.json.example"] mock_load.return_value = self.df - + # Updated to 4 tabs tab1, tab2, tab3, tab4 = MagicMock(), MagicMock(), MagicMock(), MagicMock() mock_tabs.return_value = [tab1, tab2, tab3, tab4] - + main() - + mock_title.assert_called_with("Autobiographer: Interactive Data Explorer") mock_load.assert_called_once() self.assertEqual(len(mock_tabs.return_value), 4) self.assertTrue(mock_date_input.called) -if __name__ == '__main__': - unittest.main() +if __name__ == "__main__": + unittest.main() diff --git a/tools/add_audio_to_video.py b/tools/add_audio_to_video.py index 7c33d92..d6b7a49 100644 --- a/tools/add_audio_to_video.py +++ b/tools/add_audio_to_video.py @@ -1,9 +1,10 @@ import argparse import os -import sys -from moviepy import VideoFileClip, AudioFileClip -def add_audio_to_video(video_path, audio_path, output_path): +from moviepy import AudioFileClip, VideoFileClip + + +def add_audio_to_video(video_path: str, audio_path: str, output_path: str) -> bool: """ Merges an audio file with a video file, trimming the audio to match the video duration. """ @@ -17,38 +18,44 @@ def add_audio_to_video(video_path, audio_path, output_path): try: video = VideoFileClip(video_path) audio = AudioFileClip(audio_path) - + # Trim audio if it's longer than the video if audio.duration > video.duration: print(f"Trimming audio from {audio.duration:.2f}s to match video {video.duration:.2f}s") audio = audio.subclipped(0, video.duration) - + # Attach audio to video final_video = video.with_audio(audio) - + # Write the result print(f"Writing final video to '{output_path}'...") final_video.write_videofile(output_path, codec="libx264", audio_codec="aac") - + # Close clips to free resources video.close() audio.close() - + print("Success!") return True except Exception as e: print(f"An error occurred: {e}") return False -def main(): + +def main() -> None: parser = argparse.ArgumentParser(description="Helper script to add audio to a video file.") parser.add_argument("--video", required=True, help="Path to the input MP4 video file") - parser.add_argument("--audio", required=True, help="Path to the input audio file (mp3, wav, etc.)") - parser.add_argument("--output", default="output_with_audio.mp4", help="Path for the output video file") + parser.add_argument( + "--audio", required=True, help="Path to the input audio file (mp3, wav, etc.)" + ) + parser.add_argument( + "--output", default="output_with_audio.mp4", help="Path for the output video file" + ) args = parser.parse_args() - + add_audio_to_video(args.video, args.audio, args.output) + if __name__ == "__main__": main() diff --git a/visualize.py b/visualize.py index d56d11f..0f8452e 100644 --- a/visualize.py +++ b/visualize.py @@ -1,110 +1,118 @@ -import streamlit as st +import io +import os + +import geopandas as gpd import pandas as pd import plotly.express as px -import plotly.graph_objects as go import pydeck as pdk -import os -import io -import geopandas as gpd +import streamlit as st from shapely.geometry import Point + from analysis_utils import ( - load_listening_data, - get_top_entities, - get_unique_entities, - get_listening_intensity, + apply_swarm_offsets, + get_cache_key, + get_cached_data, get_cumulative_plays, + get_forgotten_favorites, get_hourly_distribution, - get_milestones, + get_listening_intensity, get_listening_streaks, - get_forgotten_favorites, - get_cache_key, - get_cached_data, - save_to_cache, + get_milestones, + get_top_entities, + get_unique_entities, load_assumptions, + load_listening_data, load_swarm_data, - apply_swarm_offsets + save_to_cache, ) -def render_spatial_analysis(df: pd.DataFrame): + +def render_spatial_analysis(df: pd.DataFrame) -> None: """Render 3D geographical visualization of listening history.""" st.header("Spatial Music Explorer") - - if 'lat' not in df.columns or df['lat'].isna().all(): - st.warning("No geographic data found. Please provide a Swarm data directory to enable this view.") + + if "lat" not in df.columns or df["lat"].isna().all(): + st.warning( + "No geographic data found. Please provide a Swarm data directory to enable this view." + ) return # Sidebar-like filters within the tab for artist and timeframe col_f1, col_f2 = st.columns(2) with col_f1: - artists = ["All"] + sorted(df['artist'].dropna().unique().tolist()) + artists = ["All"] + sorted(df["artist"].dropna().unique().tolist()) selected_artist = st.selectbox("Filter by Artist", artists) with col_f2: - min_date = df['date_text'].min().date() - max_date = df['date_text'].max().date() - date_range = st.date_input("Filter by Date Range", [min_date, max_date], key="spatial_date_range") + min_date = df["date_text"].min().date() + max_date = df["date_text"].max().date() + date_range = st.date_input( + "Filter by Date Range", [min_date, max_date], key="spatial_date_range" + ) # Filter data map_df = df.copy() if selected_artist != "All": - map_df = map_df[map_df['artist'] == selected_artist] - + map_df = map_df[map_df["artist"] == selected_artist] + if len(date_range) == 2: - map_df = map_df[(map_df['date_text'].dt.date >= date_range[0]) & - (map_df['date_text'].dt.date <= date_range[1])] + map_df = map_df[ + (map_df["date_text"].dt.date >= date_range[0]) + & (map_df["date_text"].dt.date <= date_range[1]) + ] if map_df.empty: st.info("No data matches the selected filters.") return # Aggregate by location - geo_data = map_df.groupby(['lat', 'lng', 'city']).size().reset_index(name='Plays') - + geo_data = map_df.groupby(["lat", "lng", "city"]).size().reset_index(name="Plays") + # Initialize view state in session state if not present if "spatial_view_state" not in st.session_state: st.session_state.spatial_view_state = pdk.ViewState( - latitude=geo_data['lat'].mean(), - longitude=geo_data['lng'].mean(), + latitude=geo_data["lat"].mean(), + longitude=geo_data["lng"].mean(), zoom=3, pitch=45, - bearing=0 + bearing=0, ) # Map control sliders col_a, col_b, col_c = st.columns(3) - + # Robustly get current values from view state - def get_view_val(attr, default): + def get_view_val(attr: str, default: float) -> float: val = getattr(st.session_state.spatial_view_state, attr, default) return float(val) if val is not None else float(default) with col_a: zoom_level = st.slider( - "Map Zoom", - min_value=1.0, - max_value=15.0, - value=get_view_val('zoom', 3.0), + "Map Zoom", + min_value=1.0, + max_value=15.0, + value=get_view_val("zoom", 3.0), step=0.5, - key="zoom_slider" + key="zoom_slider", ) with col_b: bearing = st.slider( "Rotation", min_value=-180.0, max_value=180.0, - value=get_view_val('bearing', 0.0), + value=get_view_val("bearing", 0.0), step=5.0, - key="bearing_slider" + key="bearing_slider", ) with col_c: pitch = st.slider( "Tilt", min_value=0.0, max_value=90.0, - value=get_view_val('pitch', 45.0), + value=get_view_val("pitch", 45.0), step=5.0, - key="pitch_slider" + key="pitch_slider", ) - + # Update the session state st.session_state.spatial_view_state.zoom = zoom_level st.session_state.spatial_view_state.bearing = bearing @@ -116,20 +124,24 @@ def get_view_val(attr, default): with fly_col1: if st.button("🎬 Play Fly-through"): # Get top 5 cities for the tour - top_cities = geo_data.sort_values('Plays', ascending=False).head(5) + top_cities = geo_data.sort_values("Plays", ascending=False).head(5) # Add a "World View" at the end keyframes = [] for _, row in top_cities.iterrows(): - keyframes.append({ - "lat": row['lat'], "lng": row['lng'], - "zoom": 12, "pitch": 60, "bearing": 30 - }) + keyframes.append( + {"lat": row["lat"], "lng": row["lng"], "zoom": 12, "pitch": 60, "bearing": 30} + ) # Add global view - keyframes.append({ - "lat": geo_data['lat'].mean(), "lng": geo_data['lng'].mean(), - "zoom": 2, "pitch": 0, "bearing": 0 - }) - + keyframes.append( + { + "lat": geo_data["lat"].mean(), + "lng": geo_data["lng"].mean(), + "zoom": 2, + "pitch": 0, + "bearing": 0, + } + ) + st.session_state.fly_keyframes = keyframes st.session_state.fly_index = 0 st.rerun() @@ -138,24 +150,43 @@ def get_view_val(attr, default): # Issue: Export Recording HTML directly from UI if st.button("💾 Export Recording HTML"): import json + # Use top 8 for export - top_cities = geo_data.sort_values('Plays', ascending=False).head(8) + top_cities = geo_data.sort_values("Plays", ascending=False).head(8) export_keyframes = [] # Start wide - export_keyframes.append({ - "latitude": geo_data['lat'].mean(), "longitude": geo_data['lng'].mean(), - "zoom": 2, "pitch": 0, "bearing": 0, "duration": 2000 - }) + export_keyframes.append( + { + "latitude": geo_data["lat"].mean(), + "longitude": geo_data["lng"].mean(), + "zoom": 2, + "pitch": 0, + "bearing": 0, + "duration": 2000, + } + ) for _, row in top_cities.iterrows(): - export_keyframes.append({ - "latitude": row['lat'], "longitude": row['lng'], - "zoom": 11, "pitch": 60, "bearing": 45, "duration": 4000 - }) + export_keyframes.append( + { + "latitude": row["lat"], + "longitude": row["lng"], + "zoom": 11, + "pitch": 60, + "bearing": 45, + "duration": 4000, + } + ) # End wide - export_keyframes.append({ - "latitude": geo_data['lat'].mean(), "longitude": geo_data['lng'].mean(), - "zoom": 3, "pitch": 45, "bearing": 0, "duration": 4000 - }) + export_keyframes.append( + { + "latitude": geo_data["lat"].mean(), + "longitude": geo_data["lng"].mean(), + "zoom": 3, + "pitch": 45, + "bearing": 0, + "duration": 4000, + } + ) # Create the deck for export export_deck = pdk.Deck( @@ -171,11 +202,11 @@ def get_view_val(attr, default): ) ], initial_view_state=pdk.ViewState( - latitude=export_keyframes[0]['latitude'], - longitude=export_keyframes[0]['longitude'], - zoom=export_keyframes[0]['zoom'] + latitude=export_keyframes[0]["latitude"], + longitude=export_keyframes[0]["longitude"], + zoom=export_keyframes[0]["zoom"], ), - map_style="light" + map_style="light", ) html_content = export_deck.to_html(as_string=True) @@ -201,28 +232,31 @@ def get_view_val(attr, default): """ html_content = html_content.replace("", f"{animation_script}") - + st.download_button( label="📥 Download Fly-through HTML", data=html_content, file_name="music_flythrough.html", - mime="text/html" + mime="text/html", ) - if "fly_keyframes" in st.session_state and st.session_state.fly_index < len(st.session_state.fly_keyframes): + if "fly_keyframes" in st.session_state and st.session_state.fly_index < len( + st.session_state.fly_keyframes + ): kf = st.session_state.fly_keyframes[st.session_state.fly_index] # Set transition on the view state st.session_state.spatial_view_state = pdk.ViewState( - latitude=kf['lat'], - longitude=kf['lng'], - zoom=kf['zoom'], - pitch=kf['pitch'], - bearing=kf['bearing'], + latitude=kf["lat"], + longitude=kf["lng"], + zoom=kf["zoom"], + pitch=kf["pitch"], + bearing=kf["bearing"], transition_duration=3000, - transition_interp='FLY_TO' + transition_interp="FLY_TO", ) st.session_state.fly_index += 1 import time + time.sleep(3.2) st.rerun() elif "fly_keyframes" in st.session_state: @@ -232,8 +266,9 @@ def get_view_val(attr, default): st.success("Fly-through complete!") # Color Spectrum Implementation - def get_spectrum_color(val, max_val): - if max_val == 0: return [236, 226, 240, 200] + def get_spectrum_color(val: float, max_val: float) -> list[int]: + if max_val == 0: + return [236, 226, 240, 200] ratio = val / max_val if ratio < 0.5: r = 236 + (166 - 236) * (ratio * 2) @@ -246,19 +281,22 @@ def get_spectrum_color(val, max_val): return [int(r), int(g), int(b), 220] # Calculate radius based on zoom (balanced to maintain visual size) - dynamic_radius = (50000 / (2 ** (zoom_level - 1))) + dynamic_radius = 50000 / (2 ** (zoom_level - 1)) import numpy as np - geo_data['elevation_log'] = np.log1p(geo_data['Plays']) - max_log = geo_data['elevation_log'].max() - + + geo_data["elevation_log"] = np.log1p(geo_data["Plays"]) + max_log = geo_data["elevation_log"].max() + # Scale log elevation to maintain a 7:1 height-to-width ratio for the tallest marker. - geo_data['elevation'] = (geo_data['elevation_log'] / max_log) * (1.4 * dynamic_radius) if max_log > 0 else 0 - geo_data['color'] = geo_data['elevation_log'].apply(lambda x: get_spectrum_color(x, max_log)) + geo_data["elevation"] = ( + (geo_data["elevation_log"] / max_log) * (1.4 * dynamic_radius) if max_log > 0 else 0 + ) + geo_data["color"] = geo_data["elevation_log"].apply(lambda x: get_spectrum_color(x, max_log)) # Process Map Highlights @st.cache_data - def get_highlighted_map(geo_points_json): + def get_highlighted_map(geo_points_json: str) -> tuple: countries_path = os.path.join("assets", "countries.geojson") states_path = os.path.join("assets", "states.geojson") if not os.path.exists(countries_path): @@ -268,39 +306,85 @@ def get_highlighted_map(geo_points_json): geometry = [Point(xy) for xy in zip(points_df.lng, points_df.lat)] points_gpd = gpd.GeoDataFrame(points_df, geometry=geometry, crs="EPSG:4326") countries_with_points = gpd.sjoin(world, points_gpd, how="inner", predicate="intersects") - def country_color_logic(country_idx): + + def country_color_logic(country_idx: int) -> list[int]: if country_idx in countries_with_points.index: - return [166, 189, 219, 130] + return [166, 189, 219, 130] return [240, 240, 240, 30] - world['fill_color'] = world.index.map(country_color_logic) + + world["fill_color"] = world.index.map(country_color_logic) return world, states_path world_gdf, states_geojson_path = get_highlighted_map(geo_data.to_json()) layers = [] if world_gdf is not None: - layers.append(pdk.Layer("GeoJsonLayer", world_gdf, stroked=True, filled=True, get_fill_color="fill_color", get_line_color=[100, 100, 100], get_line_width=1)) + layers.append( + pdk.Layer( + "GeoJsonLayer", + world_gdf, + stroked=True, + filled=True, + get_fill_color="fill_color", + get_line_color=[100, 100, 100], + get_line_width=1, + ) + ) if states_geojson_path and os.path.exists(states_geojson_path): - layers.append(pdk.Layer("GeoJsonLayer", states_geojson_path, stroked=True, filled=False, get_line_color=[150, 150, 150, 100], get_line_width=1)) - - layers.extend([ - pdk.Layer("ScatterplotLayer", geo_data, get_position=["lng", "lat"], get_fill_color="color", radius=dynamic_radius * 1.2, pickable=True), - pdk.Layer("ColumnLayer", geo_data, get_position=["lng", "lat"], get_elevation="elevation", elevation_scale=10, radius=dynamic_radius, get_fill_color="color", pickable=True, auto_highlight=True) - ]) + layers.append( + pdk.Layer( + "GeoJsonLayer", + states_geojson_path, + stroked=True, + filled=False, + get_line_color=[150, 150, 150, 100], + get_line_width=1, + ) + ) - r = pdk.Deck(layers=layers, initial_view_state=st.session_state.spatial_view_state, tooltip={"text": "{city}: {Plays} plays"}, map_style="light") + layers.extend( + [ + pdk.Layer( + "ScatterplotLayer", + geo_data, + get_position=["lng", "lat"], + get_fill_color="color", + radius=dynamic_radius * 1.2, + pickable=True, + ), + pdk.Layer( + "ColumnLayer", + geo_data, + get_position=["lng", "lat"], + get_elevation="elevation", + elevation_scale=10, + radius=dynamic_radius, + get_fill_color="color", + pickable=True, + auto_highlight=True, + ), + ] + ) + + r = pdk.Deck( + layers=layers, + initial_view_state=st.session_state.spatial_view_state, + tooltip={"text": "{city}: {Plays} plays"}, + map_style="light", + ) st.pydeck_chart(r, key="spatial_map") - + st.markdown(""" **Map Navigation Gestures:** - **Pan**: Left-click and drag. - **Rotate/Tilt**: Right-click and drag (or use the sliders above). - **Zoom**: Mouse wheel or pinch gesture. """) - - st.dataframe(geo_data.sort_values('Plays', ascending=False), hide_index=True) -def render_top_charts(df: pd.DataFrame): + st.dataframe(geo_data.sort_values("Plays", ascending=False), hide_index=True) + + +def render_top_charts(df: pd.DataFrame) -> None: """Render top entity charts with toggle.""" st.header("Top Charts") entity_type = st.radio("Select chart type", ["artist", "album", "track"], horizontal=True) @@ -308,39 +392,49 @@ def render_top_charts(df: pd.DataFrame): top_data = get_top_entities(df, entity_type, limit=limit) col1, col2 = st.columns([2, 1]) with col1: - fig_bar = px.bar(top_data, x='Plays', y=entity_type, orientation='h', title=f"Top {limit} {entity_type.capitalize()}s") - fig_bar.update_layout(yaxis={'categoryorder':'total ascending'}) - st.plotly_chart(fig_bar, width='stretch') + fig_bar = px.bar( + top_data, + x="Plays", + y=entity_type, + orientation="h", + title=f"Top {limit} {entity_type.capitalize()}s", + ) + fig_bar.update_layout(yaxis={"categoryorder": "total ascending"}) + st.plotly_chart(fig_bar, width="stretch") with col2: - fig_pie = px.pie(top_data.head(10), values='Plays', names=entity_type, title=f"Market Share (Top 10)") - st.plotly_chart(fig_pie, width='stretch') + fig_pie = px.pie( + top_data.head(10), values="Plays", names=entity_type, title="Market Share (Top 10)" + ) + st.plotly_chart(fig_pie, width="stretch") + -def render_timeline_analysis(df: pd.DataFrame): +def render_timeline_analysis(df: pd.DataFrame) -> None: """Render various timeline and activity charts.""" st.header("Activity Over Time") freq_map = {"Daily": "D", "Weekly": "W", "Monthly": "ME"} freq_label = st.selectbox("Select grouping frequency", list(freq_map.keys())) intensity = get_listening_intensity(df, freq_map[freq_label]) - fig_intensity = px.line(intensity, x='date', y='Plays', title=f"Plays per {freq_label}") - st.plotly_chart(fig_intensity, width='stretch') + fig_intensity = px.line(intensity, x="date", y="Plays", title=f"Plays per {freq_label}") + st.plotly_chart(fig_intensity, width="stretch") st.subheader("Cumulative Growth") cumulative = get_cumulative_plays(df) - fig_cumulative = px.area(cumulative, x='date', y='CumulativePlays', title="Total Plays Growth") - st.plotly_chart(fig_cumulative, width='stretch') + fig_cumulative = px.area(cumulative, x="date", y="CumulativePlays", title="Total Plays Growth") + st.plotly_chart(fig_cumulative, width="stretch") -def render_insights_and_narrative(df: pd.DataFrame): + +def render_insights_and_narrative(df: pd.DataFrame) -> None: """Merged tab for Patterns, Narrative, and granular filtering.""" st.header("Insights & Narrative") - + # Granular Filters (Issue #41) st.subheader("Explore Patterns by Time & Location") - + # Prepare filter options - years = ["All"] + sorted(df['date_text'].dt.year.unique().astype(str).tolist(), reverse=True) + years = ["All"] + sorted(df["date_text"].dt.year.unique().astype(str).tolist(), reverse=True) months = ["All"] + list(range(1, 13)) - countries = ["All"] + sorted(df['country'].unique().tolist()) - states = ["All"] + sorted(df['state'].unique().tolist()) - + countries = ["All"] + sorted(df["country"].unique().tolist()) + states = ["All"] + sorted(df["state"].unique().tolist()) + col_filter1, col_filter2, col_filter3, col_filter4 = st.columns(4) with col_filter1: selected_year = st.selectbox("Year", years) @@ -350,48 +444,57 @@ def render_insights_and_narrative(df: pd.DataFrame): selected_country = st.selectbox("Country", countries) with col_filter4: selected_state = st.selectbox("State", states) - + # Apply filters to a local copy for analysis filtered_df = df.copy() if selected_year != "All": - filtered_df = filtered_df[filtered_df['date_text'].dt.year == int(selected_year)] + filtered_df = filtered_df[filtered_df["date_text"].dt.year == int(selected_year)] if selected_month != "All": - filtered_df = filtered_df[filtered_df['date_text'].dt.month == int(selected_month)] + filtered_df = filtered_df[filtered_df["date_text"].dt.month == int(selected_month)] if selected_country != "All": - filtered_df = filtered_df[filtered_df['country'] == selected_country] + filtered_df = filtered_df[filtered_df["country"] == selected_country] if selected_state != "All": - filtered_df = filtered_df[filtered_df['state'] == selected_state] - + filtered_df = filtered_df[filtered_df["state"] == selected_state] + if filtered_df.empty: st.warning("No data found for the selected granular filters.") else: # 1. Top & Unique Analysis (Issue #41) col_top1, col_top2 = st.columns(2) - + with col_top1: - st.markdown(f"### Top Artists & Tracks") + st.markdown("### Top Artists & Tracks") tabs_top = st.tabs(["Artists", "Tracks", "Albums"]) with tabs_top[0]: - st.dataframe(get_top_entities(filtered_df, 'artist'), hide_index=True, width='stretch') + st.dataframe( + get_top_entities(filtered_df, "artist"), hide_index=True, width="stretch" + ) with tabs_top[1]: - st.dataframe(get_top_entities(filtered_df, 'track'), hide_index=True, width='stretch') + st.dataframe( + get_top_entities(filtered_df, "track"), hide_index=True, width="stretch" + ) with tabs_top[2]: - st.dataframe(get_top_entities(filtered_df, 'album'), hide_index=True, width='stretch') - + st.dataframe( + get_top_entities(filtered_df, "album"), hide_index=True, width="stretch" + ) + with col_top2: - st.markdown(f"### Most Unique to this Selection") - st.caption("Entities that are more characteristic of this filter compared to your overall history.") + st.markdown("### Most Unique to this Selection") + st.caption( + "Entities that are more characteristic of this filter " + "compared to your overall history." + ) tabs_unique = st.tabs(["Artists", "Tracks"]) with tabs_unique[0]: - unique_artists = get_unique_entities(filtered_df, df, 'artist') + unique_artists = get_unique_entities(filtered_df, df, "artist") if not unique_artists.empty: - st.dataframe(unique_artists, hide_index=True, width='stretch') + st.dataframe(unique_artists, hide_index=True, width="stretch") else: st.info("Not enough data for uniqueness score.") with tabs_unique[1]: - unique_tracks = get_unique_entities(filtered_df, df, 'track') + unique_tracks = get_unique_entities(filtered_df, df, "track") if not unique_tracks.empty: - st.dataframe(unique_tracks, hide_index=True, width='stretch') + st.dataframe(unique_tracks, hide_index=True, width="stretch") else: st.info("Not enough data for uniqueness score.") @@ -401,18 +504,35 @@ def render_insights_and_narrative(df: pd.DataFrame): col_pat1, col_pat2 = st.columns(2) with col_pat1: hourly = get_hourly_distribution(filtered_df) - fig_hourly = px.bar(hourly, x='hour', y='Plays', title="Listening by Hour of Day") - st.plotly_chart(fig_hourly, width='stretch') + fig_hourly = px.bar(hourly, x="hour", y="Plays", title="Listening by Hour of Day") + st.plotly_chart(fig_hourly, width="stretch") with col_pat2: df_copy = filtered_df.copy() - df_copy['day_of_week'] = df_copy['date_text'].dt.day_name() - df_copy['hour'] = df_copy['date_text'].dt.hour - heatmap_data = df_copy.groupby(['day_of_week', 'hour']).size().reset_index(name='Plays') - days_order = ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday'] - heatmap_data['day_of_week'] = pd.Categorical(heatmap_data['day_of_week'], categories=days_order, ordered=True) - heatmap_pivot = heatmap_data.pivot(index='day_of_week', columns='hour', values='Plays').fillna(0) - fig_heatmap = px.imshow(heatmap_pivot, labels=dict(x="Hour of Day", y="Day of Week", color="Plays"), title="Listening Intensity (Day vs Hour)", aspect="auto") - st.plotly_chart(fig_heatmap, width='stretch') + df_copy["day_of_week"] = df_copy["date_text"].dt.day_name() + df_copy["hour"] = df_copy["date_text"].dt.hour + heatmap_data = df_copy.groupby(["day_of_week", "hour"]).size().reset_index(name="Plays") + days_order = [ + "Monday", + "Tuesday", + "Wednesday", + "Thursday", + "Friday", + "Saturday", + "Sunday", + ] + heatmap_data["day_of_week"] = pd.Categorical( + heatmap_data["day_of_week"], categories=days_order, ordered=True + ) + heatmap_pivot = heatmap_data.pivot( + index="day_of_week", columns="hour", values="Plays" + ).fillna(0) + fig_heatmap = px.imshow( + heatmap_pivot, + labels=dict(x="Hour of Day", y="Day of Week", color="Plays"), + title="Listening Intensity (Day vs Hour)", + aspect="auto", + ) + st.plotly_chart(fig_heatmap, width="stretch") # 3. Narrative Elements (Original Narrative tab) st.markdown("---") @@ -422,7 +542,7 @@ def render_insights_and_narrative(df: pd.DataFrame): st.markdown("#### Milestones") milestones = get_milestones(filtered_df) if not milestones.empty: - st.dataframe(milestones, hide_index=True, width='stretch') + st.dataframe(milestones, hide_index=True, width="stretch") else: st.info("No major milestones in this selection.") with col_nar2: @@ -430,7 +550,7 @@ def render_insights_and_narrative(df: pd.DataFrame): streaks = get_listening_streaks(filtered_df) st.metric("Longest Streak", f"{streaks['longest_streak']} days") st.metric("Current Streak", f"{streaks['current_streak']} days") - + st.markdown("#### Forgotten Favorites") st.write("Artists you loved overall but haven't heard in this period (or recently):") forgotten = get_forgotten_favorites(filtered_df) if not filtered_df.empty else pd.DataFrame() @@ -439,57 +559,64 @@ def render_insights_and_narrative(df: pd.DataFrame): else: st.info("No forgotten favorites identified.") -def main(): + +def main() -> None: st.set_page_config(page_title="Autobiographer", layout="wide") st.title("Autobiographer: Interactive Data Explorer") - + # Issue 21: Support custom data directory default_data_dir = os.getenv("AUTOBIO_LASTFM_DATA_DIR", "data") - + st.sidebar.header("Data Sources") data_dir = st.sidebar.text_input("Last.fm Data Directory", default_data_dir) - + if not os.path.exists(data_dir): st.error(f"Data directory '{data_dir}' not found.") return - + files = [f for f in os.listdir(data_dir) if f.endswith("_tracks.csv")] if not files: st.warning(f"No tracking data found in {data_dir}.") return - + selected_file = st.sidebar.selectbox("Select a data file", files) file_path = os.path.join(data_dir, selected_file) - + # Issue 20: Support Swarm data directory default_swarm_dir = os.getenv("AUTOBIO_SWARM_DIR", "") swarm_dir = st.sidebar.text_input("Swarm Data Directory (Optional)", default_swarm_dir) - + # Issue 39: Runtime Assumptions default_assumptions_path = os.getenv("AUTOBIO_ASSUMPTIONS_FILE", "default_assumptions.json") - assumptions_path = st.sidebar.text_input("Location Assumptions File (JSON)", default_assumptions_path) + assumptions_path = st.sidebar.text_input( + "Location Assumptions File (JSON)", default_assumptions_path + ) assumptions = load_assumptions(assumptions_path) - + # Issue 37: Data Caching st.sidebar.header("Cache Management") cache_key = get_cache_key(file_path, swarm_dir, assumptions_path) df = get_cached_data(cache_key) - + if df is None: df = load_listening_data(file_path) if df is not None: # Apply Swarm offsets and assumptions with st.spinner("Adjusting timezones and geocoding..."): - swarm_df = load_swarm_data(swarm_dir) if swarm_dir and os.path.exists(swarm_dir) else pd.DataFrame() + swarm_df = ( + load_swarm_data(swarm_dir) + if swarm_dir and os.path.exists(swarm_dir) + else pd.DataFrame() + ) df = apply_swarm_offsets(df, swarm_df, assumptions) - + if not swarm_df.empty: st.sidebar.success(f"Applied offsets from {len(swarm_df)} checkins.") elif os.path.exists(assumptions_path): st.sidebar.info("Applied location assumptions from file.") else: st.sidebar.warning("No Swarm or assumptions found; using Reykjavik default.") - + # Save to cache save_to_cache(df, cache_key) st.sidebar.info("Data processed and cached locally.") @@ -500,46 +627,48 @@ def main(): cache_dir = "data/cache" if os.path.exists(cache_dir): import shutil + shutil.rmtree(cache_dir) st.sidebar.success("Cache cleared!") st.rerun() - + if df is not None: # Global Filters st.sidebar.header("Global Filters") - min_date = df['date_text'].min().date() - max_date = df['date_text'].max().date() + min_date = df["date_text"].min().date() + max_date = df["date_text"].max().date() date_range = st.sidebar.date_input("Filter by Date Range", [min_date, max_date]) if len(date_range) == 2: - df = df[(df['date_text'].dt.date >= date_range[0]) & - (df['date_text'].dt.date <= date_range[1])] + df = df[ + (df["date_text"].dt.date >= date_range[0]) + & (df["date_text"].dt.date <= date_range[1]) + ] if df.empty: st.warning("No data found for the selected date range.") return tabs = st.tabs(["Overview", "Timeline", "Spatial", "Insights & Narrative"]) - + with tabs[0]: col1, col2, col3 = st.columns(3) col1.metric("Total Tracks", len(df)) - col2.metric("Unique Artists", df['artist'].nunique()) - col3.metric("Unique Albums", df['album'].nunique()) + col2.metric("Unique Artists", df["artist"].nunique()) + col3.metric("Unique Albums", df["album"].nunique()) render_top_charts(df) - + with tabs[1]: render_timeline_analysis(df) - + with tabs[2]: render_spatial_analysis(df) - + with tabs[3]: render_insights_and_narrative(df) else: st.error("Failed to load data.") + if __name__ == "__main__": main() - -