Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 51 additions & 1 deletion schedule/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
[2] https://github.com/Rykian/clockwork
[3] https://adam.herokuapp.com/past/2010/6/30/replace_cron_with_clockwork/
"""

import asyncio
from collections.abc import Hashable
import datetime
import functools
Expand Down Expand Up @@ -100,6 +100,11 @@ def run_pending(self) -> None:
for job in sorted(runnable_jobs):
self._run_job(job)

async def arun_pending(self) -> None:
runnable_jobs = (job for job in self.jobs if job.should_run)
for job in sorted(runnable_jobs):
await self._arun_job(job)

def run_all(self, delay_seconds: int = 0) -> None:
"""
Run all jobs regardless if they are scheduled to run or not.
Expand All @@ -119,6 +124,10 @@ def run_all(self, delay_seconds: int = 0) -> None:
self._run_job(job)
time.sleep(delay_seconds)

async def arun_all(self, delay_seconds: int = 0) -> None:
for job in self.jobs[:]:
await self._arun_job(job)
await asyncio.sleep(delay_seconds)
def get_jobs(self, tag: Optional[Hashable] = None) -> List["Job"]:
"""
Gets scheduled jobs marked with the given tag, or all jobs
Expand Down Expand Up @@ -173,6 +182,11 @@ def _run_job(self, job: "Job") -> None:
ret = job.run()
if isinstance(ret, CancelJob) or ret is CancelJob:
self.cancel_job(job)

async def _arun_job(self, job: "Job") -> None:
ret = await job.arun()
if isinstance(ret, CancelJob) or ret is CancelJob:
self.cancel_job(job)

def get_next_run(
self, tag: Optional[Hashable] = None
Expand Down Expand Up @@ -697,6 +711,32 @@ def run(self):
return CancelJob
return ret

async def arun(self):
"""
Asynchronously run the job and immediately reschedule it.
If the job's deadline is reached (configured using .until()), the job is not
run and CancelJob is returned immediately. If the next scheduled run exceeds
the job's deadline, CancelJob is returned after the execution. In this latter
case CancelJob takes priority over any other returned value.

:return: The return value returned by the `job_func`, or CancelJob if the job's
deadline is reached.

"""
if self._is_overdue(datetime.datetime.now()):
logger.debug("Cancelling job %s", self)
return CancelJob

logger.debug("Running job %s", self)
ret = await self.job_func()
self.last_run = datetime.datetime.now()
self._schedule_next_run()

if self._is_overdue(self.next_run):
logger.debug("Cancelling job %s", self)
return CancelJob
return ret

def _schedule_next_run(self) -> None:
"""
Compute the instant when this job should run next.
Expand Down Expand Up @@ -853,13 +893,23 @@ def run_pending() -> None:
"""
default_scheduler.run_pending()

def arun_pending() -> None:
"""Calls :meth:`arun_pending <Scheduler.arun_pending>` on the
:data:`default scheduler instance <default_scheduler>`.
"""
return default_scheduler.arun_pending()

def run_all(delay_seconds: int = 0) -> None:
"""Calls :meth:`run_all <Scheduler.run_all>` on the
:data:`default scheduler instance <default_scheduler>`.
"""
default_scheduler.run_all(delay_seconds=delay_seconds)

async def arun_all(delay_seconds: int = 0) -> None:
"""Calls :meth:`arun_all <Scheduler.arun_all>` on the
:data:`default scheduler instance <default_scheduler>`.
"""
await default_scheduler.arun_all(delay_seconds=delay_seconds)

def get_jobs(tag: Optional[Hashable] = None) -> List[Job]:
"""Calls :meth:`get_jobs <Scheduler.get_jobs>` on the
Expand Down