Skip to content

Commit

Permalink
bug #32726 [Messenger] Fix redis last error not cleared between calls…
Browse files Browse the repository at this point in the history
… (chalasr)

This PR was merged into the 4.3 branch.

Discussion
----------

[Messenger] Fix redis last error not cleared between calls

| Q             | A
| ------------- | ---
| Branch?       | 4.3
| Bug fix?      | yes
| New feature?  | no
| BC breaks?    | no
| Deprecations? | no
| Tests pass?   | yes
| Fixed tickets | -
| License       | MIT
| Doc PR        | -

Not clearing it gives misleading errors coming from previous calls which makes debugging hard.
@alexander-schranz FYI

Commits
-------

9c263ff [Messenger] Fix redis last error not cleared between calls
  • Loading branch information
fabpot committed Jul 27, 2019
2 parents e2a2dd9 + 9c263ff commit 90c6482
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 15 deletions.
Expand Up @@ -16,7 +16,7 @@
use Symfony\Component\Messenger\Transport\RedisExt\Connection;

/**
* @requires extension redis
* @requires extension redis >= 4.3.0
*/
class ConnectionTest extends TestCase
{
Expand Down Expand Up @@ -119,7 +119,7 @@ public function testUnexpectedRedisError()
$redis->expects($this->once())->method('xreadgroup')->willReturn(false);
$redis->expects($this->once())->method('getLastError')->willReturn('Redis error happens');

$connection = Connection::fromDsn('redis://localhost/queue', [], $redis);
$connection = Connection::fromDsn('redis://localhost/queue', ['auto_setup' => false], $redis);
$connection->get();
}

Expand Down Expand Up @@ -152,4 +152,31 @@ public function testGetNonBlocking()
$connection->reject($message['id']);
$redis->del('messenger-getnonblocking');
}

public function testLastErrorGetsCleared()
{
$redis = $this->getMockBuilder(\Redis::class)->disableOriginalConstructor()->getMock();

$redis->expects($this->once())->method('xadd')->willReturn(0);
$redis->expects($this->once())->method('xack')->willReturn(0);

$redis->method('getLastError')->willReturnOnConsecutiveCalls('xadd error', 'xack error');
$redis->expects($this->exactly(2))->method('clearLastError');

$connection = Connection::fromDsn('redis://localhost/messenger-clearlasterror', ['auto_setup' => false], $redis);

try {
$connection->add('message', []);
} catch (TransportException $e) {
}

$this->assertSame('xadd error', $e->getMessage());

try {
$connection->ack('1');
} catch (TransportException $e) {
}

$this->assertSame('xack error', $e->getMessage());
}
}
43 changes: 30 additions & 13 deletions src/Symfony/Component/Messenger/Transport/RedisExt/Connection.php
Expand Up @@ -114,12 +114,15 @@ public function get(): ?array
1
);
} catch (\RedisException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}

if ($e || false === $messages) {
throw new TransportException(
($e ? $e->getMessage() : $this->connection->getLastError()) ?? 'Could not read messages from the redis stream.'
);
if (false === $messages) {
if ($error = $this->connection->getLastError() ?: null) {
$this->connection->clearLastError();
}

throw new TransportException($error ?? 'Could not read messages from the redis stream.');
}

if ($this->couldHavePendingMessages && empty($messages[$this->stream])) {
Expand All @@ -144,28 +147,34 @@ public function get(): ?array

public function ack(string $id): void
{
$e = null;
try {
$acknowledged = $this->connection->xack($this->stream, $this->group, [$id]);
} catch (\RedisException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}

if ($e || !$acknowledged) {
throw new TransportException(($e ? $e->getMessage() : $this->connection->getLastError()) ?? sprintf('Could not acknowledge redis message "%s".', $id), 0, $e);
if (!$acknowledged) {
if ($error = $this->connection->getLastError() ?: null) {
$this->connection->clearLastError();
}
throw new TransportException($error ?? sprintf('Could not acknowledge redis message "%s".', $id));
}
}

public function reject(string $id): void
{
$e = null;
try {
$deleted = $this->connection->xack($this->stream, $this->group, [$id]);
$deleted = $this->connection->xdel($this->stream, [$id]) && $deleted;
} catch (\RedisException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}

if ($e || !$deleted) {
throw new TransportException(($e ? $e->getMessage() : $this->connection->getLastError()) ?? sprintf('Could not delete message "%s" from the redis stream.', $id), 0, $e);
if (!$deleted) {
if ($error = $this->connection->getLastError() ?: null) {
$this->connection->clearLastError();
}
throw new TransportException($error ?? sprintf('Could not delete message "%s" from the redis stream.', $id));
}
}

Expand All @@ -175,16 +184,19 @@ public function add(string $body, array $headers): void
$this->setup();
}

$e = null;
try {
$added = $this->connection->xadd($this->stream, '*', ['message' => json_encode(
['body' => $body, 'headers' => $headers]
)]);
} catch (\RedisException $e) {
throw new TransportException($e->getMessage(), 0, $e);
}

if ($e || !$added) {
throw new TransportException(($e ? $e->getMessage() : $this->connection->getLastError()) ?? 'Could not add a message to the redis stream.', 0, $e);
if (!$added) {
if ($error = $this->connection->getLastError() ?: null) {
$this->connection->clearLastError();
}
throw new TransportException($error ?? 'Could not add a message to the redis stream.');
}
}

Expand All @@ -196,6 +208,11 @@ public function setup(): void
throw new TransportException($e->getMessage(), 0, $e);
}

// group might already exist, ignore
if ($this->connection->getLastError()) {
$this->connection->clearLastError();
}

$this->autoSetup = false;
}
}

0 comments on commit 90c6482

Please sign in to comment.