/
_callbackdatacache.py
459 lines (364 loc) · 17.7 KB
/
_callbackdatacache.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
#!/usr/bin/env python
#
# A library that provides a Python interface to the Telegram Bot API
# Copyright (C) 2015-2022
# Leandro Toledo de Souza <devs@python-telegram-bot.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser Public License for more details.
#
# You should have received a copy of the GNU Lesser Public License
# along with this program. If not, see [http://www.gnu.org/licenses/].
"""This module contains the CallbackDataCache class."""
import logging
import time
from datetime import datetime
from typing import TYPE_CHECKING, Dict, MutableMapping, Optional, Tuple, Union, cast
from uuid import uuid4
try:
from cachetools import LRUCache
CACHE_TOOLS_AVAILABLE = True
except ImportError:
CACHE_TOOLS_AVAILABLE = False
from telegram import CallbackQuery, InlineKeyboardButton, InlineKeyboardMarkup, Message, User
from telegram._utils.datetime import to_float_timestamp
from telegram.error import TelegramError
from telegram.ext._utils.types import CDCData
if TYPE_CHECKING:
from telegram.ext import ExtBot
class InvalidCallbackData(TelegramError):
"""
Raised when the received callback data has been tempered with or deleted from cache.
Examples:
:any:`Arbitrary Callback Data Bot <examples.arbitrarycallbackdatabot>`
.. seealso:: :wiki:`Arbitrary callback_data <Arbitrary-callback_data>`
.. versionadded:: 13.6
Args:
callback_data (:obj:`int`, optional): The button data of which the callback data could not
be found.
Attributes:
callback_data (:obj:`int`): Optional. The button data of which the callback data could not
be found.
"""
__slots__ = ("callback_data",)
def __init__(self, callback_data: str = None) -> None:
super().__init__(
"The object belonging to this callback_data was deleted or the callback_data was "
"manipulated."
)
self.callback_data = callback_data
def __reduce__(self) -> Tuple[type, Tuple[Optional[str]]]: # type: ignore[override]
return self.__class__, (self.callback_data,)
class _KeyboardData:
__slots__ = ("keyboard_uuid", "button_data", "access_time")
def __init__(
self, keyboard_uuid: str, access_time: float = None, button_data: Dict[str, object] = None
):
self.keyboard_uuid = keyboard_uuid
self.button_data = button_data or {}
self.access_time = access_time or time.time()
def update_access_time(self) -> None:
"""Updates the access time with the current time."""
self.access_time = time.time()
def to_tuple(self) -> Tuple[str, float, Dict[str, object]]:
"""Gives a tuple representation consisting of the keyboard uuid, the access time and the
button data.
"""
return self.keyboard_uuid, self.access_time, self.button_data
class CallbackDataCache:
"""A custom cache for storing the callback data of a :class:`telegram.ext.ExtBot`. Internally,
it keeps two mappings with fixed maximum size:
* One for mapping the data received in callback queries to the cached objects
* One for mapping the IDs of received callback queries to the cached objects
The second mapping allows to manually drop data that has been cached for keyboards of messages
sent via inline mode.
If necessary, will drop the least recently used items.
Important:
If you want to use this class, you must install PTB with the optional requirement
``callback-data``, i.e.
.. code-block:: bash
pip install python-telegram-bot[callback-data]
Examples:
:any:`Arbitrary Callback Data Bot <examples.arbitrarycallbackdatabot>`
.. seealso:: :attr:`telegram.ext.ExtBot.callback_data_cache`,
:wiki:`Architecture Overview <Architecture>`,
:wiki:`Arbitrary callback_data <Arbitrary-callback_data>`
.. versionadded:: 13.6
.. versionchanged:: 20.0
To use this class, PTB must be installed via
``pip install python-telegram-bot[callback-data]``.
Args:
bot (:class:`telegram.ext.ExtBot`): The bot this cache is for.
maxsize (:obj:`int`, optional): Maximum number of items in each of the internal mappings.
Defaults to ``1024``.
persistent_data (Tuple[List[Tuple[:obj:`str`, :obj:`float`, \
Dict[:obj:`str`, :class:`object`]]], Dict[:obj:`str`, :obj:`str`]], optional): \
Data to initialize the cache with, as returned by \
:meth:`telegram.ext.BasePersistence.get_callback_data`.
Attributes:
bot (:class:`telegram.ext.ExtBot`): The bot this cache is for.
"""
__slots__ = ("bot", "_maxsize", "_keyboard_data", "_callback_queries", "logger")
def __init__(
self,
bot: "ExtBot",
maxsize: int = 1024,
persistent_data: CDCData = None,
):
if not CACHE_TOOLS_AVAILABLE:
raise RuntimeError(
"To use `CallbackDataCache`, PTB must be installed via `pip install "
"python-telegram-bot[callback-data]`."
)
self.logger = logging.getLogger(__name__)
self.bot = bot
self._maxsize = maxsize
self._keyboard_data: MutableMapping[str, _KeyboardData] = LRUCache(maxsize=maxsize)
self._callback_queries: MutableMapping[str, str] = LRUCache(maxsize=maxsize)
if persistent_data:
self.load_persistence_data(persistent_data)
def load_persistence_data(self, persistent_data: CDCData) -> None:
"""Loads data into the cache.
Warning:
This method is not intended to be called by users directly.
.. versionadded:: 20.0
Args:
persistent_data (Tuple[List[Tuple[:obj:`str`, :obj:`float`, \
Dict[:obj:`str`, :class:`object`]]], Dict[:obj:`str`, :obj:`str`]], optional): \
Data to load, as returned by \
:meth:`telegram.ext.BasePersistence.get_callback_data`.
"""
keyboard_data, callback_queries = persistent_data
for key, value in callback_queries.items():
self._callback_queries[key] = value
for uuid, access_time, data in keyboard_data:
self._keyboard_data[uuid] = _KeyboardData(
keyboard_uuid=uuid, access_time=access_time, button_data=data
)
@property
def maxsize(self) -> int:
""":obj:`int`: The maximum size of the cache.
.. versionchanged:: 20.0
This property is now read-only.
"""
return self._maxsize
@property
def persistence_data(self) -> CDCData:
"""Tuple[List[Tuple[:obj:`str`, :obj:`float`, Dict[:obj:`str`, :class:`object`]]],
Dict[:obj:`str`, :obj:`str`]]: The data that needs to be persisted to allow
caching callback data across bot reboots.
"""
# While building a list/dict from the LRUCaches has linear runtime (in the number of
# entries), the runtime is bounded by maxsize and it has the big upside of not throwing a
# highly customized data structure at users trying to implement a custom persistence class
return [data.to_tuple() for data in self._keyboard_data.values()], dict(
self._callback_queries.items()
)
def process_keyboard(self, reply_markup: InlineKeyboardMarkup) -> InlineKeyboardMarkup:
"""Registers the reply markup to the cache. If any of the buttons have
:attr:`~telegram.InlineKeyboardButton.callback_data`, stores that data and builds a new
keyboard with the correspondingly replaced buttons. Otherwise, does nothing and returns
the original reply markup.
Args:
reply_markup (:class:`telegram.InlineKeyboardMarkup`): The keyboard.
Returns:
:class:`telegram.InlineKeyboardMarkup`: The keyboard to be passed to Telegram.
"""
keyboard_uuid = uuid4().hex
keyboard_data = _KeyboardData(keyboard_uuid)
# Built a new nested list of buttons by replacing the callback data if needed
buttons = [
[
# We create a new button instead of replacing callback_data in case the
# same object is used elsewhere
InlineKeyboardButton(
btn.text,
callback_data=self.__put_button(btn.callback_data, keyboard_data),
)
if btn.callback_data
else btn
for btn in column
]
for column in reply_markup.inline_keyboard
]
if not keyboard_data.button_data:
# If we arrive here, no data had to be replaced and we can return the input
return reply_markup
self._keyboard_data[keyboard_uuid] = keyboard_data
return InlineKeyboardMarkup(buttons)
@staticmethod
def __put_button(callback_data: object, keyboard_data: _KeyboardData) -> str:
"""Stores the data for a single button in :attr:`keyboard_data`.
Returns the string that should be passed instead of the callback_data, which is
``keyboard_uuid + button_uuids``.
"""
uuid = uuid4().hex
keyboard_data.button_data[uuid] = callback_data
return f"{keyboard_data.keyboard_uuid}{uuid}"
def __get_keyboard_uuid_and_button_data(
self, callback_data: str
) -> Union[Tuple[str, object], Tuple[None, InvalidCallbackData]]:
keyboard, button = self.extract_uuids(callback_data)
try:
# we get the values before calling update() in case KeyErrors are raised
# we don't want to update in that case
keyboard_data = self._keyboard_data[keyboard]
button_data = keyboard_data.button_data[button]
# Update the timestamp for the LRU
keyboard_data.update_access_time()
return keyboard, button_data
except KeyError:
return None, InvalidCallbackData(callback_data)
@staticmethod
def extract_uuids(callback_data: str) -> Tuple[str, str]:
"""Extracts the keyboard uuid and the button uuid from the given :paramref:`callback_data`.
Args:
callback_data (:obj:`str`): The
:paramref:`~telegram.InlineKeyboardButton.callback_data` as present in the button.
Returns:
(:obj:`str`, :obj:`str`): Tuple of keyboard and button uuid
"""
# Extract the uuids as put in __put_button
return callback_data[:32], callback_data[32:]
def process_message(self, message: Message) -> None:
"""Replaces the data in the inline keyboard attached to the message with the cached
objects, if necessary. If the data could not be found,
:class:`telegram.ext.InvalidCallbackData` will be inserted.
Note:
Checks :attr:`telegram.Message.via_bot` and :attr:`telegram.Message.from_user` to check
if the reply markup (if any) was actually sent by this cache's bot. If it was not, the
message will be returned unchanged.
Note that this will fail for channel posts, as :attr:`telegram.Message.from_user` is
:obj:`None` for those! In the corresponding reply markups the callback data will be
replaced by :class:`telegram.ext.InvalidCallbackData`.
Warning:
* Does *not* consider :attr:`telegram.Message.reply_to_message` and
:attr:`telegram.Message.pinned_message`. Pass them to this method separately.
* *In place*, i.e. the passed :class:`telegram.Message` will be changed!
Args:
message (:class:`telegram.Message`): The message.
"""
self.__process_message(message)
def __process_message(self, message: Message) -> Optional[str]:
"""As documented in process_message, but returns the uuid of the attached keyboard, if any,
which is relevant for process_callback_query.
**IN PLACE**
"""
if not message.reply_markup:
return None
if message.via_bot:
sender: Optional[User] = message.via_bot
elif message.from_user:
sender = message.from_user
else:
sender = None
if sender is not None and sender != self.bot.bot:
return None
keyboard_uuid = None
for row in message.reply_markup.inline_keyboard:
for button in row:
if button.callback_data:
button_data = cast(str, button.callback_data)
keyboard_id, callback_data = self.__get_keyboard_uuid_and_button_data(
button_data
)
# update_callback_data makes sure that the _id_attrs are updated
button.update_callback_data(callback_data)
# This is lazy loaded. The firsts time we find a button
# we load the associated keyboard - afterwards, there is
if not keyboard_uuid and not isinstance(callback_data, InvalidCallbackData):
keyboard_uuid = keyboard_id
return keyboard_uuid
def process_callback_query(self, callback_query: CallbackQuery) -> None:
"""Replaces the data in the callback query and the attached messages keyboard with the
cached objects, if necessary. If the data could not be found,
:class:`telegram.ext.InvalidCallbackData` will be inserted.
If :attr:`telegram.CallbackQuery.data` or :attr:`telegram.CallbackQuery.message` is
present, this also saves the callback queries ID in order to be able to resolve it to the
stored data.
Note:
Also considers inserts data into the buttons of
:attr:`telegram.Message.reply_to_message` and :attr:`telegram.Message.pinned_message`
if necessary.
Warning:
*In place*, i.e. the passed :class:`telegram.CallbackQuery` will be changed!
Args:
callback_query (:class:`telegram.CallbackQuery`): The callback query.
"""
mapped = False
if callback_query.data:
data = callback_query.data
# Get the cached callback data for the CallbackQuery
keyboard_uuid, button_data = self.__get_keyboard_uuid_and_button_data(data)
with callback_query._unfrozen():
callback_query.data = button_data # type: ignore[assignment]
# Map the callback queries ID to the keyboards UUID for later use
if not mapped and not isinstance(button_data, InvalidCallbackData):
self._callback_queries[callback_query.id] = keyboard_uuid # type: ignore
mapped = True
# Get the cached callback data for the inline keyboard attached to the
# CallbackQuery.
if callback_query.message:
self.__process_message(callback_query.message)
for message in (
callback_query.message.pinned_message,
callback_query.message.reply_to_message,
):
if message:
self.__process_message(message)
def drop_data(self, callback_query: CallbackQuery) -> None:
"""Deletes the data for the specified callback query.
Note:
Will *not* raise exceptions in case the callback data is not found in the cache.
*Will* raise :exc:`KeyError` in case the callback query can not be found in the
cache.
Args:
callback_query (:class:`telegram.CallbackQuery`): The callback query.
Raises:
KeyError: If the callback query can not be found in the cache
"""
try:
keyboard_uuid = self._callback_queries.pop(callback_query.id)
self.__drop_keyboard(keyboard_uuid)
except KeyError as exc:
raise KeyError("CallbackQuery was not found in cache.") from exc
def __drop_keyboard(self, keyboard_uuid: str) -> None:
try:
self._keyboard_data.pop(keyboard_uuid)
except KeyError:
return
def clear_callback_data(self, time_cutoff: Union[float, datetime] = None) -> None:
"""Clears the stored callback data.
Args:
time_cutoff (:obj:`float` | :obj:`datetime.datetime`, optional): Pass a UNIX timestamp
or a :obj:`datetime.datetime` to clear only entries which are older.
For timezone naive :obj:`datetime.datetime` objects, the default timezone of the
bot will be used, which is UTC unless :attr:`telegram.ext.Defaults.tzinfo` is
used.
"""
self.__clear(self._keyboard_data, time_cutoff=time_cutoff)
def clear_callback_queries(self) -> None:
"""Clears the stored callback query IDs."""
self.__clear(self._callback_queries)
def __clear(self, mapping: MutableMapping, time_cutoff: Union[float, datetime] = None) -> None:
if not time_cutoff:
mapping.clear()
return
if isinstance(time_cutoff, datetime):
effective_cutoff = to_float_timestamp(
time_cutoff, tzinfo=self.bot.defaults.tzinfo if self.bot.defaults else None
)
else:
effective_cutoff = time_cutoff
# We need a list instead of a generator here, as the list doesn't change it's size
# during the iteration
to_drop = [key for key, data in mapping.items() if data.access_time < effective_cutoff]
for key in to_drop:
mapping.pop(key)