diff --git a/src/database/repositories/content_pipelines.py b/src/database/repositories/content_pipelines.py index fa9b174..87514c9 100644 --- a/src/database/repositories/content_pipelines.py +++ b/src/database/repositories/content_pipelines.py @@ -114,6 +114,13 @@ async def get_by_id(self, pipeline_id: int) -> ContentPipeline | None: row = await cur.fetchone() return self._to_pipeline(row) if row else None + async def update_generate_interval(self, pipeline_id: int, minutes: int) -> None: + await self._db.execute( + "UPDATE content_pipelines SET generate_interval_minutes = ? WHERE id = ?", + (minutes, pipeline_id), + ) + await self._db.commit() + async def update( self, pipeline_id: int, diff --git a/src/scheduler/manager.py b/src/scheduler/manager.py index e665231..2caf106 100644 --- a/src/scheduler/manager.py +++ b/src/scheduler/manager.py @@ -52,6 +52,49 @@ def is_running(self) -> bool: def interval_minutes(self) -> int: return self._current_interval_minutes + async def is_job_enabled(self, job_id: str) -> bool: + """Return True if the job is not explicitly disabled in settings.""" + if not self._scheduler_bundle: + return True + val = await self._scheduler_bundle.get_setting(f"scheduler_job_disabled:{job_id}") + return val != "1" + + async def sync_job_state(self, job_id: str, enabled: bool) -> None: + """Live add or remove a job from the running scheduler.""" + if not self._scheduler or not self._scheduler.running: + return + if not enabled: + try: + self._scheduler.remove_job(job_id) + except Exception: + pass + return + if job_id == self._job_id: + self._scheduler.add_job( + self._run_collection, + IntervalTrigger(minutes=self._current_interval_minutes), + id=job_id, + replace_existing=True, + ) + elif job_id == self._photo_due_job_id and self._task_enqueuer: + self._scheduler.add_job( + self._run_photo_due, + IntervalTrigger(minutes=1), + id=job_id, + replace_existing=True, + ) + elif job_id == self._photo_auto_job_id and self._task_enqueuer: + self._scheduler.add_job( + self._run_photo_auto, + IntervalTrigger(minutes=1), + id=job_id, + replace_existing=True, + ) + elif job_id.startswith("sq_"): + await self.sync_search_query_jobs() + elif job_id.startswith(("pipeline_run_", "content_generate_")): + await self.sync_pipeline_jobs() + async def load_settings(self) -> None: """Load persisted settings from DB without starting the scheduler.""" if not self._scheduler_bundle: @@ -87,13 +130,16 @@ async def start(self) -> None: logger=logger, ) self._current_interval_minutes = collect_interval - self._scheduler.add_job( - self._run_collection, - IntervalTrigger(minutes=collect_interval), - id=self._job_id, - replace_existing=True, - ) - logger.info("Registered job %s (every %d min)", self._job_id, collect_interval) + if await self.is_job_enabled(self._job_id): + self._scheduler.add_job( + self._run_collection, + IntervalTrigger(minutes=collect_interval), + id=self._job_id, + replace_existing=True, + ) + logger.info("Registered job %s (every %d min)", self._job_id, collect_interval) + else: + logger.info("Job %s is disabled, skipping registration", self._job_id) if self._sq_bundle: await self.sync_search_query_jobs() @@ -102,20 +148,26 @@ async def start(self) -> None: await self.sync_pipeline_jobs() if self._task_enqueuer is not None: - self._scheduler.add_job( - self._run_photo_due, - IntervalTrigger(minutes=1), - id=self._photo_due_job_id, - replace_existing=True, - ) - logger.info("Registered job %s (every 1 min)", self._photo_due_job_id) - self._scheduler.add_job( - self._run_photo_auto, - IntervalTrigger(minutes=1), - id=self._photo_auto_job_id, - replace_existing=True, - ) - logger.info("Registered job %s (every 1 min)", self._photo_auto_job_id) + if await self.is_job_enabled(self._photo_due_job_id): + self._scheduler.add_job( + self._run_photo_due, + IntervalTrigger(minutes=1), + id=self._photo_due_job_id, + replace_existing=True, + ) + logger.info("Registered job %s (every 1 min)", self._photo_due_job_id) + else: + logger.info("Job %s is disabled, skipping registration", self._photo_due_job_id) + if await self.is_job_enabled(self._photo_auto_job_id): + self._scheduler.add_job( + self._run_photo_auto, + IntervalTrigger(minutes=1), + id=self._photo_auto_job_id, + replace_existing=True, + ) + logger.info("Registered job %s (every 1 min)", self._photo_auto_job_id) + else: + logger.info("Job %s is disabled, skipping registration", self._photo_auto_job_id) self._scheduler.start() total_jobs = len(self._scheduler.get_jobs()) @@ -138,12 +190,13 @@ async def stop(self) -> None: logger.info("Scheduler stopped, %d jobs removed", job_count) def update_interval(self, minutes: int) -> None: + self._current_interval_minutes = minutes if self._scheduler and self._scheduler.running: - self._scheduler.reschedule_job(self._job_id, trigger=IntervalTrigger(minutes=minutes)) - self._current_interval_minutes = minutes - logger.info("Collection interval updated to %d minutes", minutes) - else: - self._current_interval_minutes = minutes + if self._scheduler.get_job(self._job_id) is not None: + self._scheduler.reschedule_job(self._job_id, trigger=IntervalTrigger(minutes=minutes)) + logger.info("Collection interval updated to %d minutes", minutes) + else: + logger.info("Collection interval stored as %d min (job disabled, will apply on re-enable)", minutes) def get_job_next_run(self, job_id: str): """Return next_run_time for a scheduled job, or None if missing.""" @@ -218,12 +271,16 @@ async def sync_search_query_jobs(self) -> None: existing_jobs = self._scheduler.get_jobs() for job in existing_jobs: - if job.id.startswith("sq_") and job.id not in active_ids: + if job.id.startswith("sq_") and ( + job.id not in active_ids or not await self.is_job_enabled(job.id) + ): self._scheduler.remove_job(job.id) logger.info("Removed search query job %s", job.id) for sq in active_queries: job_id = f"sq_{sq.id}" + if not await self.is_job_enabled(job_id): + continue self._scheduler.add_job( self._run_search_query, IntervalTrigger(minutes=sq.interval_minutes), @@ -249,10 +306,14 @@ async def sync_pipeline_jobs(self) -> None: existing_jobs = self._scheduler.get_jobs() for job in existing_jobs: - if job.id.startswith("pipeline_run_") and job.id not in active_ids: + if job.id.startswith("pipeline_run_") and ( + job.id not in active_ids or not await self.is_job_enabled(job.id) + ): self._scheduler.remove_job(job.id) logger.info("Removed pipeline job %s", job.id) - if job.id.startswith("content_generate_") and job.id not in active_gen_ids: + if job.id.startswith("content_generate_") and ( + job.id not in active_gen_ids or not await self.is_job_enabled(job.id) + ): self._scheduler.remove_job(job.id) logger.info("Removed content_generate job %s", job.id) @@ -260,22 +321,23 @@ async def sync_pipeline_jobs(self) -> None: if p.id is None: continue job_id = f"pipeline_run_{p.id}" - self._scheduler.add_job( - self._run_pipeline_job, - IntervalTrigger(minutes=p.generate_interval_minutes), - id=job_id, - replace_existing=True, - args=[p.id], - ) - # Also add content_generate job + if await self.is_job_enabled(job_id): + self._scheduler.add_job( + self._run_pipeline_job, + IntervalTrigger(minutes=p.generate_interval_minutes), + id=job_id, + replace_existing=True, + args=[p.id], + ) gen_job_id = f"content_generate_{p.id}" - self._scheduler.add_job( - self._run_content_generate_job, - IntervalTrigger(minutes=p.generate_interval_minutes), - id=gen_job_id, - replace_existing=True, - args=[p.id], - ) + if await self.is_job_enabled(gen_job_id): + self._scheduler.add_job( + self._run_content_generate_job, + IntervalTrigger(minutes=p.generate_interval_minutes), + id=gen_job_id, + replace_existing=True, + args=[p.id], + ) logger.info("Synced %d pipeline jobs", len(active_pipelines)) async def _run_pipeline_job(self, pipeline_id: int) -> None: diff --git a/src/web/routes/scheduler.py b/src/web/routes/scheduler.py index 0c055e6..6d3d17f 100644 --- a/src/web/routes/scheduler.py +++ b/src/web/routes/scheduler.py @@ -1,4 +1,5 @@ import logging +import re from datetime import timezone from fastapi import APIRouter, Query, Request @@ -52,31 +53,54 @@ async def clear_pending_collect_tasks(request: Request): VALID_STATUS_FILTERS = {"all", "active", "completed"} +_VALID_JOB_ID_RE = re.compile( + r"^(collect_all|photo_due|photo_auto|sq_\d+|pipeline_run_\d+|content_generate_\d+)$" +) -async def _build_jobs_context(sched) -> list[dict]: + +async def _build_jobs_context(sched, db) -> list[dict]: """Build a list of job dicts for the scheduler page template.""" + # Always fetch potential jobs to get DB-sourced interval_minutes + potential = await sched.get_potential_jobs() + potential_map = {j["job_id"]: j for j in potential} + if sched.is_running: raw = sched.get_all_jobs_next_run() jobs = [] for job_id, next_run in raw.items(): + db_interval = potential_map.get(job_id, {}).get("interval_minutes") jobs.append({ "job_id": job_id, "label": _job_label(job_id), "next_run": next_run, - "interval_minutes": None, + "interval_minutes": db_interval, }) - return jobs - - potential = await sched.get_potential_jobs() - return [ - { - "job_id": j["job_id"], - "label": _job_label(j["job_id"]), - "next_run": None, - "interval_minutes": j["interval_minutes"], - } - for j in potential - ] + # Also include disabled jobs (not in APScheduler but exist in potential_map) + running_ids = set(raw.keys()) + for j in potential: + if j["job_id"] not in running_ids: + jobs.append({ + "job_id": j["job_id"], + "label": _job_label(j["job_id"]), + "next_run": None, + "interval_minutes": j["interval_minutes"], + }) + else: + jobs = [ + { + "job_id": j["job_id"], + "label": _job_label(j["job_id"]), + "next_run": None, + "interval_minutes": j["interval_minutes"], + } + for j in potential + ] + + for j in jobs: + val = await db.repos.settings.get_setting(f"scheduler_job_disabled:{j['job_id']}") + j["enabled"] = val != "1" + j["interval_editable"] = j["job_id"] not in ("photo_due", "photo_auto") + return jobs @router.get("/", response_class=HTMLResponse) @@ -128,7 +152,7 @@ async def scheduler_page( notifier is not None and notifier.admin_chat_id is not None ) or bot is not None - scheduler_jobs = await _build_jobs_context(sched) + scheduler_jobs = await _build_jobs_context(sched, db) return deps.get_templates(request).TemplateResponse( request, @@ -154,6 +178,60 @@ async def scheduler_page( ) +@router.post("/jobs/{job_id}/toggle") +async def toggle_scheduler_job(request: Request, job_id: str): + if getattr(request.app.state, "shutting_down", False): + return RedirectResponse(url="/scheduler?error=shutting_down", status_code=303) + if not _VALID_JOB_ID_RE.match(job_id): + return RedirectResponse(url="/scheduler?error=invalid_job", status_code=303) + db = deps.get_db(request) + key = f"scheduler_job_disabled:{job_id}" + current = await db.repos.settings.get_setting(key) + new_disabled = current != "1" + await db.repos.settings.set_setting(key, "1" if new_disabled else "0") + sched = deps.get_scheduler(request) + if sched.is_running: + await sched.sync_job_state(job_id, enabled=not new_disabled) + return RedirectResponse(url="/scheduler?msg=job_toggled", status_code=303) + + +@router.post("/jobs/{job_id}/set-interval") +async def set_job_interval(request: Request, job_id: str): + if getattr(request.app.state, "shutting_down", False): + return RedirectResponse(url="/scheduler?error=shutting_down", status_code=303) + if not _VALID_JOB_ID_RE.match(job_id): + return RedirectResponse(url="/scheduler?error=invalid_job", status_code=303) + if job_id in ("photo_due", "photo_auto"): + return RedirectResponse(url="/scheduler?error=invalid_job", status_code=303) + form = await request.form() + try: + minutes = int(form["interval_minutes"]) + minutes = max(1, min(minutes, 1440)) + except (KeyError, ValueError): + return RedirectResponse(url="/scheduler?error=invalid_interval", status_code=303) + db = deps.get_db(request) + sched = deps.get_scheduler(request) + if job_id == "collect_all": + await db.repos.settings.set_setting("collect_interval_minutes", str(minutes)) + sched.update_interval(minutes) + elif job_id.startswith("sq_"): + sq_id = int(job_id.removeprefix("sq_")) + sq = await db.repos.search_queries.get_by_id(sq_id) + if sq: + await db.repos.search_queries.update(sq_id, sq.model_copy(update={"interval_minutes": minutes})) + if sched.is_running: + await sched.sync_search_query_jobs() + elif job_id.startswith(("pipeline_run_", "content_generate_")): + pid_str = job_id.removeprefix("pipeline_run_").removeprefix("content_generate_") + pid = int(pid_str) + pipeline = await db.repos.content_pipelines.get_by_id(pid) + if pipeline: + await db.repos.content_pipelines.update_generate_interval(pid, minutes) + if sched.is_running: + await sched.sync_pipeline_jobs() + return RedirectResponse(url="/scheduler?msg=interval_updated", status_code=303) + + @router.post("/start") async def start_scheduler(request: Request): if getattr(request.app.state, "shutting_down", False): diff --git a/src/web/templates/scheduler.html b/src/web/templates/scheduler.html index 117c9c5..c44223e 100644 --- a/src/web/templates/scheduler.html +++ b/src/web/templates/scheduler.html @@ -72,6 +72,7 @@
| Вкл. | Джоб | Интервал | Следующий запуск | @@ -80,9 +81,23 @@|
|---|---|---|---|---|
| {{ j.label }} | - {% if j.interval_minutes %} + + | +{{ j.label }} | +
+ {% if j.interval_editable and j.interval_minutes is not none %}
+
+ {% elif j.interval_minutes %}
{{ j.interval_minutes }} мин.
{% else %}
—
@@ -91,8 +106,10 @@ Планировщик | {% if j.next_run %} {{ j.next_run.strftime('%H:%M:%S') }} - {% elif is_running %} + {% elif is_running and j.enabled %} — + {% elif not j.enabled %} + отключён {% else %} остановлен {% endif %} diff --git a/tests/test_web.py b/tests/test_web.py index 735c66b..7158cd8 100644 --- a/tests/test_web.py +++ b/tests/test_web.py @@ -3183,6 +3183,87 @@ async def test_unhandled_exception_logged_to_logbuffer(error_client): assert any("kaboom" in r["message"] for r in new_records) +@pytest.mark.asyncio +async def test_scheduler_job_toggle_invalid_id(client): + """POST /scheduler/jobs/{job_id}/toggle with invalid job_id redirects to error.""" + resp = await client.post("/scheduler/jobs/bogus_job/toggle", follow_redirects=False) + assert resp.status_code == 303 + assert "error=invalid_job" in resp.headers["location"] + + +@pytest.mark.asyncio +async def test_scheduler_job_toggle_enables_and_disables(client): + """Toggling collect_all job persists disabled flag to settings.""" + db = client._transport.app.state.db + + # First toggle: should disable (default is enabled) + resp = await client.post("/scheduler/jobs/collect_all/toggle", follow_redirects=False) + assert resp.status_code == 303 + assert await db.repos.settings.get_setting("scheduler_job_disabled:collect_all") == "1" + + # Second toggle: should re-enable + resp = await client.post("/scheduler/jobs/collect_all/toggle", follow_redirects=False) + assert resp.status_code == 303 + assert await db.repos.settings.get_setting("scheduler_job_disabled:collect_all") == "0" + + +@pytest.mark.asyncio +async def test_scheduler_job_set_interval_invalid_id(client): + """POST /scheduler/jobs/{job_id}/set-interval with invalid job_id redirects to error.""" + resp = await client.post( + "/scheduler/jobs/malicious_id/set-interval", + data={"interval_minutes": "10"}, + follow_redirects=False, + ) + assert resp.status_code == 303 + assert "error=invalid_job" in resp.headers["location"] + + +@pytest.mark.asyncio +async def test_scheduler_job_set_interval_collect_all(client): + """Setting interval for collect_all persists to settings.""" + db = client._transport.app.state.db + resp = await client.post( + "/scheduler/jobs/collect_all/set-interval", + data={"interval_minutes": "45"}, + follow_redirects=False, + ) + assert resp.status_code == 303 + assert "msg=interval_updated" in resp.headers["location"] + assert await db.repos.settings.get_setting("collect_interval_minutes") == "45" + + +@pytest.mark.asyncio +async def test_scheduler_job_set_interval_clamps_values(client): + """Interval is clamped to 1–1440 range.""" + db = client._transport.app.state.db + await client.post( + "/scheduler/jobs/collect_all/set-interval", + data={"interval_minutes": "0"}, + follow_redirects=False, + ) + assert await db.repos.settings.get_setting("collect_interval_minutes") == "1" + + await client.post( + "/scheduler/jobs/collect_all/set-interval", + data={"interval_minutes": "9999"}, + follow_redirects=False, + ) + assert await db.repos.settings.get_setting("collect_interval_minutes") == "1440" + + +@pytest.mark.asyncio +async def test_scheduler_page_shows_disabled_job(client): + """Disabled job shows unchecked checkbox on scheduler page.""" + db = client._transport.app.state.db + await db.repos.settings.set_setting("scheduler_job_disabled:collect_all", "1") + resp = await client.get("/scheduler/") + assert resp.status_code == 200 + # The disabled job row should not have 'checked' for collect_all checkbox + # and the label should have text-muted class + assert "collect_all" in resp.text + + @pytest.mark.asyncio async def test_analytics_page_empty(client): """Analytics page renders without error when no messages exist.""" |