Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Close inactive connections and requests #425

Open
wants to merge 2 commits into
base: 1.x
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ jobs:
name: PHPUnit (PHP ${{ matrix.php }})
runs-on: ubuntu-22.04
strategy:
fail-fast: false
matrix:
php:
- 8.3
Expand Down
20 changes: 20 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ multiple concurrent HTTP requests without blocking.
* [Uri](#uri)
* [ResponseException](#responseexception)
* [React\Http\Middleware](#reacthttpmiddleware)
* [InactiveConnectionTimeoutMiddleware](#inactiveconnectiontimeoutmiddleware)
* [StreamingRequestMiddleware](#streamingrequestmiddleware)
* [LimitConcurrentRequestsMiddleware](#limitconcurrentrequestsmiddleware)
* [RequestBodyBufferMiddleware](#requestbodybuffermiddleware)
Expand Down Expand Up @@ -2692,6 +2693,25 @@ access its underlying response object.

### React\Http\Middleware

#### InactiveConnectionTimeoutMiddleware

The `React\Http\Middleware\InactiveConnectionTimeoutMiddleware` is purely a configuration middleware to configure the
`HttpServer` to close any inactive connections between requests to close the connection and not leave them needlessly
open. The default is `60` seconds of inactivity and should only be changed if you know what you are doing.

The following example configures the `HttpServer` to close any inactive connections after one and a half second:

```php
$http = new React\Http\HttpServer(
new React\Http\Middleware\InactiveConnectionTimeoutMiddleware(1.5),
$handler
);
```
> Internally, this class is used as a "value object" to override the default timeout of one minute.
As such it doesn't have any behavior internally, that is all in the internal "StreamingServer".
This timeout is only in effect if we expect data from the client, not when we are writing data to
the client.

#### StreamingRequestMiddleware

The `React\Http\Middleware\StreamingRequestMiddleware` can be used to
Expand Down
10 changes: 7 additions & 3 deletions src/HttpServer.php
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
use React\Http\Io\IniUtil;
use React\Http\Io\MiddlewareRunner;
use React\Http\Io\StreamingServer;
use React\Http\Middleware\InactiveConnectionTimeoutMiddleware;
use React\Http\Middleware\LimitConcurrentRequestsMiddleware;
use React\Http\Middleware\StreamingRequestMiddleware;
use React\Http\Middleware\RequestBodyBufferMiddleware;
Expand Down Expand Up @@ -219,10 +220,13 @@ public function __construct($requestHandlerOrLoop)
}

$streaming = false;
$idleConnectionTimeout = InactiveConnectionTimeoutMiddleware::DEFAULT_TIMEOUT;
foreach ((array) $requestHandlers as $handler) {
if ($handler instanceof StreamingRequestMiddleware) {
$streaming = true;
break;
}
if ($handler instanceof InactiveConnectionTimeoutMiddleware) {
$idleConnectionTimeout = $handler->getTimeout();
}
}

Expand Down Expand Up @@ -252,10 +256,10 @@ public function __construct($requestHandlerOrLoop)
* doing anything with the request.
*/
$middleware = \array_filter($middleware, function ($handler) {
return !($handler instanceof StreamingRequestMiddleware);
return !($handler instanceof StreamingRequestMiddleware) && !($handler instanceof InactiveConnectionTimeoutMiddleware);
});

$this->streamingServer = new StreamingServer($loop, new MiddlewareRunner($middleware));
$this->streamingServer = new StreamingServer($loop, new MiddlewareRunner($middleware), $idleConnectionTimeout);

$that = $this;
$this->streamingServer->on('error', function ($error) use ($that) {
Expand Down
104 changes: 97 additions & 7 deletions src/Io/StreamingServer.php
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ final class StreamingServer extends EventEmitter
/** @var Clock */
private $clock;

/** @var LoopInterface */
private $loop;

/** @var int */
private $idleConnectionTimeout;
WyriHaximus marked this conversation as resolved.
Show resolved Hide resolved

/**
* Creates an HTTP server that invokes the given callback for each incoming HTTP request
*
Expand All @@ -95,19 +101,21 @@ final class StreamingServer extends EventEmitter
* connections in order to then parse incoming data as HTTP.
* See also [listen()](#listen) for more details.
*
* @param LoopInterface $loop
* @param callable $requestHandler
* @param int $idleConnectionTimeout
* @see self::listen()
*/
public function __construct(LoopInterface $loop, $requestHandler)
public function __construct(LoopInterface $loop, $requestHandler, $idleConnectionTimeout)
{
if (!\is_callable($requestHandler)) {
throw new \InvalidArgumentException('Invalid request handler given');
}

$this->loop = $loop;
$this->callback = $requestHandler;
$this->clock = new Clock($loop);
$this->parser = new RequestHeaderParser($this->clock);
$this->idleConnectionTimeout = $idleConnectionTimeout;

$that = $this;
$this->parser->on('headers', function (ServerRequestInterface $request, ConnectionInterface $conn) use ($that) {
Expand All @@ -134,7 +142,35 @@ public function __construct(LoopInterface $loop, $requestHandler)
*/
public function listen(ServerInterface $socket)
{
$socket->on('connection', array($this->parser, 'handle'));
$socket->on('connection', array($this, 'handleConnection'));
}

/** @internal */
public function handleConnection(ConnectionInterface $connection)
{
$idleConnectionTimeout = $this->idleConnectionTimeout;
$loop = $this->loop;
$idleConnectionTimeoutHandler = function () use ($connection, &$closeEventHandler, &$dataEventHandler) {
$connection->removeListener('close', $closeEventHandler);
$connection->removeListener('data', $dataEventHandler);

$connection->close();
};
$timer = $this->loop->addTimer($idleConnectionTimeout, $idleConnectionTimeoutHandler);
$closeEventHandler = function () use ($connection, &$closeEventHandler, &$dataEventHandler, $loop, &$timer) {
$connection->removeListener('close', $closeEventHandler);
$connection->removeListener('data', $dataEventHandler);

$loop->cancelTimer($timer);
};
$dataEventHandler = function () use ($loop, $idleConnectionTimeout, $idleConnectionTimeoutHandler, &$timer) {
$loop->cancelTimer($timer);
$timer = $loop->addTimer($idleConnectionTimeout, $idleConnectionTimeoutHandler);
};
$connection->on('close', $closeEventHandler);
$connection->on('data', $dataEventHandler);

$this->parseRequest($connection);
}

/** @internal */
Expand Down Expand Up @@ -372,7 +408,7 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt

// either wait for next request over persistent connection or end connection
if ($persist) {
$this->parser->handle($connection);
$this->parseRequest($connection);
} else {
$connection->end();
}
Expand All @@ -393,13 +429,67 @@ public function handleResponse(ConnectionInterface $connection, ServerRequestInt
// write streaming body and then wait for next request over persistent connection
if ($persist) {
$body->pipe($connection, array('end' => false));
$parser = $this->parser;
$body->on('end', function () use ($connection, $parser, $body) {
$that = $this;
$body->on('end', function () use ($connection, $body, &$that) {
$connection->removeListener('close', array($body, 'close'));
$parser->handle($connection);
$that->parseRequest($connection);
});
} else {
$body->pipe($connection);
}
}

/**
* @internal
*/
public function parseRequest(ConnectionInterface $connection)
{
$idleConnectionTimeout = $this->idleConnectionTimeout;
$loop = $this->loop;
$parser = $this->parser;
$idleConnectionTimeoutHandler = function () use ($connection, $parser, &$removeTimerHandler) {
$parser->removeListener('headers', $removeTimerHandler);
$parser->removeListener('error', $removeTimerHandler);

$parser->emit('error', array(
new \RuntimeException('Request timed out', Response::STATUS_REQUEST_TIMEOUT),
$connection
));
};
$timer = $this->loop->addTimer($idleConnectionTimeout, $idleConnectionTimeoutHandler);
$removeTimerHandler = function ($requestOrError, $conn) use ($loop, &$timer, $parser, $connection, &$removeTimerHandler, $idleConnectionTimeout, $idleConnectionTimeoutHandler) {
if ($conn !== $connection) {
return;
}

$loop->cancelTimer($timer);
$parser->removeListener('headers', $removeTimerHandler);
$parser->removeListener('error', $removeTimerHandler);

if (!($requestOrError instanceof ServerRequestInterface)) {
return;
}

$requestBody = $requestOrError->getBody();
if (!($requestBody instanceof HttpBodyStream)) {
return;
}

$timer = $loop->addTimer($idleConnectionTimeout, $idleConnectionTimeoutHandler);
$requestBody->on('data', function () use (&$timer, $loop, $idleConnectionTimeout, $idleConnectionTimeoutHandler) {
$loop->cancelTimer($timer);
$timer = $loop->addTimer($idleConnectionTimeout, $idleConnectionTimeoutHandler);
});
$requestBody->on('end', function () use (&$timer, $loop) {
$loop->cancelTimer($timer);
});
$requestBody->on('close', function () use (&$timer, $loop) {
$loop->cancelTimer($timer);
});
};
$this->parser->on('headers', $removeTimerHandler);
$this->parser->on('error', $removeTimerHandler);

$this->parser->handle($connection);
}
}
62 changes: 62 additions & 0 deletions src/Middleware/InactiveConnectionTimeoutMiddleware.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
<?php

namespace React\Http\Middleware;

use Psr\Http\Message\ResponseInterface;
use Psr\Http\Message\ServerRequestInterface;
use React\Http\Io\HttpBodyStream;
use React\Http\Io\PauseBufferStream;
use React\Promise;
use React\Promise\PromiseInterface;
use React\Promise\Deferred;
use React\Stream\ReadableStreamInterface;

/**
* Closes any inactive connection after the specified amount of seconds since last activity.
*
* This allows you to set an alternative timeout to the default one minute (60 seconds). For example
* thirteen and a half seconds:
*
* ```php
* $http = new React\Http\HttpServer(
* new React\Http\Middleware\InactiveConnectionTimeoutMiddleware(13.5),
* $handler
* );
*
* > Internally, this class is used as a "value object" to override the default timeout of one minute.
* As such it doesn't have any behavior internally, that is all in the internal "StreamingServer".
*/
final class InactiveConnectionTimeoutMiddleware
{
/**
* @internal
*/
const DEFAULT_TIMEOUT = 60;
WyriHaximus marked this conversation as resolved.
Show resolved Hide resolved

/**
* @var float
*/
private $timeout;

/**
* @param float $timeout
*/
public function __construct($timeout = self::DEFAULT_TIMEOUT)
{
$this->timeout = $timeout;
}

public function __invoke(ServerRequestInterface $request, $next)
{
return $next($request);
}

/**
* @return float
* @internal
*/
public function getTimeout()
WyriHaximus marked this conversation as resolved.
Show resolved Hide resolved
{
return $this->timeout;
}
}
9 changes: 5 additions & 4 deletions tests/FunctionalBrowserTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
use React\Http\HttpServer;
use React\Http\Message\Response;
use React\Http\Message\ResponseException;
use React\Http\Middleware\InactiveConnectionTimeoutMiddleware;
use React\Http\Middleware\StreamingRequestMiddleware;
use React\Promise\Promise;
use React\Promise\Stream;
Expand All @@ -32,7 +33,7 @@ public function setUpBrowserAndServer()
{
$this->browser = new Browser();

$http = new HttpServer(new StreamingRequestMiddleware(), function (ServerRequestInterface $request) {
$http = new HttpServer(new InactiveConnectionTimeoutMiddleware(0.2), new StreamingRequestMiddleware(), function (ServerRequestInterface $request) {
$path = $request->getUri()->getPath();

$headers = array();
Expand Down Expand Up @@ -687,7 +688,7 @@ public function testPostStreamKnownLength()
*/
public function testPostStreamWillStartSendingRequestEvenWhenBodyDoesNotEmitData()
{
$http = new HttpServer(new StreamingRequestMiddleware(), function (ServerRequestInterface $request) {
$http = new HttpServer(new InactiveConnectionTimeoutMiddleware(0.1), new StreamingRequestMiddleware(), function (ServerRequestInterface $request) {
return new Response(200);
});
$socket = new SocketServer('127.0.0.1:0');
Expand All @@ -714,7 +715,7 @@ public function testPostStreamClosed()

public function testSendsHttp11ByDefault()
{
$http = new HttpServer(function (ServerRequestInterface $request) {
$http = new HttpServer(new InactiveConnectionTimeoutMiddleware(0.1), function (ServerRequestInterface $request) {
return new Response(
200,
array(),
Expand All @@ -734,7 +735,7 @@ public function testSendsHttp11ByDefault()

public function testSendsExplicitHttp10Request()
{
$http = new HttpServer(function (ServerRequestInterface $request) {
$http = new HttpServer(new InactiveConnectionTimeoutMiddleware(0.1), function (ServerRequestInterface $request) {
return new Response(
200,
array(),
Expand Down