-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcelery_app.py
More file actions
102 lines (80 loc) · 2.92 KB
/
celery_app.py
File metadata and controls
102 lines (80 loc) · 2.92 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
"""
Celery Application Factory
Creates Celery instance compatible with Flask app factory pattern.
Use this for background task processing (iCal sync, OAuth refresh, email notifications).
"""
from celery import Celery
from celery.schedules import crontab
import os
def make_celery(app=None):
"""
Create Celery instance with Flask app context.
Usage:
from celery_app import celery
@celery.task
def my_background_task(arg):
# Task runs with Flask app context
from models import User
user = User.query.get(arg)
Args:
app: Flask application instance (optional, set later via init_app)
Returns:
Celery instance configured for Flask
"""
celery = Celery(
'bos_checklist',
broker=os.getenv('CELERY_BROKER_URL', 'redis://localhost:6379/0'),
backend=os.getenv('CELERY_RESULT_BACKEND', 'redis://localhost:6379/0'),
include=['tasks.ical_sync', 'tasks.oauth_refresh'] # Task modules
)
# Celery configuration
celery.conf.update(
# Task result settings
result_expires=3600, # Results expire after 1 hour
# Task execution settings
task_serializer='json',
result_serializer='json',
accept_content=['json'],
timezone='UTC',
enable_utc=True,
# Worker settings
worker_prefetch_multiplier=1, # One task at a time per worker
worker_max_tasks_per_child=1000, # Restart worker after 1000 tasks (prevent memory leaks)
# Task routing (optional: separate queues for different task types)
task_routes={
'tasks.ical_sync.*': {'queue': 'ical'},
'tasks.oauth_refresh.*': {'queue': 'oauth'},
},
# Periodic task schedule (Celery Beat)
beat_schedule={
'sync-ical-calendars': {
'task': 'tasks.ical_sync.sync_all_ical_calendars',
'schedule': crontab(minute='*/30'), # Every 30 minutes
},
'refresh-oauth-tokens': {
'task': 'tasks.oauth_refresh.refresh_expiring_tokens',
'schedule': crontab(minute='0', hour='*/6'), # Every 6 hours
},
},
)
if app:
init_celery(celery, app)
return celery
def init_celery(celery, app):
"""
Initialize Celery with Flask app context.
Ensures all Celery tasks run within Flask application context,
allowing database access, config variables, etc.
Args:
celery: Celery instance
app: Flask application instance
"""
class ContextTask(celery.Task):
"""Celery Task that runs within Flask app context"""
def __call__(self, *args, **kwargs):
with app.app_context():
return self.run(*args, **kwargs)
celery.Task = ContextTask
celery.conf.update(app.config)
# Create default Celery instance (app context set later in app.py)
celery = make_celery()