-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtag_fixer.py
More file actions
428 lines (380 loc) · 14.7 KB
/
tag_fixer.py
File metadata and controls
428 lines (380 loc) · 14.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
import os
import sys
import argparse
from pathlib import Path
from dataclasses import dataclass, field
from typing import Iterable, Callable, List, Optional
import pkgutil
import importlib
from mutagen import File as MutagenFile
from utils.audio_metadata_reader import read_tags
from utils.path_helpers import ensure_long_path
from plugins.base import MetadataPlugin
import sqlite3
from crash_watcher import record_event
from crash_logger import watcher
# ─── Database Helpers ─────────────────────────────────────────────────────
def init_db(path: str):
"""Create the SQLite database schema if needed."""
db_folder = os.path.dirname(path)
os.makedirs(db_folder, exist_ok=True)
conn = sqlite3.connect(path)
c = conn.cursor()
c.execute(
"""
CREATE TABLE IF NOT EXISTS files (
path TEXT PRIMARY KEY,
status TEXT,
score REAL,
old_artist TEXT, new_artist TEXT,
old_title TEXT, new_title TEXT,
old_album TEXT, new_album TEXT,
old_genres TEXT, new_genres TEXT
);
"""
)
c.execute("CREATE INDEX IF NOT EXISTS idx_status ON files(status);")
c.execute(
"""
CREATE TABLE IF NOT EXISTS fingerprints (
path TEXT PRIMARY KEY,
duration INT,
fingerprint TEXT
);
"""
)
conn.commit()
conn.close()
# ─── Configuration ────────────────────────────────────────────────────────
ACOUSTID_API_KEY = "eBOqCZhyAx"
ACOUSTID_APP_NAME = "SoundVaultTagFixer"
ACOUSTID_APP_VERSION = "1.0.0"
SUPPORTED_EXTS = {".mp3", ".flac", ".m4a", ".aac", ".ogg", ".wav", ".opus"}
# ─── Plugin Discovery ─────────────────────────────────────────────────────
PLUGINS: List[MetadataPlugin] = []
for finder, name, _ in pkgutil.iter_modules(['plugins']):
try:
module = importlib.import_module(f'plugins.{name}')
except Exception:
continue
for attr in dir(module):
obj = getattr(module, attr)
if isinstance(obj, type) and issubclass(obj, MetadataPlugin) and obj is not MetadataPlugin:
PLUGINS.append(obj())
# ─── Data Classes ────────────────────────────────────────────────────────
@dataclass
class FileRecord:
"""Representation of a single audio file and proposed tags."""
path: Path
status: str
score: Optional[float] = None
old_artist: Optional[str] = None
new_artist: Optional[str] = None
old_title: Optional[str] = None
new_title: Optional[str] = None
old_album: Optional[str] = None
new_album: Optional[str] = None
old_genres: List[str] = field(default_factory=list)
new_genres: List[str] = field(default_factory=list)
# Score thresholds
MIN_AUTOMATIC_SCORE = 0.90 # ≥90% → apply automatically
MIN_INTERACTIVE_SCORE = 0.75 # ≥75% & <90% → prompt user
# ─── Utility Functions ────────────────────────────────────────────────────
def is_remix(audio_path):
"""Return True if filename or existing title suggests a remix."""
if "remix" in os.path.basename(audio_path).lower():
return True
tags = read_tags(ensure_long_path(audio_path))
title = tags.get("title")
if isinstance(title, str) and "remix" in title.lower():
return True
return False
def find_files(root):
"""Return a list of audio files under `root` (file or directory)."""
if os.path.isfile(root):
return [root]
audio_files = []
for dirpath, _, files in os.walk(root):
for fname in files:
if os.path.splitext(fname)[1].lower() in SUPPORTED_EXTS:
audio_files.append(os.path.join(dirpath, fname))
return audio_files
@watcher.traced
def update_tags(path: str, proposal: FileRecord, fields: List[str], log_callback):
"""Write selected tags from ``proposal`` into ``path``. Return True if saved."""
record_event(f"tag_fixer: updating tags for {path}")
try:
audio = MutagenFile(ensure_long_path(path), easy=True)
except Exception as e:
log_callback(f"Failed to read {path}: {e}")
return False
if audio is None:
return False
changed = False
if "artist" in fields and proposal.new_artist is not None and proposal.new_artist != proposal.old_artist:
audio["artist"] = [proposal.new_artist]
changed = True
if "title" in fields and proposal.new_title is not None and proposal.new_title != proposal.old_title:
audio["title"] = [proposal.new_title]
changed = True
if "album" in fields and proposal.new_album is not None and proposal.new_album != proposal.old_album:
audio["album"] = [proposal.new_album]
changed = True
if "genres" in fields:
existing = audio.tags.get("genre", []) if audio.tags else []
old = proposal.old_genres or existing
new = proposal.new_genres or []
merged = []
seen: set[str] = set()
for g in list(old) + list(new):
if g not in seen:
merged.append(g)
seen.add(g)
if merged != existing:
audio["genre"] = ["; ".join(merged)]
changed = True
if changed:
try:
audio.save()
log_callback(f"Updated tags for {path}")
record_event(f"tag_fixer: updated tags for {path}")
return True
except Exception as e:
log_callback(f"Failed to save {path}: {e}")
record_event(f"tag_fixer: failed to save {path}")
return False
def prompt_user_about_tags(f, old_artist, old_title, new_tags):
"""Show side-by-side old vs new and ask Y/N."""
new_artist = new_tags["artist"]
new_title = new_tags["title"]
print(f"\nFile: {f}")
print(f"{'Field':10} │ {'Current':30} │ {'New from AcoustID':30}")
print("-" * 75)
print(f"{'Artist':10} │ {old_artist or '—':30} │ {new_artist:30}")
print(f"{'Title':10} │ {old_title or '—':30} │ {new_title:30}")
print()
resp = input("Apply these changes? [y/N]: ").strip().lower()
return (resp == "y")
# ─── Main Tag-Fixing Logic ────────────────────────────────────────────────
@watcher.traced
def build_file_records(
root: str,
*,
db_conn: sqlite3.Connection,
show_all: bool = False,
log_callback: Callable[[str], None] | None = None,
progress_callback: Callable[[int], None] | None = None,
) -> List[FileRecord]:
"""Return a list of ``FileRecord`` objects for ``root``."""
record_event(f"tag_fixer: building records for {root}")
if log_callback is None:
def log_callback(msg: str):
print(msg)
records: List[FileRecord] = []
for plugin in PLUGINS:
checker = getattr(plugin, "check_connection", None)
if callable(checker):
try:
if not checker():
log_callback("⚠ Cannot reach AcoustID service \u2013 genre lookups will be skipped")
except Exception:
log_callback("⚠ Cannot reach AcoustID service \u2013 genre lookups will be skipped")
break
existing_status = dict(db_conn.execute("SELECT path, status FROM files"))
files = find_files(root)
for idx, f in enumerate(files, start=1):
status = existing_status.get(f, "new")
if status == "applied" and not show_all:
if progress_callback:
progress_callback(idx)
continue
if is_remix(f) and not show_all:
if progress_callback:
progress_callback(idx)
continue
log_callback(f"Processing {f}")
tags = read_tags(ensure_long_path(f))
old_artist = tags.get("artist")
old_title = tags.get("title")
old_album = tags.get("album")
raw_genres = tags.get("genre")
if raw_genres in (None, ""):
old_genres = []
elif isinstance(raw_genres, (list, tuple)):
old_genres = [str(v) for v in raw_genres if isinstance(v, str)]
else:
old_genres = [str(raw_genres)]
rec = FileRecord(
path=Path(f),
status=status,
score=None,
old_artist=old_artist,
new_artist=None,
old_title=old_title,
new_title=None,
old_album=old_album,
new_album=None,
old_genres=old_genres,
new_genres=[],
)
best_result = None
best_score = 0.0
merged_genres: set[str] = set()
for plugin in PLUGINS:
result = plugin.identify(f)
if not result:
continue
if result.get('genres'):
merged_genres.update(result['genres'])
score = result.get('score', 0)
if score > best_score:
best_score = score
best_result = result
if best_result and best_score >= MIN_INTERACTIVE_SCORE:
rec.new_artist = best_result.get('artist', rec.new_artist)
rec.new_title = best_result.get('title', rec.new_title)
rec.new_album = best_result.get('album', rec.new_album)
if merged_genres:
rec.new_genres = list(merged_genres)
rec.score = best_score
if progress_callback:
progress_callback(idx)
if rec.new_artist is None and rec.new_title is None and rec.new_album is None and not rec.new_genres:
rec.status = "unmatched"
else:
all_match = (
rec.old_artist == rec.new_artist
and rec.old_title == rec.new_title
and rec.old_album == rec.new_album
and sorted(rec.old_genres or []) == sorted(rec.new_genres or [])
)
if all_match:
rec.status = "no_diff"
records.append(rec)
genres_old = ";".join(rec.old_genres)
genres_new = ";".join(rec.new_genres)
vals = (
str(rec.path), rec.status, rec.score,
rec.old_artist, rec.new_artist,
rec.old_title, rec.new_title,
rec.old_album, rec.new_album,
genres_old, genres_new,
)
db_conn.execute(
"INSERT OR REPLACE INTO files VALUES (?,?,?,?,?,?,?,?,?,?,?)",
vals,
)
db_conn.commit()
record_event(
f"tag_fixer: built {len(records)} records for {root}"
)
return records
@watcher.traced
def apply_tag_proposals(
selected: Iterable[FileRecord],
*,
fields: List[str] | None = None,
log_callback: Callable[[str], None] | None = None,
) -> int:
"""Apply ``selected`` proposals and return number of files updated."""
selected = list(selected)
record_event(f"tag_fixer: applying {len(selected)} proposals")
if log_callback is None:
def log_callback(msg: str):
print(msg)
if fields is None:
fields = ["artist", "title"]
updated = 0
for p in selected:
if update_tags(str(p.path), p, fields, log_callback):
updated += 1
record_event(f"tag_fixer: applied {updated} proposals")
return updated
@watcher.traced
def fix_tags(target, log_callback=None, interactive=False):
"""Fill missing tags for files in target using AcoustID."""
if log_callback is None:
def log_callback(msg):
print(msg)
record_event(f"tag_fixer: fix_tags starting on {target}")
files = find_files(target)
if not files:
log_callback("No audio files found.")
record_event("tag_fixer: no audio files found")
return {"processed": 0, "updated": 0}
base_folder = target if os.path.isdir(target) else os.path.dirname(target)
docs_dir = os.path.join(base_folder, "Docs")
os.makedirs(docs_dir, exist_ok=True)
db_path = os.path.join(docs_dir, ".soundvault.db")
init_db(db_path)
db_folder = os.path.dirname(db_path)
os.makedirs(db_folder, exist_ok=True)
db_conn = sqlite3.connect(db_path)
records = build_file_records(
target,
db_conn=db_conn,
show_all=False,
log_callback=log_callback,
)
db_conn.commit()
db_conn.close()
selected = [p for p in records if p.status != "no_diff"]
if interactive:
selected = []
for p in [r for r in records if r.status != "no_diff"]:
apply_change = prompt_user_about_tags(
str(p.path),
p.old_artist,
p.old_title,
{"artist": p.new_artist, "title": p.new_title}
)
if apply_change:
selected.append(p)
updated = apply_tag_proposals(
selected,
fields=["artist", "title"],
log_callback=log_callback,
)
db_folder = os.path.dirname(db_path)
os.makedirs(db_folder, exist_ok=True)
db_conn = sqlite3.connect(db_path)
selected_set = {rec.path for rec in selected}
for rec in records:
if rec.path in selected_set:
status = "applied"
elif rec.status == "no_diff":
status = "no_diff"
else:
status = "skipped"
rec.status = status
db_conn.execute(
"UPDATE files SET status=? WHERE path=?",
(status, str(rec.path)),
)
db_conn.commit()
db_conn.close()
result = {"processed": len(files), "updated": updated}
record_event(
f"tag_fixer: fix_tags finished processed={len(files)} updated={updated}"
)
return result
# ─── CLI Entry Point ─────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(
description="Fill missing audio tags using AcoustID"
)
parser.add_argument("target", help="file or folder to process")
parser.add_argument(
"--interactive",
action="store_true",
help="show and confirm tag changes for medium-confidence matches",
)
args = parser.parse_args()
try:
summary = fix_tags(args.target, interactive=args.interactive)
print(f"\nProcessed {summary['processed']} files, updated {summary['updated']}.")
except RuntimeError as e:
print(e)
sys.exit(1)
if __name__ == "__main__":
main()