-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathFileRenamer.py
More file actions
520 lines (408 loc) · 17.9 KB
/
FileRenamer.py
File metadata and controls
520 lines (408 loc) · 17.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
import argparse
import os
import re
import string
import json
import glob
import requests
import shutil
import logging
import sys
import FileRenamerConfig as config
class SkipDryRunFilter(logging.Filter):
def filter(self, record):
return not getattr(record, 'dryrun', False)
class Summary:
def __init__(self):
self.total_files = 0
self.renamed = 0
self.skipped = 0
self.errors = 0
def report(self):
logging.info("=== Summary Report ===")
logging.info(f"Total files processed: {self.total_files}")
logging.info(f"Files renamed: {self.renamed}")
logging.info(f"Files skipped: {self.skipped}")
logging.info(f"Errors encountered: {self.errors}")
def parse_args():
parser = argparse.ArgumentParser(description="Rename files from Stash metadata and create accompanying NFO files")
parser.add_argument("--indir", default="./", help="Directory containing files to process")
parser.add_argument("--outdir", default="./", help="Base output directory for renamed files") # ✅ Add this line
parser.add_argument("--mask", default="*", help="File mask to process")
parser.add_argument("--extra", action="store_true", help="Also write JPG and NFO files")
parser.add_argument("--dryrun", action="store_true", help="Preview changes without moving files")
parser.add_argument("--verbose", action="store_true", help="Enable verbose logging")
parser.add_argument("--sceneroot", default=config.scene_root, help="Root directory for scene files")
parser.add_argument("--galleryroot", default=config.gallery_root, help="Root directory for gallery files")
return parser.parse_args()
def ensure_directories(args):
for path in [args.sceneroot, args.galleryroot]:
if not os.path.exists(path):
try:
os.makedirs(path)
logging.info(f"Created missing directory: {path}")
except Exception as e:
logging.error(f"Failed to create directory {path}: {e}")
def setup_logging(verbose):
level = logging.DEBUG if verbose else logging.INFO
handlers = [logging.StreamHandler(sys.stdout)]
if config.logfile_path:
try:
file_handler = logging.FileHandler(config.logfile_path, mode='a', encoding='utf-8')
file_handler.setLevel(logging.INFO)
file_handler.setFormatter(logging.Formatter('%(asctime)s - %(levelname)s: %(message)s'))
# ⛔️ Skip dryrun entries in the file log
file_handler.addFilter(SkipDryRunFilter())
handlers.append(file_handler)
except Exception as e:
print(f"⚠️ Failed to set up file logging: {e}")
logging.basicConfig(level=level, format='%(levelname)s: %(message)s', handlers=handlers)
def validate_config():
required = [config.server_ip, config.server_port]
if not all(required):
logging.error("Missing required config values: server_ip or server_port")
sys.exit(1)
config.server = build_server_url()
def build_server_url():
protocol = "https" if config.use_https else "http"
return f"{protocol}://{config.server_ip}:{config.server_port}"
def set_auth(server):
try:
r = requests.get(f"{server}/playground", verify=not config.ignore_ssl_warnings)
if r.history and r.history[-1].status_code == 302:
config.auth = "jwt"
jwt_auth(server)
elif r.status_code == 200:
config.auth = "none"
else:
config.auth = "basic"
except requests.RequestException as e:
logging.error(f"Failed to connect to server: {e}")
sys.exit(1)
def jwt_auth(server):
try:
response = requests.post(f"{server}/login", data={'username': config.username, 'password': config.password}, verify=not config.ignore_ssl_warnings)
token = response.cookies.get('session')
if not token:
logging.error("JWT authentication failed")
sys.exit(1)
config.headers['Authorization'] = f"Bearer {token}"
except requests.RequestException as e:
logging.error(f"JWT auth error: {e}")
sys.exit(1)
def get_file_list(indir, mask):
pattern = os.path.join(indir, mask)
return [f for f in glob.glob(pattern) if os.path.isfile(f)]
def call_graphql(query):
try:
response = requests.post(f"{config.server}/graphql", json={'query': query}, headers=config.headers, verify=not config.ignore_ssl_warnings)
response.raise_for_status()
return response.json()
except requests.RequestException as e:
logging.error(f"GraphQL query failed: {e}")
return {}
def fetch_metadata(basename):
query = config.file_query.replace("<FILENAME>", basename)
result = call_graphql(query)
if not result or not isinstance(result, dict):
logging.error(f"No result returned for query: {basename}")
return {}
logging.debug(f"GraphQL response for {basename}:\n{json.dumps(result, indent=2)}")
return result
def should_process(scene_data):
return scene_data and scene_data.get("studio")
def get_parental_path(studioid):
basequery = """
query {
findStudio(id: "<STUDIONUM>") {
id
name
parent_studio {
id
}
}
}
"""
studiolist = {}
counter = 0
while studioid:
query = basequery.replace("<STUDIONUM>", studioid)
result = call_graphql(query)
if not isinstance(result, dict):
logging.error(f"Invalid response type for studio ID {studioid}: {type(result)}")
break
data = result.get('data')
if not isinstance(data, dict):
logging.error(f"No 'data' field in response for studio ID {studioid}")
break
studio = data.get('findStudio')
if not isinstance(studio, dict):
logging.warning(f"Studio ID {studioid} not found or returned null.")
break
name = studio.get('name', f"UnknownStudio_{studioid}")
studiolist[counter] = name
parent = studio.get('parent_studio', None)
if parent is None:
logging.debug(f"Studio ID {studioid} has no parent. Ending path trace.")
break
if not isinstance(parent, dict):
logging.warning(f"Unexpected parent_studio format for studio ID {studioid}: {type(parent)}")
break
studioid = parent.get('id')
if not studioid:
logging.debug("Parent studio has no ID. Ending path trace.")
break
counter += 1
if not studiolist:
studiolist[0] = "Uncategorized"
return studiolist
def truncate_string(s, max_length=50):
if len(s) <= max_length:
return s
for sep in ['-', '_']:
pos = s[:max_length].rfind(sep)
if pos != -1:
return s[:pos]
return s[:max_length]
def build_output_path(filedata, args):
path = args.outdir
if ".zip" in filedata['extension'].lower():
path = args.galleryroot
else:
path = args.sceneroot
studiolist = filedata.get('studiolist')
if not isinstance(studiolist, dict) or not studiolist:
logging.warning(f"No valid studio path found for {filedata['filename']}. Using fallback.")
studiolist = {0: "Uncategorized"}
for i in reversed(sorted(studiolist.keys())):
studiopath = re.sub(r'[^-a-zA-Z0-9_.() ]+', '', studiolist[i]).strip()
path = os.path.join(path, studiopath.title())
try:
os.makedirs(path, exist_ok=True)
except Exception as e:
logging.error(f"Failed to create directory {path}: {e}")
path = args.outdir # fallback to base output
return path
def normalize_string(s):
return re.sub(r'[^a-zA-Z0-9]+', '', s).lower()
def format_filename(filedata, args):
data = filedata['jsondata']
# Performers
performers = ", ".join([p['name'] for p in data.get('performers', [])[:3]])
performer_str = f"({performers})" if performers else ""
# Tags
tags = ", ".join([t['name'] for t in data.get('tags', [])])
# Dimensions
file_info = data.get('files', [{}])[0]
width = file_info.get('width')
height = file_info.get('height')
dimensions = f"[{width}x{height}]" if width and height else ""
# Title and Code
title = re.sub(r'[^-a-zA-Z0-9_.()\[\]\' ,]+', ' ', data.get('title', 'Untitled')).strip().title()
title = truncate_string(title, 100)
code = truncate_string(data.get('code', ''), 50)
normalized_title = normalize_string(title)
normalized_code = normalize_string(code)
# Check for fallback ID in original filename
fallback_id_match = re.search(r'\[(\d+)\]', filedata['basename'])
fallback_id = fallback_id_match.group(1) if fallback_id_match else ""
# Determine name format
if normalized_title == normalized_code:
if args.dryrun:
if fallback_id:
logging.info(f"[DRY-RUN] STUDIOID removed from filename due to equality with TITLE in Stash data for: {filedata['filename']}. Using Fallback ID of [{fallback_id}] Instead.", extra={'dryrun': True})
else:
logging.info(f"[DRY-RUN] STUDIOID removed from filename due to equality with TITLE in Stash data for: {filedata['filename']}", extra={'dryrun': True})
else:
if fallback_id:
logging.info(f"STUDIOID removed from filename due to equality with TITLE in Stash data for: {filedata['filename']}. Using Fallback ID of [{fallback_id}] Instead.")
else:
logging.info(f"STUDIOID removed from filename due to equality with TITLE in Stash data for: {filedata['filename']}")
if fallback_id:
name = config.name_format.replace("<STUDIOID>", fallback_id)
else:
name = config.name_format.replace(" [<STUDIOID>]", "")
else:
name = config.name_format.replace("<STUDIOID>", code)
# Studio and Parent Studio
studio = data.get('studio')
studio_name = studio.get('name', 'UnknownStudio') if studio else 'UnknownStudio'
parent = studio.get('parent_studio') if studio else None
parent_name = parent.get('name', studio_name) if isinstance(parent, dict) else studio_name
# Build filename
name = name.replace("<STUDIO>", studio_name.title())
name = name.replace("<PARENT>", parent_name.title())
name = name.replace("<TITLE>", string.capwords(title))
name = name.replace("<ID>", str(data.get('id', '')))
name = name.replace("<DATE>", data.get('date', ''))
name = name.replace("<PERFORMERS>", performer_str)
name = name.replace("<TAGS>", tags)
name = name.replace("<DIMENSIONS>", dimensions)
# Final cleanup: sanitize filename
name = re.sub(r'[<>:"/\\|?*\x00-\x1F]', '', name) # Remove illegal filesystem characters
name = re.sub(r'\s+', ' ', name).strip() # Collapse whitespace
name = re.sub(r'[^\w\-.,()\[\]\' ]+', '', name) # Remove any remaining unsafe characters
return name
def move_file(filedata, targetname, dry_run):
fullpath = filedata['output_path']
extension = filedata['extension']
target = os.path.join(fullpath, targetname + extension)
if dry_run:
logging.info(f"[DRY-RUN] Would move: {filedata['filename']} → {target}", extra={'dryrun': True})
else:
if os.path.exists(target):
logging.warning(f"Target file already exists: {target}. Skipping move.")
return None # or return original path if you prefer
logging.info(f"Moving: {filedata['filename']} → {target}")
shutil.move(filedata['filename'], target)
return os.path.join(fullpath, targetname)
def get_image(filedata):
url = filedata['jsondata'].get('paths', {}).get('screenshot')
if url:
try:
response = requests.get(url)
response.raise_for_status()
with open(filedata['fullpathname'] + ".jpg", "wb") as f:
f.write(response.content)
except requests.RequestException as e:
logging.warning(f"Image download failed: {e}")
else:
logging.info(f"No screenshot for {filedata['filename']}")
def generate_nfo(scene):
tags = ""
if config.create_collection_tags:
parent = scene['studio'].get('parent_studio', {}).get('name', scene['studio']['name'])
tags += f"<tag>Site: {scene['studio']['name']}</tag>\n"
tags += f"<tag>Studio: {parent}</tag>\n"
genres = "\n".join([
f"<genre>{t['name']}</genre>"
for t in scene['tags']
if t['id'] not in config.ignore_tags and "ambiguous" not in t['name'].lower()
])
performers = "\n".join([
f""" <actor>
<name>{p['name']}</name>
<role></role>
<order>{i}</order>
<thumb>{p['image_path']}</thumb>
</actor>""" for i, p in enumerate(scene['performers'])
])
thumbs = f"<thumb aspect='poster'>{scene['paths']['screenshot']}</thumb>"
fanart = f"<fanart><thumb>{scene['paths']['screenshot']}</thumb></fanart>"
rating = str(int(scene['rating']) * 2) if scene.get('rating') else ""
date = scene.get('date', "")
studio = scene['studio']['name']
title = scene.get('title', "Untitled")
plot = scene.get('details', "")
scene_id = scene['id']
return f"""<?xml version="1.0" encoding="UTF-8" standalone="yes" ?>
<movie>
<title>{title}</title>
<userrating>{rating}</userrating>
<plot>{plot}</plot>
<uniqueid type="stash">{scene_id}</uniqueid>
{tags}
<premiered>{date}</premiered>
<studio>{studio}</studio>
{performers}
{thumbs}
{fanart}
{genres}
</movie>
"""
def write_file(filename, content, use_utf=True):
encoding = "utf-8-sig" if use_utf else None
try:
with open(filename, "w", encoding=encoding) as f:
f.write(content)
logging.info(f"Wrote file: {filename}")
except Exception as e:
logging.error(f"Failed to write file {filename}: {e}")
def process_file(file, args):
safe = ']'
unsafe = ''.join(c for c in string.punctuation if c not in safe)
basename = os.path.splitext(os.path.basename(file))[0].strip().rstrip(unsafe)
part_match = re.search(r'(.*)-\d+$', basename)
if part_match and ".zip" in file:
basename = part_match.group(1)
logging.debug(f"Querying for: {basename}")
metadata = fetch_metadata(basename)
if not isinstance(metadata, dict):
logging.error(f"Metadata is not a dictionary for {basename}")
return "error"
data = metadata.get('data')
if not isinstance(data, dict):
logging.error(f"No 'data' field in GraphQL response for {basename}")
return "error"
find_scenes = data.get('findScenes')
if not isinstance(find_scenes, dict):
logging.error(f"'findScenes' field missing or invalid for {basename}")
return "error"
scenes = find_scenes.get('scenes')
if not isinstance(scenes, list) or not scenes:
logging.warning(f"No scenes found for {basename}")
return "skipped"
scene = scenes[0]
if not should_process(scene):
logging.warning(f"Scene data missing studio info: {basename}")
return "skipped"
if not isinstance(scene.get('files'), list) or len(scene['files']) != 1:
logging.warning(f"Multiple or missing files in Stash for {file}. Skipping.")
return "skipped"
studio = scene.get('studio')
studio_id = studio.get('id') if isinstance(studio, dict) else None
if not studio_id:
logging.warning(f"No studio ID found for {basename}. Skipping.")
return "skipped"
try:
filedata = {
'jsondata': scene,
'filename': file,
'basename': basename,
'extension': os.path.splitext(file)[-1],
'studiolist': get_parental_path(studio_id)
}
filedata['output_path'] = build_output_path(filedata, args)
targetname = format_filename(filedata, args)
filedata['fullpathname'] = move_file(filedata, targetname, args.dryrun)
if not filedata['fullpathname']:
if args.dryrun:
logging.info(f"[DRY-RUN] File \'{filedata['filename']}\' not moved due to existing target: {targetname}", extra={'dryrun': True})
else:
logging.warning(f"File \'{filedata['filename']}\' not moved due to existing target: {targetname}")
return "skipped"
if args.extra:
get_image(filedata)
nfo = generate_nfo(scene)
write_file(filedata['fullpathname'] + ".nfo", nfo, use_utf=True)
return "renamed"
except Exception as e:
logging.error(f"Unhandled error processing {file}: {e}")
logging.debug(f"Scene data: {json.dumps(scene, indent=2)}")
return "error"
def main():
args = parse_args()
setup_logging(args.verbose)
validate_config() # ✅ This must be called before anything uses config.server
ensure_directories(args)
summary = Summary()
files = get_file_list(args.indir, args.mask)
if not files:
logging.warning("No files found to process.")
return
for file in files:
summary.total_files += 1
try:
result = process_file(file, args)
if result == "renamed":
summary.renamed += 1
elif result == "skipped":
summary.skipped += 1
else:
summary.errors += 1
except Exception as e:
logging.error(f"Unhandled error processing {file}: {e}")
summary.errors += 1
summary.report()
if __name__ == "__main__":
main()