diff --git a/materializationengine/celery_worker.py b/materializationengine/celery_worker.py index c5ea9590..e8ce9bf4 100644 --- a/materializationengine/celery_worker.py +++ b/materializationengine/celery_worker.py @@ -52,6 +52,11 @@ def create_celery(app=None): for logger_name in celery_loggers: logging.getLogger(logger_name).setLevel(log_level) + # Debug: Check if BEAT_SCHEDULES is in app.config + beat_schedules = app.config.get("BEAT_SCHEDULES", []) + celery_logger.debug(f"BEAT_SCHEDULES from app.config: {beat_schedules}") + celery_logger.debug(f"BEAT_SCHEDULES type: {type(beat_schedules)}, length: {len(beat_schedules) if isinstance(beat_schedules, (list, dict)) else 'N/A'}") + celery.conf.update( { "task_routes": ("materializationengine.task_router.TaskRouter"), @@ -70,11 +75,20 @@ def create_celery(app=None): "socket_timeout": 20, "socket_connect_timeout": 20, }, # timeout (s) for tasks to be sent back to broker queue - "beat_schedules": app.config["BEAT_SCHEDULES"], + "beat_schedules": beat_schedules, } ) celery.conf.update(app.config) + # Ensure beat_schedules is set correctly after update (in case app.config overwrote it) + # Use BEAT_SCHEDULES from app.config if beat_schedules is empty or missing + if not celery.conf.get("beat_schedules") and app.config.get("BEAT_SCHEDULES"): + celery.conf.beat_schedules = app.config["BEAT_SCHEDULES"] + celery_logger.debug(f"Restored beat_schedules from BEAT_SCHEDULES: {len(app.config['BEAT_SCHEDULES'])} schedules") + + # Debug: Verify beat_schedules is in celery.conf after update + celery_logger.debug(f"beat_schedules in celery.conf after update: {celery.conf.get('beat_schedules', 'NOT FOUND')}") + celery_logger.debug(f"BEAT_SCHEDULES in celery.conf after update: {celery.conf.get('BEAT_SCHEDULES', 'NOT FOUND')}") TaskBase = celery.Task class ContextTask(TaskBase): @@ -87,6 +101,7 @@ def __call__(self, *args, **kwargs): celery.Task = ContextTask if os.environ.get("SLACK_WEBHOOK"): celery.Task.on_failure = post_to_slack_on_task_failure + return celery @@ -129,8 +144,18 @@ def setup_periodic_tasks(sender, **kwargs): name="Clean up back end results", ) - beat_schedules = celery.conf.get("beat_schedules", []) - celery_logger.info(beat_schedules) + # Try to get beat_schedules from celery.conf, fallback to BEAT_SCHEDULES if not found + beat_schedules = celery.conf.get("beat_schedules") + if not beat_schedules: + # Fallback: try to get from BEAT_SCHEDULES (uppercase) in celery.conf + beat_schedules = celery.conf.get("BEAT_SCHEDULES", []) + if beat_schedules: + celery_logger.debug(f"Found BEAT_SCHEDULES (uppercase), converting to beat_schedules") + celery.conf.beat_schedules = beat_schedules + + celery_logger.debug(f"beat_schedules from celery.conf: {beat_schedules}") + celery_logger.debug(f"beat_schedules type: {type(beat_schedules)}, length: {len(beat_schedules) if isinstance(beat_schedules, (list, dict)) else 'N/A'}") + if not beat_schedules: celery_logger.info("No periodic tasks configured.") return diff --git a/materializationengine/config.py b/materializationengine/config.py index 6a939dab..4706a51a 100644 --- a/materializationengine/config.py +++ b/materializationengine/config.py @@ -189,14 +189,22 @@ def configure_app(app: Flask) -> Flask: if "MATERIALIZATION_ENGINE_SETTINGS" in os.environ.keys(): app.config.from_envvar("MATERIALIZATION_ENGINE_SETTINGS") # instance-folders configuration + # Store BEAT_SCHEDULES before loading config file to see if it gets overwritten + beat_schedules_before = app.config.get("BEAT_SCHEDULES", "NOT_SET") app.config.from_pyfile("config.cfg", silent=True) - + beat_schedules_after = app.config.get("BEAT_SCHEDULES", "NOT_SET") + handler = logging.StreamHandler(sys.stdout) handler.setLevel(app.config["LOGGING_LEVEL"]) app.logger.removeHandler(default_handler) app.logger.addHandler(handler) app.logger.setLevel(app.config["LOGGING_LEVEL"]) app.logger.propagate = False + + # Log BEAT_SCHEDULES loading status (debug level) + app.logger.debug(f"BEAT_SCHEDULES before config.cfg: {beat_schedules_before}") + app.logger.debug(f"BEAT_SCHEDULES after config.cfg: {beat_schedules_after}") + app.logger.debug(f"BEAT_SCHEDULES type: {type(beat_schedules_after)}, length: {len(beat_schedules_after) if isinstance(beat_schedules_after, (list, dict)) else 'N/A'}") app.logger.debug(app.config) app.app_context().push() return app