From c8513e8eff66005a3f8ee2d79cc62989f99f0246 Mon Sep 17 00:00:00 2001 From: Forrest Collman Date: Wed, 10 Dec 2025 20:40:54 -0800 Subject: [PATCH 1/2] trying to fix beat schedule --- materializationengine/celery_worker.py | 40 ++++++++++++++++++++++++-- materializationengine/config.py | 10 ++++++- 2 files changed, 46 insertions(+), 4 deletions(-) diff --git a/materializationengine/celery_worker.py b/materializationengine/celery_worker.py index c5ea9590..4b595228 100644 --- a/materializationengine/celery_worker.py +++ b/materializationengine/celery_worker.py @@ -52,6 +52,11 @@ def create_celery(app=None): for logger_name in celery_loggers: logging.getLogger(logger_name).setLevel(log_level) + # Debug: Check if BEAT_SCHEDULES is in app.config + beat_schedules = app.config.get("BEAT_SCHEDULES", []) + celery_logger.info(f"BEAT_SCHEDULES from app.config: {beat_schedules}") + celery_logger.info(f"BEAT_SCHEDULES type: {type(beat_schedules)}, length: {len(beat_schedules) if isinstance(beat_schedules, (list, dict)) else 'N/A'}") + celery.conf.update( { "task_routes": ("materializationengine.task_router.TaskRouter"), @@ -70,11 +75,20 @@ def create_celery(app=None): "socket_timeout": 20, "socket_connect_timeout": 20, }, # timeout (s) for tasks to be sent back to broker queue - "beat_schedules": app.config["BEAT_SCHEDULES"], + "beat_schedules": beat_schedules, } ) celery.conf.update(app.config) + # Ensure beat_schedules is set correctly after update (in case app.config overwrote it) + # Use BEAT_SCHEDULES from app.config if beat_schedules is empty or missing + if not celery.conf.get("beat_schedules") and app.config.get("BEAT_SCHEDULES"): + celery.conf.beat_schedules = app.config["BEAT_SCHEDULES"] + celery_logger.info(f"Restored beat_schedules from BEAT_SCHEDULES: {len(app.config['BEAT_SCHEDULES'])} schedules") + + # Debug: Verify beat_schedules is in celery.conf after update + celery_logger.info(f"beat_schedules in celery.conf after update: {celery.conf.get('beat_schedules', 'NOT FOUND')}") + celery_logger.info(f"BEAT_SCHEDULES in celery.conf after update: {celery.conf.get('BEAT_SCHEDULES', 'NOT FOUND')}") TaskBase = celery.Task class ContextTask(TaskBase): @@ -87,6 +101,16 @@ def __call__(self, *args, **kwargs): celery.Task = ContextTask if os.environ.get("SLACK_WEBHOOK"): celery.Task.on_failure = post_to_slack_on_task_failure + + # Manually trigger setup_periodic_tasks to ensure it runs with the correct configuration + # This is a fallback in case the signal handler doesn't fire or fires before beat_schedules is set + try: + setup_periodic_tasks(celery) + celery_logger.info("Manually triggered setup_periodic_tasks after create_celery") + except Exception as e: + celery_logger.warning(f"Error manually triggering setup_periodic_tasks: {e}") + # Don't fail if this doesn't work - the signal handler should still work + return celery @@ -129,8 +153,18 @@ def setup_periodic_tasks(sender, **kwargs): name="Clean up back end results", ) - beat_schedules = celery.conf.get("beat_schedules", []) - celery_logger.info(beat_schedules) + # Try to get beat_schedules from celery.conf, fallback to BEAT_SCHEDULES if not found + beat_schedules = celery.conf.get("beat_schedules") + if not beat_schedules: + # Fallback: try to get from BEAT_SCHEDULES (uppercase) in celery.conf + beat_schedules = celery.conf.get("BEAT_SCHEDULES", []) + if beat_schedules: + celery_logger.info(f"Found BEAT_SCHEDULES (uppercase), converting to beat_schedules") + celery.conf.beat_schedules = beat_schedules + + celery_logger.info(f"beat_schedules from celery.conf: {beat_schedules}") + celery_logger.info(f"beat_schedules type: {type(beat_schedules)}, length: {len(beat_schedules) if isinstance(beat_schedules, (list, dict)) else 'N/A'}") + if not beat_schedules: celery_logger.info("No periodic tasks configured.") return diff --git a/materializationengine/config.py b/materializationengine/config.py index 6a939dab..c8c4ae3f 100644 --- a/materializationengine/config.py +++ b/materializationengine/config.py @@ -189,14 +189,22 @@ def configure_app(app: Flask) -> Flask: if "MATERIALIZATION_ENGINE_SETTINGS" in os.environ.keys(): app.config.from_envvar("MATERIALIZATION_ENGINE_SETTINGS") # instance-folders configuration + # Store BEAT_SCHEDULES before loading config file to see if it gets overwritten + beat_schedules_before = app.config.get("BEAT_SCHEDULES", "NOT_SET") app.config.from_pyfile("config.cfg", silent=True) - + beat_schedules_after = app.config.get("BEAT_SCHEDULES", "NOT_SET") + handler = logging.StreamHandler(sys.stdout) handler.setLevel(app.config["LOGGING_LEVEL"]) app.logger.removeHandler(default_handler) app.logger.addHandler(handler) app.logger.setLevel(app.config["LOGGING_LEVEL"]) app.logger.propagate = False + + # Log BEAT_SCHEDULES loading status + app.logger.info(f"BEAT_SCHEDULES before config.cfg: {beat_schedules_before}") + app.logger.info(f"BEAT_SCHEDULES after config.cfg: {beat_schedules_after}") + app.logger.info(f"BEAT_SCHEDULES type: {type(beat_schedules_after)}, length: {len(beat_schedules_after) if isinstance(beat_schedules_after, (list, dict)) else 'N/A'}") app.logger.debug(app.config) app.app_context().push() return app From 47d04d2744dd34a8da846818517b63ddbe8fdd3c Mon Sep 17 00:00:00 2001 From: Forrest Collman Date: Wed, 10 Dec 2025 20:51:55 -0800 Subject: [PATCH 2/2] switching to debug for print statements --- materializationengine/celery_worker.py | 25 ++++++++----------------- materializationengine/config.py | 8 ++++---- 2 files changed, 12 insertions(+), 21 deletions(-) diff --git a/materializationengine/celery_worker.py b/materializationengine/celery_worker.py index 4b595228..e8ce9bf4 100644 --- a/materializationengine/celery_worker.py +++ b/materializationengine/celery_worker.py @@ -54,8 +54,8 @@ def create_celery(app=None): # Debug: Check if BEAT_SCHEDULES is in app.config beat_schedules = app.config.get("BEAT_SCHEDULES", []) - celery_logger.info(f"BEAT_SCHEDULES from app.config: {beat_schedules}") - celery_logger.info(f"BEAT_SCHEDULES type: {type(beat_schedules)}, length: {len(beat_schedules) if isinstance(beat_schedules, (list, dict)) else 'N/A'}") + celery_logger.debug(f"BEAT_SCHEDULES from app.config: {beat_schedules}") + celery_logger.debug(f"BEAT_SCHEDULES type: {type(beat_schedules)}, length: {len(beat_schedules) if isinstance(beat_schedules, (list, dict)) else 'N/A'}") celery.conf.update( { @@ -84,11 +84,11 @@ def create_celery(app=None): # Use BEAT_SCHEDULES from app.config if beat_schedules is empty or missing if not celery.conf.get("beat_schedules") and app.config.get("BEAT_SCHEDULES"): celery.conf.beat_schedules = app.config["BEAT_SCHEDULES"] - celery_logger.info(f"Restored beat_schedules from BEAT_SCHEDULES: {len(app.config['BEAT_SCHEDULES'])} schedules") + celery_logger.debug(f"Restored beat_schedules from BEAT_SCHEDULES: {len(app.config['BEAT_SCHEDULES'])} schedules") # Debug: Verify beat_schedules is in celery.conf after update - celery_logger.info(f"beat_schedules in celery.conf after update: {celery.conf.get('beat_schedules', 'NOT FOUND')}") - celery_logger.info(f"BEAT_SCHEDULES in celery.conf after update: {celery.conf.get('BEAT_SCHEDULES', 'NOT FOUND')}") + celery_logger.debug(f"beat_schedules in celery.conf after update: {celery.conf.get('beat_schedules', 'NOT FOUND')}") + celery_logger.debug(f"BEAT_SCHEDULES in celery.conf after update: {celery.conf.get('BEAT_SCHEDULES', 'NOT FOUND')}") TaskBase = celery.Task class ContextTask(TaskBase): @@ -102,15 +102,6 @@ def __call__(self, *args, **kwargs): if os.environ.get("SLACK_WEBHOOK"): celery.Task.on_failure = post_to_slack_on_task_failure - # Manually trigger setup_periodic_tasks to ensure it runs with the correct configuration - # This is a fallback in case the signal handler doesn't fire or fires before beat_schedules is set - try: - setup_periodic_tasks(celery) - celery_logger.info("Manually triggered setup_periodic_tasks after create_celery") - except Exception as e: - celery_logger.warning(f"Error manually triggering setup_periodic_tasks: {e}") - # Don't fail if this doesn't work - the signal handler should still work - return celery @@ -159,11 +150,11 @@ def setup_periodic_tasks(sender, **kwargs): # Fallback: try to get from BEAT_SCHEDULES (uppercase) in celery.conf beat_schedules = celery.conf.get("BEAT_SCHEDULES", []) if beat_schedules: - celery_logger.info(f"Found BEAT_SCHEDULES (uppercase), converting to beat_schedules") + celery_logger.debug(f"Found BEAT_SCHEDULES (uppercase), converting to beat_schedules") celery.conf.beat_schedules = beat_schedules - celery_logger.info(f"beat_schedules from celery.conf: {beat_schedules}") - celery_logger.info(f"beat_schedules type: {type(beat_schedules)}, length: {len(beat_schedules) if isinstance(beat_schedules, (list, dict)) else 'N/A'}") + celery_logger.debug(f"beat_schedules from celery.conf: {beat_schedules}") + celery_logger.debug(f"beat_schedules type: {type(beat_schedules)}, length: {len(beat_schedules) if isinstance(beat_schedules, (list, dict)) else 'N/A'}") if not beat_schedules: celery_logger.info("No periodic tasks configured.") diff --git a/materializationengine/config.py b/materializationengine/config.py index c8c4ae3f..4706a51a 100644 --- a/materializationengine/config.py +++ b/materializationengine/config.py @@ -201,10 +201,10 @@ def configure_app(app: Flask) -> Flask: app.logger.setLevel(app.config["LOGGING_LEVEL"]) app.logger.propagate = False - # Log BEAT_SCHEDULES loading status - app.logger.info(f"BEAT_SCHEDULES before config.cfg: {beat_schedules_before}") - app.logger.info(f"BEAT_SCHEDULES after config.cfg: {beat_schedules_after}") - app.logger.info(f"BEAT_SCHEDULES type: {type(beat_schedules_after)}, length: {len(beat_schedules_after) if isinstance(beat_schedules_after, (list, dict)) else 'N/A'}") + # Log BEAT_SCHEDULES loading status (debug level) + app.logger.debug(f"BEAT_SCHEDULES before config.cfg: {beat_schedules_before}") + app.logger.debug(f"BEAT_SCHEDULES after config.cfg: {beat_schedules_after}") + app.logger.debug(f"BEAT_SCHEDULES type: {type(beat_schedules_after)}, length: {len(beat_schedules_after) if isinstance(beat_schedules_after, (list, dict)) else 'N/A'}") app.logger.debug(app.config) app.app_context().push() return app