-
Notifications
You must be signed in to change notification settings - Fork 1
/
api.py
107 lines (83 loc) · 3.42 KB
/
api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
from copy import deepcopy
from functools import wraps
from os import getenv
from typing import TYPE_CHECKING, Any, Callable, List
from lbz.aws_boto3 import client
from lbz.events.event import Event
from lbz.misc import Singleton, get_logger
if TYPE_CHECKING:
from mypy_boto3_events.type_defs import PutEventsRequestEntryTypeDef
else:
PutEventsRequestEntryTypeDef = dict
logger = get_logger(__name__)
# https://docs.aws.amazon.com/eventbridge/latest/APIReference/API_PutEvents.html
MAX_EVENTS_TO_SEND_AT_ONCE = 10
class EventAPI(metaclass=Singleton):
def __init__(self) -> None:
self._source = getenv("AWS_LAMBDA_FUNCTION_NAME") or "lbz-event-api"
self._resources: List[str] = []
self._pending_events: List[Event] = []
self._sent_events: List[Event] = []
self._failed_events: List[Event] = []
self._bus_name = getenv("EVENTS_BUS_NAME", f"{self._source}-event-bus")
def __repr__(self) -> str:
return (
f"<EventAPI bus: {self._bus_name} Events: pending={len(self._pending_events)} "
f"sent={len(self._sent_events)} failed={len(self._failed_events)}>"
)
def set_source(self, source: str) -> None:
self._source = source
def set_resources(self, resources: List[str]) -> None:
self._resources = resources
def set_bus_name(self, bus_name: str) -> None:
self._bus_name = bus_name
@property
def sent_events(self) -> List[Event]:
return deepcopy(self._sent_events)
@property
def pending_events(self) -> List[Event]:
return deepcopy(self._pending_events)
@property
def failed_events(self) -> List[Event]:
return deepcopy(self._failed_events)
def register(self, new_event: Event) -> None:
self._pending_events.append(new_event)
def send(self) -> None:
self._sent_events = []
self._failed_events = []
while self._pending_events:
events = self._pending_events[:MAX_EVENTS_TO_SEND_AT_ONCE]
try:
entries = [self._create_eb_entry(event) for event in events]
client.eventbridge.put_events(Entries=entries)
self._sent_events.extend(events)
except Exception as err: # pylint: disable=broad-except
self._failed_events.extend(events)
logger.exception(err)
self._pending_events = self._pending_events[MAX_EVENTS_TO_SEND_AT_ONCE:]
if self._failed_events:
raise RuntimeError("Sending events has failed. Check logs for more details!")
def clear(self) -> None:
self._sent_events = []
self._pending_events = []
self._failed_events = []
def _create_eb_entry(self, new_event: Event) -> PutEventsRequestEntryTypeDef:
return {
"Detail": new_event.serialized_data,
"DetailType": new_event.type,
"EventBusName": self._bus_name,
"Resources": self._resources,
"Source": self._source,
}
def event_emitter(function: Callable) -> Callable:
"""Decorator that makes function an emitter - automatically sends pending events on success"""
@wraps(function)
def wrapped(*args: Any, **kwargs: Any) -> Any:
try:
result = function(*args, **kwargs)
EventAPI().send()
return result
except Exception as error:
EventAPI().clear()
raise error
return wrapped