-
-
Notifications
You must be signed in to change notification settings - Fork 6.1k
/
concurrency.py
39 lines (32 loc) · 951 Bytes
/
concurrency.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import sys
from typing import AsyncGenerator, ContextManager, TypeVar
__all__ = [
"iterate_in_threadpool",
"run_in_threadpool",
"run_until_first_complete",
"contextmanager_in_threadpool",
"asynccontextmanager",
"AsyncExitStack",
]
from starlette.concurrency import (
iterate_in_threadpool,
run_in_threadpool,
run_until_first_complete,
)
if sys.version_info >= (3, 7):
from contextlib import AsyncExitStack, asynccontextmanager
else:
from contextlib2 import AsyncExitStack, asynccontextmanager
_T = TypeVar("_T")
@asynccontextmanager
async def contextmanager_in_threadpool(
cm: ContextManager[_T],
) -> AsyncGenerator[_T, None]:
try:
yield await run_in_threadpool(cm.__enter__)
except Exception as e:
ok = await run_in_threadpool(cm.__exit__, type(e), e, None)
if not ok:
raise e
else:
await run_in_threadpool(cm.__exit__, None, None, None)