Skip to content

Commit

Permalink
POC: outbound protobuf zero-copy take 2
Browse files Browse the repository at this point in the history
Variation on #7350. Slightly bigger but I think the interface changes are cleaner, avoids "borrowing" a ByteBuffer and separately informing of bytes written. Not sure which approach is preferable.
  • Loading branch information
njhill committed Aug 28, 2020
1 parent e6ab167 commit 5eb0dc6
Show file tree
Hide file tree
Showing 10 changed files with 187 additions and 27 deletions.
36 changes: 36 additions & 0 deletions api/src/main/java/io/grpc/BufferDrainable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2020 The gRPC Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.grpc;

import java.io.IOException;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;

/**
* Extension to an {@link java.io.InputStream} or alike by adding a method that
* allows transferring directly to an underlying {@link ByteBuffer}
*/
public interface BufferDrainable {

/**
* Transfers all content to a target {@link ByteBuffer}
*
* @param dest the destination buffer to receive the bytes.
* @throws BufferOverflowException if destination buffer has insufficient remaining bytes
*/
int drainTo(ByteBuffer dest) throws IOException;
}
7 changes: 4 additions & 3 deletions core/src/main/java/io/grpc/internal/MessageFramer.java
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ private int writeKnownLengthUncompressed(InputStream message, int messageLength)
buffer = bufferAllocator.allocate(header.position() + messageLength);
}
writeRaw(headerScratch, 0, header.position());
return writeToOutputStream(message, outputStreamAdapter);
return buffer.writableBytes() >= messageLength ? buffer.write(message)
: writeToOutputStream(message, outputStreamAdapter);
}

/**
Expand Down Expand Up @@ -341,7 +342,7 @@ public void dispose() {

private void releaseBuffer() {
if (buffer != null) {
buffer.release();
buffer.close();
buffer = null;
}
}
Expand Down Expand Up @@ -403,7 +404,7 @@ public void write(int b) throws IOException {

@Override
public void write(byte[] b, int off, int len) {
if (current == null) {
if (current == null && len > 0) {
// Request len bytes initially from the allocator, it may give us more.
current = bufferAllocator.allocate(len);
bufferList.add(current);
Expand Down
14 changes: 12 additions & 2 deletions core/src/main/java/io/grpc/internal/WritableBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package io.grpc.internal;

import java.io.IOException;
import java.io.InputStream;

/**
* An interface for a byte buffer that can only be written to.
* {@link WritableBuffer}s are a generic way to transfer bytes to
Expand All @@ -38,7 +41,14 @@ public interface WritableBuffer {
/**
* Appends a single byte to the buffer. This is slow so don't call it.
*/
void write(byte b);
void write(int b);

/**
* Write entire {@link InputStream} to the buffer.
*
* @return number of bytes written
*/
int write(InputStream stream) throws IOException;

/**
* Returns the number of bytes one can write to the buffer.
Expand All @@ -54,5 +64,5 @@ public interface WritableBuffer {
* Releases the buffer, indicating to the {@link WritableBufferAllocator} that
* this buffer is no longer used and its resources can be reused.
*/
void release();
void close();
}
20 changes: 17 additions & 3 deletions core/src/test/java/io/grpc/internal/MessageFramerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,15 @@
import static org.mockito.Mockito.verifyNoInteractions;
import static org.mockito.Mockito.verifyNoMoreInteractions;

import com.google.common.io.ByteStreams;
import io.grpc.Codec;
import io.grpc.StreamTracer;
import io.grpc.internal.testing.TestStreamTracer.TestBaseStreamTracer;
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays;
Expand Down Expand Up @@ -408,8 +412,18 @@ public void write(byte[] bytes, int srcIndex, int length) {
}

@Override
public void write(byte b) {
data[writeIdx++] = b;
public void write(int b) {
data[writeIdx++] = (byte) b;
}

@Override
public int write(InputStream stream) throws IOException {
int written = ByteStreams.read(stream, data, writeIdx, writableBytes());
writeIdx += written;
if (writeIdx >= data.length && stream.available() != 0) {
throw new BufferOverflowException();
}
return written;
}

@Override
Expand All @@ -423,7 +437,7 @@ public int readableBytes() {
}

@Override
public void release() {
public void close() {
data = null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public void testBuffersAreDifferent() {

assertNotSame(buffer1, buffer2);

buffer1.release();
buffer2.release();
buffer1.close();
buffer2.close();
}
}
39 changes: 36 additions & 3 deletions netty/src/main/java/io/grpc/netty/NettyWritableBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,20 @@

package io.grpc.netty;

import io.grpc.BufferDrainable;
import io.grpc.Drainable;
import io.grpc.internal.WritableBuffer;
import io.netty.buffer.ByteBuf;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;

/**
* The {@link WritableBuffer} used by the Netty transport.
*/
class NettyWritableBuffer implements WritableBuffer {
class NettyWritableBuffer extends OutputStream implements WritableBuffer {

private final ByteBuf bytebuf;

Expand All @@ -36,10 +43,36 @@ public void write(byte[] src, int srcIndex, int length) {
}

@Override
public void write(byte b) {
public void write(int b) {
bytebuf.writeByte(b);
}

@Override
public int write(InputStream stream) throws IOException {
if (!bytebuf.hasArray()) {
if (stream instanceof BufferDrainable && bytebuf.nioBufferCount() == 1) {
ByteBuffer bb = bytebuf.internalNioBuffer(bytebuf.writerIndex(), bytebuf.writableBytes());
int positionBefore = bb.position();
((BufferDrainable) stream).drainTo(bb);
int written = bb.position() - positionBefore;
bytebuf.writerIndex(bytebuf.writerIndex() + written);
return written;
// Could potentially also include composite here if stream is known length
// and less than first ByteBuf size. But shouldn't encounter those since
// output buffers come straight from the allocator
}
if (stream instanceof Drainable) {
return ((Drainable) stream).drainTo(this);
}
}
int writable = bytebuf.writableBytes();
int written = bytebuf.writeBytes(stream, writable);
if (written == writable && stream.available() != 0) {
throw new BufferOverflowException();
}
return written;
}

@Override
public int writableBytes() {
return bytebuf.writableBytes();
Expand All @@ -51,7 +84,7 @@ public int readableBytes() {
}

@Override
public void release() {
public void close() {
bytebuf.release();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public void setup() {

@After
public void teardown() {
buffer.release();
buffer.close();
}

@Override
Expand Down
16 changes: 14 additions & 2 deletions okhttp/src/main/java/io/grpc/okhttp/OkHttpWritableBuffer.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@
package io.grpc.okhttp;

import io.grpc.internal.WritableBuffer;
import java.io.IOException;
import java.io.InputStream;
import okio.Buffer;
import okio.Okio;

class OkHttpWritableBuffer implements WritableBuffer {

Expand All @@ -38,12 +41,21 @@ public void write(byte[] src, int srcIndex, int length) {
}

@Override
public void write(byte b) {
public void write(int b) {
buffer.writeByte(b);
writableBytes -= 1;
readableBytes += 1;
}

@Override
public int write(InputStream stream) throws IOException {
//TODO ensure this doesn't write more than writableBytes
int written = (int) buffer.writeAll(Okio.source(stream));
writableBytes -= written;
readableBytes += written;
return written;
}

@Override
public int writableBytes() {
return writableBytes;
Expand All @@ -55,7 +67,7 @@ public int readableBytes() {
}

@Override
public void release() {
public void close() {
}

Buffer buffer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,22 @@
import com.google.protobuf.CodedOutputStream;
import com.google.protobuf.MessageLite;
import com.google.protobuf.Parser;

import io.grpc.BufferDrainable;
import io.grpc.Drainable;
import io.grpc.KnownLength;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.BufferOverflowException;
import java.nio.ByteBuffer;
import javax.annotation.Nullable;

/**
* An {@link InputStream} backed by a protobuf.
*/
final class ProtoInputStream extends InputStream implements Drainable, KnownLength {
final class ProtoInputStream extends InputStream implements Drainable, KnownLength, BufferDrainable {

// ProtoInputStream is first initialized with a *message*. *partial* is initially null.
// Once there has been a read operation on this stream, *message* is serialized to *partial* and
Expand Down Expand Up @@ -60,6 +64,32 @@ public int drainTo(OutputStream target) throws IOException {
return written;
}

@Override
public int drainTo(final ByteBuffer buffer) throws IOException {
if (message != null) {
int positionBefore = buffer.position();
try {
CodedOutputStream coded = CodedOutputStream.newInstance(buffer);
message.writeTo(coded);
coded.flush();
message = null;
return buffer.position() - positionBefore;
} catch (CodedOutputStream.OutOfSpaceException oos) {
throw new BufferOverflowException();
} finally {
if (message != null) {
buffer.position(positionBefore);
}
}
}
if (partial != null) {
int written = (int) ProtoLiteUtils.copy(partial, buffer);
partial = null;
return written;
}
return 0;
}

@Override
public int read() {
if (message != null) {
Expand All @@ -82,7 +112,6 @@ public int read(byte[] b, int off, int len) throws IOException {
return -1;
}
if (len >= size) {
// This is the only case that is zero-copy.
CodedOutputStream stream = CodedOutputStream.newInstance(b, off, size);
message.writeTo(stream);
stream.flush();
Expand Down

0 comments on commit 5eb0dc6

Please sign in to comment.