Skip to content

Commit

Permalink
Close inactive connections and requests
Browse files Browse the repository at this point in the history
This new middleware introduces a timeout of closing inactive
connections between requests after a configured amount of seconds.

This builds on top of #405 and partially on #422
  • Loading branch information
WyriHaximus committed Mar 25, 2024
1 parent 05e170d commit 7f39e07
Show file tree
Hide file tree
Showing 8 changed files with 253 additions and 12 deletions.
17 changes: 17 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ multiple concurrent HTTP requests without blocking.
* [ServerRequest](#serverrequest)
* [ResponseException](#responseexception)
* [React\Http\Middleware](#reacthttpmiddleware)
* [InactiveConnectionTimeoutMiddleware](#inactiveconnectiontimeoutmiddleware)
* [StreamingRequestMiddleware](#streamingrequestmiddleware)
* [LimitConcurrentRequestsMiddleware](#limitconcurrentrequestsmiddleware)
* [RequestBodyBufferMiddleware](#requestbodybuffermiddleware)
Expand Down Expand Up @@ -2679,6 +2680,22 @@ access its underlying response object.

### React\Http\Middleware

#### InactiveConnectionTimeoutMiddleware

The `React\Http\Middleware\InactiveConnectionTimeoutMiddleware` is purely a configuration middleware to configure the
`HttpServer` to close any inactive connections between requests to close the connection and not leave them needlessly open.

The following example configures the `HttpServer` to close any inactive connections after one and a half second:

```php
$http = new React\Http\HttpServer(
new React\Http\Middleware\InactiveConnectionTimeoutMiddleware(1.5),
$handler
);
```
> Internally, this class is used as a "value object" to override the default timeout of one minute.
As such it doesn't have any behavior internally, that is all in the internal "StreamingServer".

#### StreamingRequestMiddleware

The `React\Http\Middleware\StreamingRequestMiddleware` can be used to
Expand Down
13 changes: 10 additions & 3 deletions src/HttpServer.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,12 @@
use Evenement\EventEmitter;
use React\EventLoop\Loop;
use React\EventLoop\LoopInterface;
use React\Http\Io\Clock;
use React\Http\Io\IniUtil;
use React\Http\Io\MiddlewareRunner;
use React\Http\Io\RequestHeaderParser;
use React\Http\Io\StreamingServer;
use React\Http\Middleware\InactiveConnectionTimeoutMiddleware;
use React\Http\Middleware\LimitConcurrentRequestsMiddleware;
use React\Http\Middleware\StreamingRequestMiddleware;
use React\Http\Middleware\RequestBodyBufferMiddleware;
Expand Down Expand Up @@ -219,10 +222,13 @@ public function __construct($requestHandlerOrLoop)
}

$streaming = false;
$idleConnectionTimeout = InactiveConnectionTimeoutMiddleware::DEFAULT_TIMEOUT;
foreach ((array) $requestHandlers as $handler) {
if ($handler instanceof StreamingRequestMiddleware) {
$streaming = true;
break;
}
if ($handler instanceof InactiveConnectionTimeoutMiddleware) {
$idleConnectionTimeout = $handler->getTimeout();
}
}

Expand Down Expand Up @@ -252,10 +258,11 @@ public function __construct($requestHandlerOrLoop)
* doing anything with the request.
*/
$middleware = \array_filter($middleware, function ($handler) {
return !($handler instanceof StreamingRequestMiddleware);
return !($handler instanceof StreamingRequestMiddleware) && !($handler instanceof InactiveConnectionTimeoutMiddleware);
});

$this->streamingServer = new StreamingServer($loop, new MiddlewareRunner($middleware));
$clock = new Clock($loop);
$this->streamingServer = new StreamingServer(new MiddlewareRunner($middleware), new RequestHeaderParser($loop, $clock, $idleConnectionTimeout), $clock);

$that = $this;
$this->streamingServer->on('error', function ($error) use ($that) {
Expand Down
16 changes: 16 additions & 0 deletions src/Io/RequestHeaderParser.php
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,25 @@ public function __construct(Clock $clock)

public function handle(ConnectionInterface $conn)
{
$loop = $this->loop;
$idleConnectionTimeout = $this->idleConnectionTimeout;
$that = $this;
$idleConnectionTimeoutHandler = function () use ($that, $conn) {
$that->emit('error', array(
new \RuntimeException('Request timed out', Response::STATUS_REQUEST_TIMEOUT),
$conn
));
$conn->close();
};
$timer = $loop->addTimer($idleConnectionTimeout, $idleConnectionTimeoutHandler);
$conn->on('close', function () use ($loop, &$timer) {
$loop->cancelTimer($timer);
});
$buffer = '';
$maxSize = $this->maxSize;
$that = $this;
$conn->on('data', $fn = function ($data) use (&$buffer, &$fn, $conn, $maxSize, $that) {
// $loop->cancelTimer($timer);
// append chunk of data to buffer and look for end of request headers
$buffer .= $data;
$endOfHeader = \strpos($buffer, "\r\n\r\n");
Expand All @@ -59,6 +74,7 @@ public function handle(ConnectionInterface $conn)

// ignore incomplete requests
if ($endOfHeader === false) {
// $timer = $loop->addTimer($idleConnectionTimeout, $idleConnectionTimeoutHandler);
return;
}

Expand Down
34 changes: 28 additions & 6 deletions src/Io/StreamingServer.php
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ final class StreamingServer extends EventEmitter

/** @var Clock */
private $clock;
private $loop;
private $idleConnectionTimeout;

/**
* Creates an HTTP server that invokes the given callback for each incoming HTTP request
Expand All @@ -95,7 +97,6 @@ final class StreamingServer extends EventEmitter
* connections in order to then parse incoming data as HTTP.
* See also [listen()](#listen) for more details.
*
* @param LoopInterface $loop
* @param callable $requestHandler
* @see self::listen()
*/
Expand Down Expand Up @@ -134,7 +135,7 @@ public function __construct(LoopInterface $loop, $requestHandler)
*/
public function listen(ServerInterface $socket)
{
$socket->on('connection', array($this->parser, 'handle'));
$socket->on('connection', array($this, 'parseRequest'));
}

/** @internal */
Expand Down Expand Up @@ -359,7 +360,7 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt

// either wait for next request over persistent connection or end connection
if ($persist) {
$this->parser->handle($connection);
$this->parseRequest($connection);
} else {
$connection->end();
}
Expand All @@ -380,13 +381,34 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt
// write streaming body and then wait for next request over persistent connection
if ($persist) {
$body->pipe($connection, array('end' => false));
$parser = $this->parser;
$body->on('end', function () use ($connection, $parser, $body) {
$that = $this;
$body->on('end', function () use ($connection, $body, &$that) {
$connection->removeListener('close', array($body, 'close'));
$parser->handle($connection);
$that->parseRequest($connection);
});
} else {
$body->pipe($connection);
}
}

/**
* @internal
*/
public function parseRequest(ConnectionInterface $connection)
{
$idleConnectionTimeout = $this->idleConnectionTimeout;
$loop = $this->loop;
$idleConnectionTimeoutHandler = function () use ($connection) {
$connection->close();
};
$timer = $this->loop->addTimer($idleConnectionTimeout, $idleConnectionTimeoutHandler);
$connection->once('close', function () use ($loop, &$timer) {
$loop->cancelTimer($timer);
});
$connection->once('data', function () use ($loop, &$timer) {
$loop->cancelTimer($timer);
});

$this->parser->handle($connection);
}
}
62 changes: 62 additions & 0 deletions src/Middleware/InactiveConnectionTimeoutMiddleware.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
<?php

namespace React\Http\Middleware;

use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\ServerRequestInterface;
use React\Http\Io\HttpBodyStream;
use React\Http\Io\PauseBufferStream;
use React\Promise;
use React\Promise\PromiseInterface;
use React\Promise\Deferred;
use React\Stream\ReadableStreamInterface;

/**
* Closes any inactive connection after the specified amount of seconds since last activity.
*
* This allows you to set an alternative timeout to the default one minute (60 seconds). For example
* thirteen and a half seconds:
*
* ```php
* $http = new React\Http\HttpServer(
* new React\Http\Middleware\InactiveConnectionTimeoutMiddleware(13.5),
* $handler
* );
*
* > Internally, this class is used as a "value object" to override the default timeout of one minute.
* As such it doesn't have any behavior internally, that is all in the internal "StreamingServer".
*/
final class InactiveConnectionTimeoutMiddleware
{
/**
* @internal
*/
const DEFAULT_TIMEOUT = 60;

/**
* @var float
*/
private $timeout;

/**
* @param float $timeout
*/
public function __construct($timeout = self::DEFAULT_TIMEOUT)
{
$this->timeout = $timeout;
}

public function __invoke(ServerRequestInterface $request, $next)
{
return $next($request);
}

/**
* @return float
* @internal
*/
public function getTimeout()
{
return $this->timeout;
}
}
24 changes: 23 additions & 1 deletion tests/HttpServerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
use React\EventLoop\Loop;
use React\Http\HttpServer;
use React\Http\Io\IniUtil;
use React\Http\Io\StreamingServer;
use React\Http\Middleware\InactiveConnectionTimeoutMiddleware;
use React\Http\Middleware\StreamingRequestMiddleware;
use React\Promise;
use React\Promise\Deferred;
Expand Down Expand Up @@ -60,10 +62,18 @@ public function testConstructWithoutLoopAssignsLoopAutomatically()
$ref->setAccessible(true);
$clock = $ref->getValue($streamingServer);

$ref = new \ReflectionProperty($streamingServer, 'parser');
$ref->setAccessible(true);
$parser = $ref->getValue($streamingServer);

$ref = new \ReflectionProperty($clock, 'loop');
$ref->setAccessible(true);
$loop = $ref->getValue($clock);

$ref = new \ReflectionProperty($parser, 'loop');
$ref->setAccessible(true);
$loop = $ref->getValue($parser);

$this->assertInstanceOf('React\EventLoop\LoopInterface', $loop);
}

Expand Down Expand Up @@ -257,6 +267,18 @@ function (ServerRequestInterface $request) use (&$streaming) {
$this->assertEquals(true, $streaming);
}

public function testIdleConnectionWillBeClosedAfterConfiguredTimeout()
{
$this->connection->expects($this->once())->method('close');

$http = new HttpServer(Loop::get(), new InactiveConnectionTimeoutMiddleware(0.1), $this->expectCallableNever());

$http->listen($this->socket);
$this->socket->emit('connection', array($this->connection));

Loop::run();
}

public function testForwardErrors()
{
$exception = new \Exception();
Expand Down Expand Up @@ -439,7 +461,7 @@ public function testConstructServerWithMemoryLimitDoesLimitConcurrency()

public function testConstructFiltersOutConfigurationMiddlewareBefore()
{
$http = new HttpServer(new StreamingRequestMiddleware(), function () { });
$http = new HttpServer(new InactiveConnectionTimeoutMiddleware(0), new StreamingRequestMiddleware(), function () { });

$ref = new \ReflectionProperty($http, 'streamingServer');
$ref->setAccessible(true);
Expand Down
47 changes: 45 additions & 2 deletions tests/Io/RequestHeaderParserTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -813,7 +813,7 @@ public function testServerParamsWillBeReusedForMultipleRequestsFromSameConnectio
$clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock();
$clock->expects($this->exactly(2))->method('now')->willReturn(1652972091.3958);

$parser = new RequestHeaderParser($clock);
$parser = $this->createRequestHeaderParser($clock);

$connection = $this->getMockBuilder('React\Socket\Connection')->disableOriginalConstructor()->setMethods(array('getLocalAddress', 'getRemoteAddress'))->getMock();
$connection->expects($this->once())->method('getLocalAddress')->willReturn('tcp://127.1.1.1:8000');
Expand Down Expand Up @@ -848,7 +848,7 @@ public function testServerParamsWillBeRememberedUntilConnectionIsClosed()
{
$clock = $this->getMockBuilder('React\Http\Io\Clock')->disableOriginalConstructor()->getMock();

$parser = new RequestHeaderParser($clock);
$parser = $this->createRequestHeaderParser($clock);

$connection = $this->getMockBuilder('React\Socket\Connection')->disableOriginalConstructor()->setMethods(array('getLocalAddress', 'getRemoteAddress'))->getMock();

Expand Down Expand Up @@ -887,6 +887,49 @@ public function testQueryParametersWillBeSet()
$this->assertEquals('this', $queryParams['test']);
}

public function testIdleConnectionWillBeClosedAfterConfiguredTimeout()
{
$callback = null;
$caughtError = null;
$timer = $this->getMockBuilder('React\EventLoop\TimerInterface')->getMock();
$loop = $this->getMockBuilder('React\EventLoop\LoopInterface')->getMock();
$loop->expects($this->exactly(2))->method('addTimer')->with(0.1, $this->callback(function ($cb) use (&$tick, &$callback) {
$callback = $cb;
return true;
}))->willReturn($timer);
$loop->expects($this->any())->method('cancelTimer')->with($timer);

$connection = $this->getMockBuilder('React\Socket\Connection')->disableOriginalConstructor()->setMethods(array('close'))->getMock();
$connection->expects($this->once())->method('close');

$parser = $this->createRequestHeaderParser(new Clock($loop), 0.1, $loop);
$parser->on('error', function ($error) use (&$caughtError) {
$caughtError = $error;
});

$parser->handle($connection);

$connection->emit('data', array("GET /foo.php?hello=world&test=this HTTP/"));

self::assertTrue(is_callable($callback));
$callback();

self::assertInstanceOf('\RuntimeException', $caughtError);
self::assertSame('Request timed out', $caughtError->getMessage());
}

public function testIdleConnectionWillNotBeClosedAfterConfiguredTimeoutOnFullRequest()
{
$connection = $this->getMockBuilder('React\Socket\Connection')->disableOriginalConstructor()->setMethods(array('close'))->getMock();
$connection->expects($this->never())->method('close');

$parser = $this->createRequestHeaderParser(new Clock($this->getMockBuilder('React\EventLoop\LoopInterface')->getMock()), 0.1);

$parser->handle($connection);

$connection->emit('data', array($this->createGetRequest()));
}

private function createGetRequest()
{
$data = "GET / HTTP/1.1\r\n";
Expand Down

0 comments on commit 7f39e07

Please sign in to comment.