-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathservices.py
More file actions
311 lines (259 loc) · 12.1 KB
/
services.py
File metadata and controls
311 lines (259 loc) · 12.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
import os
import gc
import uuid
import time
import threading
import torch
from flask import url_for
from utils import save_uploaded_file, cleanup_file
import utils as utils_module
# Default cleanup delay (seconds) — can be overridden with FILE_EXPIRY_SECONDS env var
CLEANUP_DELAY_SECONDS = int(os.getenv('FILE_EXPIRY_SECONDS', '900'))
class AudioConversionService:
"""Service for handling audio file conversions."""
def __init__(self, upload_folder, converted_folder):
self.upload_folder = upload_folder
self.converted_folder = converted_folder
def convert_file(self, audio_file, target_format):
"""Convert an uploaded audio file to the target format."""
input_path = None
output_path = None
try:
print("=== CONVERT_FILE METHOD CALLED ===")
# Validate input
if not audio_file or not hasattr(audio_file, 'filename') or not audio_file.filename:
print("Invalid file: audio_file is None or has no filename")
return {
'success': False,
'error': 'Invalid file'
}
# Validate target format
if target_format not in ['mp3', 'wav', 'flac']:
print(f"Invalid format: {target_format}")
return {
'success': False,
'error': f'Invalid format: {target_format}'
}
# Save the uploaded file
print(f"Saving uploaded file: {audio_file.filename}")
file_uuid, input_path = save_uploaded_file(audio_file, self.upload_folder)
print(f"File saved to: {input_path}")
# Verify the file was saved
if not os.path.exists(input_path):
print(f"File not saved: {input_path}")
return {
'success': False,
'error': 'Failed to save uploaded file'
}
# Create output path
original_filename = audio_file.filename
original_name = os.path.splitext(original_filename)[0]
output_filename = f"{original_name}.{target_format}" # User-friendly name
server_output_filename = f"{file_uuid}_{output_filename}" # Server storage name
output_path = os.path.join(self.converted_folder, server_output_filename)
print(f"Output path: {output_path}")
# Ensure the output directory exists
print(f"Ensuring output directory exists: {os.path.dirname(output_path)}")
os.makedirs(os.path.dirname(output_path), exist_ok=True)
# Convert the file using the utils conversion helper
print(f"Converting file from {input_path} to {output_path}")
if utils_module.convert_audio(input_path, output_path, target_format):
# Verify the output file was created
if not os.path.exists(output_path):
print(f"Output file not created: {output_path}")
return {
'success': False,
'error': 'Conversion completed but output file not found'
}
print(f"File converted successfully: {output_path}")
# Generate download URL - use a direct approach without url_for
try:
# Hardcode the URL path
download_url = f"/static/converted/{server_output_filename}"
print(f"Generated URL: {download_url}")
except Exception as url_error:
print(f"Error generating URL: {str(url_error)}")
import traceback
traceback.print_exc()
return {
'success': False,
'error': f'Error generating download URL: {str(url_error)}'
}
# Schedule cleanup (configurable delay)
delay = CLEANUP_DELAY_SECONDS
print(f"Scheduling cleanup for: {output_path} in {delay} seconds")
self._schedule_file_cleanup(output_path, delay)
return {
'success': True,
'download_url': download_url,
'filename': output_filename
}
else:
print("Conversion failed")
return {
'success': False,
'error': 'Conversion failed'
}
except Exception as e:
print(f"Conversion error: {str(e)}")
import traceback
traceback.print_exc()
return {
'success': False,
'error': f'Conversion error: {str(e)}'
}
finally:
# Clean up input file
if input_path and os.path.exists(input_path):
print(f"Cleaning up input file: {input_path}")
cleanup_file(input_path)
def _schedule_file_cleanup(self, file_path, delay_seconds):
"""Schedule a file for deletion after a delay."""
def delete_file():
time.sleep(delay_seconds)
cleanup_file(file_path)
cleanup_thread = threading.Thread(target=delete_file)
cleanup_thread.daemon = True
cleanup_thread.start()
class StemSeparationService:
"""Service for handling stem separation."""
def __init__(self, upload_folder, converted_folder):
self.upload_folder = upload_folder
self.converted_folder = converted_folder
def separate_stems(self, audio_file):
"""Separate an audio file into stems."""
import demucs.separate
input_path = None
try:
# Save the uploaded file
file_uuid, input_path = save_uploaded_file(audio_file, self.upload_folder)
output_dir = file_uuid + "_" + os.path.splitext(os.path.basename(input_path))[0]
# Clear memory
del audio_file
gc.collect()
# Configure demucs for separation
demucs.separate.main([
"--mp3",
"-n", "htdemucs",
"--segment", "7",
"-d", "cpu",
"--overlap", "0.1",
"--out", self.converted_folder,
input_path
])
# Generate URLs for stems
stem_paths = {}
source_stems = ['drums', 'bass', 'vocals', 'other']
display_stems = ['drums', 'bass', 'vocals', 'melody']
# Find output directory
possible_dirs = [
output_dir,
file_uuid,
os.path.splitext(os.path.basename(input_path))[0]
]
found_dir = None
for dir_name in possible_dirs:
check_path = os.path.join(self.converted_folder, 'htdemucs', dir_name)
if os.path.exists(check_path):
found_dir = dir_name
break
if not found_dir:
raise Exception("Output directory not found")
# Get stem URLs
for source_stem, display_stem in zip(source_stems, display_stems):
stem_filename = f"{source_stem}.mp3"
full_path = os.path.join(self.converted_folder, 'htdemucs', found_dir, stem_filename)
if os.path.exists(full_path):
relative_path = os.path.join('htdemucs', found_dir, stem_filename)
stem_paths[display_stem] = url_for('static', filename=f'converted/{relative_path}')
# Force garbage collection
gc.collect()
try:
if hasattr(torch, 'cuda') and torch.cuda.is_available():
torch.cuda.empty_cache()
except Exception as e:
print(f"Warning: Could not clear CUDA cache: {str(e)}")
return {
'success': True,
'stems': stem_paths,
'session_id': found_dir
}
except Exception as e:
print(f"Separation error: {str(e)}")
import traceback
print(f"Full error details: {traceback.format_exc()}")
return {
'success': False,
'error': 'Failed to process audio file. Please try again with a different file.'
}
finally:
# Clean up input file regardless of success or failure
if input_path and os.path.exists(input_path):
print(f"Cleaning up input file: {input_path}")
cleanup_file(input_path)
def cleanup_session(self, session_id):
"""Clean up stem separation session files."""
try:
print(f"=== CLEANUP_SESSION METHOD CALLED for session {session_id} ===")
output_dir = os.path.join(self.converted_folder, 'htdemucs', session_id)
print(f"Checking if directory exists: {output_dir}")
if os.path.exists(output_dir):
print(f"Directory exists, removing: {output_dir}")
import shutil
shutil.rmtree(output_dir)
print(f"Successfully removed directory: {output_dir}")
return {'success': True}
print(f"Directory does not exist: {output_dir}")
return {'success': True, 'message': 'Directory already cleaned'}
except Exception as e:
print(f"Cleanup error: {str(e)}")
import traceback
traceback.print_exc()
return {'success': False, 'error': str(e)}
# --- Convenience functional wrappers for minimal API ---
def analyze_audio(input_path):
"""Wrapper that analyzes an audio file and returns analysis results."""
from utils import analyze_audio_file
return analyze_audio_file(input_path)
def convert_audio(input_path, target_format, out_dir):
"""Convert an input file to target_format and place it in out_dir.
Returns the path to the converted file on success, raises on failure."""
import os
import uuid
from utils import convert_audio as _convert
base = os.path.splitext(os.path.basename(input_path))[0]
out_name = f"{base}_{uuid.uuid4().hex}.{target_format}"
out_path = os.path.join(out_dir, out_name)
success = _convert(input_path, out_path, target_format)
if not success:
raise RuntimeError(f"Conversion to {target_format} failed")
return out_path
def separate_audio(input_path, output_dir, model='htdemucs'):
"""Run the demucs CLI to separate stems and create a zip of resulting stems.
Returns path to the zip file.
Requires `demucs` be available in PATH and that the demucs model is installed.
"""
import subprocess
import os
import zipfile
# Normalize model (treat empty string as default)
model = model or 'htdemucs'
# Ensure output directory exists
os.makedirs(output_dir, exist_ok=True)
cmd = ['demucs', '-n', model, '-o', output_dir, '--segment', '7', '--overlap', '0.1', input_path]
try:
proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if proc.returncode != 0:
raise RuntimeError(f"Demucs failed (code {proc.returncode}): {proc.stderr.strip()}")
except FileNotFoundError:
raise RuntimeError("Demucs CLI not found. Please install demucs and ensure it's in PATH.")
# Zip any .wav/.mp3 files under output_dir
zip_path = os.path.join(output_dir, 'stems.zip')
with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zf:
for root, dirs, files in os.walk(output_dir):
for f in files:
if f.lower().endswith(('.wav', '.mp3')):
full = os.path.join(root, f)
arcname = os.path.relpath(full, output_dir)
zf.write(full, arcname)
return zip_path