-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathProdScheduler.py
More file actions
38 lines (32 loc) · 1.11 KB
/
ProdScheduler.py
File metadata and controls
38 lines (32 loc) · 1.11 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from datetime import datetime, timedelta
class Scheduler:
def __init__(self):
self.Schedule = []
def add(self, func, param, time):
"""
Schedule a Method
"""
# Time => (Hours, Minutes, Seconds)
Delta = timedelta(hours=time[0], minutes=time[1], seconds=time[2])
self.Schedule.append((func, param, datetime.now() + Delta))
self.Schedule.sort(key=lambda x: x[2])
def nextTask(self):
"""
Return next scheduled task and remove it from schedule.
"""
if len(self.Schedule) == 0:
return None
elif datetime.now() >= self.Schedule[0][2]:
task = self.Schedule[0][0]
param = self.Schedule[0][1]
self.Schedule.pop(0)
return (task, param)
else:
return None
def isScheduled(self, *tasks):
"""
Checks if a task is currently scheduled.
It is meant to help control to schedule tasks
"""
taskScheduled = [Event[0] for Event in self.Schedule]
return any([(task in taskScheduled) for task in tasks])