From bafa2afaacc813ff2bef8a9e5066adf72876d617 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20L=C3=BCck?= Date: Sat, 3 Sep 2022 11:44:55 +0200 Subject: [PATCH] Include buffer logic to avoid dependency on reactphp/promise-stream --- composer.json | 2 +- src/Io/Transaction.php | 68 +++++--- .../RequestBodyBufferMiddleware.php | 75 ++++++-- tests/Io/TransactionTest.php | 94 +++++++++- .../RequestBodyBufferMiddlewareTest.php | 164 ++++++++++++++++-- 5 files changed, 346 insertions(+), 57 deletions(-) diff --git a/composer.json b/composer.json index aeee592b..59736ddd 100644 --- a/composer.json +++ b/composer.json @@ -32,7 +32,6 @@ "psr/http-message": "^1.0", "react/event-loop": "^1.2", "react/promise": "^3 || ^2.3 || ^1.2.1", - "react/promise-stream": "^1.4", "react/socket": "^1.12", "react/stream": "^1.2", "ringcentral/psr7": "^1.2" @@ -43,6 +42,7 @@ "clue/socks-react": "^1.4", "phpunit/phpunit": "^9.5 || ^5.7 || ^4.8.35", "react/async": "^4 || ^3 || ^2", + "react/promise-stream": "^1.4", "react/promise-timer": "^1.9" }, "autoload": { diff --git a/src/Io/Transaction.php b/src/Io/Transaction.php index cbf8f3eb..bfa42241 100644 --- a/src/Io/Transaction.php +++ b/src/Io/Transaction.php @@ -9,6 +9,7 @@ use React\Http\Message\Response; use React\Http\Message\ResponseException; use React\Promise\Deferred; +use React\Promise\Promise; use React\Promise\PromiseInterface; use React\Stream\ReadableStreamInterface; use RingCentral\Psr7\Uri; @@ -165,46 +166,67 @@ function (ResponseInterface $response) use ($request, $that, $deferred, $state) */ public function bufferResponse(ResponseInterface $response, Deferred $deferred, ClientRequestState $state) { - $stream = $response->getBody(); + $body = $response->getBody(); + $size = $body->getSize(); - $size = $stream->getSize(); if ($size !== null && $size > $this->maximumSize) { - $stream->close(); + $body->close(); return \React\Promise\reject(new \OverflowException( 'Response body size of ' . $size . ' bytes exceeds maximum of ' . $this->maximumSize . ' bytes', - \defined('SOCKET_EMSGSIZE') ? \SOCKET_EMSGSIZE : 0 + \defined('SOCKET_EMSGSIZE') ? \SOCKET_EMSGSIZE : 90 )); } // body is not streaming => already buffered - if (!$stream instanceof ReadableStreamInterface) { + if (!$body instanceof ReadableStreamInterface) { return \React\Promise\resolve($response); } - // buffer stream and resolve with buffered body + /** @var ?\Closure $closer */ + $closer = null; $maximumSize = $this->maximumSize; - $promise = \React\Promise\Stream\buffer($stream, $maximumSize)->then( - function ($body) use ($response) { - return $response->withBody(new BufferedBody($body)); - }, - function ($e) use ($stream, $maximumSize) { - // try to close stream if buffering fails (or is cancelled) - $stream->close(); - if ($e instanceof \OverflowException) { - $e = new \OverflowException( + return $state->pending = new Promise(function ($resolve, $reject) use ($body, $maximumSize, $response, &$closer) { + // resolve with current buffer when stream closes successfully + $buffer = ''; + $body->on('close', $closer = function () use (&$buffer, $response, $maximumSize, $resolve, $reject) { + $resolve($response->withBody(new BufferedBody($buffer))); + }); + + // buffer response body data in memory + $body->on('data', function ($data) use (&$buffer, $maximumSize, $body, $closer, $reject) { + $buffer .= $data; + + // close stream and reject promise if limit is exceeded + if (isset($buffer[$maximumSize])) { + $buffer = ''; + assert($closer instanceof \Closure); + $body->removeListener('close', $closer); + $body->close(); + + $reject(new \OverflowException( 'Response body size exceeds maximum of ' . $maximumSize . ' bytes', - \defined('SOCKET_EMSGSIZE') ? \SOCKET_EMSGSIZE : 0 - ); + \defined('SOCKET_EMSGSIZE') ? \SOCKET_EMSGSIZE : 90 + )); } + }); - throw $e; - } - ); - - $state->pending = $promise; + // reject buffering if body emits error + $body->on('error', function (\Exception $e) use ($reject) { + $reject(new \RuntimeException( + 'Error while buffering response body: ' . $e->getMessage(), + $e->getCode(), + $e + )); + }); + }, function () use ($body, &$closer) { + // cancelled buffering: remove close handler to avoid resolving, then close and reject + assert($closer instanceof \Closure); + $body->removeListener('close', $closer); + $body->close(); - return $promise; + throw new \RuntimeException('Cancelled buffering response body'); + }); } /** diff --git a/src/Middleware/RequestBodyBufferMiddleware.php b/src/Middleware/RequestBodyBufferMiddleware.php index c13a5dec..ddb39f5e 100644 --- a/src/Middleware/RequestBodyBufferMiddleware.php +++ b/src/Middleware/RequestBodyBufferMiddleware.php @@ -6,7 +6,7 @@ use Psr\Http\Message\ServerRequestInterface; use React\Http\Io\BufferedBody; use React\Http\Io\IniUtil; -use React\Promise\Stream; +use React\Promise\Promise; use React\Stream\ReadableStreamInterface; final class RequestBodyBufferMiddleware @@ -29,19 +29,19 @@ public function __construct($sizeLimit = null) $this->sizeLimit = IniUtil::iniSizeToBytes($sizeLimit); } - public function __invoke(ServerRequestInterface $request, $stack) + public function __invoke(ServerRequestInterface $request, $next) { $body = $request->getBody(); $size = $body->getSize(); // happy path: skip if body is known to be empty (or is already buffered) - if ($size === 0 || !$body instanceof ReadableStreamInterface) { + if ($size === 0 || !$body instanceof ReadableStreamInterface || !$body->isReadable()) { // replace with empty body if body is streaming (or buffered size exceeds limit) if ($body instanceof ReadableStreamInterface || $size > $this->sizeLimit) { $request = $request->withBody(new BufferedBody('')); } - return $stack($request); + return $next($request); } // request body of known size exceeding limit @@ -50,21 +50,60 @@ public function __invoke(ServerRequestInterface $request, $stack) $sizeLimit = 0; } - return Stream\buffer($body, $sizeLimit)->then(function ($buffer) use ($request, $stack) { - $request = $request->withBody(new BufferedBody($buffer)); - - return $stack($request); - }, function ($error) use ($stack, $request, $body) { - // On buffer overflow keep the request body stream in, - // but ignore the contents and wait for the close event - // before passing the request on to the next middleware. - if ($error instanceof OverflowException) { - return Stream\first($body, 'close')->then(function () use ($stack, $request) { - return $stack($request); - }); - } + /** @var ?\Closure $closer */ + $closer = null; + + return new Promise(function ($resolve, $reject) use ($body, &$closer, $sizeLimit, $request, $next) { + // buffer request body data in memory, discard but keep buffering if limit is reached + $buffer = ''; + $bufferer = null; + $body->on('data', $bufferer = function ($data) use (&$buffer, $sizeLimit, $body, &$bufferer) { + $buffer .= $data; + + // On buffer overflow keep the request body stream in, + // but ignore the contents and wait for the close event + // before passing the request on to the next middleware. + if (isset($buffer[$sizeLimit])) { + assert($bufferer instanceof \Closure); + $body->removeListener('data', $bufferer); + $bufferer = null; + $buffer = ''; + } + }); + + // call $next with current buffer and resolve or reject with its results + $body->on('close', $closer = function () use (&$buffer, $request, $resolve, $reject, $next) { + try { + // resolve with result of next handler + $resolve($next($request->withBody(new BufferedBody($buffer)))); + } catch (\Exception $e) { + $reject($e); + } catch (\Throwable $e) { // @codeCoverageIgnoreStart + // reject Errors just like Exceptions (PHP 7+) + $reject($e); // @codeCoverageIgnoreEnd + } + }); + + // reject buffering if body emits error + $body->on('error', function (\Exception $e) use ($reject, $body, $closer) { + // remove close handler to avoid resolving, then close and reject + assert($closer instanceof \Closure); + $body->removeListener('close', $closer); + $body->close(); + + $reject(new \RuntimeException( + 'Error while buffering request body: ' . $e->getMessage(), + $e->getCode(), + $e + )); + }); + }, function () use ($body, &$closer) { + // cancelled buffering: remove close handler to avoid resolving, then close and reject + assert($closer instanceof \Closure); + $body->removeListener('close', $closer); + $body->close(); - throw $error; + throw new \RuntimeException('Cancelled buffering request body'); }); } } diff --git a/tests/Io/TransactionTest.php b/tests/Io/TransactionTest.php index e0d04e39..140c53e0 100644 --- a/tests/Io/TransactionTest.php +++ b/tests/Io/TransactionTest.php @@ -406,7 +406,7 @@ public function testReceivingStreamingBodyWillResolveWithBufferedResponseByDefau $this->assertEquals('hello world', (string)$response->getBody()); } - public function testReceivingStreamingBodyWithSizeExceedingMaximumResponseBufferWillRejectAndCloseResponseStream() + public function testReceivingStreamingBodyWithContentLengthExceedingMaximumResponseBufferWillRejectAndCloseResponseStreamImmediately() { $stream = new ThroughStream(); $stream->on('close', $this->expectCallableOnce()); @@ -419,11 +419,87 @@ public function testReceivingStreamingBodyWithSizeExceedingMaximumResponseBuffer $sender = $this->makeSenderMock(); $sender->expects($this->once())->method('send')->with($this->equalTo($request))->willReturn(Promise\resolve($response)); + $transaction = new Transaction($sender, Loop::get()); + + $promise = $transaction->send($request); + + $exception = null; + $promise->then(null, function ($e) use (&$exception) { + $exception = $e; + }); + + $this->assertFalse($stream->isWritable()); + + assert($exception instanceof \OverflowException); + $this->assertInstanceOf('OverflowException', $exception); + $this->assertEquals('Response body size of 100000000 bytes exceeds maximum of 16777216 bytes', $exception->getMessage()); + $this->assertEquals(defined('SOCKET_EMSGSIZE') ? \SOCKET_EMSGSIZE : 90, $exception->getCode()); + $this->assertNull($exception->getPrevious()); + } + + public function testReceivingStreamingBodyWithContentsExceedingMaximumResponseBufferWillRejectAndCloseResponseStreamWhenBufferExceedsLimit() + { + $stream = new ThroughStream(); + $stream->on('close', $this->expectCallableOnce()); + + $request = $this->getMockBuilder('Psr\Http\Message\RequestInterface')->getMock(); + + $response = new Response(200, array(), new ReadableBodyStream($stream)); + + // mock sender to resolve promise with the given $response in response to the given $request + $sender = $this->makeSenderMock(); + $sender->expects($this->once())->method('send')->with($this->equalTo($request))->willReturn(Promise\resolve($response)); + + $transaction = new Transaction($sender, Loop::get()); + $transaction = $transaction->withOptions(array('maximumSize' => 10)); + $promise = $transaction->send($request); + + $exception = null; + $promise->then(null, function ($e) use (&$exception) { + $exception = $e; + }); + + $this->assertTrue($stream->isWritable()); + $stream->write('hello wörld'); + $this->assertFalse($stream->isWritable()); + + assert($exception instanceof \OverflowException); + $this->assertInstanceOf('OverflowException', $exception); + $this->assertEquals('Response body size exceeds maximum of 10 bytes', $exception->getMessage()); + $this->assertEquals(defined('SOCKET_EMSGSIZE') ? \SOCKET_EMSGSIZE : 90, $exception->getCode()); + $this->assertNull($exception->getPrevious()); + } + + public function testReceivingStreamingBodyWillRejectWhenStreamEmitsError() + { + $stream = new ThroughStream(function ($data) { + throw new \UnexpectedValueException('Unexpected ' . $data, 42); + }); + + $request = $this->getMockBuilder('Psr\Http\Message\RequestInterface')->getMock(); + $response = new Response(200, array(), new ReadableBodyStream($stream)); + + // mock sender to resolve promise with the given $response in response to the given $request + $sender = $this->makeSenderMock(); + $sender->expects($this->once())->method('send')->with($this->equalTo($request))->willReturn(Promise\resolve($response)); + $transaction = new Transaction($sender, Loop::get()); $promise = $transaction->send($request); - $this->setExpectedException('OverflowException'); - \React\Async\await(\React\Promise\Timer\timeout($promise, 0.001)); + $exception = null; + $promise->then(null, function ($e) use (&$exception) { + $exception = $e; + }); + + $this->assertTrue($stream->isWritable()); + $stream->write('Foo'); + $this->assertFalse($stream->isWritable()); + + assert($exception instanceof \RuntimeException); + $this->assertInstanceOf('RuntimeException', $exception); + $this->assertEquals('Error while buffering response body: Unexpected Foo', $exception->getMessage()); + $this->assertEquals(42, $exception->getCode()); + $this->assertInstanceOf('UnexpectedValueException', $exception->getPrevious()); } public function testCancelBufferingResponseWillCloseStreamAndReject() @@ -446,8 +522,16 @@ public function testCancelBufferingResponseWillCloseStreamAndReject() $deferred->resolve($response); $promise->cancel(); - $this->setExpectedException('RuntimeException'); - \React\Async\await(\React\Promise\Timer\timeout($promise, 0.001)); + $exception = null; + $promise->then(null, function ($e) use (&$exception) { + $exception = $e; + }); + + assert($exception instanceof \RuntimeException); + $this->assertInstanceOf('RuntimeException', $exception); + $this->assertEquals('Cancelled buffering response body', $exception->getMessage()); + $this->assertEquals(0, $exception->getCode()); + $this->assertNull($exception->getPrevious()); } public function testReceivingStreamingBodyWillResolveWithStreamingResponseIfStreamingIsEnabled() diff --git a/tests/Middleware/RequestBodyBufferMiddlewareTest.php b/tests/Middleware/RequestBodyBufferMiddlewareTest.php index 0edec7da..fd818a8c 100644 --- a/tests/Middleware/RequestBodyBufferMiddlewareTest.php +++ b/tests/Middleware/RequestBodyBufferMiddlewareTest.php @@ -115,10 +115,11 @@ function (ServerRequestInterface $request) use (&$exposedRequest) { $this->assertSame($body, $exposedRequest->getBody()); } - public function testKnownExcessiveSizedBodyIsDisgardedTheRequestIsPassedDownToTheNextMiddleware() + public function testClosedStreamResolvesImmediatelyWithEmptyBody() { $stream = new ThroughStream(); - $stream->end('aa'); + $stream->close(); + $serverRequest = new ServerRequest( 'GET', 'https://example.com/', @@ -126,13 +127,41 @@ public function testKnownExcessiveSizedBodyIsDisgardedTheRequestIsPassedDownToTh new HttpBodyStream($stream, 2) ); + $exposedRequest = null; $buffer = new RequestBodyBufferMiddleware(1); - $response = \React\Async\await($buffer( + $buffer( + $serverRequest, + function (ServerRequestInterface $request) use (&$exposedRequest) { + $exposedRequest = $request; + } + ); + + $this->assertSame(0, $exposedRequest->getBody()->getSize()); + $this->assertSame('', $exposedRequest->getBody()->getContents()); + } + + public function testKnownExcessiveSizedBodyIsDiscardedAndRequestIsPassedDownToTheNextMiddleware() + { + $stream = new ThroughStream(); + $serverRequest = new ServerRequest( + 'GET', + 'https://example.com/', + array(), + new HttpBodyStream($stream, 2) + ); + + $buffer = new RequestBodyBufferMiddleware(1); + + $promise = $buffer( $serverRequest, function (ServerRequestInterface $request) { return new Response(200, array(), $request->getBody()->getContents()); } - )); + ); + + $stream->end('aa'); + + $response = \React\Async\await($promise); $this->assertSame(200, $response->getStatusCode()); $this->assertSame('', $response->getBody()->getContents()); @@ -214,9 +243,10 @@ function (ServerRequestInterface $request) { $this->assertSame('', $exposedResponse->getBody()->getContents()); } - public function testBufferingErrorThrows() + public function testBufferingRejectsWhenNextHandlerThrowsWhenStreamEnds() { $stream = new ThroughStream(); + $serverRequest = new ServerRequest( 'GET', 'https://example.com/', @@ -224,18 +254,101 @@ public function testBufferingErrorThrows() new HttpBodyStream($stream, null) ); - $buffer = new RequestBodyBufferMiddleware(1); + $buffer = new RequestBodyBufferMiddleware(100); $promise = $buffer( $serverRequest, function (ServerRequestInterface $request) { - return $request; + throw new \RuntimeException('Buffered ' . $request->getBody()->getSize(), 42); } ); - $stream->emit('error', array(new \RuntimeException())); + $exception = null; + $promise->then(null, function ($e) use (&$exception) { + $exception = $e; + }); + + $this->assertTrue($stream->isWritable()); + $stream->end('Foo'); + $this->assertFalse($stream->isWritable()); - $this->setExpectedException('RuntimeException'); - \React\Async\await($promise); + assert($exception instanceof \RuntimeException); + $this->assertInstanceOf('RuntimeException', $exception); + $this->assertEquals('Buffered 3', $exception->getMessage()); + $this->assertEquals(42, $exception->getCode()); + $this->assertNull($exception->getPrevious()); + } + + /** + * @requires PHP 7 + */ + public function testBufferingRejectsWhenNextHandlerThrowsErrorWhenStreamEnds() + { + $stream = new ThroughStream(); + + $serverRequest = new ServerRequest( + 'GET', + 'https://example.com/', + array(), + new HttpBodyStream($stream, null) + ); + + $buffer = new RequestBodyBufferMiddleware(100); + $promise = $buffer( + $serverRequest, + function (ServerRequestInterface $request) { + throw new \Error('Buffered ' . $request->getBody()->getSize(), 42); + } + ); + + $exception = null; + $promise->then(null, function ($e) use (&$exception) { + $exception = $e; + }); + + $this->assertTrue($stream->isWritable()); + $stream->end('Foo'); + $this->assertFalse($stream->isWritable()); + + assert($exception instanceof \Error); + $this->assertInstanceOf('Error', $exception); + $this->assertEquals('Buffered 3', $exception->getMessage()); + $this->assertEquals(42, $exception->getCode()); + $this->assertNull($exception->getPrevious()); + } + + public function testBufferingRejectsWhenStreamEmitsError() + { + $stream = new ThroughStream(function ($data) { + throw new \UnexpectedValueException('Unexpected ' . $data, 42); + }); + + $serverRequest = new ServerRequest( + 'GET', + 'https://example.com/', + array(), + new HttpBodyStream($stream, null) + ); + + $buffer = new RequestBodyBufferMiddleware(1); + $promise = $buffer( + $serverRequest, + $this->expectCallableNever() + ); + + $exception = null; + $promise->then(null, function ($e) use (&$exception) { + $exception = $e; + }); + + $this->assertTrue($stream->isWritable()); + $stream->write('Foo'); + $this->assertFalse($stream->isWritable()); + + assert($exception instanceof \RuntimeException); + $this->assertInstanceOf('RuntimeException', $exception); + $this->assertEquals('Error while buffering request body: Unexpected Foo', $exception->getMessage()); + $this->assertEquals(42, $exception->getCode()); + $this->assertInstanceOf('UnexpectedValueException', $exception->getPrevious()); } public function testFullBodyStreamedBeforeCallingNextMiddleware() @@ -263,4 +376,35 @@ public function testFullBodyStreamedBeforeCallingNextMiddleware() $stream->end('aaa'); $this->assertTrue($promiseResolved); } + + public function testCancelBufferingClosesStreamAndRejectsPromise() + { + $stream = new ThroughStream(); + $stream->on('close', $this->expectCallableOnce()); + + $serverRequest = new ServerRequest( + 'GET', + 'https://example.com/', + array(), + new HttpBodyStream($stream, 2) + ); + + $buffer = new RequestBodyBufferMiddleware(2); + + $promise = $buffer($serverRequest, $this->expectCallableNever()); + $promise->cancel(); + + $this->assertFalse($stream->isReadable()); + + $exception = null; + $promise->then(null, function ($e) use (&$exception) { + $exception = $e; + }); + + assert($exception instanceof \RuntimeException); + $this->assertInstanceOf('RuntimeException', $exception); + $this->assertEquals('Cancelled buffering request body', $exception->getMessage()); + $this->assertEquals(0, $exception->getCode()); + $this->assertNull($exception->getPrevious()); + } }