From 892c52b8df66ca2341e826558b3a395712d85095 Mon Sep 17 00:00:00 2001 From: Karantechie Date: Thu, 17 Oct 2024 23:14:09 +0530 Subject: [PATCH] Update run_task.py --- app/tasks/run_task.py | 51 ++++++++++++++++++++++++++++++++++++++----- 1 file changed, 46 insertions(+), 5 deletions(-) diff --git a/app/tasks/run_task.py b/app/tasks/run_task.py index 266bc48..d55d818 100644 --- a/app/tasks/run_task.py +++ b/app/tasks/run_task.py @@ -10,13 +10,54 @@ @celery_app.task(bind=True, autoretry_for=()) def run_task(self, run_id: str, stream: bool = False): - logging.info(f"[run_task] [{run_id}] running at {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") + """ + Celery task to run a job using ThreadRunner. + + Args: + run_id (str): The unique identifier for the run. + stream (bool): Flag to enable streaming of events. Default is False. + """ + current_time = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') + logging.info(f"[run_task] [{run_id}] running at {current_time}") try: - ThreadRunner(run_id, session, stream).run() + # Execute the thread runner + execute_runner(run_id, stream) except Exception as e: - logging.exception(e) - StreamEventHandler(run_id=run_id, is_stream=True).pub_error(str(e)) - RunService.to_failed(session=session, run_id=run_id, last_error=e) + handle_exception(run_id, e) finally: + close_session() + + +def execute_runner(run_id: str, stream: bool): + """ + Initializes and runs the ThreadRunner. + + Args: + run_id (str): The unique identifier for the run. + stream (bool): Flag to enable streaming of events. + """ + ThreadRunner(run_id, session, stream).run() + + +def handle_exception(run_id: str, exception: Exception): + """ + Handles exceptions by logging and updating the run status. + + Args: + run_id (str): The unique identifier for the run. + exception (Exception): The exception to handle. + """ + logging.exception(f"[run_task] [{run_id}] Error: {exception}") + StreamEventHandler(run_id=run_id, is_stream=True).pub_error(str(exception)) + RunService.to_failed(session=session, run_id=run_id, last_error=exception) + + +def close_session(): + """ + Closes the database session. + """ + try: session.close() + except Exception as close_exception: + logging.warning(f"Failed to close session: {close_exception}")