Skip to content

Commit

Permalink
bug #33298 [Messenger] Stop worker when it should stop (tienvx)
Browse files Browse the repository at this point in the history
This PR was merged into the 4.3 branch.

Discussion
----------

[Messenger] Stop worker when it should stop

| Q             | A
| ------------- | ---
| Branch?       | 4.3 <!-- see below -->
| Bug fix?      | yes
| New feature?  | no <!-- please update src/**/CHANGELOG.md files -->
| BC breaks?    | no     <!-- see https://symfony.com/bc -->
| Deprecations? | no <!-- please update UPGRADE-*.md and src/**/CHANGELOG.md files -->
| Tests pass?   | yes    <!-- please add some, will be required by reviewers -->
| Fixed tickets | NA   <!-- #-prefixed issue number(s), if any -->
| License       | MIT
| Doc PR        | NA <!-- required for new features -->

<!--
Replace this notice by a short README for your feature/bugfix. This will help people
understand your PR and can be used as a start for the documentation.

Additionally (see https://symfony.com/roadmap):
 - Bug fixes must be submitted against the lowest maintained branch where they apply
   (lowest branches are regularly merged to upper ones so they get the fixes too).
 - Features and deprecations must be submitted against branch 4.4.
 - Legacy code removals go to the master branch.
-->

There are 2 things about this PR:
* This PR fix the bug when using `limit`, `memory-limit`, `time-limit` options with command `messenger:consume`, these options does not work if the receiver return multiple messages
* This PR is the continue work of #32783

Commits
-------

5c1f3a2 [Messenger] Stop worker when it should stop
  • Loading branch information
fabpot committed Aug 26, 2019
2 parents 861b483 + 5c1f3a2 commit 113fa0b
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 0 deletions.
25 changes: 25 additions & 0 deletions src/Symfony/Component/Messenger/Tests/WorkerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
use Symfony\Component\Messenger\Tests\Fixtures\DummyMessage;
use Symfony\Component\Messenger\Transport\Receiver\ReceiverInterface;
use Symfony\Component\Messenger\Worker;
use Symfony\Component\Messenger\Worker\StopWhenMessageCountIsExceededWorker;
use Symfony\Contracts\EventDispatcher\EventDispatcherInterface;

/**
Expand Down Expand Up @@ -361,6 +362,30 @@ public function testWorkerWithMultipleReceivers()
// make sure they were processed in the correct order
$this->assertSame([$envelope1, $envelope2, $envelope3, $envelope4, $envelope5, $envelope6], $processedEnvelopes);
}

public function testWorkerWithDecorator()
{
$envelope1 = new Envelope(new DummyMessage('message1'));
$envelope2 = new Envelope(new DummyMessage('message2'));
$envelope3 = new Envelope(new DummyMessage('message3'));

$receiver = new DummyReceiver([
[$envelope1, $envelope2, $envelope3],
]);

$bus = $this->getMockBuilder(MessageBusInterface::class)->getMock();

$worker = new Worker([$receiver], $bus);
$workerWithDecorator = new StopWhenMessageCountIsExceededWorker($worker, 2);
$processedEnvelopes = [];
$workerWithDecorator->run([], function (?Envelope $envelope) use ($worker, &$processedEnvelopes) {
if (null !== $envelope) {
$processedEnvelopes[] = $envelope;
}
});

$this->assertSame([$envelope1, $envelope2], $processedEnvelopes);
}
}

class DummyReceiver implements ReceiverInterface
Expand Down
4 changes: 4 additions & 0 deletions src/Symfony/Component/Messenger/Worker.php
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ public function run(array $options = [], callable $onHandledCallback = null): vo

$this->handleMessage($envelope, $receiver, $transportName, $this->retryStrategies[$transportName] ?? null);
$onHandled($envelope);

if ($this->shouldStop) {
break 2;
}
}

// after handling a single receiver, quit and start the loop again
Expand Down

0 comments on commit 113fa0b

Please sign in to comment.