-
Notifications
You must be signed in to change notification settings - Fork 5.1k
/
bot_method_checks.py
418 lines (370 loc) · 16.8 KB
/
bot_method_checks.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
#!/usr/bin/env python
#
# A library that provides a Python interface to the Telegram Bot API
# Copyright (C) 2015-2022
# Leandro Toledo de Souza <devs@python-telegram-bot.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser Public License for more details.
#
# You should have received a copy of the GNU Lesser Public License
# along with this program. If not, see [http://www.gnu.org/licenses/].
import datetime
import functools
import inspect
import os
from typing import Any, Callable, Dict, Iterable, List
import pytest
from telegram import (
Bot,
ChatPermissions,
File,
InlineQueryResultArticle,
InlineQueryResultCachedPhoto,
InputMediaPhoto,
InputTextMessageContent,
TelegramObject,
)
from telegram._utils.defaultvalue import DEFAULT_NONE, DefaultValue
from telegram.constants import InputMediaType
from telegram.ext import Defaults, ExtBot
from telegram.request import RequestData
from tests.auxil.object_conversions import env_var_2_bool
TEST_WITH_OPT_DEPS = env_var_2_bool(os.getenv("TEST_WITH_OPT_DEPS", True))
if TEST_WITH_OPT_DEPS:
import pytz
def check_shortcut_signature(
shortcut: Callable,
bot_method: Callable,
shortcut_kwargs: List[str],
additional_kwargs: List[str],
) -> bool:
"""
Checks that the signature of a shortcut matches the signature of the underlying bot method.
Args:
shortcut: The shortcut, e.g. :meth:`telegram.Message.reply_text`
bot_method: The bot method, e.g. :meth:`telegram.Bot.send_message`
shortcut_kwargs: The kwargs passed by the shortcut directly, e.g. ``chat_id``
additional_kwargs: Additional kwargs of the shortcut that the bot method doesn't have, e.g.
``quote``.
Returns:
:obj:`bool`: Whether or not the signature matches.
"""
shortcut_sig = inspect.signature(shortcut)
effective_shortcut_args = set(shortcut_sig.parameters.keys()).difference(additional_kwargs)
effective_shortcut_args.discard("self")
bot_sig = inspect.signature(bot_method)
expected_args = set(bot_sig.parameters.keys()).difference(shortcut_kwargs)
expected_args.discard("self")
args_check = expected_args == effective_shortcut_args
if not args_check:
raise Exception(f"Expected arguments {expected_args}, got {effective_shortcut_args}")
# TODO: Also check annotation of return type. Would currently be a hassle b/c typing doesn't
# resolve `ForwardRef('Type')` to `Type`. For now we rely on MyPy, which probably allows the
# shortcuts to return more specific types than the bot method, but it's only annotations after
# all
for kwarg in effective_shortcut_args:
expected_kind = bot_sig.parameters[kwarg].kind
if shortcut_sig.parameters[kwarg].kind != expected_kind:
raise Exception(f"Argument {kwarg} must be of kind {expected_kind}.")
if bot_sig.parameters[kwarg].annotation != shortcut_sig.parameters[kwarg].annotation:
if isinstance(bot_sig.parameters[kwarg].annotation, type):
if bot_sig.parameters[kwarg].annotation.__name__ != str(
shortcut_sig.parameters[kwarg].annotation
):
raise Exception(
f"For argument {kwarg} I expected {bot_sig.parameters[kwarg].annotation}, "
f"but got {shortcut_sig.parameters[kwarg].annotation}"
)
else:
raise Exception(
f"For argument {kwarg} I expected {bot_sig.parameters[kwarg].annotation}, but "
f"got {shortcut_sig.parameters[kwarg].annotation}"
)
bot_method_sig = inspect.signature(bot_method)
shortcut_sig = inspect.signature(shortcut)
for arg in expected_args:
if not shortcut_sig.parameters[arg].default == bot_method_sig.parameters[arg].default:
raise Exception(
f"Default for argument {arg} does not match the default of the Bot method."
)
for kwarg in additional_kwargs:
if not shortcut_sig.parameters[kwarg].kind == inspect.Parameter.KEYWORD_ONLY:
raise Exception(f"Argument {kwarg} must be a positional-only argument!")
return True
async def check_shortcut_call(
shortcut_method: Callable,
bot: ExtBot,
bot_method_name: str,
skip_params: Iterable[str] = None,
shortcut_kwargs: Iterable[str] = None,
) -> bool:
"""
Checks that a shortcut passes all the existing arguments to the underlying bot method. Use as::
assert await check_shortcut_call(message.reply_text, message.bot, 'send_message')
Args:
shortcut_method: The shortcut method, e.g. `message.reply_text`
bot: The bot
bot_method_name: The bot methods name, e.g. `'send_message'`
skip_params: Parameters that are allowed to be missing, e.g. `['inline_message_id']`
`rate_limit_args` will be skipped by default
shortcut_kwargs: The kwargs passed by the shortcut directly, e.g. ``chat_id``
Returns:
:obj:`bool`
"""
if not skip_params:
skip_params = set()
else:
skip_params = set(skip_params)
skip_params.add("rate_limit_args")
if not shortcut_kwargs:
shortcut_kwargs = set()
else:
shortcut_kwargs = set(shortcut_kwargs)
orig_bot_method = getattr(bot, bot_method_name)
bot_signature = inspect.signature(orig_bot_method)
expected_args = set(bot_signature.parameters.keys()) - {"self"} - set(skip_params)
positional_args = {
name for name, param in bot_signature.parameters.items() if param.default == param.empty
}
ignored_args = positional_args | set(shortcut_kwargs)
shortcut_signature = inspect.signature(shortcut_method)
# auto_pagination: Special casing for InlineQuery.answer
kwargs = {name: name for name in shortcut_signature.parameters if name != "auto_pagination"}
async def make_assertion(**kw):
# name == value makes sure that
# a) we receive non-None input for all parameters
# b) we receive the correct input for each kwarg
received_kwargs = {
name for name, value in kw.items() if name in ignored_args or value == name
}
if not received_kwargs == expected_args:
raise Exception(
f"{orig_bot_method.__name__} did not receive correct value for the parameters "
f"{expected_args - received_kwargs}"
)
if bot_method_name == "get_file":
# This is here mainly for PassportFile.get_file, which calls .set_credentials on the
# return value
return File(file_id="result", file_unique_id="result")
return True
setattr(bot, bot_method_name, make_assertion)
try:
await shortcut_method(**kwargs)
except Exception as exc:
raise exc
finally:
setattr(bot, bot_method_name, orig_bot_method)
return True
def build_kwargs(signature: inspect.Signature, default_kwargs, dfv: Any = DEFAULT_NONE):
kws = {}
for name, param in signature.parameters.items():
# For required params we need to pass something
if param.default is inspect.Parameter.empty:
# Some special casing
if name == "permissions":
kws[name] = ChatPermissions()
elif name in ["prices", "commands", "errors"]:
kws[name] = []
elif name == "media":
media = InputMediaPhoto("media", parse_mode=dfv)
if "list" in str(param.annotation).lower():
kws[name] = [media]
else:
kws[name] = media
elif name == "results":
itmc = InputTextMessageContent(
"text", parse_mode=dfv, disable_web_page_preview=dfv
)
kws[name] = [
InlineQueryResultArticle("id", "title", input_message_content=itmc),
InlineQueryResultCachedPhoto(
"id", "photo_file_id", parse_mode=dfv, input_message_content=itmc
),
]
elif name == "ok":
kws["ok"] = False
kws["error_message"] = "error"
else:
kws[name] = True
# pass values for params that can have defaults only if we don't want to use the
# standard default
elif name in default_kwargs:
if dfv != DEFAULT_NONE:
kws[name] = dfv
# Some special casing for methods that have "exactly one of the optionals" type args
elif name in ["location", "contact", "venue", "inline_message_id"]:
kws[name] = True
elif name == "until_date":
if dfv == "non-None-value":
# Europe/Berlin
kws[name] = pytz.timezone("Europe/Berlin").localize(
datetime.datetime(2000, 1, 1, 0)
)
else:
# UTC
kws[name] = datetime.datetime(2000, 1, 1, 0)
return kws
async def check_defaults_handling(
method: Callable,
bot: Bot,
return_value=None,
) -> bool:
"""
Checks that tg.ext.Defaults are handled correctly.
Args:
method: The shortcut/bot_method
bot: The bot. May be a telegram.Bot or a telegram.ext.ExtBot. In the former case, all
default values will be converted to None.
return_value: Optional. The return value of Bot._post that the method expects. Defaults to
None. get_file is automatically handled. If this is a `TelegramObject`, Bot._post will
return the `to_dict` representation of it.
"""
raw_bot = not isinstance(bot, ExtBot)
shortcut_signature = inspect.signature(method)
kwargs_need_default = [
kwarg
for kwarg, value in shortcut_signature.parameters.items()
if isinstance(value.default, DefaultValue) and not kwarg.endswith("_timeout")
]
if method.__name__.endswith("_media_group"):
# the parse_mode is applied to the first media item, and we test this elsewhere
kwargs_need_default.remove("parse_mode")
defaults_no_custom_defaults = Defaults()
kwargs = {kwarg: "custom_default" for kwarg in inspect.signature(Defaults).parameters.keys()}
kwargs["tzinfo"] = pytz.timezone("America/New_York")
defaults_custom_defaults = Defaults(**kwargs)
expected_return_values = [None, []] if return_value is None else [return_value]
async def make_assertion(
url, request_data: RequestData, df_value=DEFAULT_NONE, *args, **kwargs
):
data = request_data.parameters
# Check regular arguments that need defaults
for arg in kwargs_need_default:
# 'None' should not be passed along to Telegram
if df_value in [None, DEFAULT_NONE]:
if arg in data:
pytest.fail(
f"Got value {data[arg]} for argument {arg}, expected it to be absent"
)
else:
value = data.get(arg, "`not passed at all`")
if value != df_value:
pytest.fail(f"Got value {value} for argument {arg} instead of {df_value}")
# Check InputMedia (parse_mode can have a default)
def check_input_media(m: Dict):
parse_mode = m.get("parse_mode", None)
if df_value is DEFAULT_NONE:
if parse_mode is not None:
pytest.fail("InputMedia has non-None parse_mode")
elif parse_mode != df_value:
pytest.fail(
f"Got value {parse_mode} for InputMedia.parse_mode instead of {df_value}"
)
media = data.pop("media", None)
if media:
if isinstance(media, dict) and isinstance(media.get("type", None), InputMediaType):
check_input_media(media)
else:
for m in media:
check_input_media(m)
# Check InlineQueryResults
results = data.pop("results", [])
for result in results:
if df_value in [DEFAULT_NONE, None]:
if "parse_mode" in result:
pytest.fail("ILQR has a parse mode, expected it to be absent")
# Here we explicitly use that we only pass ILQRPhoto and ILQRArticle for testing
# so ILQRPhoto is expected to have parse_mode if df_value is not in [DF_NONE, NONE]
elif "photo" in result and result.get("parse_mode") != df_value:
pytest.fail(
f'Got value {result.get("parse_mode")} for '
f"ILQR.parse_mode instead of {df_value}"
)
imc = result.get("input_message_content")
if not imc:
continue
for attr in ["parse_mode", "disable_web_page_preview"]:
if df_value in [DEFAULT_NONE, None]:
if attr in imc:
pytest.fail(f"ILQR.i_m_c has a {attr}, expected it to be absent")
# Here we explicitly use that we only pass InputTextMessageContent for testing
# which has both attributes
elif imc.get(attr) != df_value:
pytest.fail(
f"Got value {imc.get(attr)} for ILQR.i_m_c.{attr} instead of {df_value}"
)
# Check datetime conversion
until_date = data.pop("until_date", None)
if until_date:
if df_value == "non-None-value":
if until_date != 946681200:
pytest.fail("Non-naive until_date was interpreted as Europe/Berlin.")
if df_value is DEFAULT_NONE:
if until_date != 946684800:
pytest.fail("Naive until_date was not interpreted as UTC")
if df_value == "custom_default":
if until_date != 946702800:
pytest.fail("Naive until_date was not interpreted as America/New_York")
if method.__name__ in ["get_file", "get_small_file", "get_big_file"]:
# This is here mainly for PassportFile.get_file, which calls .set_credentials on the
# return value
out = File(file_id="result", file_unique_id="result")
nonlocal expected_return_values
expected_return_values = [out]
return out.to_dict()
# Otherwise return None by default, as TGObject.de_json/list(None) in [None, []]
# That way we can check what gets passed to Request.post without having to actually
# make a request
# Some methods expect specific output, so we allow to customize that
if isinstance(return_value, TelegramObject):
return return_value.to_dict()
return return_value
orig_post = bot.request.post
try:
if raw_bot:
combinations = [(DEFAULT_NONE, None)]
else:
combinations = [
(DEFAULT_NONE, defaults_no_custom_defaults),
("custom_default", defaults_custom_defaults),
]
for default_value, defaults in combinations:
if not raw_bot:
bot._defaults = defaults
# 1: test that we get the correct default value, if we don't specify anything
kwargs = build_kwargs(
shortcut_signature,
kwargs_need_default,
)
assertion_callback = functools.partial(make_assertion, df_value=default_value)
setattr(bot.request, "post", assertion_callback)
assert await method(**kwargs) in expected_return_values
# 2: test that we get the manually passed non-None value
kwargs = build_kwargs(shortcut_signature, kwargs_need_default, dfv="non-None-value")
assertion_callback = functools.partial(make_assertion, df_value="non-None-value")
setattr(bot.request, "post", assertion_callback)
assert await method(**kwargs) in expected_return_values
# 3: test that we get the manually passed None value
kwargs = build_kwargs(
shortcut_signature,
kwargs_need_default,
dfv=None,
)
assertion_callback = functools.partial(make_assertion, df_value=None)
setattr(bot.request, "post", assertion_callback)
assert await method(**kwargs) in expected_return_values
except Exception as exc:
raise exc
finally:
setattr(bot.request, "post", orig_post)
if not raw_bot:
bot._defaults = None
return True