-
Notifications
You must be signed in to change notification settings - Fork 3.3k
/
scheduler.py
51 lines (43 loc) · 1.79 KB
/
scheduler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import threading
from datetime import datetime
from typing import Optional
from croniter import croniter
from deepdiff import Delta
from lightning_app.utilities.proxies import ComponentDelta
class SchedulerThread(threading.Thread):
# TODO (tchaton) Abstract this logic to a generic scheduling service.
def __init__(self, app) -> None:
super().__init__(daemon=True)
self._exit_event = threading.Event()
self._sleep_time = 1.0
self._app = app
def run(self) -> None:
try:
while not self._exit_event.is_set():
self._exit_event.wait(self._sleep_time)
self.run_once()
except Exception as e:
raise e
def run_once(self):
for call_hash in list(self._app._schedules.keys()):
metadata = self._app._schedules[call_hash]
start_time = datetime.fromisoformat(metadata["start_time"])
current_date = datetime.now()
next_event = croniter(metadata["cron_pattern"], start_time).get_next(datetime)
# When the event is reached, send a delta to activate scheduling.
if current_date > next_event:
component_delta = ComponentDelta(
id=metadata["name"],
delta=Delta(
{
"values_changed": {
f"root['calls']['scheduling']['{call_hash}']['running']": {"new_value": True}
}
}
),
)
self._app.delta_queue.put(component_delta)
metadata["start_time"] = next_event.isoformat()
def join(self, timeout: Optional[float] = None) -> None:
self._exit_event.set()
super().join(timeout)