Skip to content

Commit

Permalink
Close inactive connections and requests
Browse files Browse the repository at this point in the history
This new middleware introduces a timeout of closing inactive
connections between requests after a configured amount of seconds.

This builds on top of #405 and partially on #422
  • Loading branch information
WyriHaximus committed Mar 26, 2024
1 parent 5bd8210 commit fd7d4a6
Show file tree
Hide file tree
Showing 6 changed files with 387 additions and 128 deletions.
20 changes: 20 additions & 0 deletions README.md
Expand Up @@ -82,6 +82,7 @@ multiple concurrent HTTP requests without blocking.
* [Uri](#uri)
* [ResponseException](#responseexception)
* [React\Http\Middleware](#reacthttpmiddleware)
* [InactiveConnectionTimeoutMiddleware](#inactiveconnectiontimeoutmiddleware)
* [StreamingRequestMiddleware](#streamingrequestmiddleware)
* [LimitConcurrentRequestsMiddleware](#limitconcurrentrequestsmiddleware)
* [RequestBodyBufferMiddleware](#requestbodybuffermiddleware)
Expand Down Expand Up @@ -2692,6 +2693,25 @@ access its underlying response object.

### React\Http\Middleware

#### InactiveConnectionTimeoutMiddleware

The `React\Http\Middleware\InactiveConnectionTimeoutMiddleware` is purely a configuration middleware to configure the
`HttpServer` to close any inactive connections between requests to close the connection and not leave them needlessly
open. The default is `60` seconds of inactivity and should only be changed if you know what you are doing.

The following example configures the `HttpServer` to close any inactive connections after one and a half second:

```php
$http = new React\Http\HttpServer(
new React\Http\Middleware\InactiveConnectionTimeoutMiddleware(1.5),
$handler
);
```
> Internally, this class is used as a "value object" to override the default timeout of one minute.
As such it doesn't have any behavior internally, that is all in the internal "StreamingServer".
This timeout is only in effect if we expect data from the client, not when we are writing data to
the client.

#### StreamingRequestMiddleware

The `React\Http\Middleware\StreamingRequestMiddleware` can be used to
Expand Down
10 changes: 7 additions & 3 deletions src/HttpServer.php
Expand Up @@ -8,6 +8,7 @@
use React\Http\Io\IniUtil;
use React\Http\Io\MiddlewareRunner;
use React\Http\Io\StreamingServer;
use React\Http\Middleware\InactiveConnectionTimeoutMiddleware;
use React\Http\Middleware\LimitConcurrentRequestsMiddleware;
use React\Http\Middleware\StreamingRequestMiddleware;
use React\Http\Middleware\RequestBodyBufferMiddleware;
Expand Down Expand Up @@ -219,10 +220,13 @@ public function __construct($requestHandlerOrLoop)
}

$streaming = false;
$idleConnectionTimeout = InactiveConnectionTimeoutMiddleware::DEFAULT_TIMEOUT;
foreach ((array) $requestHandlers as $handler) {
if ($handler instanceof StreamingRequestMiddleware) {
$streaming = true;
break;
}
if ($handler instanceof InactiveConnectionTimeoutMiddleware) {
$idleConnectionTimeout = $handler->getTimeout();
}
}

Expand Down Expand Up @@ -252,10 +256,10 @@ public function __construct($requestHandlerOrLoop)
* doing anything with the request.
*/
$middleware = \array_filter($middleware, function ($handler) {
return !($handler instanceof StreamingRequestMiddleware);
return !($handler instanceof StreamingRequestMiddleware) && !($handler instanceof InactiveConnectionTimeoutMiddleware);
});

$this->streamingServer = new StreamingServer($loop, new MiddlewareRunner($middleware));
$this->streamingServer = new StreamingServer($loop, new MiddlewareRunner($middleware), $idleConnectionTimeout);

$that = $this;
$this->streamingServer->on('error', function ($error) use ($that) {
Expand Down
74 changes: 67 additions & 7 deletions src/Io/StreamingServer.php
Expand Up @@ -87,6 +87,12 @@ final class StreamingServer extends EventEmitter
/** @var Clock */
private $clock;

/** @var LoopInterface */
private $loop;

/** @var int */
private $idleConnectionTimeout;

/**
* Creates an HTTP server that invokes the given callback for each incoming HTTP request
*
Expand All @@ -95,19 +101,21 @@ final class StreamingServer extends EventEmitter
* connections in order to then parse incoming data as HTTP.
* See also [listen()](#listen) for more details.
*
* @param LoopInterface $loop
* @param callable $requestHandler
* @param int $idleConnectionTimeout
* @see self::listen()
*/
public function __construct(LoopInterface $loop, $requestHandler)
public function __construct(LoopInterface $loop, $requestHandler, $idleConnectionTimeout)
{
if (!\is_callable($requestHandler)) {
throw new \InvalidArgumentException('Invalid request handler given');
}

$this->loop = $loop;
$this->callback = $requestHandler;
$this->clock = new Clock($loop);
$this->parser = new RequestHeaderParser($this->clock);
$this->idleConnectionTimeout = $idleConnectionTimeout;

$that = $this;
$this->parser->on('headers', function (ServerRequestInterface $request, ConnectionInterface $conn) use ($that) {
Expand All @@ -134,7 +142,32 @@ public function __construct(LoopInterface $loop, $requestHandler)
*/
public function listen(ServerInterface $socket)
{
$socket->on('connection', array($this->parser, 'handle'));
$socket->on('connection', array($this, 'handleConnection'));
}

/** @internal */
public function handleConnection(ConnectionInterface $connection)
{
$idleConnectionTimeout = $this->idleConnectionTimeout;
$loop = $this->loop;
$idleConnectionTimeoutHandler = function () use ($connection) {
$connection->close();
};
$timer = $this->loop->addTimer($idleConnectionTimeout, $idleConnectionTimeoutHandler);
$closeTimerHandler = function () use ($connection, &$closeTimerHandler, &$dataTimerHandler, $loop, &$timer) {
$connection->removeListener('close', $closeTimerHandler);
$connection->removeListener('data', $dataTimerHandler);

$loop->cancelTimer($timer);
};
$dataTimerHandler = function () use ($loop, $idleConnectionTimeout, $idleConnectionTimeoutHandler, &$timer) {
$loop->cancelTimer($timer);
$timer = $loop->addTimer($idleConnectionTimeout, $idleConnectionTimeoutHandler);
};
$connection->on('close', $closeTimerHandler);
$connection->on('data', $dataTimerHandler);

$this->parseRequest($connection);
}

/** @internal */
Expand Down Expand Up @@ -359,7 +392,7 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt

// either wait for next request over persistent connection or end connection
if ($persist) {
$this->parser->handle($connection);
$this->parseRequest($connection);
} else {
$connection->end();
}
Expand All @@ -380,13 +413,40 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt
// write streaming body and then wait for next request over persistent connection
if ($persist) {
$body->pipe($connection, array('end' => false));
$parser = $this->parser;
$body->on('end', function () use ($connection, $parser, $body) {
$that = $this;
$body->on('end', function () use ($connection, $body, &$that) {
$connection->removeListener('close', array($body, 'close'));
$parser->handle($connection);
$that->parseRequest($connection);
});
} else {
$body->pipe($connection);
}
}

/**
* @internal
*/
public function parseRequest(ConnectionInterface $connection)
{
$idleConnectionTimeout = $this->idleConnectionTimeout;
$loop = $this->loop;
$parser = $this->parser;
$idleConnectionTimeoutHandler = function () use ($connection) {
$connection->close();
};
$timer = $this->loop->addTimer($idleConnectionTimeout, $idleConnectionTimeoutHandler);
$removeTimerHandler = function ($_, $conn) use ($loop, $timer, $parser, $connection, &$removeTimerHandler) {
if (\spl_object_hash($conn) !== \spl_object_hash($connection)) {
return;
}

$loop->cancelTimer($timer);
$parser->removeListener('headers', $removeTimerHandler);
$parser->removeListener('error', $removeTimerHandler);
};
$this->parser->on('headers', $removeTimerHandler);
$this->parser->on('error', $removeTimerHandler);

$this->parser->handle($connection);
}
}
62 changes: 62 additions & 0 deletions src/Middleware/InactiveConnectionTimeoutMiddleware.php
@@ -0,0 +1,62 @@
<?php

namespace React\Http\Middleware;

use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\ServerRequestInterface;
use React\Http\Io\HttpBodyStream;
use React\Http\Io\PauseBufferStream;
use React\Promise;
use React\Promise\PromiseInterface;
use React\Promise\Deferred;
use React\Stream\ReadableStreamInterface;

/**
* Closes any inactive connection after the specified amount of seconds since last activity.
*
* This allows you to set an alternative timeout to the default one minute (60 seconds). For example
* thirteen and a half seconds:
*
* ```php
* $http = new React\Http\HttpServer(
* new React\Http\Middleware\InactiveConnectionTimeoutMiddleware(13.5),
* $handler
* );
*
* > Internally, this class is used as a "value object" to override the default timeout of one minute.
* As such it doesn't have any behavior internally, that is all in the internal "StreamingServer".
*/
final class InactiveConnectionTimeoutMiddleware
{
/**
* @internal
*/
const DEFAULT_TIMEOUT = 60;

/**
* @var float
*/
private $timeout;

/**
* @param float $timeout
*/
public function __construct($timeout = self::DEFAULT_TIMEOUT)
{
$this->timeout = $timeout;
}

public function __invoke(ServerRequestInterface $request, $next)
{
return $next($request);
}

/**
* @return float
* @internal
*/
public function getTimeout()
{
return $this->timeout;
}
}
20 changes: 19 additions & 1 deletion tests/HttpServerTest.php
Expand Up @@ -6,6 +6,8 @@
use React\EventLoop\Loop;
use React\Http\HttpServer;
use React\Http\Io\IniUtil;
use React\Http\Io\StreamingServer;
use React\Http\Middleware\InactiveConnectionTimeoutMiddleware;
use React\Http\Middleware\StreamingRequestMiddleware;
use React\Promise;
use React\Promise\Deferred;
Expand Down Expand Up @@ -60,6 +62,10 @@ public function testConstructWithoutLoopAssignsLoopAutomatically()
$ref->setAccessible(true);
$clock = $ref->getValue($streamingServer);

$ref = new \ReflectionProperty($streamingServer, 'parser');
$ref->setAccessible(true);
$parser = $ref->getValue($streamingServer);

$ref = new \ReflectionProperty($clock, 'loop');
$ref->setAccessible(true);
$loop = $ref->getValue($clock);
Expand Down Expand Up @@ -257,6 +263,18 @@ function (ServerRequestInterface $request) use (&$streaming) {
$this->assertEquals(true, $streaming);
}

public function testIdleConnectionWillBeClosedAfterConfiguredTimeout()
{
$this->connection->expects($this->once())->method('close');

$http = new HttpServer(Loop::get(), new InactiveConnectionTimeoutMiddleware(0.1), $this->expectCallableNever());

$http->listen($this->socket);
$this->socket->emit('connection', array($this->connection));

Loop::run();
}

public function testForwardErrors()
{
$exception = new \Exception();
Expand Down Expand Up @@ -439,7 +457,7 @@ public function testConstructServerWithMemoryLimitDoesLimitConcurrency()

public function testConstructFiltersOutConfigurationMiddlewareBefore()
{
$http = new HttpServer(new StreamingRequestMiddleware(), function () { });
$http = new HttpServer(new InactiveConnectionTimeoutMiddleware(0), new StreamingRequestMiddleware(), function () { });

$ref = new \ReflectionProperty($http, 'streamingServer');
$ref->setAccessible(true);
Expand Down

0 comments on commit fd7d4a6

Please sign in to comment.