-
Notifications
You must be signed in to change notification settings - Fork 10.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Address leak when using request stream interceptors (#25449) #27571
Address leak when using request stream interceptors (#25449) #27571
Conversation
|
c848b61
to
80f6aec
Compare
80f6aec
to
1147f3c
Compare
I see some builds are failing, looking at other PRs though (including the merged ones) it seems that this is "normal". Is there any thing for me to do? |
3b103d4
to
f46f3d9
Compare
I have rebased on master and resolved the merge conflict that was preventing merge. |
f46f3d9
to
d922ae9
Compare
The results of these pending tasks are not needed, leaving them on the queue grows the size of the queue until the call completes. This fix slows the growth of the memory in the test example.
Cancelling unneeded Tasks is not sufficient as this leaves behind cancelled Futures in the cygrpc layer, which still occupy memory. Instead, avoid creating unneeded tasks in the first place.
d922ae9
to
c8801bb
Compare
@lidizheng Fixed typo causing build to fail. |
@lidizheng I do not think the current failing tests have anything to do with my changes, can you please confirm. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the late reply. This is a hard problem to investigate and solve. Thanks so much for fixing it. I think the fix and comment looks good. Just a minor comment, and please check our Sanity Tests' output, as following:
+ yapf_virtual_environment/bin/python -m yapf --diff --parallel --recursive --style=setup.cfg examples src test tools setup.py
--- src/python/grpcio/grpc/aio/_interceptor.py (original)
+++ src/python/grpcio/grpc/aio/_interceptor.py (reformatted)
@@ -505,8 +505,8 @@
break
yield value
- async def _write_to_iterator_queue_interruptible(
- self, request: RequestType, call: InterceptedCall):
+ async def _write_to_iterator_queue_interruptible(self, request: RequestType,
+ call: InterceptedCall):
# Write the specified 'request' to the request iterator queue using the
# specified 'call' to allow for interruption of the write in the case
# of abrupt termination of the call.
@@ -516,8 +516,7 @@
_, _ = await asyncio.wait(
(self._loop.create_task(self._write_to_iterator_queue.put(request)),
self._status_code_task),
- return_when=asyncio.FIRST_COMPLETED
- )
+ return_when=asyncio.FIRST_COMPLETED)
async def write(self, request: RequestType) -> None:
# If no queue was created it means that requests
1. Ignore unused return values 2. Fix formatting
Addressed the formatting fix and your nit. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update!
@lidizheng Thanks for accepting. What version to look out for this fix in? |
We will have a branch cut tomorrow for |
lgtm |
This addresses the leak demonstrated in #25449
The root cause seems to be the creation of lots of unresolved asyncio
Tasks
because of how the call status code is used to write to the interceptor request stream in an interruptible manner.The first attempt that (sort of) worked is just to cancel whatever pending calls that are not required: ae3f88e
However, while this slows the rate of leakage, it does not quite solve the problem completely.
It turns out that making many calls to the status code itself is a problem, as this creates many
Future
s in the cython layer which persist even after theTask
s that create them are cancelled (see:grpc/src/python/grpcio/grpc/_cython/_cygrpc/aio/call.pyx.pxi
Lines 235 to 252 in fd3bd70
While
Future
objects seem to be cheaper than theTask
objects, creating a lot of them still adds up over time.For the purposes of the interceptor request stream, it seems that it is sufficient to just have a single status code task that can be "listened" on for the lifetime of the interceptor request stream. This way we avoid polluting the callback queue of the corresponding cython call object: c8801bb
@lidizheng