Skip to content

Commit

Permalink
Merge pull request #346 from clue-labs/reuse-parser
Browse files Browse the repository at this point in the history
Internal refactoring to reuse single request parser for all requests
  • Loading branch information
jsor committed Jun 17, 2019
2 parents 809a514 + 6e7dd66 commit f30fb12
Show file tree
Hide file tree
Showing 3 changed files with 236 additions and 140 deletions.
102 changes: 65 additions & 37 deletions src/Io/RequestHeaderParser.php
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
namespace React\Http\Io;

use Evenement\EventEmitter;
use Psr\Http\Message\ServerRequestInterface;
use React\Socket\ConnectionInterface;
use RingCentral\Psr7 as g7;
use Exception;

Expand All @@ -19,47 +21,73 @@
*/
class RequestHeaderParser extends EventEmitter
{
private $buffer = '';
private $maxSize = 8192;

private $localSocketUri;
private $remoteSocketUri;

public function __construct($localSocketUri = null, $remoteSocketUri = null)
public function handle(ConnectionInterface $conn)
{
$this->localSocketUri = $localSocketUri;
$this->remoteSocketUri = $remoteSocketUri;
}
$buffer = '';
$maxSize = $this->maxSize;
$that = $this;
$conn->on('data', $fn = function ($data) use (&$buffer, &$fn, $conn, $maxSize, $that) {
// append chunk of data to buffer and look for end of request headers
$buffer .= $data;
$endOfHeader = \strpos($buffer, "\r\n\r\n");

// reject request if buffer size is exceeded
if ($endOfHeader > $maxSize || ($endOfHeader === false && isset($buffer[$maxSize]))) {
$conn->removeListener('data', $fn);
$fn = null;

$that->emit('error', array(
new \OverflowException("Maximum header size of {$maxSize} exceeded.", 431),
$conn
));
return;
}

public function feed($data)
{
$this->buffer .= $data;
$endOfHeader = \strpos($this->buffer, "\r\n\r\n");
// ignore incomplete requests
if ($endOfHeader === false) {
return;
}

if ($endOfHeader > $this->maxSize || ($endOfHeader === false && isset($this->buffer[$this->maxSize]))) {
$this->emit('error', array(new \OverflowException("Maximum header size of {$this->maxSize} exceeded.", 431), $this));
$this->removeAllListeners();
return;
}
// request headers received => try to parse request
$conn->removeListener('data', $fn);
$fn = null;

if (false !== $endOfHeader) {
try {
$this->parseAndEmitRequest($endOfHeader);
$request = $that->parseRequest(
(string)\substr($buffer, 0, $endOfHeader),
$conn->getRemoteAddress(),
$conn->getLocalAddress()
);
} catch (Exception $exception) {
$this->emit('error', array($exception));
$buffer = '';
$that->emit('error', array(
$exception,
$conn
));
return;
}
$this->removeAllListeners();
}
}

private function parseAndEmitRequest($endOfHeader)
{
$request = $this->parseRequest((string)\substr($this->buffer, 0, $endOfHeader));
$bodyBuffer = isset($this->buffer[$endOfHeader + 4]) ? \substr($this->buffer, $endOfHeader + 4) : '';
$this->emit('headers', array($request, $bodyBuffer));
$bodyBuffer = isset($buffer[$endOfHeader + 4]) ? \substr($buffer, $endOfHeader + 4) : '';
$buffer = '';
$that->emit('headers', array($request, $bodyBuffer, $conn));
});

$conn->on('close', function () use (&$buffer, &$fn) {
$fn = $buffer = null;
});
}

private function parseRequest($headers)
/**
* @param string $headers buffer string containing request headers only
* @param ?string $remoteSocketUri
* @param ?string $localSocketUri
* @return ServerRequestInterface
* @throws \InvalidArgumentException
* @internal
*/
public function parseRequest($headers, $remoteSocketUri, $localSocketUri)
{
// additional, stricter safe-guard for request line
// because request parser doesn't properly cope with invalid ones
Expand Down Expand Up @@ -99,22 +127,22 @@ private function parseRequest($headers)

// apply REMOTE_ADDR and REMOTE_PORT if source address is known
// address should always be known, unless this is over Unix domain sockets (UDS)
if ($this->remoteSocketUri !== null) {
$remoteAddress = \parse_url($this->remoteSocketUri);
if ($remoteSocketUri !== null) {
$remoteAddress = \parse_url($remoteSocketUri);
$serverParams['REMOTE_ADDR'] = $remoteAddress['host'];
$serverParams['REMOTE_PORT'] = $remoteAddress['port'];
}

// apply SERVER_ADDR and SERVER_PORT if server address is known
// address should always be known, even for Unix domain sockets (UDS)
// but skip UDS as it doesn't have a concept of host/port.s
if ($this->localSocketUri !== null) {
$localAddress = \parse_url($this->localSocketUri);
if ($localSocketUri !== null) {
$localAddress = \parse_url($localSocketUri);
if (isset($localAddress['host'], $localAddress['port'])) {
$serverParams['SERVER_ADDR'] = $localAddress['host'];
$serverParams['SERVER_PORT'] = $localAddress['port'];
}
if (isset($localAddress['scheme']) && $localAddress['scheme'] === 'https') {
if (isset($localAddress['scheme']) && $localAddress['scheme'] === 'tls') {
$serverParams['HTTPS'] = 'on';
}
}
Expand Down Expand Up @@ -173,7 +201,7 @@ private function parseRequest($headers)

// set URI components from socket address if not already filled via Host header
if ($request->getUri()->getHost() === '') {
$parts = \parse_url($this->localSocketUri);
$parts = \parse_url($localSocketUri);
if (!isset($parts['host'], $parts['port'])) {
$parts = array('host' => '127.0.0.1', 'port' => 80);
}
Expand All @@ -194,8 +222,8 @@ private function parseRequest($headers)
}

// Update request URI to "https" scheme if the connection is encrypted
$parts = \parse_url($this->localSocketUri);
if (isset($parts['scheme']) && $parts['scheme'] === 'https') {
$parts = \parse_url($localSocketUri);
if (isset($parts['scheme']) && $parts['scheme'] === 'tls') {
// The request URI may omit default ports here, so try to parse port
// from Host header field (if possible)
$port = $request->getUri()->getPort();
Expand Down
64 changes: 23 additions & 41 deletions src/StreamingServer.php
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
final class StreamingServer extends EventEmitter
{
private $callback;
private $parser;

/**
* Creates an HTTP server that invokes the given callback for each incoming HTTP request
Expand All @@ -108,6 +109,27 @@ public function __construct($requestHandler)
}

$this->callback = $requestHandler;
$this->parser = new RequestHeaderParser();

$that = $this;
$this->parser->on('headers', function (ServerRequestInterface $request, $bodyBuffer, ConnectionInterface $conn) use ($that) {
$that->handleRequest($conn, $request);

if ($bodyBuffer !== '') {
$conn->emit('data', array($bodyBuffer));
}
});

$this->parser->on('error', function(\Exception $e, ConnectionInterface $conn) use ($that) {
$that->emit('error', array($e));

// parsing failed => assume dummy request and send appropriate error
$that->writeError(
$conn,
$e->getCode() !== 0 ? $e->getCode() : 400,
new ServerRequest('GET', '/')
);
});
}

/**
Expand Down Expand Up @@ -154,47 +176,7 @@ public function __construct($requestHandler)
*/
public function listen(ServerInterface $socket)
{
$socket->on('connection', array($this, 'handleConnection'));
}

/** @internal */
public function handleConnection(ConnectionInterface $conn)
{
$uriLocal = $conn->getLocalAddress();
if ($uriLocal !== null) {
// local URI known, so translate transport scheme to application scheme
$uriLocal = \strtr($uriLocal, array('tcp://' => 'http://', 'tls://' => 'https://'));
}

$uriRemote = $conn->getRemoteAddress();

$that = $this;
$parser = new RequestHeaderParser($uriLocal, $uriRemote);

$listener = array($parser, 'feed');
$parser->on('headers', function (ServerRequestInterface $request, $bodyBuffer) use ($conn, $listener, $that) {
// parsing request completed => stop feeding parser
$conn->removeListener('data', $listener);

$that->handleRequest($conn, $request);

if ($bodyBuffer !== '') {
$conn->emit('data', array($bodyBuffer));
}
});

$conn->on('data', $listener);
$parser->on('error', function(\Exception $e) use ($conn, $listener, $that) {
$conn->removeListener('data', $listener);
$that->emit('error', array($e));

// parsing failed => assume dummy request and send appropriate error
$that->writeError(
$conn,
$e->getCode() !== 0 ? $e->getCode() : 400,
new ServerRequest('GET', '/')
);
});
$socket->on('connection', array($this->parser, 'handle'));
}

/** @internal */
Expand Down

0 comments on commit f30fb12

Please sign in to comment.