Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server-side streaming must call hasNext() before cancel #10490

Closed
misumisery opened this issue Aug 16, 2023 · 1 comment · Fixed by #10496
Closed

server-side streaming must call hasNext() before cancel #10490

misumisery opened this issue Aug 16, 2023 · 1 comment · Fixed by #10496
Assignees
Labels
Milestone

Comments

@misumisery
Copy link

hello, when i use server-side streaming, i have a problem.

i get iterators from server list, put them into priority queue. i only get top n from queue. after called many times, i get OutOfMemoryError.

exception:

Caused by: java.lang.OutOfMemoryError: Direct buffer memory
        at java.base/java.nio.Bits.reserveMemory(Bits.java:175)
        at java.base/java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:118)
        at java.base/java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317)
        at io.grpc.netty.shaded.io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:649)
        at io.grpc.netty.shaded.io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:624)
        at io.grpc.netty.shaded.io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:203)
        at io.grpc.netty.shaded.io.netty.buffer.PoolArena.tcacheAllocateNormal(PoolArena.java:187)
        at io.grpc.netty.shaded.io.netty.buffer.PoolArena.allocate(PoolArena.java:136)
        at io.grpc.netty.shaded.io.netty.buffer.PoolArena.allocate(PoolArena.java:126)
        at io.grpc.netty.shaded.io.netty.buffer.PooledByteBufAllocator.newDirectBuffer(PooledByteBufAllocator.java:396)
        at io.grpc.netty.shaded.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:188)
        at io.grpc.netty.shaded.io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:179)
        at io.grpc.netty.shaded.io.netty.buffer.AbstractByteBufAllocator.buffer(AbstractByteBufAllocator.java:116)
        at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.expandCumulation(ByteToMessageDecoder.java:541)
        at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder$1.cumulate(ByteToMessageDecoder.java:97)
        at io.grpc.netty.shaded.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:277)
        at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
        at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
        at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
        at io.grpc.netty.shaded.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
        at io.grpc.netty.shaded.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
        at io.grpc.netty.shaded.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
        at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722)
        at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658)
        at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584)
        at io.grpc.netty.shaded.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496)
        at io.grpc.netty.shaded.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995)
        at io.grpc.netty.shaded.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
        at io.grpc.netty.shaded.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        ... 1 more

my client code like this:

try (Context.CancellableContext c = Context.current().withCancellation()) {
  Context toRestore = c.attach();
  try {
    PriorityBlockingQueue<PriorityQueueObject> queue = new PriorityBlockingQueue<>();
    for (int i = 0; i < num; i++) {
      Iterator<ListObjectResult> iterator = ObjectGrpc.newBlockingStub(channelList[i]).listObjects(listObjectRequest);
      if (iterator.hasNext()) {
        ListObjectResult listObjectResult = iterator.next();
        queue.offer(new PriorityQueueObject(listObjectResult, iterator));
      }
    }
    while (!queue.isEmpty() && keyMap.size() < maxKeys) {
      PriorityQueueObject priorityQueueObject = queue.poll();
      ListObjectResult temp = priorityQueueObject.getListObjectResult();
      keyMap.put(temp.getKey(), temp);
      Iterator<ListObjectResult> iterator = priorityQueueObject.getIterator();
      if (iterator.hasNext()) {
        ListObjectResult listObjectResult = iterator.next();
        queue.offer(new PriorityQueueObject(listObjectResult, iterator));
      }
    }
  }
  finally {
    c.detach(toRestore);
  }
}

but when i add this, this exception do not appear.

try (Context.CancellableContext c = Context.current().withCancellation()) {
  Context toRestore = c.attach();
  try {
    PriorityBlockingQueue<PriorityQueueObject> queue = new PriorityBlockingQueue<>();
    for (int i = 0; i < num; i++) {
      Iterator<ListObjectResult> iterator = ObjectGrpc.newBlockingStub(channelList[i]).listObjects(listObjectRequest);
      if (iterator.hasNext()) {
        ListObjectResult listObjectResult = iterator.next();
        queue.offer(new PriorityQueueObject(listObjectResult, iterator));
      }
    }
    while (!queue.isEmpty() && keyMap.size() < maxKeys) {
      PriorityQueueObject priorityQueueObject = queue.poll();
      ListObjectResult temp = priorityQueueObject.getListObjectResult();
      keyMap.put(temp.getKey(), temp);
      Iterator<ListObjectResult> iterator = priorityQueueObject.getIterator();
      if (iterator.hasNext()) {
        ListObjectResult listObjectResult = iterator.next();
        queue.offer(new PriorityQueueObject(listObjectResult, iterator));
      }
    }
    // add this do not cause OutOfMemoryError
    while (!queue.isEmpty()) {
      PriorityQueueObject priorityQueueObject = queue.poll();
      priorityQueueObject.getIterator().hasNext();
    }
  }
  finally {
    c.detach(toRestore);
  }
}

i don't know why, can someone help me explain?

@ejona86
Copy link
Member

ejona86 commented Aug 16, 2023

This is a problem caused by an optimization called ThreadlessExecutor. I think you just happened to stumble on a workaround. You are doing a good thing to cancel the RPC using the Context, but a message is getting stuck in an executor and leaking. hasNext() processes that message enough so the GC can clean it up, and another message won't get stuck until you call next() again.

For the iterator, I'd suggest to always drain it until it is empty. You could drain it after the CancellableContext is closed, but it will end up throwing an exception which is ugly.

This looks like enough of a problem that we should probably remove the ThreadlessExecutor optimization for it, though. @larry-safran, would you mind removing the custom executor for the iterator API?

The iterator-based API is broken/painful in a lot of ways. #10318 is a new API to replace it.

@ejona86 ejona86 added bug and removed question labels Aug 16, 2023
@ejona86 ejona86 added this to the Next milestone Aug 16, 2023
larry-safran added a commit to larry-safran/grpc-java that referenced this issue Aug 16, 2023
larry-safran added a commit that referenced this issue Aug 18, 2023
* Remove ThreadlessExecutor from BlockingServerStream

fixes #10490
@ejona86 ejona86 modified the milestones: Next, 1.58 Aug 18, 2023
larry-safran added a commit to larry-safran/grpc-java that referenced this issue Aug 29, 2023
* Remove ThreadlessExecutor from BlockingServerStream

fixes grpc#10490
@github-actions github-actions bot locked as resolved and limited conversation to collaborators Nov 17, 2023
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants