diff --git a/src/Io/ClientRequestState.php b/src/Io/ClientRequestState.php new file mode 100644 index 00000000..73a63a14 --- /dev/null +++ b/src/Io/ClientRequestState.php @@ -0,0 +1,16 @@ +pending)) { - $deferred->pending->cancel(); - unset($deferred->pending); + $state = new ClientRequestState(); + $deferred = new Deferred(function () use ($state) { + if ($state->pending !== null) { + $state->pending->cancel(); + $state->pending = null; } }); - $deferred->numRequests = 0; - // use timeout from options or default to PHP's default_socket_timeout (60) $timeout = (float)($this->timeout !== null ? $this->timeout : ini_get("default_socket_timeout")); $loop = $this->loop; - $this->next($request, $deferred)->then( - function (ResponseInterface $response) use ($deferred, $loop, &$timeout) { - if (isset($deferred->timeout)) { - $loop->cancelTimer($deferred->timeout); - unset($deferred->timeout); + $this->next($request, $deferred, $state)->then( + function (ResponseInterface $response) use ($state, $deferred, $loop, &$timeout) { + if ($state->timeout !== null) { + $loop->cancelTimer($state->timeout); + $state->timeout = null; } $timeout = -1; $deferred->resolve($response); }, - function ($e) use ($deferred, $loop, &$timeout) { - if (isset($deferred->timeout)) { - $loop->cancelTimer($deferred->timeout); - unset($deferred->timeout); + function ($e) use ($state, $deferred, $loop, &$timeout) { + if ($state->timeout !== null) { + $loop->cancelTimer($state->timeout); + $state->timeout = null; } $timeout = -1; $deferred->reject($e); @@ -106,13 +105,13 @@ function ($e) use ($deferred, $loop, &$timeout) { $body = $request->getBody(); if ($body instanceof ReadableStreamInterface && $body->isReadable()) { $that = $this; - $body->on('close', function () use ($that, $deferred, &$timeout) { + $body->on('close', function () use ($that, $deferred, $state, &$timeout) { if ($timeout >= 0) { - $that->applyTimeout($deferred, $timeout); + $that->applyTimeout($deferred, $state, $timeout); } }); } else { - $this->applyTimeout($deferred, $timeout); + $this->applyTimeout($deferred, $state, $timeout); } return $deferred->promise(); @@ -120,53 +119,51 @@ function ($e) use ($deferred, $loop, &$timeout) { /** * @internal - * @param Deferred $deferred - * @param number $timeout + * @param number $timeout * @return void */ - public function applyTimeout(Deferred $deferred, $timeout) + public function applyTimeout(Deferred $deferred, ClientRequestState $state, $timeout) { - $deferred->timeout = $this->loop->addTimer($timeout, function () use ($timeout, $deferred) { + $state->timeout = $this->loop->addTimer($timeout, function () use ($timeout, $deferred, $state) { $deferred->reject(new \RuntimeException( 'Request timed out after ' . $timeout . ' seconds' )); - if (isset($deferred->pending)) { - $deferred->pending->cancel(); - unset($deferred->pending); + if ($state->pending !== null) { + $state->pending->cancel(); + $state->pending = null; } }); } - private function next(RequestInterface $request, Deferred $deferred) + private function next(RequestInterface $request, Deferred $deferred, ClientRequestState $state) { $this->progress('request', array($request)); $that = $this; - ++$deferred->numRequests; + ++$state->numRequests; $promise = $this->sender->send($request); if (!$this->streaming) { - $promise = $promise->then(function ($response) use ($deferred, $that) { - return $that->bufferResponse($response, $deferred); + $promise = $promise->then(function ($response) use ($deferred, $state, $that) { + return $that->bufferResponse($response, $deferred, $state); }); } - $deferred->pending = $promise; + $state->pending = $promise; return $promise->then( - function (ResponseInterface $response) use ($request, $that, $deferred) { - return $that->onResponse($response, $request, $deferred); + function (ResponseInterface $response) use ($request, $that, $deferred, $state) { + return $that->onResponse($response, $request, $deferred, $state); } ); } /** * @internal - * @param ResponseInterface $response * @return PromiseInterface Promise */ - public function bufferResponse(ResponseInterface $response, $deferred) + public function bufferResponse(ResponseInterface $response, Deferred $deferred, ClientRequestState $state) { $stream = $response->getBody(); @@ -205,26 +202,24 @@ function ($e) use ($stream, $maximumSize) { } ); - $deferred->pending = $promise; + $state->pending = $promise; return $promise; } /** * @internal - * @param ResponseInterface $response - * @param RequestInterface $request * @throws ResponseException * @return ResponseInterface|PromiseInterface */ - public function onResponse(ResponseInterface $response, RequestInterface $request, $deferred) + public function onResponse(ResponseInterface $response, RequestInterface $request, Deferred $deferred, ClientRequestState $state) { $this->progress('response', array($response, $request)); // follow 3xx (Redirection) response status codes if Location header is present and not explicitly disabled // @link https://tools.ietf.org/html/rfc7231#section-6.4 if ($this->followRedirects && ($response->getStatusCode() >= 300 && $response->getStatusCode() < 400) && $response->hasHeader('Location')) { - return $this->onResponseRedirect($response, $request, $deferred); + return $this->onResponseRedirect($response, $request, $deferred, $state); } // only status codes 200-399 are considered to be valid, reject otherwise @@ -242,7 +237,7 @@ public function onResponse(ResponseInterface $response, RequestInterface $reques * @return PromiseInterface * @throws \RuntimeException */ - private function onResponseRedirect(ResponseInterface $response, RequestInterface $request, Deferred $deferred) + private function onResponseRedirect(ResponseInterface $response, RequestInterface $request, Deferred $deferred, ClientRequestState $state) { // resolve location relative to last request URI $location = Uri::resolve($request->getUri(), $response->getHeaderLine('Location')); @@ -250,11 +245,11 @@ private function onResponseRedirect(ResponseInterface $response, RequestInterfac $request = $this->makeRedirectRequest($request, $location); $this->progress('redirect', array($request)); - if ($deferred->numRequests >= $this->maxRedirects) { + if ($state->numRequests >= $this->maxRedirects) { throw new \RuntimeException('Maximum number of redirects (' . $this->maxRedirects . ') exceeded'); } - return $this->next($request, $deferred); + return $this->next($request, $deferred, $state); } /** diff --git a/tests/HttpServerTest.php b/tests/HttpServerTest.php index 4d00fcef..31bc32ee 100644 --- a/tests/HttpServerTest.php +++ b/tests/HttpServerTest.php @@ -17,6 +17,9 @@ final class HttpServerTest extends TestCase private $connection; private $socket; + /** @var ?int */ + private $called = null; + /** * @before */ diff --git a/tests/Io/StreamingServerTest.php b/tests/Io/StreamingServerTest.php index 2703362a..a2700b86 100644 --- a/tests/Io/StreamingServerTest.php +++ b/tests/Io/StreamingServerTest.php @@ -17,6 +17,9 @@ class StreamingServerTest extends TestCase private $connection; private $socket; + /** @var ?int */ + private $called = null; + /** * @before */