Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions src/database/repositories/content_pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,13 @@ async def get_by_id(self, pipeline_id: int) -> ContentPipeline | None:
row = await cur.fetchone()
return self._to_pipeline(row) if row else None

async def update_generate_interval(self, pipeline_id: int, minutes: int) -> None:
await self._db.execute(
"UPDATE content_pipelines SET generate_interval_minutes = ? WHERE id = ?",
(minutes, pipeline_id),
)
await self._db.commit()

async def update(
self,
pipeline_id: int,
Expand Down
150 changes: 106 additions & 44 deletions src/scheduler/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,49 @@ def is_running(self) -> bool:
def interval_minutes(self) -> int:
return self._current_interval_minutes

async def is_job_enabled(self, job_id: str) -> bool:
"""Return True if the job is not explicitly disabled in settings."""
if not self._scheduler_bundle:
return True
val = await self._scheduler_bundle.get_setting(f"scheduler_job_disabled:{job_id}")
return val != "1"

async def sync_job_state(self, job_id: str, enabled: bool) -> None:
"""Live add or remove a job from the running scheduler."""
if not self._scheduler or not self._scheduler.running:
return
if not enabled:
try:
self._scheduler.remove_job(job_id)
except Exception:
pass
return
if job_id == self._job_id:
self._scheduler.add_job(
self._run_collection,
IntervalTrigger(minutes=self._current_interval_minutes),
id=job_id,
replace_existing=True,
)
elif job_id == self._photo_due_job_id and self._task_enqueuer:
self._scheduler.add_job(
self._run_photo_due,
IntervalTrigger(minutes=1),
id=job_id,
replace_existing=True,
)
elif job_id == self._photo_auto_job_id and self._task_enqueuer:
self._scheduler.add_job(
self._run_photo_auto,
IntervalTrigger(minutes=1),
id=job_id,
replace_existing=True,
)
elif job_id.startswith("sq_"):
await self.sync_search_query_jobs()
elif job_id.startswith(("pipeline_run_", "content_generate_")):
await self.sync_pipeline_jobs()

async def load_settings(self) -> None:
"""Load persisted settings from DB without starting the scheduler."""
if not self._scheduler_bundle:
Expand Down Expand Up @@ -87,13 +130,16 @@ async def start(self) -> None:
logger=logger,
)
self._current_interval_minutes = collect_interval
self._scheduler.add_job(
self._run_collection,
IntervalTrigger(minutes=collect_interval),
id=self._job_id,
replace_existing=True,
)
logger.info("Registered job %s (every %d min)", self._job_id, collect_interval)
if await self.is_job_enabled(self._job_id):
self._scheduler.add_job(
self._run_collection,
IntervalTrigger(minutes=collect_interval),
id=self._job_id,
replace_existing=True,
)
logger.info("Registered job %s (every %d min)", self._job_id, collect_interval)
else:
logger.info("Job %s is disabled, skipping registration", self._job_id)

if self._sq_bundle:
await self.sync_search_query_jobs()
Expand All @@ -102,20 +148,26 @@ async def start(self) -> None:
await self.sync_pipeline_jobs()

if self._task_enqueuer is not None:
self._scheduler.add_job(
self._run_photo_due,
IntervalTrigger(minutes=1),
id=self._photo_due_job_id,
replace_existing=True,
)
logger.info("Registered job %s (every 1 min)", self._photo_due_job_id)
self._scheduler.add_job(
self._run_photo_auto,
IntervalTrigger(minutes=1),
id=self._photo_auto_job_id,
replace_existing=True,
)
logger.info("Registered job %s (every 1 min)", self._photo_auto_job_id)
if await self.is_job_enabled(self._photo_due_job_id):
self._scheduler.add_job(
self._run_photo_due,
IntervalTrigger(minutes=1),
id=self._photo_due_job_id,
replace_existing=True,
)
logger.info("Registered job %s (every 1 min)", self._photo_due_job_id)
else:
logger.info("Job %s is disabled, skipping registration", self._photo_due_job_id)
if await self.is_job_enabled(self._photo_auto_job_id):
self._scheduler.add_job(
self._run_photo_auto,
IntervalTrigger(minutes=1),
id=self._photo_auto_job_id,
replace_existing=True,
)
logger.info("Registered job %s (every 1 min)", self._photo_auto_job_id)
else:
logger.info("Job %s is disabled, skipping registration", self._photo_auto_job_id)

self._scheduler.start()
total_jobs = len(self._scheduler.get_jobs())
Expand All @@ -138,12 +190,13 @@ async def stop(self) -> None:
logger.info("Scheduler stopped, %d jobs removed", job_count)

def update_interval(self, minutes: int) -> None:
self._current_interval_minutes = minutes
if self._scheduler and self._scheduler.running:
self._scheduler.reschedule_job(self._job_id, trigger=IntervalTrigger(minutes=minutes))
self._current_interval_minutes = minutes
logger.info("Collection interval updated to %d minutes", minutes)
else:
self._current_interval_minutes = minutes
if self._scheduler.get_job(self._job_id) is not None:
self._scheduler.reschedule_job(self._job_id, trigger=IntervalTrigger(minutes=minutes))
logger.info("Collection interval updated to %d minutes", minutes)
else:
logger.info("Collection interval stored as %d min (job disabled, will apply on re-enable)", minutes)

def get_job_next_run(self, job_id: str):
"""Return next_run_time for a scheduled job, or None if missing."""
Expand Down Expand Up @@ -218,12 +271,16 @@ async def sync_search_query_jobs(self) -> None:

existing_jobs = self._scheduler.get_jobs()
for job in existing_jobs:
if job.id.startswith("sq_") and job.id not in active_ids:
if job.id.startswith("sq_") and (
job.id not in active_ids or not await self.is_job_enabled(job.id)
):
self._scheduler.remove_job(job.id)
logger.info("Removed search query job %s", job.id)

for sq in active_queries:
job_id = f"sq_{sq.id}"
if not await self.is_job_enabled(job_id):
continue
self._scheduler.add_job(
self._run_search_query,
IntervalTrigger(minutes=sq.interval_minutes),
Expand All @@ -249,33 +306,38 @@ async def sync_pipeline_jobs(self) -> None:

existing_jobs = self._scheduler.get_jobs()
for job in existing_jobs:
if job.id.startswith("pipeline_run_") and job.id not in active_ids:
if job.id.startswith("pipeline_run_") and (
job.id not in active_ids or not await self.is_job_enabled(job.id)
):
self._scheduler.remove_job(job.id)
logger.info("Removed pipeline job %s", job.id)
if job.id.startswith("content_generate_") and job.id not in active_gen_ids:
if job.id.startswith("content_generate_") and (
job.id not in active_gen_ids or not await self.is_job_enabled(job.id)
):
self._scheduler.remove_job(job.id)
logger.info("Removed content_generate job %s", job.id)

for p in active_pipelines:
if p.id is None:
continue
job_id = f"pipeline_run_{p.id}"
self._scheduler.add_job(
self._run_pipeline_job,
IntervalTrigger(minutes=p.generate_interval_minutes),
id=job_id,
replace_existing=True,
args=[p.id],
)
# Also add content_generate job
if await self.is_job_enabled(job_id):
self._scheduler.add_job(
self._run_pipeline_job,
IntervalTrigger(minutes=p.generate_interval_minutes),
id=job_id,
replace_existing=True,
args=[p.id],
)
gen_job_id = f"content_generate_{p.id}"
self._scheduler.add_job(
self._run_content_generate_job,
IntervalTrigger(minutes=p.generate_interval_minutes),
id=gen_job_id,
replace_existing=True,
args=[p.id],
)
if await self.is_job_enabled(gen_job_id):
self._scheduler.add_job(
self._run_content_generate_job,
IntervalTrigger(minutes=p.generate_interval_minutes),
id=gen_job_id,
replace_existing=True,
args=[p.id],
)
logger.info("Synced %d pipeline jobs", len(active_pipelines))

async def _run_pipeline_job(self, pipeline_id: int) -> None:
Expand Down
108 changes: 93 additions & 15 deletions src/web/routes/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import re
from datetime import timezone

from fastapi import APIRouter, Query, Request
Expand Down Expand Up @@ -52,31 +53,54 @@ async def clear_pending_collect_tasks(request: Request):

VALID_STATUS_FILTERS = {"all", "active", "completed"}

_VALID_JOB_ID_RE = re.compile(
r"^(collect_all|photo_due|photo_auto|sq_\d+|pipeline_run_\d+|content_generate_\d+)$"
)

async def _build_jobs_context(sched) -> list[dict]:

async def _build_jobs_context(sched, db) -> list[dict]:
"""Build a list of job dicts for the scheduler page template."""
# Always fetch potential jobs to get DB-sourced interval_minutes
potential = await sched.get_potential_jobs()
potential_map = {j["job_id"]: j for j in potential}

if sched.is_running:
raw = sched.get_all_jobs_next_run()
jobs = []
for job_id, next_run in raw.items():
db_interval = potential_map.get(job_id, {}).get("interval_minutes")
jobs.append({
"job_id": job_id,
"label": _job_label(job_id),
"next_run": next_run,
"interval_minutes": None,
"interval_minutes": db_interval,
})
return jobs

potential = await sched.get_potential_jobs()
return [
{
"job_id": j["job_id"],
"label": _job_label(j["job_id"]),
"next_run": None,
"interval_minutes": j["interval_minutes"],
}
for j in potential
]
# Also include disabled jobs (not in APScheduler but exist in potential_map)
running_ids = set(raw.keys())
for j in potential:
if j["job_id"] not in running_ids:
jobs.append({
"job_id": j["job_id"],
"label": _job_label(j["job_id"]),
"next_run": None,
"interval_minutes": j["interval_minutes"],
})
else:
jobs = [
{
"job_id": j["job_id"],
"label": _job_label(j["job_id"]),
"next_run": None,
"interval_minutes": j["interval_minutes"],
}
for j in potential
]

for j in jobs:
val = await db.repos.settings.get_setting(f"scheduler_job_disabled:{j['job_id']}")
j["enabled"] = val != "1"
j["interval_editable"] = j["job_id"] not in ("photo_due", "photo_auto")
return jobs


@router.get("/", response_class=HTMLResponse)
Expand Down Expand Up @@ -128,7 +152,7 @@ async def scheduler_page(
notifier is not None and notifier.admin_chat_id is not None
) or bot is not None

scheduler_jobs = await _build_jobs_context(sched)
scheduler_jobs = await _build_jobs_context(sched, db)

return deps.get_templates(request).TemplateResponse(
request,
Expand All @@ -154,6 +178,60 @@ async def scheduler_page(
)


@router.post("/jobs/{job_id}/toggle")
async def toggle_scheduler_job(request: Request, job_id: str):
if getattr(request.app.state, "shutting_down", False):
return RedirectResponse(url="/scheduler?error=shutting_down", status_code=303)
if not _VALID_JOB_ID_RE.match(job_id):
return RedirectResponse(url="/scheduler?error=invalid_job", status_code=303)
db = deps.get_db(request)
key = f"scheduler_job_disabled:{job_id}"
current = await db.repos.settings.get_setting(key)
new_disabled = current != "1"
await db.repos.settings.set_setting(key, "1" if new_disabled else "0")
sched = deps.get_scheduler(request)
if sched.is_running:
await sched.sync_job_state(job_id, enabled=not new_disabled)
return RedirectResponse(url="/scheduler?msg=job_toggled", status_code=303)


@router.post("/jobs/{job_id}/set-interval")
async def set_job_interval(request: Request, job_id: str):
if getattr(request.app.state, "shutting_down", False):
return RedirectResponse(url="/scheduler?error=shutting_down", status_code=303)
if not _VALID_JOB_ID_RE.match(job_id):
return RedirectResponse(url="/scheduler?error=invalid_job", status_code=303)
if job_id in ("photo_due", "photo_auto"):
return RedirectResponse(url="/scheduler?error=invalid_job", status_code=303)
form = await request.form()
try:
minutes = int(form["interval_minutes"])
minutes = max(1, min(minutes, 1440))
except (KeyError, ValueError):
return RedirectResponse(url="/scheduler?error=invalid_interval", status_code=303)
db = deps.get_db(request)
sched = deps.get_scheduler(request)
if job_id == "collect_all":
await db.repos.settings.set_setting("collect_interval_minutes", str(minutes))
sched.update_interval(minutes)
elif job_id.startswith("sq_"):
sq_id = int(job_id.removeprefix("sq_"))
sq = await db.repos.search_queries.get_by_id(sq_id)
if sq:
await db.repos.search_queries.update(sq_id, sq.model_copy(update={"interval_minutes": minutes}))
if sched.is_running:
await sched.sync_search_query_jobs()
elif job_id.startswith(("pipeline_run_", "content_generate_")):
pid_str = job_id.removeprefix("pipeline_run_").removeprefix("content_generate_")
pid = int(pid_str)
pipeline = await db.repos.content_pipelines.get_by_id(pid)
if pipeline:
await db.repos.content_pipelines.update_generate_interval(pid, minutes)
if sched.is_running:
await sched.sync_pipeline_jobs()
return RedirectResponse(url="/scheduler?msg=interval_updated", status_code=303)


@router.post("/start")
async def start_scheduler(request: Request):
if getattr(request.app.state, "shutting_down", False):
Expand Down
Loading
Loading