Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle SystemExit raised in Handlers #4157

Merged
merged 18 commits into from May 20, 2024
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 3 additions & 1 deletion docs/source/conf.py
Expand Up @@ -67,7 +67,9 @@
master_doc = "index"

# Global substitutions
rst_prolog = (Path.cwd() / "../substitutions/global.rst").read_text(encoding="utf-8")
rst_prolog = ""
for file in Path.cwd().glob("../substitutions/*.rst"):
rst_prolog += "\n" + file.read_text(encoding="utf-8")

# -- Extension settings ------------------------------------------------
napoleon_use_admonition_for_examples = True
Expand Down
1 change: 1 addition & 0 deletions docs/substitutions/application.rst
@@ -0,0 +1 @@
.. |app_run_shutdown| replace:: The app will shut down when :exc:`KeyboardInterrupt` or :exc:`SystemExit` is raised. This also works from within handlers, error handlers and jobs. However, using :meth:`~telegram.ext.Application.stop_running` will give a somewhat cleaner shutdown behavior than manually raising those exceptions. On unix, the app will also shut down on receiving the signals specified by
117 changes: 74 additions & 43 deletions telegram/ext/_application.py
Expand Up @@ -365,6 +365,7 @@ def __init__(
self.__update_persistence_event = asyncio.Event()
self.__update_persistence_lock = asyncio.Lock()
self.__create_task_tasks: Set[asyncio.Task] = set() # Used for awaiting tasks upon exit
self.__stop_running_marker = asyncio.Event()

async def __aenter__(self: _AppType) -> _AppType: # noqa: PYI019
"""|async_context_manager| :meth:`initializes <initialize>` the App.
Expand Down Expand Up @@ -516,6 +517,7 @@ async def initialize(self) -> None:
await self._add_ch_to_persistence(handler)

self._initialized = True
self.__stop_running_marker.clear()

async def _add_ch_to_persistence(self, handler: "ConversationHandler") -> None:
self._conversation_handler_conversations.update(
Expand Down Expand Up @@ -670,14 +672,26 @@ async def stop(self) -> None:
raise RuntimeError("This Application is not running!")

self._running = False
self.__stop_running_marker.clear()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this again here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in case the app is stopped & restartet during the lifetime of the python script, we need to reset the marker. otherwise the second startup will immediately shut down as well

_LOGGER.info("Application is stopping. This might take a moment.")

# Stop listening for new updates and handle all pending ones
await self.update_queue.put(_STOP_SIGNAL)
_LOGGER.debug("Waiting for update_queue to join")
await self.update_queue.join()
if self.__update_fetcher_task:
await self.__update_fetcher_task
if self.__update_fetcher_task.done():
try:
self.__update_fetcher_task.result()
except BaseException as exc:
_LOGGER.critical(
"Fetching updates was aborted due to %r. Suppressing "
"exception to ensure graceful shutdown.",
exc,
exc_info=True,
)
else:
await self.update_queue.put(_STOP_SIGNAL)
_LOGGER.debug("Waiting for update_queue to join")
await self.update_queue.join()
await self.__update_fetcher_task
_LOGGER.debug("Application stopped fetching of updates.")

if self._job_queue:
Expand All @@ -703,17 +717,36 @@ def stop_running(self) -> None:
shutdown of the application, i.e. the methods listed in :attr:`run_polling` and
:attr:`run_webhook` will still be executed.

This method can also be called within :meth:`post_init`. This allows for a graceful,
early shutdown of the application if some condition is met (e.g., a database connection
could not be established).

Note:
If the application is not running, this method does nothing.
If the application is not running and this method is not called within
:meth:`post_init`, this method does nothing.

Warning:
This method is designed to for use in combination with :meth:`run_polling` or
:meth:`run_webhook`. Using this method in combination with a custom logic for starting
and stopping the application is not guaranteed to work as expected. Use at your own
risk.

.. versionadded:: 20.5

.. versionchanged:: NEXT.VERSION
Added support for calling within :meth:`post_init`.
"""
if self.running:
# This works because `__run` is using `loop.run_forever()`. If that changes, this
# method needs to be adapted.
asyncio.get_running_loop().stop()
else:
_LOGGER.debug("Application is not running, stop_running() does nothing.")
self.__stop_running_marker.set()
if not self._initialized:
_LOGGER.debug(
"Application is not running and not initialized. `stop_running()` likely has "
"no effect."
)

def run_polling(
self,
Expand All @@ -733,9 +766,7 @@ def run_polling(
polling updates from Telegram using :meth:`telegram.ext.Updater.start_polling` and
a graceful shutdown of the app on exit.

The app will shut down when :exc:`KeyboardInterrupt` or :exc:`SystemExit` is raised.
On unix, the app will also shut down on receiving the signals specified by
:paramref:`stop_signals`.
|app_run_shutdown| :paramref:`stop_signals`.
Bibo-Joshi marked this conversation as resolved.
Show resolved Hide resolved

The order of execution by :meth:`run_polling` is roughly as follows:

Expand Down Expand Up @@ -874,9 +905,7 @@ def run_webhook(
listening for updates from Telegram using :meth:`telegram.ext.Updater.start_webhook` and
a graceful shutdown of the app on exit.

The app will shut down when :exc:`KeyboardInterrupt` or :exc:`SystemExit` is raised.
On unix, the app will also shut down on receiving the signals specified by
:paramref:`stop_signals`.
|app_run_shutdown| :paramref:`stop_signals`.

If :paramref:`cert`
and :paramref:`key` are not provided, the webhook will be started directly on
Expand Down Expand Up @@ -1038,6 +1067,9 @@ def __run(
loop.run_until_complete(self.initialize())
if self.post_init:
loop.run_until_complete(self.post_init(self))
if self.__stop_running_marker.is_set():
_LOGGER.info("Application received stop signal via `stop_running`. Shutting down.")
return
loop.run_until_complete(updater_coroutine) # one of updater.start_webhook/polling
loop.run_until_complete(self.start())
loop.run_forever()
Expand Down Expand Up @@ -1184,45 +1216,44 @@ async def __create_task_callback(
finally:
self._mark_for_persistence_update(update=update)

async def _update_fetcher(self) -> None:
async def __update_fetcher(self) -> None:
# Continuously fetch updates from the queue. Exit only once the signal object is found.
while True:
try:
update = await self.update_queue.get()
update = await self.update_queue.get()

if update is _STOP_SIGNAL:
_LOGGER.debug("Dropping pending updates")
while not self.update_queue.empty():
self.update_queue.task_done()

# For the _STOP_SIGNAL
self.update_queue.task_done()
return
if update is _STOP_SIGNAL:
# For the _STOP_SIGNAL
self.update_queue.task_done()
return

_LOGGER.debug("Processing update %s", update)
_LOGGER.debug("Processing update %s", update)

if self._update_processor.max_concurrent_updates > 1:
# We don't await the below because it has to be run concurrently
self.create_task(
self.__process_update_wrapper(update),
update=update,
name=f"Application:{self.bot.id}:process_concurrent_update",
)
else:
await self.__process_update_wrapper(update)

except asyncio.CancelledError:
# This may happen if the application is manually run via application.start() and
# then a KeyboardInterrupt is sent. We must prevent this loop to die since
# application.stop() will wait for it's clean shutdown.
_LOGGER.warning(
"Fetching updates got a asyncio.CancelledError. Ignoring as this task may only"
"be closed via `Application.stop`."
if self._update_processor.max_concurrent_updates > 1:
# We don't await the below because it has to be run concurrently
self.create_task(
self.__process_update_wrapper(update),
update=update,
name=f"Application:{self.bot.id}:process_concurrent_update",
)
else:
await self.__process_update_wrapper(update)

async def _update_fetcher(self) -> None:
try:
await self.__update_fetcher()
finally:
while not self.update_queue.empty():
_LOGGER.debug("Dropping pending update: %s", self.update_queue.get_nowait())
with contextlib.suppress(ValueError):
# Since we're shutting down here, it's not too bad if we call task_done
# on an empty queue
self.update_queue.task_done()

async def __process_update_wrapper(self, update: object) -> None:
await self._update_processor.process_update(update, self.process_update(update))
self.update_queue.task_done()
try:
await self._update_processor.process_update(update, self.process_update(update))
finally:
self.update_queue.task_done()

async def process_update(self, update: object) -> None:
"""Processes a single update and marks the update to be updated by the persistence later.
Expand Down