-
Notifications
You must be signed in to change notification settings - Fork 391
/
_asyncio_compat.py
94 lines (72 loc) · 2.48 KB
/
_asyncio_compat.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# Backports from Python/Lib/asyncio for older Pythons
#
# Copyright (c) 2001-2023 Python Software Foundation; All Rights Reserved
#
# SPDX-License-Identifier: PSF-2.0
from __future__ import annotations
import asyncio
import functools
import sys
import typing
if typing.TYPE_CHECKING:
from . import compat
if sys.version_info < (3, 11):
from async_timeout import timeout as timeout_ctx
else:
from asyncio import timeout as timeout_ctx
_T = typing.TypeVar('_T')
async def wait_for(fut: compat.Awaitable[_T], timeout: float | None) -> _T:
"""Wait for the single Future or coroutine to complete, with timeout.
Coroutine will be wrapped in Task.
Returns result of the Future or coroutine. When a timeout occurs,
it cancels the task and raises TimeoutError. To avoid the task
cancellation, wrap it in shield().
If the wait is cancelled, the task is also cancelled.
If the task supresses the cancellation and returns a value instead,
that value is returned.
This function is a coroutine.
"""
# The special case for timeout <= 0 is for the following case:
#
# async def test_waitfor():
# func_started = False
#
# async def func():
# nonlocal func_started
# func_started = True
#
# try:
# await asyncio.wait_for(func(), 0)
# except asyncio.TimeoutError:
# assert not func_started
# else:
# assert False
#
# asyncio.run(test_waitfor())
if timeout is not None and timeout <= 0:
fut = asyncio.ensure_future(fut)
if fut.done():
return fut.result()
await _cancel_and_wait(fut)
try:
return fut.result()
except asyncio.CancelledError as exc:
raise TimeoutError from exc
async with timeout_ctx(timeout):
return await fut
async def _cancel_and_wait(fut: asyncio.Future[_T]) -> None:
"""Cancel the *fut* future or task and wait until it completes."""
loop = asyncio.get_running_loop()
waiter = loop.create_future()
cb = functools.partial(_release_waiter, waiter)
fut.add_done_callback(cb)
try:
fut.cancel()
# We cannot wait on *fut* directly to make
# sure _cancel_and_wait itself is reliably cancellable.
await waiter
finally:
fut.remove_done_callback(cb)
def _release_waiter(waiter: asyncio.Future[typing.Any], *args: object) -> None:
if not waiter.done():
waiter.set_result(None)