Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

POC: outbound protobuf zero-copy take 2 #7369

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
36 changes: 36 additions & 0 deletions api/src/main/java/io/grpc/BufferDrainable.java
@@ -0,0 +1,36 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc;

import java.io.IOException;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;

/**
* Extension to an {@link java.io.InputStream} or alike by adding a method that
* allows transferring directly to an underlying {@link ByteBuffer}
*/
public interface BufferDrainable {

/**
* Transfers all content to a target {@link ByteBuffer}
*
* @param dest the destination buffer to receive the bytes.
* @throws BufferOverflowException if destination buffer has insufficient remaining bytes
*/
int drainTo(ByteBuffer dest) throws IOException;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of calling the method "drain" would it be more apt to call it "read"? That would make the logic very similar to read(byte[] b, int off, int len).

I'm not wild about the BufferOverflowException and transferring of all bytes. I see little reason to limit the API to reading all the bytes and it means data in invalided if an exception is thrown. While having to call read() multiple times is annoying, it is that way for a reason.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't disagree and this is actually what I started with for exactly the reasons you described, but I was thinking that the target impl doesn't actually work that way so this has the side effect that it will only be used when beneficial. And at least it's kind of similar to both ReadableBuffer#readBytes(ByteBuffer) which requires the target buffer to be large enough to hold the entire source, and the Drainable interface which has to drain the entire InputStream.

Having ProtoInputStream fall back to use ByteArrayInputStream in the case that the provided ByteBuffer isn't big enough would be worse than the current path, and this would not be obvious from the interface. WDYT?

It's also why I changed the name to drainTo, agree read more appropriate if just filled the ByteBuffer.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking that the target impl doesn't actually work that way so this has the side effect that it will only be used when beneficial

"Beneficial" is in the eye of the beholder. Yes, with the current code that is true. If this was in an API we could readily change, I'd agree. But since this is a public API that 1) will last a long time and 2) have multiple users I'd much rather favor the more generic method definition. I'm optimizing the API for longevity while making sure it allows optimizing the implementation.

As things change, my preference order would be:

  1. Any old code path is fast
  2. Any old code path works
  3. Any old code path fails (and this one is not actually acceptable)

Since drainTo(ByteBuffer) would require KnownLength, I'm not as fond of it. It could be useful to drain a compressed message to a ByteBuffer or some such.

Interfaces generally don't define guaranteed performance levels. List.get() may be fast or slow. We will consider performance when making changes, but at times we may need to hurt performance of one particular method for some other goal. So I'm okay with the varying level of performance. If you are benchmarking then you don't care about performance, and such a regression would show up on a benchmark.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this was in an API we could readily change, I'd agree.

How about just moving it to internal package then (alongside WritableBuffer)?

Probably this is clear but I meant that performance-wise the only implementation of this would actually involve more copying than less in the case that the stream didn't fit in the buffer i.e. worse than it is now and having the opposite than intended effect of this PR.

}
7 changes: 4 additions & 3 deletions core/src/main/java/io/grpc/internal/MessageFramer.java
Expand Up @@ -227,7 +227,8 @@ private int writeKnownLengthUncompressed(InputStream message, int messageLength)
buffer = bufferAllocator.allocate(header.position() + messageLength);
}
writeRaw(headerScratch, 0, header.position());
return writeToOutputStream(message, outputStreamAdapter);
return buffer.writableBytes() >= messageLength ? buffer.write(message)
: writeToOutputStream(message, outputStreamAdapter);
}

/**
Expand Down Expand Up @@ -341,7 +342,7 @@ public void dispose() {

private void releaseBuffer() {
if (buffer != null) {
buffer.release();
buffer.close();
buffer = null;
}
}
Expand Down Expand Up @@ -403,7 +404,7 @@ public void write(int b) throws IOException {

@Override
public void write(byte[] b, int off, int len) {
if (current == null) {
if (current == null && len > 0) {
// Request len bytes initially from the allocator, it may give us more.
current = bufferAllocator.allocate(len);
bufferList.add(current);
Expand Down
14 changes: 12 additions & 2 deletions core/src/main/java/io/grpc/internal/WritableBuffer.java
Expand Up @@ -16,6 +16,9 @@

package io.grpc.internal;

import java.io.IOException;
import java.io.InputStream;

/**
* An interface for a byte buffer that can only be written to.
* {@link WritableBuffer}s are a generic way to transfer bytes to
Expand All @@ -38,7 +41,14 @@ public interface WritableBuffer {
/**
* Appends a single byte to the buffer. This is slow so don't call it.
*/
void write(byte b);
void write(int b);

/**
* Write entire {@link InputStream} to the buffer.
*
* @return number of bytes written
*/
int write(InputStream stream) throws IOException;

/**
* Returns the number of bytes one can write to the buffer.
Expand All @@ -54,5 +64,5 @@ public interface WritableBuffer {
* Releases the buffer, indicating to the {@link WritableBufferAllocator} that
* this buffer is no longer used and its resources can be reused.
*/
void release();
void close();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changes in this interface and MessageFramer aren't directly related to the goal of this PR. Also, I'd vote for close() as the mirror of ReadableBuffer.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@voidzcy could you clarify which changes specifically you're talking about in each? Obviously write(InputStream) is part of it. The other two methods in this interface are to make it compatible with OutputStream so that impls can impl both if useful, which NettyWritableBuffer now does, used here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean changes like s/release/close/g, casting int to byte as well as adding the len > 0 check to the existing write-to-array method in MessageFramer, are sort of excessive regarding to the goal of this PR. They are irrelevant to things intended to be added. Also, close() would be preferable compared to release() as it's more abstract and ReadableBuffer also uses close().

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

close() would be preferable compared to release() as it's more abstract and ReadableBuffer also uses close()

You seem to be arguing for both sides of this in the same paragraph? :)

changes like s/release/close/g, casting int to byte

I just explained these in the preceding comment, they are in fact linked to this PR.

adding the len > 0 check

Sure I can remove this if you prefer

}
20 changes: 17 additions & 3 deletions core/src/test/java/io/grpc/internal/MessageFramerTest.java
Expand Up @@ -26,11 +26,15 @@
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;

import com.google.common.io.ByteStreams;
import io.grpc.Codec;
import io.grpc.StreamTracer;
import io.grpc.internal.testing.TestStreamTracer.TestBaseStreamTracer;
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;
Expand Down Expand Up @@ -408,8 +412,18 @@ public void write(byte[] bytes, int srcIndex, int length) {
}

@Override
public void write(byte b) {
data[writeIdx++] = b;
public void write(int b) {
data[writeIdx++] = (byte) b;
}

@Override
public int write(InputStream stream) throws IOException {
int written = ByteStreams.read(stream, data, writeIdx, writableBytes());
writeIdx += written;
if (writeIdx >= data.length && stream.available() != 0) {
throw new BufferOverflowException();
}
return written;
}

@Override
Expand All @@ -423,7 +437,7 @@ public int readableBytes() {
}

@Override
public void release() {
public void close() {
data = null;
}

Expand Down
Expand Up @@ -37,7 +37,7 @@ public void testBuffersAreDifferent() {

assertNotSame(buffer1, buffer2);

buffer1.release();
buffer2.release();
buffer1.close();
buffer2.close();
}
}
39 changes: 36 additions & 3 deletions netty/src/main/java/io/grpc/netty/NettyWritableBuffer.java
Expand Up @@ -16,13 +16,20 @@

package io.grpc.netty;

import io.grpc.BufferDrainable;
import io.grpc.Drainable;
import io.grpc.internal.WritableBuffer;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;

/**
* The {@link WritableBuffer} used by the Netty transport.
*/
class NettyWritableBuffer implements WritableBuffer {
class NettyWritableBuffer extends OutputStream implements WritableBuffer {

private final ByteBuf bytebuf;

Expand All @@ -36,10 +43,36 @@ public void write(byte[] src, int srcIndex, int length) {
}

@Override
public void write(byte b) {
public void write(int b) {
bytebuf.writeByte(b);
}

@Override
public int write(InputStream stream) throws IOException {
if (!bytebuf.hasArray()) {
if (stream instanceof BufferDrainable && bytebuf.nioBufferCount() == 1) {
ByteBuffer bb = bytebuf.internalNioBuffer(bytebuf.writerIndex(), bytebuf.writableBytes());
int positionBefore = bb.position();
((BufferDrainable) stream).drainTo(bb);
int written = bb.position() - positionBefore;
bytebuf.writerIndex(bytebuf.writerIndex() + written);
return written;
// Could potentially also include composite here if stream is known length
// and less than first ByteBuf size. But shouldn't encounter those since
// output buffers come straight from the allocator
}
if (stream instanceof Drainable) {
return ((Drainable) stream).drainTo(this);
}
}
int writable = bytebuf.writableBytes();
int written = bytebuf.writeBytes(stream, writable);
if (written == writable && stream.available() != 0) {
throw new BufferOverflowException();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need to check writable capacity at this level? Since the WritableBuffer is a wrapper backed by some real buffer and all the operations are delegated to the underlaying buffer, we should just let the underlaying buffer handle the case and such an exception should be thrown from the underneath.

}
return written;
}

@Override
public int writableBytes() {
return bytebuf.writableBytes();
Expand All @@ -51,7 +84,7 @@ public int readableBytes() {
}

@Override
public void release() {
public void close() {
bytebuf.release();
}

Expand Down
Expand Up @@ -40,7 +40,7 @@ public void setup() {

@After
public void teardown() {
buffer.release();
buffer.close();
}

@Override
Expand Down
16 changes: 14 additions & 2 deletions okhttp/src/main/java/io/grpc/okhttp/OkHttpWritableBuffer.java
Expand Up @@ -17,7 +17,10 @@
package io.grpc.okhttp;

import io.grpc.internal.WritableBuffer;
import java.io.IOException;
import java.io.InputStream;
import okio.Buffer;
import okio.Okio;

class OkHttpWritableBuffer implements WritableBuffer {

Expand All @@ -38,12 +41,21 @@ public void write(byte[] src, int srcIndex, int length) {
}

@Override
public void write(byte b) {
public void write(int b) {
buffer.writeByte(b);
writableBytes -= 1;
readableBytes += 1;
}

@Override
public int write(InputStream stream) throws IOException {
//TODO ensure this doesn't write more than writableBytes
int written = (int) buffer.writeAll(Okio.source(stream));
writableBytes -= written;
readableBytes += written;
return written;
}

@Override
public int writableBytes() {
return writableBytes;
Expand All @@ -55,7 +67,7 @@ public int readableBytes() {
}

@Override
public void release() {
public void close() {
}

Buffer buffer() {
Expand Down
Expand Up @@ -19,18 +19,23 @@
import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.MessageLite;
import com.google.protobuf.Parser;

import io.grpc.BufferDrainable;
import io.grpc.Drainable;
import io.grpc.KnownLength;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import javax.annotation.Nullable;

/**
* An {@link InputStream} backed by a protobuf.
*/
final class ProtoInputStream extends InputStream implements Drainable, KnownLength {
final class ProtoInputStream extends InputStream
implements Drainable, KnownLength, BufferDrainable {

// ProtoInputStream is first initialized with a *message*. *partial* is initially null.
// Once there has been a read operation on this stream, *message* is serialized to *partial* and
Expand Down Expand Up @@ -60,6 +65,32 @@ public int drainTo(OutputStream target) throws IOException {
return written;
}

@Override
public int drainTo(final ByteBuffer buffer) throws IOException {
if (message != null) {
int positionBefore = buffer.position();
try {
CodedOutputStream coded = CodedOutputStream.newInstance(buffer);
message.writeTo(coded);
coded.flush();
message = null;
return buffer.position() - positionBefore;
} catch (CodedOutputStream.OutOfSpaceException oos) {
throw new BufferOverflowException();
} finally {
if (message != null) {
buffer.position(positionBefore);
}
}
}
if (partial != null) {
int written = (int) ProtoLiteUtils.copy(partial, buffer);
partial = null;
return written;
}
return 0;
}

@Override
public int read() {
if (message != null) {
Expand All @@ -82,7 +113,6 @@ public int read(byte[] b, int off, int len) throws IOException {
return -1;
}
if (len >= size) {
// This is the only case that is zero-copy.
CodedOutputStream stream = CodedOutputStream.newInstance(b, off, size);
message.writeTo(stream);
stream.flush();
Expand Down