diff --git a/schedule/__init__.py b/schedule/__init__.py index 8e12eeb7..de1e19c7 100644 --- a/schedule/__init__.py +++ b/schedule/__init__.py @@ -37,7 +37,7 @@ [2] https://github.com/Rykian/clockwork [3] https://adam.herokuapp.com/past/2010/6/30/replace_cron_with_clockwork/ """ - +import asyncio from collections.abc import Hashable import datetime import functools @@ -100,6 +100,11 @@ def run_pending(self) -> None: for job in sorted(runnable_jobs): self._run_job(job) + async def arun_pending(self) -> None: + runnable_jobs = (job for job in self.jobs if job.should_run) + for job in sorted(runnable_jobs): + await self._arun_job(job) + def run_all(self, delay_seconds: int = 0) -> None: """ Run all jobs regardless if they are scheduled to run or not. @@ -119,6 +124,10 @@ def run_all(self, delay_seconds: int = 0) -> None: self._run_job(job) time.sleep(delay_seconds) + async def arun_all(self, delay_seconds: int = 0) -> None: + for job in self.jobs[:]: + await self._arun_job(job) + await asyncio.sleep(delay_seconds) def get_jobs(self, tag: Optional[Hashable] = None) -> List["Job"]: """ Gets scheduled jobs marked with the given tag, or all jobs @@ -173,6 +182,11 @@ def _run_job(self, job: "Job") -> None: ret = job.run() if isinstance(ret, CancelJob) or ret is CancelJob: self.cancel_job(job) + + async def _arun_job(self, job: "Job") -> None: + ret = await job.arun() + if isinstance(ret, CancelJob) or ret is CancelJob: + self.cancel_job(job) def get_next_run( self, tag: Optional[Hashable] = None @@ -697,6 +711,32 @@ def run(self): return CancelJob return ret + async def arun(self): + """ + Asynchronously run the job and immediately reschedule it. + If the job's deadline is reached (configured using .until()), the job is not + run and CancelJob is returned immediately. If the next scheduled run exceeds + the job's deadline, CancelJob is returned after the execution. In this latter + case CancelJob takes priority over any other returned value. + + :return: The return value returned by the `job_func`, or CancelJob if the job's + deadline is reached. + + """ + if self._is_overdue(datetime.datetime.now()): + logger.debug("Cancelling job %s", self) + return CancelJob + + logger.debug("Running job %s", self) + ret = await self.job_func() + self.last_run = datetime.datetime.now() + self._schedule_next_run() + + if self._is_overdue(self.next_run): + logger.debug("Cancelling job %s", self) + return CancelJob + return ret + def _schedule_next_run(self) -> None: """ Compute the instant when this job should run next. @@ -853,6 +893,11 @@ def run_pending() -> None: """ default_scheduler.run_pending() +def arun_pending() -> None: + """Calls :meth:`arun_pending ` on the + :data:`default scheduler instance `. + """ + return default_scheduler.arun_pending() def run_all(delay_seconds: int = 0) -> None: """Calls :meth:`run_all ` on the @@ -860,6 +905,11 @@ def run_all(delay_seconds: int = 0) -> None: """ default_scheduler.run_all(delay_seconds=delay_seconds) +async def arun_all(delay_seconds: int = 0) -> None: + """Calls :meth:`arun_all ` on the + :data:`default scheduler instance `. + """ + await default_scheduler.arun_all(delay_seconds=delay_seconds) def get_jobs(tag: Optional[Hashable] = None) -> List[Job]: """Calls :meth:`get_jobs ` on the