Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[8.x] Handle concurrent asynchronous requests in the HTTP client #36948

Merged
merged 8 commits into from Apr 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/Illuminate/Http/Client/Factory.php
Expand Up @@ -34,6 +34,8 @@
* @method \Illuminate\Http\Client\PendingRequest withoutVerifying()
* @method \Illuminate\Http\Client\PendingRequest dump()
* @method \Illuminate\Http\Client\PendingRequest dd()
* @method \Illuminate\Http\Client\PendingRequest async()
* @method \Illuminate\Http\Client\Pool pool()
* @method \Illuminate\Http\Client\Response delete(string $url, array $data = [])
* @method \Illuminate\Http\Client\Response get(string $url, array $query = [])
* @method \Illuminate\Http\Client\Response head(string $url, array $query = [])
Expand Down
157 changes: 146 additions & 11 deletions src/Illuminate/Http/Client/PendingRequest.php
Expand Up @@ -5,10 +5,13 @@
use GuzzleHttp\Client;
use GuzzleHttp\Cookie\CookieJar;
use GuzzleHttp\Exception\ConnectException;
use GuzzleHttp\Exception\RequestException;
use GuzzleHttp\Exception\TransferException;
use GuzzleHttp\HandlerStack;
use Illuminate\Support\Collection;
use Illuminate\Support\Str;
use Illuminate\Support\Traits\Macroable;
use Psr\Http\Message\MessageInterface;
use Symfony\Component\VarDumper\VarDumper;

class PendingRequest
Expand All @@ -22,6 +25,13 @@ class PendingRequest
*/
protected $factory;

/**
* The client instance.
*
* @var \GuzzleHttp\Client
*/
protected $client;

/**
* The base URL for the request.
*
Expand Down Expand Up @@ -106,6 +116,20 @@ class PendingRequest
*/
protected $middleware;

/**
* Whether the requests should be asynchronous.
*
* @var bool
*/
protected $async = false;

/**
* The pending request promise.
*
* @var \GuzzleHttp\Promise\PromiseInterface
*/
protected $promise;

/**
* Create a new HTTP Client instance.
*
Expand Down Expand Up @@ -601,18 +625,14 @@ public function send(string $method, string $url, array $options = [])

[$this->pendingBody, $this->pendingFiles] = [null, []];

if ($this->async) {
return $this->makePromise($method, $url, $options);
}

return retry($this->tries ?? 1, function () use ($method, $url, $options) {
try {
$laravelData = $this->parseRequestData($method, $url, $options);

return tap(new Response($this->buildClient()->request($method, $url, $this->mergeOptions([
'laravel_data' => $laravelData,
'on_stats' => function ($transferStats) {
$this->transferStats = $transferStats;
},
], $options))), function ($response) {
$response->cookies = $this->cookies;
$response->transferStats = $this->transferStats;
return tap(new Response($this->sendRequest($method, $url, $options)), function ($response) {
$this->populateResponse($response);

if ($this->tries > 1 && ! $response->successful()) {
$response->throw();
Expand All @@ -637,6 +657,49 @@ protected function parseMultipartBodyFormat(array $data)
})->values()->all();
}

/**
* Send an asynchronous request to the given URL.
*
* @param string $method
* @param string $url
* @param array $options
* @return \GuzzleHttp\Promise\PromiseInterface
*/
protected function makePromise(string $method, string $url, array $options = [])
{
return $this->promise = $this->sendRequest($method, $url, $options)
->then(function (MessageInterface $message) {
return $this->populateResponse(new Response($message));
})
->otherwise(function (TransferException $e) {
return $e instanceof RequestException ? $this->populateResponse(new Response($e->getResponse())) : $e;
});
}

/**
* Send a request either synchronously or asynchronously.
*
* @param string $method
* @param string $url
* @param array $options
* @return \Psr\Http\Message\MessageInterface|\GuzzleHttp\Promise\PromiseInterface
*
* @throws \Exception
*/
protected function sendRequest(string $method, string $url, array $options = [])
{
$clientMethod = $this->async ? 'requestAsync' : 'request';

$laravelData = $this->parseRequestData($method, $url, $options);

return $this->buildClient()->$clientMethod($method, $url, $this->mergeOptions([
'laravel_data' => $laravelData,
'on_stats' => function ($transferStats) {
$this->transferStats = $transferStats;
},
], $options));
}

/**
* Get the request data as an array so that we can attach it to the request for convenient assertions.
*
Expand Down Expand Up @@ -664,14 +727,42 @@ protected function parseRequestData($method, $url, array $options)
return $laravelData;
}

/**
* Populate the given response with additional data.
*
* @param \Illuminate\Http\Client\Response $response
* @return \Illuminate\Http\Client\Response
*/
protected function populateResponse(Response $response)
{
$response->cookies = $this->cookies;

$response->transferStats = $this->transferStats;

return $response;
}

/**
* Set the client instance.
*
* @param \GuzzleHttp\Client $client
* @return $this
*/
public function setClient(Client $client)
{
$this->client = $client;

return $this;
}

/**
* Build the Guzzle client.
*
* @return \GuzzleHttp\Client
*/
public function buildClient()
{
return new Client([
return $this->client = $this->client ?: new Client([
'handler' => $this->buildHandlerStack(),
'cookies' => true,
]);
Expand Down Expand Up @@ -826,4 +917,48 @@ public function stub($callback)

return $this;
}

/**
* Toggle asynchronicity in requests.
*
* @param bool $async
* @return $this
*/
public function async(bool $async = true)
{
$this->async = $async;

return $this;
}

/**
* Send a pool of asynchronous requests concurrently.
*
* @param callable $callback
* @return array
*/
public function pool(callable $callback)
{
$results = [];

$requests = tap(new Pool($this->factory), $callback)->getRequests();

foreach ($requests as $key => $item) {
$results[$key] = $item instanceof static ? $item->getPromise()->wait() : $item->wait();
}

ksort($results);

return $results;
}

/**
* Retrieve the pending request promise.
*
* @return \GuzzleHttp\Promise\PromiseInterface|null
*/
public function getPromise()
{
return $this->promise;
}
}
84 changes: 84 additions & 0 deletions src/Illuminate/Http/Client/Pool.php
@@ -0,0 +1,84 @@
<?php

namespace Illuminate\Http\Client;

class Pool
{
/**
* The factory instance.
*
* @var \Illuminate\Http\Client\Factory
*/
protected $factory;

/**
* The client instance.
*
* @var \GuzzleHttp\Client
*/
protected $client;

/**
* The pool of requests.
*
* @var array
*/
protected $pool = [];

/**
* Create a new requests pool.
*
* @param \Illuminate\Http\Client\Factory|null $factory
* @return void
*/
public function __construct(Factory $factory = null)
{
$this->factory = $factory ?: new Factory();

$this->client = $this->factory->buildClient();
}

/**
* Add a request to the pool with a key.
*
* @param string $key
* @return \Illuminate\Http\Client\PendingRequest
*/
public function add(string $key)
{
return $this->pool[$key] = $this->asyncRequest();
}

/**
* Retrieve a new async pending request.
*
* @return \Illuminate\Http\Client\PendingRequest
*/
protected function asyncRequest()
{
// the same client instance needs to be shared across all async requests
return $this->factory->setClient($this->client)->async();
}

/**
* Retrieve the requests in the pool.
*
* @return array
*/
public function getRequests()
{
return $this->pool;
}

/**
* Add a request to the pool with a numeric index.
*
* @param string $method
* @param array $parameters
* @return \Illuminate\Http\Client\PendingRequest
*/
public function __call($method, $parameters)
{
return $this->pool[] = $this->asyncRequest()->$method(...$parameters);
}
}
2 changes: 2 additions & 0 deletions src/Illuminate/Support/Facades/Http.php
Expand Up @@ -32,6 +32,8 @@
* @method static \Illuminate\Http\Client\PendingRequest withoutVerifying()
* @method static \Illuminate\Http\Client\PendingRequest dump()
* @method static \Illuminate\Http\Client\PendingRequest dd()
* @method static \Illuminate\Http\Client\PendingRequest async()
* @method static \Illuminate\Http\Client\Pool pool()
* @method static \Illuminate\Http\Client\Response delete(string $url, array $data = [])
* @method static \Illuminate\Http\Client\Response get(string $url, array $query = [])
* @method static \Illuminate\Http\Client\Response head(string $url, array $query = [])
Expand Down