Skip to content

Commit

Permalink
Merge pull request #43 from clue-labs/queue-closure
Browse files Browse the repository at this point in the history
Support PHP 8.2, refactor queuing logic
  • Loading branch information
clue committed Jul 27, 2023
2 parents 74c1d0c + a40d6ac commit ae86562
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 25 deletions.
1 change: 1 addition & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ jobs:
strategy:
matrix:
php:
- 8.2
- 8.1
- 8.0
- 7.4
Expand Down
58 changes: 33 additions & 25 deletions src/Queue.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ class Queue implements \Countable
private $limit;
private $handler;

/** @var int<0,max> */
private $pending = 0;

/** @var array<int,\Closure():void> */
private $queue = array();

/**
Expand Down Expand Up @@ -373,24 +376,42 @@ public function __invoke()
$id = key($queue);
assert(is_int($id));

$deferred = new Deferred(function ($_, $reject) use (&$queue, $id, &$deferred) {
/** @var ?PromiseInterface<T> $pending */
$pending = null;

$deferred = new Deferred(function ($_, $reject) use (&$queue, $id, &$pending) {
// forward cancellation to pending operation if it is currently executing
if (isset($deferred->pending) && $deferred->pending instanceof PromiseInterface && \method_exists($deferred->pending, 'cancel')) {
$deferred->pending->cancel();
if ($pending instanceof PromiseInterface && \method_exists($pending, 'cancel')) {
$pending->cancel();
}
unset($deferred->pending);
$pending = null;

if (isset($deferred->args)) {
if (isset($queue[$id])) {
// queued promise cancelled before its handler is invoked
// remove from queue and reject explicitly
unset($queue[$id], $deferred->args);
unset($queue[$id]);
$reject(new \RuntimeException('Cancelled queued job before processing started'));
}
});

// queue job to process if number of pending jobs is below concurrency limit again
$deferred->args = func_get_args();
$queue[$id] = $deferred;
$handler = $this->handler; // PHP 5.4+
$args = func_get_args();
$that = $this; // PHP 5.4+
$queue[$id] = function () use ($handler, $args, $deferred, &$pending, $that) {
$pending = \call_user_func_array($handler, $args);

$that->await($pending)->then(
function ($result) use ($deferred, &$pending) {
$pending = null;
$deferred->resolve($result);
},
function ($e) use ($deferred, &$pending) {
$pending = null;
$deferred->reject($e);
}
);
};

return $deferred->promise();
}
Expand All @@ -407,7 +428,7 @@ public function count()
*/
public function await(PromiseInterface $promise)
{
$that = $this;
$that = $this; // PHP 5.4+

return $promise->then(function ($result) use ($that) {
$that->processQueue();
Expand All @@ -430,28 +451,15 @@ public function processQueue()
return;
}

$deferred = reset($this->queue);
assert($deferred instanceof Deferred);
$next = reset($this->queue);
assert($next instanceof \Closure);
unset($this->queue[key($this->queue)]);

// once number of pending jobs is below concurrency limit again:
// await this situation, invoke handler and await its resolution before invoking next queued job
++$this->pending;

$promise = call_user_func_array($this->handler, $deferred->args);
$deferred->pending = $promise;
unset($deferred->args);

// invoke handler and await its resolution before invoking next queued job
$this->await($promise)->then(
function ($result) use ($deferred) {
unset($deferred->pending);
$deferred->resolve($result);
},
function ($e) use ($deferred) {
unset($deferred->pending);
$deferred->reject($e);
}
);
$next();
}
}

0 comments on commit ae86562

Please sign in to comment.