-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathplaylist_generator.py
More file actions
299 lines (253 loc) · 10.5 KB
/
playlist_generator.py
File metadata and controls
299 lines (253 loc) · 10.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
"""Playlist generation utilities with edge-case handling.
DEFAULT_EXTS defines which audio file extensions are included when building
playlists. Existing playlists are overwritten by default; pass
``overwrite=False`` to skip files that already exist.
"""
import os, hashlib, difflib, re
from collections import defaultdict
from crash_watcher import record_event
from crash_logger import watcher
# Default extensions for playlist generation
DEFAULT_EXTS = {".mp3", ".flac", ".wav", ".aac", ".m4a", ".opus"}
SKIP_SCAN_DIRS = {"trash", "docs", "not sorted", "playlists"}
_COPY_SUFFIX_RE = re.compile(r"(?:\s*-\s*copy|\s+copy)(?:\s*\d+)?$", re.IGNORECASE)
_NUMBER_SUFFIX_RE = re.compile(r"\s*[\(\[\{]\s*\d+\s*[\)\]\}]\s*$")
def _normalize_track_name(path: str) -> str:
base = os.path.splitext(os.path.basename(path))[0].lower()
base = base.replace("_", " ").replace(".", " ")
base = _COPY_SUFFIX_RE.sub("", base)
base = _NUMBER_SUFFIX_RE.sub("", base)
return re.sub(r"\s+", " ", base).strip()
def _tokenize_name(name: str) -> list[str]:
tokens = [tok for tok in re.split(r"[^a-z0-9]+", name) if tok]
return [tok for tok in tokens if not tok.isdigit()]
def _build_audio_index(root: str, valid_exts: set[str]) -> tuple[dict[str, list[str]], dict[str, set[str]]]:
name_index: dict[str, list[str]] = defaultdict(list)
token_index: dict[str, set[str]] = defaultdict(set)
for dirpath, dirnames, filenames in os.walk(root):
rel_parts = set(os.path.relpath(dirpath, root).split(os.sep))
if rel_parts & SKIP_SCAN_DIRS:
dirnames[:] = []
continue
dirnames[:] = [d for d in dirnames if d.lower() not in SKIP_SCAN_DIRS]
for fname in filenames:
ext = os.path.splitext(fname)[1].lower()
if ext not in valid_exts:
continue
full_path = os.path.join(dirpath, fname)
norm_name = _normalize_track_name(full_path)
if not norm_name:
continue
name_index[norm_name].append(full_path)
for token in _tokenize_name(norm_name):
token_index[token].add(full_path)
return name_index, token_index
def _pick_best_match(
missing_path: str,
name_index: dict[str, list[str]],
token_index: dict[str, set[str]],
) -> str | None:
norm_missing = _normalize_track_name(missing_path)
if not norm_missing:
return None
tokens = set(_tokenize_name(norm_missing))
candidates = list(name_index.get(norm_missing, []))
if not candidates and tokens:
candidate_set: set[str] = set()
for token in tokens:
candidate_set.update(token_index.get(token, set()))
candidates = list(candidate_set)
if not candidates:
return None
orig_dir = os.path.dirname(missing_path)
orig_ext = os.path.splitext(missing_path)[1].lower()
def score(path: str) -> tuple[float, float]:
cand_norm = _normalize_track_name(path)
cand_tokens = set(_tokenize_name(cand_norm))
overlap = len(tokens & cand_tokens) / len(tokens) if tokens else 0.0
ratio = difflib.SequenceMatcher(None, norm_missing, cand_norm).ratio()
bonus = 0.0
if os.path.dirname(path) == orig_dir:
bonus += 0.2
if orig_ext and os.path.splitext(path)[1].lower() == orig_ext:
bonus += 0.1
return overlap + ratio + bonus, overlap
best_path = None
best_score = 0.0
best_overlap = 0.0
for path in candidates:
total_score, overlap = score(path)
if total_score > best_score:
best_score = total_score
best_overlap = overlap
best_path = path
if best_path is None:
return None
if best_overlap < 0.4 and best_score < 1.0:
return None
return best_path
DEFAULT_EXTS = {".mp3", ".flac", ".wav", ".aac", ".m4a", ".ogg", ".opus"}
def _sanitize_name(rel_path, existing):
"""Sanitize a relative path for playlist filename.
Replaces path separators with underscores and appends a short hash if
the sanitized name already exists in ``existing``.
"""
base = rel_path.replace(os.sep, "_") or "root"
if base not in existing:
return base
h = hashlib.md5(rel_path.encode("utf-8")).hexdigest()[:6]
return f"{base}_{h}"
@watcher.traced
def generate_playlists(
moves,
root_path,
output_dir=None,
valid_exts=None,
overwrite=True,
log_callback=None,
):
"""Generate M3U playlists based on move mapping.
Parameters
----------
moves : dict
Mapping of original file paths to their new locations.
root_path : str
Root of the music library.
output_dir : str or None
Directory to write playlists into. Defaults to ``root_path/Playlists``.
valid_exts : set or None
Extensions to include. Defaults to ``DEFAULT_EXTS``.
overwrite : bool
Whether to overwrite existing playlists.
log_callback : callable
Function for logging messages.
"""
if log_callback is None:
log_callback = lambda msg: None
valid_exts = {e.lower() for e in (valid_exts or DEFAULT_EXTS)}
playlists_dir = output_dir or os.path.join(root_path, "Playlists")
os.makedirs(playlists_dir, exist_ok=True)
record_event(f"playlist_generator: generating playlists in {playlists_dir}")
dir_map = defaultdict(list)
for old, new in moves.items():
ext = os.path.splitext(new)[1].lower()
if ext in valid_exts:
dir_map[os.path.dirname(old)].append(new)
used = set()
for old_dir, files in dir_map.items():
if not files:
log_callback(f"\u26A0 Skipping empty folder: {old_dir}")
continue
rel = os.path.relpath(old_dir, root_path)
name = _sanitize_name(rel, used)
used.add(name)
playlist_file = os.path.join(playlists_dir, f"{name}.m3u")
rel_files = {os.path.relpath(p, playlists_dir) for p in files}
if os.path.exists(playlist_file):
if not overwrite:
try:
with open(playlist_file, "r", encoding="utf-8") as f:
existing = {line.strip() for line in f if line.strip()}
except Exception as e:
log_callback(f"\u2717 Failed to read {playlist_file}: {e}")
existing = set()
rel_files.update(existing)
log_callback(f"\u2192 Writing playlist: {playlist_file}")
else:
log_callback(f"\u2192 Writing playlist: {playlist_file}")
try:
with open(playlist_file, "w", encoding="utf-8") as f:
for p in sorted(rel_files):
f.write(p + "\n")
except Exception as e:
log_callback(f"\u2717 Failed to write {playlist_file}: {e}")
record_event("playlist_generator: playlist generation complete")
def write_playlist(tracks, outfile):
"""Write a simple M3U playlist from ``tracks`` to ``outfile``."""
os.makedirs(os.path.dirname(outfile), exist_ok=True)
try:
with open(outfile, "w", encoding="utf-8") as f:
for p in tracks:
f.write(os.path.relpath(p, os.path.dirname(outfile)) + "\n")
except Exception as e:
raise RuntimeError(f"Failed to write playlist {outfile}: {e}")
@watcher.traced
def update_playlists(changes):
"""Update ``.m3u`` playlists based on moved or deleted tracks.
``changes`` may be either an iterable of new track paths or a mapping of
``old_path -> new_path``. A ``None`` value means the old path was removed
without a replacement.
"""
if not changes:
return
if isinstance(changes, dict):
move_map = dict(changes)
else:
move_map = {p: p for p in changes}
normalized_map = {
os.path.normcase(os.path.normpath(old)): new for old, new in move_map.items()
}
all_paths = list(move_map.keys()) + [p for p in move_map.values() if p]
root = os.path.commonpath(all_paths) if all_paths else None
if not root:
return
playlists_dir = os.path.join(root, "Playlists")
if not os.path.isdir(playlists_dir):
return
record_event("playlist_generator: updating existing playlists")
name_index = None
token_index = None
valid_exts = {e.lower() for e in DEFAULT_EXTS}
for dirpath, _dirs, files in os.walk(playlists_dir):
for fname in files:
if not fname.lower().endswith((".m3u", ".m3u8")):
continue
pl_path = os.path.join(dirpath, fname)
try:
with open(pl_path, "r", encoding="utf-8") as f:
lines = [ln.rstrip("\n") for ln in f]
except Exception:
continue
changed = False
new_lines = []
for ln in lines:
if ln.startswith("#"):
new_lines.append(ln)
continue
abs_line = os.path.normcase(os.path.normpath(os.path.join(dirpath, ln)))
if abs_line in normalized_map:
new = normalized_map[abs_line]
if new is None:
changed = True
continue
rel_new = os.path.relpath(new, dirpath)
if rel_new != ln:
changed = True
new_lines.append(rel_new)
else:
if not os.path.exists(abs_line):
if name_index is None or token_index is None:
name_index, token_index = _build_audio_index(root, valid_exts)
repaired = _pick_best_match(abs_line, name_index, token_index)
if repaired:
rel_repaired = os.path.relpath(repaired, dirpath)
new_lines.append(rel_repaired)
changed = True
continue
new_lines.append(ln)
if changed:
try:
with open(pl_path, "w", encoding="utf-8") as f:
for l in new_lines:
f.write(l + "\n")
except Exception:
pass
new_tracks = [p for p in move_map.values() if p]
if new_tracks:
generate_playlists(
{p: p for p in new_tracks},
root,
overwrite=False,
log_callback=lambda m: None,
)
record_event("playlist_generator: playlist update complete")