Skip to content

Commit

Permalink
Improve documentation and examples
Browse files Browse the repository at this point in the history
  • Loading branch information
yadaiio committed Apr 3, 2024
1 parent 772af44 commit 34a5a60
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 27 deletions.
24 changes: 19 additions & 5 deletions README.md
Expand Up @@ -16,7 +16,7 @@ operations, but keeping thousands of jobs in memory at once may easily take up
all resources on your side.
Instead, you can use this library to stream your arbitrarily large input list
as individual records to a non-blocking (async) transformation handler. It uses
[ReactPHP](https://reactphp.org) to enable you to concurrently process multiple
[ReactPHP](https://reactphp.org/) to enable you to concurrently process multiple
records at once. You can control the concurrency limit, so that by allowing
it to process 10 operations at the same time, you can thus process this large
input list around 10 times faster and at the same time you're no longer limited
Expand Down Expand Up @@ -72,21 +72,25 @@ Once [installed](#install), you can use the following code to process an example
user lists by sending a (RESTful) HTTP API request for each user record:

```php
<?php

require __DIR__ . '/vendor/autoload.php';

$browser = new React\Http\Browser();

$concurrency = isset($argv[1]) ? $argv[1] : 3;

// each job should use the browser to GET a certain URL
// limit number of concurrent jobs here
$transformer = new Transformer($concurrency, function ($user) use ($browser) {
$transformer = new Clue\React\Flux\Transformer($concurrency, function ($user) use ($browser) {
// skip users that do not have an IP address listed
if (!isset($user['ip'])) {
return React\Promise\resolve($user);
}

// look up country for this IP
return $browser->get("https://ipapi.co/$user[ip]/country_name/")->then(
function (ResponseInterface $response) use ($user) {
function (Psr\Http\Message\ResponseInterface $response) use ($user) {
// response successfully received
// add country to user array and return updated user
$user['country'] = (string)$response->getBody();
Expand Down Expand Up @@ -114,7 +118,9 @@ $transformer->on('data', function ($user) {
$transformer->on('end', function () {
echo '[DONE]' . PHP_EOL;
});
$transformer->on('error', 'printf');
$transformer->on('error', function (Exception $e) {
echo 'Error: ' . $e->getMessage() . PHP_EOL;
});

```

Expand Down Expand Up @@ -241,7 +247,7 @@ $transformer = new Transformer(10, function ($url) use ($browser) {
return json_decode($response->getBody());
},
function (Exception $error) {
var_dump('There was an error', $error->getMessage());
echo 'Error: ' . $e->getMessage() . PHP_EOL;

throw $error;
}
Expand Down Expand Up @@ -411,6 +417,10 @@ $transformer = new Transformer(10, function ($data) use ($http) {
});

$source->pipe($gunzip)->pipe($ndjson)->pipe($transformer)->pipe($dest);

$transformer->on('error', function (Exception $e) {
echo 'Error: ' . $e->getMessage() . PHP_EOL;
});
```

Keep in mind that the transformation handler may return a rejected promise.
Expand Down Expand Up @@ -456,6 +466,8 @@ $promise = Transformer::all($input, 3, function ($data) use ($browser, $url) {

$promise->then(function ($count) {
echo 'All ' . $count . ' jobs successful!' . PHP_EOL;
}, function (Exception $e) {
echo 'Error: ' . $e->getMessage() . PHP_EOL;
});
```

Expand Down Expand Up @@ -561,6 +573,8 @@ $promise = Transformer::any($input, 3, function ($data) use ($browser, $url) {

$promise->then(function (ResponseInterface $response) {
echo 'First successful job: ' . $response->getBody() . PHP_EOL;
}, function (Exception $e) {
echo 'Error: ' . $e->getMessage() . PHP_EOL;
});
```

Expand Down
12 changes: 5 additions & 7 deletions examples/01-transform.php
@@ -1,8 +1,5 @@
<?php

use Clue\React\Flux\Transformer;
use Psr\Http\Message\ResponseInterface;

require __DIR__ . '/../vendor/autoload.php';

$browser = new React\Http\Browser();
Expand All @@ -11,7 +8,7 @@

// each job should use the browser to GET a certain URL
// limit number of concurrent jobs here
$transformer = new Transformer($concurrency, function ($user) use ($browser) {
$transformer = new Clue\React\Flux\Transformer($concurrency, function ($user) use ($browser) {
// skip users that do not have an IP address listed
if (!isset($user['ip'])) {
$user['country'] = 'n/a';
Expand All @@ -21,7 +18,7 @@

// look up country for this user's IP
return $browser->get("https://ipapi.co/$user[ip]/country_name/")->then(
function (ResponseInterface $response) use ($user) {
function (Psr\Http\Message\ResponseInterface $response) use ($user) {
// response successfully received
// add country to user array and return updated user
$user['country'] = (string)$response->getBody();
Expand Down Expand Up @@ -49,5 +46,6 @@ function (ResponseInterface $response) use ($user) {
$transformer->on('end', function () {
echo '[DONE]' . PHP_EOL;
});
$transformer->on('error', 'printf');

$transformer->on('error', function (Exception $e) {
echo 'Error: ' . $e->getMessage() . PHP_EOL;
});
10 changes: 3 additions & 7 deletions examples/02-transform-all.php
@@ -1,8 +1,5 @@
<?php

use Clue\React\Flux\Transformer;
use Psr\Http\Message\ResponseInterface;

require __DIR__ . '/../vendor/autoload.php';

$browser = new React\Http\Browser();
Expand All @@ -21,12 +18,12 @@
// each job should use the browser to POST each user object to a certain URL
// process all users by processing all users through transformer
// limit number of concurrent jobs here
$promise = Transformer::all($input, $concurrency, function ($user) use ($browser, $url) {
$promise = Clue\React\Flux\Transformer::all($input, $concurrency, function ($user) use ($browser, $url) {
return $browser->post(
$url,
array('Content-Type' => 'application/json'),
json_encode($user)
)->then(function (ResponseInterface $response) {
)->then(function (Psr\Http\Message\ResponseInterface $response) {
// demo HTTP response validation
$body = json_decode($response->getBody());
if (!isset($body->json)) {
Expand All @@ -40,10 +37,9 @@ function ($count) {
echo 'Successfully processed all ' . $count . ' user records' . PHP_EOL;
},
function (Exception $e) {
echo 'An error occurred: ' . $e->getMessage() . PHP_EOL;
echo 'Error: ' . $e->getMessage() . PHP_EOL;
if ($e->getPrevious()) {
echo 'Previous: ' . $e->getPrevious()->getMessage() . PHP_EOL;
}
}
);

10 changes: 3 additions & 7 deletions examples/03-transform-any.php
@@ -1,8 +1,5 @@
<?php

use Clue\React\Flux\Transformer;
use Psr\Http\Message\ResponseInterface;

require __DIR__ . '/../vendor/autoload.php';

$browser = new React\Http\Browser();
Expand All @@ -21,12 +18,12 @@
// each job should use the browser to POST each user object to a certain URL
// process all users by processing all users through transformer
// limit number of concurrent jobs here
$promise = Transformer::any($input, $concurrency, function ($user) use ($browser, $url) {
$promise = Clue\React\Flux\Transformer::any($input, $concurrency, function ($user) use ($browser, $url) {
return $browser->post(
$url,
array('Content-Type' => 'application/json'),
json_encode($user)
)->then(function (ResponseInterface $response) use ($user) {
)->then(function (Psr\Http\Message\ResponseInterface $response) use ($user) {
// demo HTTP response validation
$body = json_decode($response->getBody());
if (!isset($body->json)) {
Expand All @@ -44,10 +41,9 @@ function ($user) {
echo 'Successfully processed user record:' . print_r($user, true) . PHP_EOL;
},
function (Exception $e) {
echo 'An error occurred: ' . $e->getMessage() . PHP_EOL;
echo 'Error: ' . $e->getMessage() . PHP_EOL;
if ($e->getPrevious()) {
echo 'Previous: ' . $e->getPrevious()->getMessage() . PHP_EOL;
}
}
);

10 changes: 9 additions & 1 deletion src/Transformer.php
Expand Up @@ -86,7 +86,7 @@
* return json_decode($response->getBody());
* },
* function (Exception $error) {
* var_dump('There was an error', $error->getMessage());
* echo 'Error: ' . $e->getMessage() . PHP_EOL;
*
* throw $error;
* }
Expand Down Expand Up @@ -256,6 +256,10 @@
* });
*
* $source->pipe($gunzip)->pipe($ndjson)->pipe($transformer)->pipe($dest);
*
* $transformer->on('error', function (Exception $e) {
* echo 'Error: ' . $e->getMessage() . PHP_EOL;
* });
* ```
*
* Keep in mind that the transformation handler may return a rejected promise.
Expand Down Expand Up @@ -314,6 +318,8 @@ final class Transformer extends EventEmitter implements DuplexStreamInterface
*
* $promise->then(function ($count) {
* echo 'All ' . $count . ' jobs successful!' . PHP_EOL;
* }, function (Exception $e) {
* echo 'Error: ' . $e->getMessage() . PHP_EOL;
* });
* ```
*
Expand Down Expand Up @@ -466,6 +472,8 @@ public static function all(ReadableStreamInterface $input, $concurrency, $callba
*
* $promise->then(function (ResponseInterface $response) {
* echo 'First successful job: ' . $response->getBody() . PHP_EOL;
* }, function (Exception $e) {
* echo 'Error: ' . $e->getMessage() . PHP_EOL;
* });
* ```
*
Expand Down

0 comments on commit 34a5a60

Please sign in to comment.