Skip to content

Commit

Permalink
Reuse single request parser for all requests
Browse files Browse the repository at this point in the history
This changeset is in preparation for upcoming refactorings to move
unrelated logic out of the parser class to prepare for persistent HTTP
connections in follow-up PR. This changeset does not affect the public
API and happens to improves performance slightly from around 9000 req/s
to 9200 req/s on my machine (best of 5).
  • Loading branch information
clue committed Jun 16, 2019
1 parent 2fe19c4 commit 34d576d
Show file tree
Hide file tree
Showing 3 changed files with 227 additions and 138 deletions.
91 changes: 56 additions & 35 deletions src/Io/RequestHeaderParser.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

use Evenement\EventEmitter;
use Psr\Http\Message\ServerRequestInterface;
use React\Socket\ConnectionInterface;
use RingCentral\Psr7 as g7;
use Exception;

Expand All @@ -20,50 +21,70 @@
*/
class RequestHeaderParser extends EventEmitter
{
private $buffer = '';
private $maxSize = 8192;

private $localSocketUri;
private $remoteSocketUri;

public function __construct($localSocketUri = null, $remoteSocketUri = null)
public function handle(ConnectionInterface $conn)
{
$this->localSocketUri = $localSocketUri;
$this->remoteSocketUri = $remoteSocketUri;
}
$buffer = '';
$conn->on('data', $fn = function ($data) use (&$buffer, &$fn, $conn) {
// append chunk of data to buffer and look for end of request headers
$buffer .= $data;
$endOfHeader = \strpos($buffer, "\r\n\r\n");

// reject request if buffer size is exceeded
if ($endOfHeader > $this->maxSize || ($endOfHeader === false && isset($buffer[$this->maxSize]))) {
$conn->removeListener('data', $fn);
$fn = null;

$this->emit('error', array(
new \OverflowException("Maximum header size of {$this->maxSize} exceeded.", 431),
$conn
));
return;
}

public function feed($data)
{
$this->buffer .= $data;
$endOfHeader = \strpos($this->buffer, "\r\n\r\n");
// ignore incomplete requests
if ($endOfHeader === false) {
return;
}

if ($endOfHeader > $this->maxSize || ($endOfHeader === false && isset($this->buffer[$this->maxSize]))) {
$this->emit('error', array(new \OverflowException("Maximum header size of {$this->maxSize} exceeded.", 431), $this));
$this->removeAllListeners();
return;
}
// request headers received => try to parse request
$conn->removeListener('data', $fn);
$fn = null;

if (false !== $endOfHeader) {
try {
$request = $this->parseRequest((string)\substr($this->buffer, 0, $endOfHeader));
$request = $this->parseRequest(
(string)\substr($buffer, 0, $endOfHeader),
$conn->getRemoteAddress(),
$conn->getLocalAddress()
);
} catch (Exception $exception) {
$this->emit('error', array($exception));
$this->removeAllListeners();
$buffer = '';
$this->emit('error', array(
$exception,
$conn
));
return;
}

$bodyBuffer = isset($this->buffer[$endOfHeader + 4]) ? \substr($this->buffer, $endOfHeader + 4) : '';
$this->emit('headers', array($request, $bodyBuffer));
$this->removeAllListeners();
}
$bodyBuffer = isset($buffer[$endOfHeader + 4]) ? \substr($buffer, $endOfHeader + 4) : '';
$buffer = '';
$this->emit('headers', array($request, $bodyBuffer, $conn));
});

$conn->on('close', function () use (&$buffer, &$fn) {
$fn = $buffer = null;
});
}

/**
* @param string $headers buffer string containing request headers only
* @throws \InvalidArgumentException
* @param ?string $remoteSocketUri
* @param ?string $localSocketUri
* @return ServerRequestInterface
* @throws \InvalidArgumentException
*/
private function parseRequest($headers)
private function parseRequest($headers, $remoteSocketUri, $localSocketUri)
{
// additional, stricter safe-guard for request line
// because request parser doesn't properly cope with invalid ones
Expand Down Expand Up @@ -103,22 +124,22 @@ private function parseRequest($headers)

// apply REMOTE_ADDR and REMOTE_PORT if source address is known
// address should always be known, unless this is over Unix domain sockets (UDS)
if ($this->remoteSocketUri !== null) {
$remoteAddress = \parse_url($this->remoteSocketUri);
if ($remoteSocketUri !== null) {
$remoteAddress = \parse_url($remoteSocketUri);
$serverParams['REMOTE_ADDR'] = $remoteAddress['host'];
$serverParams['REMOTE_PORT'] = $remoteAddress['port'];
}

// apply SERVER_ADDR and SERVER_PORT if server address is known
// address should always be known, even for Unix domain sockets (UDS)
// but skip UDS as it doesn't have a concept of host/port.s
if ($this->localSocketUri !== null) {
$localAddress = \parse_url($this->localSocketUri);
if ($localSocketUri !== null) {
$localAddress = \parse_url($localSocketUri);
if (isset($localAddress['host'], $localAddress['port'])) {
$serverParams['SERVER_ADDR'] = $localAddress['host'];
$serverParams['SERVER_PORT'] = $localAddress['port'];
}
if (isset($localAddress['scheme']) && $localAddress['scheme'] === 'https') {
if (isset($localAddress['scheme']) && $localAddress['scheme'] === 'tls') {
$serverParams['HTTPS'] = 'on';
}
}
Expand Down Expand Up @@ -177,7 +198,7 @@ private function parseRequest($headers)

// set URI components from socket address if not already filled via Host header
if ($request->getUri()->getHost() === '') {
$parts = \parse_url($this->localSocketUri);
$parts = \parse_url($localSocketUri);
if (!isset($parts['host'], $parts['port'])) {
$parts = array('host' => '127.0.0.1', 'port' => 80);
}
Expand All @@ -198,8 +219,8 @@ private function parseRequest($headers)
}

// Update request URI to "https" scheme if the connection is encrypted
$parts = \parse_url($this->localSocketUri);
if (isset($parts['scheme']) && $parts['scheme'] === 'https') {
$parts = \parse_url($localSocketUri);
if (isset($parts['scheme']) && $parts['scheme'] === 'tls') {
// The request URI may omit default ports here, so try to parse port
// from Host header field (if possible)
$port = $request->getUri()->getPort();
Expand Down
64 changes: 23 additions & 41 deletions src/StreamingServer.php
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
final class StreamingServer extends EventEmitter
{
private $callback;
private $parser;

/**
* Creates an HTTP server that invokes the given callback for each incoming HTTP request
Expand All @@ -108,6 +109,27 @@ public function __construct($requestHandler)
}

$this->callback = $requestHandler;
$this->parser = new RequestHeaderParser();

$that = $this;
$this->parser->on('headers', function (ServerRequestInterface $request, $bodyBuffer, ConnectionInterface $conn) use ($that) {
$that->handleRequest($conn, $request);

if ($bodyBuffer !== '') {
$conn->emit('data', array($bodyBuffer));
}
});

$this->parser->on('error', function(\Exception $e, ConnectionInterface $conn) use ($that) {
$that->emit('error', array($e));

// parsing failed => assume dummy request and send appropriate error
$that->writeError(
$conn,
$e->getCode() !== 0 ? $e->getCode() : 400,
new ServerRequest('GET', '/')
);
});
}

/**
Expand Down Expand Up @@ -154,47 +176,7 @@ public function __construct($requestHandler)
*/
public function listen(ServerInterface $socket)
{
$socket->on('connection', array($this, 'handleConnection'));
}

/** @internal */
public function handleConnection(ConnectionInterface $conn)
{
$uriLocal = $conn->getLocalAddress();
if ($uriLocal !== null) {
// local URI known, so translate transport scheme to application scheme
$uriLocal = \strtr($uriLocal, array('tcp://' => 'http://', 'tls://' => 'https://'));
}

$uriRemote = $conn->getRemoteAddress();

$that = $this;
$parser = new RequestHeaderParser($uriLocal, $uriRemote);

$listener = array($parser, 'feed');
$parser->on('headers', function (ServerRequestInterface $request, $bodyBuffer) use ($conn, $listener, $that) {
// parsing request completed => stop feeding parser
$conn->removeListener('data', $listener);

$that->handleRequest($conn, $request);

if ($bodyBuffer !== '') {
$conn->emit('data', array($bodyBuffer));
}
});

$conn->on('data', $listener);
$parser->on('error', function(\Exception $e) use ($conn, $listener, $that) {
$conn->removeListener('data', $listener);
$that->emit('error', array($e));

// parsing failed => assume dummy request and send appropriate error
$that->writeError(
$conn,
$e->getCode() !== 0 ? $e->getCode() : 400,
new ServerRequest('GET', '/')
);
});
$socket->on('connection', array($this->parser, 'handle'));
}

/** @internal */
Expand Down

0 comments on commit 34d576d

Please sign in to comment.