diff --git a/src/Illuminate/Bus/Batch.php b/src/Illuminate/Bus/Batch.php index 8eb916094c79..401ef5e74205 100644 --- a/src/Illuminate/Bus/Batch.php +++ b/src/Illuminate/Bus/Batch.php @@ -6,6 +6,7 @@ use Closure; use Illuminate\Contracts\Queue\Factory as QueueFactory; use Illuminate\Contracts\Support\Arrayable; +use Illuminate\Foundation\Bus\PendingChain; use Illuminate\Queue\CallQueuedClosure; use Illuminate\Queue\SerializableClosure; use Illuminate\Support\Arr; @@ -161,18 +162,39 @@ public function fresh() */ public function add($jobs) { - $jobs = Collection::wrap($jobs)->map(function ($job) { + $jobTotal = 0; + $jobs = Collection::wrap($jobs)->map(function ($job) use (&$jobTotal) { if ($job instanceof Closure) { $job = CallQueuedClosure::create($job); } - $job->withBatchId($this->id); + if ($job instanceof PendingChain) { + $batchId = $this->id; + + $job->job->withBatchId($batchId); + + $jobTotal++; + + array_walk($job->chain, function (&$job) use ($batchId) { + if ($job instanceof Closure) { + $job = CallQueuedClosure::create($job); + } + $job->withBatchId($batchId); + }); + + $jobTotal += count($job->chain); + + return $job->prepare(); + } else { + $job->withBatchId($this->id); + $jobTotal++; + } return $job; }); - $this->repository->transaction(function () use ($jobs) { - $this->repository->incrementTotalJobs($this->id, count($jobs)); + $this->repository->transaction(function () use ($jobs, $jobTotal) { + $this->repository->incrementTotalJobs($this->id, $jobTotal); $this->queue->connection($this->options['connection'] ?? null)->bulk( $jobs->all(), diff --git a/src/Illuminate/Foundation/Bus/PendingChain.php b/src/Illuminate/Foundation/Bus/PendingChain.php index 8965e9923fa1..568eb70f0598 100644 --- a/src/Illuminate/Foundation/Bus/PendingChain.php +++ b/src/Illuminate/Foundation/Bus/PendingChain.php @@ -109,11 +109,11 @@ public function catchCallbacks() } /** - * Dispatch the job with the given arguments. + * Prepare a job chain for dispatch with the given arguments. * - * @return \Illuminate\Foundation\Bus\PendingDispatch + * @return CallQueuedClosure|mixed */ - public function dispatch() + public function prepare() { if (is_string($this->job)) { $firstJob = new $this->job(...func_get_args()); @@ -128,6 +128,18 @@ public function dispatch() $firstJob->chain($this->chain); $firstJob->chainCatchCallbacks = $this->catchCallbacks(); - return app(Dispatcher::class)->dispatch($firstJob); + return $firstJob; + } + + /** + * Dispatch the job with the given arguments. + * + * @return \Illuminate\Foundation\Bus\PendingDispatch + */ + public function dispatch() + { + $preparedJob = $this->prepare(); + + return app(Dispatcher::class)->dispatch($preparedJob); } } diff --git a/tests/Bus/BusBatchTest.php b/tests/Bus/BusBatchTest.php index 3a7c7e8056de..f04b83e827d4 100644 --- a/tests/Bus/BusBatchTest.php +++ b/tests/Bus/BusBatchTest.php @@ -8,11 +8,16 @@ use Illuminate\Bus\BatchFactory; use Illuminate\Bus\DatabaseBatchRepository; use Illuminate\Bus\PendingBatch; +use Illuminate\Bus\Queueable; use Illuminate\Container\Container; use Illuminate\Contracts\Queue\Factory; +use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Database\Capsule\Manager as DB; use Illuminate\Database\Eloquent\Model; +use Illuminate\Foundation\Bus\Dispatchable; +use Illuminate\Foundation\Bus\PendingChain; use Illuminate\Queue\CallQueuedClosure; +use Illuminate\Support\Facades\Bus; use Mockery as m; use PHPUnit\Framework\TestCase; use RuntimeException; @@ -300,6 +305,54 @@ public function test_batch_state_can_be_inspected() $this->assertTrue(is_string(json_encode($batch))); } + public function test_chain_can_be_added_to_batch() + { + $queue = m::mock(Factory::class); + + $batch = $this->createTestBatch($queue); + + $firstJob = new FirstTestJob(); + + $secondJob = new SecondTestJob(); + + $thirdJob = new ThirdTestJob(); + + $fourthJob = function () { + }; + + $queue->shouldReceive('connection')->once() + ->with('test-connection') + ->andReturn($connection = m::mock(stdClass::class)); + + $connection->shouldReceive('bulk')->once()->with(\Mockery::on(function ($args) use ($firstJob, $secondJob, $thirdJob) { + return + $args[0] == $firstJob + && $args[1] == $secondJob + && unserialize($args[1]->chained[0]) == $thirdJob + && unserialize($args[1]->chained[1]) instanceof CallQueuedClosure + && is_string(unserialize($args[1]->chained[1])->batchId); + }), '', 'test-queue'); + + // Without "RuntimeException: A facade root has not been set" is thrown + Bus::partialMock()->shouldReceive('chain')->andReturn(new PendingChain($secondJob, [$thirdJob, $fourthJob])); + + $batch = $batch->add([ + $firstJob, + Bus::chain([ + $secondJob, + $thirdJob, + $fourthJob, + ]), + ]); + + $this->assertEquals(4, $batch->totalJobs); + $this->assertEquals(4, $batch->pendingJobs); + $this->assertTrue(is_string($firstJob->batchId)); + $this->assertTrue(is_string($secondJob->batchId)); + $this->assertTrue(is_string($thirdJob->batchId)); + $this->assertInstanceOf(CarbonImmutable::class, $batch->createdAt); + } + protected function createTestBatch($queue, $allowFailures = false) { $repository = new DatabaseBatchRepository(new BatchFactory($queue), DB::connection(), 'job_batches'); @@ -345,3 +398,18 @@ protected function schema() return $this->connection()->getSchemaBuilder(); } } + +class FirstTestJob implements ShouldQueue +{ + use Dispatchable, Queueable, Batchable; +} + +class SecondTestJob implements ShouldQueue +{ + use Dispatchable, Queueable, Batchable; +} + +class ThirdTestJob implements ShouldQueue +{ + use Dispatchable, Queueable, Batchable; +}