Skip to content

Commit

Permalink
Fixes for scan/sscan/zscan/hscan & transaction/multi handling (#181)
Browse files Browse the repository at this point in the history
* Fixes type hinting on for scan/sscan/zscan/hscan 
* Aligns behavior of scan/sscan/zscan/hscan with phpredis of how an empty/falsy iterator is handled.
* Fix errors in exec() could cause the redis connection and credis internal state to become out of sync
* Prevent scan/sscan/zscan/hscan from being used in pipeline/multi mode, as phpredis would cause trigger a fatal php error instead of throwing an exception in this case
* Fix calling mget([]) in standalone mode didn't match phpredis mode
* Fix multi mode failing when commands where issued between multi() and pipeline(). Recommended usage is however pipeline()-> multi()->...->exec()

---------

Co-authored-by: Xon <635541+Xon@users.noreply.github.com>
  • Loading branch information
Xon and Xon committed Apr 18, 2023
1 parent 9956928 commit 2881043
Show file tree
Hide file tree
Showing 4 changed files with 227 additions and 56 deletions.
6 changes: 6 additions & 0 deletions .editorconfig
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[*.php]
charset=utf-8
end_of_line=lf
insert_final_newline=true
indent_style=space
indent_size=4
3 changes: 3 additions & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,6 @@
/.travis.yml export-ignore
/composer.lock export-ignore
/phpunit.xml export-ignore
*.php text eol=lf
*.js text eol=lf
*.md text eol=lf
156 changes: 113 additions & 43 deletions Client.php
Original file line number Diff line number Diff line change
Expand Up @@ -758,6 +758,20 @@ public function select($index)
return $response;
}

/**
* @param string $caller
* @return void
* @throws CredisException
*/
protected function assertNotPipelineOrMulti($caller)
{
if ($this->standalone && ($this->isMulti || $this->usePipeline) ||
// phpredis triggers a php fatal error, so do the check before
!$this->standalone && ($this->redis->getMode() === Redis::MULTI || $this->redis->getMode() === Redis::PIPELINE)) {
throw new CredisException('multi()/pipeline() mode can not be used with '.$caller);
}
}

/**
* @param string|array $pattern
* @return array
Expand All @@ -770,49 +784,57 @@ public function pUnsubscribe()
}

/**
* @param int $Iterator
* @param ?int $Iterator
* @param string $pattern
* @param int $count
* @return bool|array
* @throws CredisException
*/
public function scan(&$Iterator, $pattern = null, $count = null)
{
$this->assertNotPipelineOrMulti(__METHOD__);
return $this->__call('scan', array(&$Iterator, $pattern, $count));
}

/**
* @param int $Iterator
* @param ?int $Iterator
* @param string $field
* @param string $pattern
* @param int $count
* @return bool|array
* @throws CredisException
*/
public function hscan(&$Iterator, $field, $pattern = null, $count = null)
{
$this->assertNotPipelineOrMulti(__METHOD__);
return $this->__call('hscan', array($field, &$Iterator, $pattern, $count));
}

/**
* @param int $Iterator
* @param ?int $Iterator
* @param string $field
* @param string $pattern
* @param int $Iterator
* @return bool|array
* @throws CredisException
*/
public function sscan(&$Iterator, $field, $pattern = null, $count = null)
{
$this->assertNotPipelineOrMulti(__METHOD__);
return $this->__call('sscan', array($field, &$Iterator, $pattern, $count));
}

/**
* @param int $Iterator
* @param ?int $Iterator
* @param string $field
* @param string $pattern
* @param int $Iterator
* @return bool|array
* @throws CredisException
*/
public function zscan(&$Iterator, $field, $pattern = null, $count = null)
{
$this->assertNotPipelineOrMulti(__METHOD__);
return $this->__call('zscan', array($field, &$Iterator, $pattern, $count));
}

Expand Down Expand Up @@ -931,6 +953,7 @@ public function __call($name, $args)

// Send request via native PHP
if ($this->standalone) {
// Early returns should verify how phpredis behaves!
$trackedArgs = array();
switch ($name) {
case 'eval':
Expand Down Expand Up @@ -974,8 +997,10 @@ public function __call($name, $args)
break;
case 'scan':
$trackedArgs = array(&$args[0]);
if (empty($trackedArgs[0])) {
if ($trackedArgs[0] === null) {
$trackedArgs[0] = 0;
} elseif ($trackedArgs[0] === 0) {
return false;
}
$eArgs = array($trackedArgs[0]);
if (!empty($args[1])) {
Expand All @@ -992,8 +1017,10 @@ public function __call($name, $args)
case 'zscan':
case 'hscan':
$trackedArgs = array(&$args[1]);
if (empty($trackedArgs[0])) {
if ($trackedArgs[0] === null) {
$trackedArgs[0] = 0;
} elseif ($trackedArgs[0] === 0) {
return false;
}
$eArgs = array($args[0], $trackedArgs[0]);
if (!empty($args[2])) {
Expand Down Expand Up @@ -1028,7 +1055,7 @@ public function __call($name, $args)
$args = array_values($args[0]);
}
if (is_array($args) && count($args) === 0) {
return false;
return ($this->isMulti || $this->usePipeline) ? $this : false;
}
break;
case 'hmset':
Expand All @@ -1053,6 +1080,12 @@ public function __call($name, $args)
$trackedArgs = $args[1];
}
break;
case 'multi':
// calling multi() multiple times is a no-op
if ($this->isMulti) {
return $this;
}
break;
}
// Flatten arguments
$args = self::_flattenArguments($args);
Expand All @@ -1063,40 +1096,52 @@ public function __call($name, $args)
throw new CredisException('A pipeline is already in use and only one pipeline is supported.');
} elseif ($name === 'exec') {
if ($this->isMulti) {
$this->commandNames[] = array($name, $trackedArgs);
$this->commandNames[] = array($name, $trackedArgs, true);
$this->commands .= self::_prepare_command(array($this->getRenamedCommand($name)));
}

// Write request
if ($this->commands) {
$this->write_command($this->commands);
}
$this->commands = null;
try {
// Write request
if ($this->commands) {
$this->write_command($this->commands);
}

// Read response
$queuedResponses = array();
$response = array();
foreach ($this->commandNames as $command) {
list($name, $arguments) = $command;
$result = $this->read_reply($name, true);
if ($result !== null) {
$result = $this->decode_reply($name, $result, $arguments);
} else {
$queuedResponses[] = $command;
// Read response
$queuedResponses = array();
$response = array();
foreach ($this->commandNames as $command) {
list($name, $arguments, $requireDispatch) = $command;
if (!$requireDispatch) {
$queuedResponses[] = $command;
continue;
}
$result = $this->read_reply($name, true);
if ($result !== null) {
if ($name === 'multi') {
continue;
}
$result = $this->decode_reply($name, $result, $arguments);
$response[] = $result;
} else {
$queuedResponses[] = $command;
}
}
$response[] = $result;
}

if ($this->isMulti) {
$response = array_pop($response);
foreach ($queuedResponses as $key => $command) {
list($name, $arguments) = $command;
$response[$key] = $this->decode_reply($name, $response[$key], $arguments);
if ($this->isMulti) {
$execResponse = array_pop($response);
foreach ($queuedResponses as $key => $command) {
list($name, $arguments) = $command;
$response[] = $this->decode_reply($name, $execResponse[$key], $arguments);
}
}
} catch (CredisException $e) {
// the connection on redis's side is likely in a bad state, force it closed to abort the pipeline/transaction
$this->close(true);
throw $e;
} finally {
$this->commands = $this->commandNames = null;
$this->isMulti = $this->usePipeline = false;
}

$this->commandNames = null;
$this->usePipeline = $this->isMulti = false;
return $response;
} elseif ($name === 'discard') {
$this->commands = null;
Expand All @@ -1107,7 +1152,7 @@ public function __call($name, $args)
$this->isMulti = true;
}
array_unshift($args, $this->getRenamedCommand($name));
$this->commandNames[] = array($name, $trackedArgs);
$this->commandNames[] = array($name, $trackedArgs, true);
$this->commands .= self::_prepare_command($args);
return $this;
}
Expand All @@ -1116,7 +1161,9 @@ public function __call($name, $args)
// Start pipeline mode
if ($name === 'pipeline') {
$this->usePipeline = true;
$this->commandNames = array();
if (!$this->isMulti) {
$this->commandNames = [];
}
$this->commands = '';
return $this;
}
Expand All @@ -1129,18 +1176,41 @@ public function __call($name, $args)
// Non-pipeline mode
array_unshift($args, $this->getRenamedCommand($name));
$command = self::_prepare_command($args);
$this->write_command($command);
$response = $this->read_reply($name);
$response = $this->decode_reply($name, $response, $trackedArgs);
// transaction mode needs to track commands
if ($this->isMulti) {
try {
if ($name === 'exec' || $name === 'discard') {
try {
$this->write_command($command);
$response = $this->read_reply($name);
$response = $this->decode_reply($name, $response, $trackedArgs);
} finally {
$this->isMulti = false;
$this->commandNames = [];
}
} else {
$this->commandNames[] = array($name, $trackedArgs, false);
$this->write_command($command);
$response = $this->read_reply($name);
}
} catch (CredisException $e) {
// the connection on redis's side is likely in a bad state, force it closed to abort the transaction
$this->isMulti = false;
$this->commandNames = [];
$this->close(true);
throw $e;
}
} else {
$this->write_command($command);
$response = $this->read_reply($name);
$response = $this->decode_reply($name, $response, $trackedArgs);
}

// Watch mode disables reconnect so error is thrown
if ($name == 'watch') {
if ($name === 'watch') {
$this->isWatching = true;
} // Transaction mode
elseif ($this->isMulti && ($name == 'exec' || $name == 'discard')) {
$this->isMulti = false;
} // Started transaction
elseif ($this->isMulti || $name == 'multi') {
elseif ($this->isMulti || $name === 'multi') {
$this->isMulti = true;
$response = $this;
}
Expand Down

0 comments on commit 2881043

Please sign in to comment.