1010import traceback
1111import time
1212
13+
1314# Read environment variables.
1415QUEUE_NAME = os .getenv ("REDIS_QUEUE_NAME" , "task_queue" )
1516MINIO_URL = os .getenv ("MINIO_URL" , "localhost:9000" )
@@ -40,16 +41,11 @@ def download_file(run_id, file_name, extension):
4041 """Downloads a file from MinIO storage."""
4142 BUCKET_NAME = "code"
4243 try :
43- minio_client = Minio (
44- MINIO_URL ,
45- access_key = MINIO_ACCESS_KEY ,
46- secret_key = MINIO_SECRET_KEY ,
47- secure = False ,
48- )
44+ minio_client = Minio (endpoint = MINIO_URL , access_key = MINIO_ACCESS_KEY , secret_key = MINIO_SECRET_KEY , secure = False )
4945 object_name = f"{ run_id } /{ file_name } .{ extension } "
5046 download_path = f"code/{ run_id } /{ file_name } .{ extension } "
5147 os .makedirs (os .path .dirname (download_path ), exist_ok = True )
52- minio_client .fget_object (BUCKET_NAME , object_name , download_path )
48+ minio_client .fget_object (bucket_name = BUCKET_NAME , object_name = object_name , file_path = download_path )
5349 print (f"Successfully downloaded { object_name } to { download_path } " )
5450 return os .path .abspath (download_path )
5551 except S3Error as exc :
@@ -64,16 +60,10 @@ def upload_file(run_id, file_path):
6460 """Uploads a file to MinIO storage."""
6561 BUCKET_NAME = "code"
6662 try :
67- minio_client = Minio (
68- MINIO_URL ,
69- access_key = MINIO_ACCESS_KEY ,
70- secret_key = MINIO_SECRET_KEY ,
71- secure = False ,
72- )
73- # Extract filename from the full path.
63+ minio_client = Minio (endpoint = MINIO_URL , access_key = MINIO_ACCESS_KEY , secret_key = MINIO_SECRET_KEY , secure = False )
7464 file_name_with_ext = os .path .basename (file_path )
7565 object_name = f"{ run_id } /{ file_name_with_ext } "
76- minio_client .fput_object (BUCKET_NAME , object_name , file_path )
66+ minio_client .fput_object (bucket_name = BUCKET_NAME , object_name = object_name , file_path = file_path )
7767 print (f"Successfully uploaded { file_path } as { object_name } " )
7868 except S3Error as exc :
7969 print (f"Failed to upload { file_path } : { exc } " )
@@ -305,10 +295,11 @@ def process_message(body):
305295
306296 # Execute code.
307297 command = []
308- if runType == " ml" :
298+ if runType in [ "bo" , " ml"] :
309299 command = ["python" , local_file_path ]
310300 else :
311301 command = ["python" , "-m" , "scoop" , local_file_path ]
302+
312303 timeout_sec = 3600
313304 print (f"Running command: { ' ' .join (command )} in { file_parent_dir } " )
314305 print (f"Timeout: { timeout_sec } seconds" )
0 commit comments