Skip to content

Commit

Permalink
Confirm we can read a response that completed before RST_STREAM (#6293)…
Browse files Browse the repository at this point in the history
… (#6914)

The server can reject the request without breaking the response.

(cherry picked from commit 480c20e)

Co-authored-by: Jesse Wilson <jesse@swank.ca>
  • Loading branch information
oldergod and swankjesse committed Nov 18, 2021
1 parent b1a39f4 commit 97a8f6c
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 15 deletions.
28 changes: 13 additions & 15 deletions okhttp/src/main/kotlin/okhttp3/internal/http2/Http2Stream.kt
Expand Up @@ -15,11 +15,6 @@
*/
package okhttp3.internal.http2

import java.io.EOFException
import java.io.IOException
import java.io.InterruptedIOException
import java.net.SocketTimeoutException
import java.util.ArrayDeque
import okhttp3.Headers
import okhttp3.internal.EMPTY_HEADERS
import okhttp3.internal.assertThreadDoesntHoldLock
Expand All @@ -32,6 +27,11 @@ import okio.BufferedSource
import okio.Sink
import okio.Source
import okio.Timeout
import java.io.EOFException
import java.io.IOException
import java.io.InterruptedIOException
import java.net.SocketTimeoutException
import java.util.ArrayDeque

/** A logical bidirectional stream. */
@Suppress("NAME_SHADOWING")
Expand Down Expand Up @@ -61,7 +61,7 @@ class Http2Stream internal constructor(
var writeBytesMaximum: Long = connection.peerSettings.initialWindowSize.toLong()
internal set

/** Received headers yet to be [taken][takeHeaders], or [read][FramingSource.read]. */
/** Received headers yet to be [taken][takeHeaders]. */
private val headersQueue = ArrayDeque<Headers>()

/** True if response headers have been sent or received. */
Expand Down Expand Up @@ -154,13 +154,13 @@ class Http2Stream internal constructor(
*/
@Synchronized @Throws(IOException::class)
fun trailers(): Headers {
if (source.finished && source.receiveBuffer.exhausted() && source.readBuffer.exhausted()) {
return source.trailers ?: EMPTY_HEADERS
}
if (errorCode != null) {
throw errorException ?: StreamResetException(errorCode!!)
}
check(source.finished && source.receiveBuffer.exhausted() && source.readBuffer.exhausted()) {
"too early; can't read the trailers yet"
}
return source.trailers ?: EMPTY_HEADERS
throw IllegalStateException("too early; can't read the trailers yet")
}

/**
Expand Down Expand Up @@ -276,10 +276,7 @@ class Http2Stream internal constructor(
this.source.receive(source, length.toLong())
}

/**
* Accept headers from the network and store them until the client calls [takeHeaders], or
* [FramingSource.read] them.
*/
/** Accept headers from the network and store them until the client calls [takeHeaders]. */
fun receiveHeaders(headers: Headers, inFinished: Boolean) {
this@Http2Stream.assertThreadDoesntHoldLock()

Expand Down Expand Up @@ -560,7 +557,7 @@ class Http2Stream internal constructor(
checkOutNotClosed() // Kick out if the stream was reset or closed while waiting.
toWrite = minOf(writeBytesMaximum - writeBytesTotal, sendBuffer.size)
writeBytesTotal += toWrite
outFinished = outFinishedOnLastFrame && toWrite == sendBuffer.size && errorCode == null
outFinished = outFinishedOnLastFrame && toWrite == sendBuffer.size
}

writeTimeout.enter()
Expand All @@ -578,6 +575,7 @@ class Http2Stream internal constructor(
synchronized(this@Http2Stream) {
checkOutNotClosed()
}
// TODO(jwilson): flush the connection?!
while (sendBuffer.size > 0L) {
emitFrame(false)
connection.flush()
Expand Down
Expand Up @@ -610,6 +610,105 @@ public final class Http2ConnectionTest {
assertThat(synStream.headerBlock).isEqualTo(headerEntries("a", "artichaut"));
}

/** A server RST_STREAM shouldn't prevent the client from consuming the response body. */
@Test public void serverResponseBodyRstStream() throws Exception {
// write the mocking script
peer.sendFrame().settings(new Settings());
peer.acceptFrame(); // ACK
peer.acceptFrame(); // SYN_STREAM
peer.acceptFrame(); // PING
peer.sendFrame().headers(false, 3, headerEntries("a", "android"));
peer.sendFrame().data(true, 3, new Buffer().writeUtf8("robot"), 5);
peer.sendFrame().rstStream(3, ErrorCode.NO_ERROR);
peer.sendFrame().ping(true, AWAIT_PING, 0); // PONG
peer.play();

// play it back
Http2Connection connection = connect(peer);
Http2Stream stream = connection.newStream(headerEntries(), false);
connection.writePingAndAwaitPong();
assertThat(stream.takeHeaders()).isEqualTo(Headers.of("a", "android"));
BufferedSource source = Okio.buffer(stream.getSource());
assertThat(source.readUtf8(5)).isEqualTo("robot");
stream.getSink().close();
assertThat(connection.openStreamCount()).isEqualTo(0);

// verify the peer received what was expected
InFrame synStream = peer.takeFrame();
assertThat(synStream.type).isEqualTo(Http2.TYPE_HEADERS);
InFrame ping = peer.takeFrame();
assertThat(ping.type).isEqualTo(Http2.TYPE_PING);
}

/** A server RST_STREAM shouldn't prevent the client from consuming trailers. */
@Test public void serverTrailersRstStream() throws Exception {
// write the mocking script
peer.sendFrame().settings(new Settings());
peer.acceptFrame(); // ACK
peer.acceptFrame(); // SYN_STREAM
peer.acceptFrame(); // PING
peer.sendFrame().headers(false, 3, headerEntries("a", "android"));
peer.sendFrame().headers(true, 3, headerEntries("z", "zebra"));
peer.sendFrame().rstStream(3, ErrorCode.NO_ERROR);
peer.sendFrame().ping(true, AWAIT_PING, 0); // PONG
peer.play();

// play it back
Http2Connection connection = connect(peer);
Http2Stream stream = connection.newStream(headerEntries(), true);
connection.writePingAndAwaitPong();
assertThat(stream.takeHeaders()).isEqualTo(Headers.of("a", "android"));
stream.getSink().close();
assertThat(stream.trailers()).isEqualTo(Headers.of("z", "zebra"));
assertThat(connection.openStreamCount()).isEqualTo(0);

// verify the peer received what was expected
InFrame synStream = peer.takeFrame();
assertThat(synStream.type).isEqualTo(Http2.TYPE_HEADERS);
InFrame ping = peer.takeFrame();
assertThat(ping.type).isEqualTo(Http2.TYPE_PING);
}

/**
* A server RST_STREAM shouldn't prevent the client from consuming the response body, even if it
* follows a truncated request body.
*/
@Test public void clientRequestBodyServerResponseBodyRstStream() throws Exception {
// write the mocking script
peer.sendFrame().settings(new Settings());
peer.acceptFrame(); // ACK
peer.acceptFrame(); // SYN_STREAM
peer.acceptFrame(); // PING
peer.sendFrame().headers(false, 3, headerEntries("a", "android"));
peer.sendFrame().data(true, 3, new Buffer().writeUtf8("robot"), 5);
peer.sendFrame().rstStream(3, ErrorCode.NO_ERROR);
peer.sendFrame().ping(true, AWAIT_PING, 0); // PONG
peer.play();

// play it back
Http2Connection connection = connect(peer);
Http2Stream stream = connection.newStream(headerEntries(), true);
connection.writePingAndAwaitPong();
BufferedSink sink = Okio.buffer(stream.getSink());
sink.writeUtf8("abc");
try {
sink.close();
fail();
} catch (StreamResetException expected) {
assertThat(expected.errorCode).isEqualTo(ErrorCode.NO_ERROR);
}
assertThat(stream.takeHeaders()).isEqualTo(Headers.of("a", "android"));
BufferedSource source = Okio.buffer(stream.getSource());
assertThat(source.readUtf8(5)).isEqualTo("robot");
assertThat(connection.openStreamCount()).isEqualTo(0);

// verify the peer received what was expected
InFrame synStream = peer.takeFrame();
assertThat(synStream.type).isEqualTo(Http2.TYPE_HEADERS);
InFrame ping = peer.takeFrame();
assertThat(ping.type).isEqualTo(Http2.TYPE_PING);
}

@Test public void serverWritesTrailersWithData() throws Exception {
// We buffer some outbound data and headers and confirm that the END_STREAM flag comes with the
// headers (and not with the data).
Expand Down

0 comments on commit 97a8f6c

Please sign in to comment.