Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test PR for #4157 #4224

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 3 additions & 1 deletion docs/source/conf.py
Expand Up @@ -67,7 +67,9 @@
master_doc = "index"

# Global substitutions
rst_prolog = (Path.cwd() / "../substitutions/global.rst").read_text(encoding="utf-8")
rst_prolog = ""
for file in Path.cwd().glob("../substitutions/*.rst"):
rst_prolog += "\n" + file.read_text(encoding="utf-8")

# -- Extension settings ------------------------------------------------
napoleon_use_admonition_for_examples = True
Expand Down
1 change: 1 addition & 0 deletions docs/substitutions/application.rst
@@ -0,0 +1 @@
.. |app_run_shutdown| replace:: The app will shut down when :exc:`KeyboardInterrupt` or :exc:`SystemExit` is raised. This also works from within handlers, error handlers and jobs. However, using :meth:`~telegram.ext.Application.stop_running` will give a somewhat cleaner shutdown behavior than manually raising those exceptions. On unix, the app will also shut down on receiving the signals specified by
2 changes: 1 addition & 1 deletion pyproject.toml
Expand Up @@ -48,7 +48,7 @@ exclude-protected = ["_unfrozen"]
# PYTEST:
[tool.pytest.ini_options]
testpaths = ["tests"]
addopts = "--no-success-flaky-report -rsxX"
addopts = "--no-success-flaky-report -rX"
filterwarnings = [
"error",
"ignore::DeprecationWarning",
Expand Down
2 changes: 1 addition & 1 deletion requirements-dev.txt
@@ -1,7 +1,7 @@
pre-commit # needed for pre-commit hooks in the git commit command

# For the test suite
pytest==7.4.4
pytest==8.1.1
pytest-asyncio==0.21.1 # needed because pytest doesn't come with native support for coroutines as tests
pytest-xdist==3.6.0 # xdist runs tests in parallel
flaky # Used for flaky tests (flaky decorator)
Expand Down
2 changes: 1 addition & 1 deletion telegram/_utils/enum.py
Expand Up @@ -60,7 +60,7 @@ def __str__(self) -> str:


# Apply the __repr__ modification and __str__ fix to IntEnum
class IntEnum(_enum.IntEnum):
class IntEnum(_enum.IntEnum): # pylint: disable=invalid-slots
"""Helper class for int enums where ``str(member)`` prints the value, but ``repr(member)``
gives ``EnumName.MEMBER_NAME``.
"""
Expand Down
2 changes: 1 addition & 1 deletion telegram/constants.py
Expand Up @@ -29,7 +29,7 @@
* Most of the constants in this module are grouped into enums.
"""
# TODO: Remove this when https://github.com/PyCQA/pylint/issues/6887 is resolved.
# pylint: disable=invalid-enum-extension
# pylint: disable=invalid-enum-extension,invalid-slots

__all__ = [
"BOT_API_VERSION",
Expand Down
127 changes: 80 additions & 47 deletions telegram/ext/_application.py
Expand Up @@ -365,6 +365,7 @@ def __init__(
self.__update_persistence_event = asyncio.Event()
self.__update_persistence_lock = asyncio.Lock()
self.__create_task_tasks: Set[asyncio.Task] = set() # Used for awaiting tasks upon exit
self.__stop_running_marker = asyncio.Event()

async def __aenter__(self: _AppType) -> _AppType: # noqa: PYI019
"""|async_context_manager| :meth:`initializes <initialize>` the App.
Expand Down Expand Up @@ -516,6 +517,7 @@ async def initialize(self) -> None:
await self._add_ch_to_persistence(handler)

self._initialized = True
self.__stop_running_marker.clear()

async def _add_ch_to_persistence(self, handler: "ConversationHandler") -> None:
self._conversation_handler_conversations.update(
Expand Down Expand Up @@ -670,14 +672,26 @@ async def stop(self) -> None:
raise RuntimeError("This Application is not running!")

self._running = False
self.__stop_running_marker.clear()
_LOGGER.info("Application is stopping. This might take a moment.")

# Stop listening for new updates and handle all pending ones
await self.update_queue.put(_STOP_SIGNAL)
_LOGGER.debug("Waiting for update_queue to join")
await self.update_queue.join()
if self.__update_fetcher_task:
await self.__update_fetcher_task
if self.__update_fetcher_task.done():
try:
self.__update_fetcher_task.result()
except BaseException as exc:
_LOGGER.critical(
"Fetching updates was aborted due to %r. Suppressing "
"exception to ensure graceful shutdown.",
exc,
exc_info=True,
)
else:
await self.update_queue.put(_STOP_SIGNAL)
_LOGGER.debug("Waiting for update_queue to join")
await self.update_queue.join()
await self.__update_fetcher_task
_LOGGER.debug("Application stopped fetching of updates.")

if self._job_queue:
Expand All @@ -703,17 +717,36 @@ def stop_running(self) -> None:
shutdown of the application, i.e. the methods listed in :attr:`run_polling` and
:attr:`run_webhook` will still be executed.

This method can also be called within :meth:`post_init`. This allows for a graceful,
early shutdown of the application if some condition is met (e.g., a database connection
could not be established).

Note:
If the application is not running, this method does nothing.
If the application is not running and this method is not called within
:meth:`post_init`, this method does nothing.

Warning:
This method is designed to for use in combination with :meth:`run_polling` or
:meth:`run_webhook`. Using this method in combination with a custom logic for starting
and stopping the application is not guaranteed to work as expected. Use at your own
risk.

.. versionadded:: 20.5

.. versionchanged:: NEXT.VERSION
Added support for calling within :meth:`post_init`.
"""
if self.running:
# This works because `__run` is using `loop.run_forever()`. If that changes, this
# method needs to be adapted.
asyncio.get_running_loop().stop()
else:
_LOGGER.debug("Application is not running, stop_running() does nothing.")
self.__stop_running_marker.set()
if not self._initialized:
_LOGGER.debug(
"Application is not running and not initialized. `stop_running()` likely has "
"no effect."
)

def run_polling(
self,
Expand All @@ -733,9 +766,7 @@ def run_polling(
polling updates from Telegram using :meth:`telegram.ext.Updater.start_polling` and
a graceful shutdown of the app on exit.

The app will shut down when :exc:`KeyboardInterrupt` or :exc:`SystemExit` is raised.
On unix, the app will also shut down on receiving the signals specified by
:paramref:`stop_signals`.
|app_run_shutdown| :paramref:`stop_signals`.

The order of execution by :meth:`run_polling` is roughly as follows:

Expand Down Expand Up @@ -874,9 +905,7 @@ def run_webhook(
listening for updates from Telegram using :meth:`telegram.ext.Updater.start_webhook` and
a graceful shutdown of the app on exit.

The app will shut down when :exc:`KeyboardInterrupt` or :exc:`SystemExit` is raised.
On unix, the app will also shut down on receiving the signals specified by
:paramref:`stop_signals`.
|app_run_shutdown| :paramref:`stop_signals`.

If :paramref:`cert`
and :paramref:`key` are not provided, the webhook will be started directly on
Expand Down Expand Up @@ -1038,18 +1067,23 @@ def __run(
loop.run_until_complete(self.initialize())
if self.post_init:
loop.run_until_complete(self.post_init(self))
if self.__stop_running_marker.is_set():
_LOGGER.info("Application received stop signal via `stop_running`. Shutting down.")
return
loop.run_until_complete(updater_coroutine) # one of updater.start_webhook/polling
loop.run_until_complete(self.start())
loop.run_forever()
except (KeyboardInterrupt, SystemExit):
_LOGGER.debug("Application received stop signal. Shutting down.")
except Exception as exc:
# In case the coroutine wasn't awaited, we don't need to bother the user with a warning
updater_coroutine.close()
raise exc
finally:
# We arrive here either by catching the exceptions above or if the loop gets stopped
# In case the coroutine wasn't awaited, we don't need to bother the user with a warning
updater_coroutine.close()

try:
# In we never got to await the coroutine, it's cleaner to close it here
# This can be the case e.g. if post_init raises SystemExit
updater_coroutine.close()
# Mypy doesn't know that we already check if updater is None
if self.updater.running: # type: ignore[union-attr]
loop.run_until_complete(self.updater.stop()) # type: ignore[union-attr]
Expand Down Expand Up @@ -1185,45 +1219,44 @@ async def __create_task_callback(
finally:
self._mark_for_persistence_update(update=update)

async def _update_fetcher(self) -> None:
async def __update_fetcher(self) -> None:
# Continuously fetch updates from the queue. Exit only once the signal object is found.
while True:
try:
update = await self.update_queue.get()
update = await self.update_queue.get()

if update is _STOP_SIGNAL:
_LOGGER.debug("Dropping pending updates")
while not self.update_queue.empty():
self.update_queue.task_done()

# For the _STOP_SIGNAL
self.update_queue.task_done()
return
if update is _STOP_SIGNAL:
# For the _STOP_SIGNAL
self.update_queue.task_done()
return

_LOGGER.debug("Processing update %s", update)
_LOGGER.debug("Processing update %s", update)

if self._update_processor.max_concurrent_updates > 1:
# We don't await the below because it has to be run concurrently
self.create_task(
self.__process_update_wrapper(update),
update=update,
name=f"Application:{self.bot.id}:process_concurrent_update",
)
else:
await self.__process_update_wrapper(update)

except asyncio.CancelledError:
# This may happen if the application is manually run via application.start() and
# then a KeyboardInterrupt is sent. We must prevent this loop to die since
# application.stop() will wait for it's clean shutdown.
_LOGGER.warning(
"Fetching updates got a asyncio.CancelledError. Ignoring as this task may only"
"be closed via `Application.stop`."
if self._update_processor.max_concurrent_updates > 1:
# We don't await the below because it has to be run concurrently
self.create_task(
self.__process_update_wrapper(update),
update=update,
name=f"Application:{self.bot.id}:process_concurrent_update",
)
else:
await self.__process_update_wrapper(update)

async def _update_fetcher(self) -> None:
try:
await self.__update_fetcher()
finally:
while not self.update_queue.empty():
_LOGGER.debug("Dropping pending update: %s", self.update_queue.get_nowait())
with contextlib.suppress(ValueError):
# Since we're shutting down here, it's not too bad if we call task_done
# on an empty queue
self.update_queue.task_done()

async def __process_update_wrapper(self, update: object) -> None:
await self._update_processor.process_update(update, self.process_update(update))
self.update_queue.task_done()
try:
await self._update_processor.process_update(update, self.process_update(update))
finally:
self.update_queue.task_done()

async def process_update(self, update: object) -> None:
"""Processes a single update and marks the update to be updated by the persistence later.
Expand Down
1 change: 0 additions & 1 deletion telegram/ext/_handlers/commandhandler.py
Expand Up @@ -152,7 +152,6 @@ def _check_correct_args(self, args: List[str]) -> Optional[bool]:
Returns:
:obj:`bool`: Whether the args are valid for this handler.
"""
# pylint: disable=too-many-boolean-expressions
return bool(
(self.has_args is None)
or (self.has_args is True and args)
Expand Down
2 changes: 1 addition & 1 deletion telegram/ext/filters.py
Expand Up @@ -289,7 +289,7 @@ def check_update(self, update: Update) -> Optional[Union[bool, FilterDataDict]]:
:attr:`telegram.Update.edited_business_message`, or :obj:`False` otherwise.
"""
return bool( # Only message updates should be handled.
update.channel_post # pylint: disable=too-many-boolean-expressions
update.channel_post
or update.message
or update.edited_channel_post
or update.edited_message
Expand Down