diff --git a/.gitignore b/.gitignore index d098259..3452648 100644 --- a/.gitignore +++ b/.gitignore @@ -1,48 +1,3 @@ -# Cache and logs (Symfony2) -/app/cache/* -/app/logs/* -!app/cache/.gitkeep -!app/logs/.gitkeep - -# Email spool folder -/app/spool/* - -# Cache, session files and logs (Symfony3) -/var/cache/* -/var/logs/* -/var/sessions/* -!var/cache/.gitkeep -!var/logs/.gitkeep -!var/sessions/.gitkeep - -# Parameters -/app/config/parameters.yml -/app/config/parameters.ini - -# Managed by Composer -/app/bootstrap.php.cache -/var/bootstrap.php.cache -/bin/* -!bin/console -!bin/symfony_requirements /vendor/ - -# Assets and user uploads -/web/bundles/ -/web/uploads/ - -# PHPUnit -/app/phpunit.xml -/phpunit.xml - -# Build data -/build/ - -# Composer PHAR -/composer.phar - -# Backup entities generated with doctrine:generate:entities command -**/Entity/*~ - -# Embedded web-server pid file -/.web-server-pid +composer.lock +.php_cs.cache diff --git a/.php_cs b/.php_cs index e14de40..dc3efb7 100644 --- a/.php_cs +++ b/.php_cs @@ -29,6 +29,7 @@ return PhpCsFixer\Config::create() ->setRiskyAllowed(true) ->setFinder( PhpCsFixer\Finder::create() + ->exclude('lib/Symfony') ->in(__DIR__ . '/src') ->files()->name('*.php') ) diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..e8861c7 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,54 @@ +dist: xenial +sudo: required + +language: php + +cache: + directories: + - "$HOME/.composer/cache" + +git: + depth: 2 + +matrix: + fast_finish: true + include: + - php: 7.1 + - php: 7.2 + env: ENABLE_IGBINARY=true + - php: 7.3 + env: CHECK_CS=true + +services: + - redis-server +# - memcached + - docker + +# test only master + stable (+ Pull requests) +branches: + only: + - master + - /^\d.\d+$/ + +before_install: + - phpenv config-rm xdebug.ini + - | + # Start Redis cluster + docker pull grokzen/redis-cluster:4.0.12 + docker run -d -p 7000:7000 -p 7001:7001 -p 7002:7002 -p 7003:7003 -p 7004:7004 -p 7005:7005 --name redis-cluster grokzen/redis-cluster:4.0.8 + export REDIS_CLUSTER_HOSTS='localhost:7000 localhost:7001 localhost:7002 localhost:7003 localhost:7004 localhost:7005' + +install: +# - echo "extension = memcached.so" >> ~/.phpenv/versions/$(phpenv version-name)/etc/conf.d/travis.ini + - echo "extension = redis.so" >> ~/.phpenv/versions/$(phpenv version-name)/etc/conf.d/travis.ini + - if [ "$ENABLE_IGBINARY" != "" ] ; then pecl install igbinary ; fi + - if [ "$ENABLE_LZF" != "" ] ; then printf "no\n" | pecl install lzf ; fi + - travis_retry composer install --prefer-dist --no-interaction + +script: + - composer test + - if [ "$CHECK_CS" != "" ]; then ./vendor/bin/php-cs-fixer fix -v --dry-run --diff --show-progress=estimating; fi + + +notifications: + email: false diff --git a/LICENSE b/LICENSE index c96fad8..bd855f9 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ MIT License -Copyright (c) 2018 eZ Systems +Copyright (c) 2019 eZ Systems Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal diff --git a/README.md b/README.md index d12478a..69ccfea 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,8 @@ # ezsystems/symfony-tools -Collection of polyfill (backport) and incubator features for Symfony. +Collection of polyfill (backported) and incubator (proposed) features for Symfony. Backports Symfony features so they can be used in earlier versions of Symfony, and -aims to serve as incubator for ideas to improve Symfony in the future. +proposed features improving Symfony further. This bundle is first and foremost aiming to cover needs of [eZ Platform](https://ezplatform.com), but is placed in own bundle under MIT as we think others can benefit and help collaborate, and @@ -10,37 +10,45 @@ to simplify forward and backport ports to and from Symfony itself. ### Requriments -- Symfony 3.4 _(4.3+ planned spring 2019, 2.8 support might happen if we need it)_ -- PHP 7.1+ 3.x branch for Symfony3 _(PHP 5.6+ for 2.x for Symfony2 if we ever add that)_ +- Symfony 3.4 +- PHP 7.1+ _(due to backported Symfony 4 code being written for PHP 7.1+)_ #### Semantic Versioning exception Bundle follows [SemVer](https://semver.org/) with one exception: -- Incubator features are allowed to break BC also in Minor versions (x.Y.z), __when__ needed in order to align with changes to the feature when it gets contributed to Symfony. +- Incubator features are allowed to break BC also in Minor versions (x.Y.z), __when__ needed in order to align with +changes to the feature when it gets accepted to Symfony. -!! Tip: As such if you rely on incubator features, make sure to require specific minor versions in composer, like `~1.1.0` or `~1.1.2 || ~1.2.0` +!! Tip: As such if you rely on incubator features, make sure to require specific minor versions in composer, like +`~1.1.0` or `~1.1.2 || ~1.2.0` ### Features **Polyfill (backport) features:** - [Redis session handler](doc/RedisSessionHandler.md) _(for Symfony3, native in Symfony4)_ -**Incubator features** -- +**Incubator (proposed) features** +- [NativeTagAwareAdapters](doc/NativeTagAwareAdapters.md) ### Contributing -Make sure as much as possible the feature is forward compatible for users, so when they upgrade to Symfony version where it's included, they should ideally not need to adapt their code/config. _(see `Semantic Versioning exception` for how this works for incubators)_ +Make sure as much as possible the feature is forward compatible for users, so when they upgrade to Symfony version where +it's included, they should ideally not need to adapt their code/config. _(see `Semantic Versioning exception` for how +this works for incubators)_ **Polyfill (Backports)** -When contributing Symfony backports to this bundle, be aware you commit to help maintain that feature in case there are bug fixes or improvements to that feature in Symfony itself. +When contributing Symfony backports to this bundle, be aware you commit to help maintain that feature in case there are +bug fixes or improvements to that feature in Symfony itself. -**Incubator** -Incubator features should only be proposed here if you intend to contribute this to Symfony itself, and there is at least some certainty it will be accepted. And you also commit to adapt the feature here, if changes are requested once proposed to Symfony. Essentially aiming for the feature here becoming a polyfill/backport feature in the end. +**Incubator (Proposed)** +Incubator features should only be proposed here if also proposed against Symfony itself, and there is at least some +certainty it will be accepted. And you also commit to adapt the feature here, if changes are requested once proposed to +Symfony. Essentially aiming for the feature here becoming a polyfill/backport feature in the end. -As such it's only applicable for smaller features _(e.g. new cache adapter(s))_, not a complete new component or larger changes across Symfony itself for instance. +As such it's only applicable for smaller features _(e.g. new cache adapter(s))_, not a complete new component or larger +changes across Symfony itself for instance. ### License diff --git a/composer.json b/composer.json index e7530f6..dba95d9 100644 --- a/composer.json +++ b/composer.json @@ -23,13 +23,26 @@ }, "require": { "php": "^7.1", - "symfony/symfony": "^3.4.17" + "symfony/symfony": "^3.4.23" }, "require-dev": { + "cache/integration-tests": "dev-master", + "friendsofphp/php-cs-fixer": "v2.14.0", + "phpdocumentor/reflection-docblock": "^3.0|^4.0", "phpunit/phpunit": "^7.3", - "friendsofphp/php-cs-fixer": "^2.7" + "predis/predis": "^1.1.0", + "symfony/phpunit-bridge": "~3.4|~4.0" + + }, + "conflict": { + "ezsystems/ezpublish-kernel": "7.0 - 7.3.4 | 7.4.0 - 7.4.2" + }, + "suggest": { + "ext-redis": "For use with RedisSessionHandler & RedisTagAwareAdapter, usage of native redis v3.1.3+ extension is recommended", + "ext-igbinary": "To improve serialization size and speed for cache and sessions, install igbinary extension " }, "scripts": { - "fix-cs": "@php ./vendor/bin/php-cs-fixer fix -v --show-progress=estimating" + "fix-cs": "@php ./vendor/bin/php-cs-fixer fix -v --show-progress=estimating", + "test": "@php ./vendor/bin/phpunit" } } diff --git a/doc/NativeTagAwareAdapters.md b/doc/NativeTagAwareAdapters.md new file mode 100644 index 0000000..3a97c2c --- /dev/null +++ b/doc/NativeTagAwareAdapters.md @@ -0,0 +1,92 @@ +# NativeTagAwareAdapters + +This feature is an Incubator, and as such might change from minor release to the next. + +Contains a set of optimized TagAwareAdapters that cuts number of cache lookups down by half +compared to usage of Symfony's TagAwareAdapter. In short, for Filesystem symlinks for tags are used, +and for Redis a Set is used to keep track of ids connected to a tag, and instead of tag lookups to +find expiry info on each request this info is used to do it on-demand when calling invalidation buy tags. +_See Adapters for further details._ + +It also backports `Marshaller` feature from Symfony 4 in order to support serialization with igbinary. +The `MarshallerInterface` and `DefaultMarshaller` class is taken from the following revision: d2098d7 +See: https://github.com/symfony/symfony/commits/master/src/Symfony/Component/Cache/Marshaller + +## Requirements +- Symfony 3.4, PHP 7.1+ +- For usage eZ Platform v2: `ezsystems/ezpublish-kernel` v7.3.5, v7.4.3 or higher. +- For `RedisTagAwareAdapter` usage: + - [PHP Redis](https://pecl.php.net/package/redis) extension v3.1.3 or higher, _or_ [Predis](https://packagist.org/packages/predis/predis) + - Redis 3.2 or higher, configured with `noeviction` or any `volatile-*` eviction policy + +## Configuration +After installing the bundle, you have to configure proper services in order to use this. + +**Here is an example on how to do that with eZ Platform:** + + +### File system cache + +In `app/config/cache_pool/app.cache.tagaware.filesystem.yml`, place the following: +```yaml +services: + app.cache.tagaware.filesystem: + class: Symfony\Component\Cache\Adapter\TagAware\FilesystemTagAwareAdapter + parent: cache.adapter.filesystem + tags: + - name: cache.pool + clearer: cache.app_clearer + # Cache namespace prefix overriding the one used by Symfony by default + # This makes sure cache is reliably shared across whole cluster and all Symfony env's + # Can be used for blue/green deployment strategies when changes affect content cache. + # For multi db setup adapt this to be unique per pool (one pool per database) + # If you prefer default behaviour set this to null or comment out, and consider for instance: + # https://symfony.com/doc/current/reference/configuration/framework.html#prefix-seed + namespace: '%cache_namespace%' +``` + +Once that is done you can enable the handler, for instance by setting the following environment variable for PHP: +```bash +export CACHE_POOL="app.cache.tagaware.filesystem" +``` + +_Then clear cache and restart web server, you'll be able to verify it's in use on Symfony's web debug toolbar._ + + +### Redis cache + +In `app/config/cache_pool/app.cache.tagaware.redis.yml`, place the following: +```yaml +services: + app.cache.tagaware.redis: + class: Symfony\Component\Cache\Adapter\TagAware\RedisTagAwareAdapter + parent: cache.adapter.redis + tags: + - name: cache.pool + clearer: cache.app_clearer + # Examples from vendor/symfony/symfony/src/Symfony/Component/Cache/Traits/RedisTrait.php: + # redis://localhost:6379 + # redis://secret@example.com:1234/13 + # redis://secret@/var/run/redis.sock/13?persistent_id=4&class=Redis&timeout=3&retry_interval=3 + # Example using Predis: redis://%cache_dsn%?class=\Predis\Client + provider: 'redis://%cache_dsn%' + # Cache namespace prefix overriding the one used by Symfony by default + # This makes sure cache is reliably shared across whole cluster and all Symfony env's + # Can be used for blue/green deployment strategies when changes affect content cache. + # For multi db setup adapt this to be unique per pool (one pool per database) + # If you prefer default behaviour set this to null or comment out, and consider for instance: + # https://symfony.com/doc/current/reference/configuration/framework.html#prefix-seed + namespace: '%cache_namespace%' +``` + +Once that is done you can enable the handler, for instance by setting the following environment variable for PHP: +```bash +export CACHE_POOL="app.cache.tagaware.redis" +``` +If you don't have redis, for testing you can use: +- Run: `docker run --name my-redis -p 6379:6379 -d redis`. +- Stop + Remove: `docker rm -f my-redis`. +- Debug: `printf "PING\r\n" | nc localhost 6379`, should return `+PONG`. + + +_Then clear cache and restart web server, you'll be able to verify it's in use on Symfony's web debug toolbar._ diff --git a/phpunit.xml.dist b/phpunit.xml.dist new file mode 100644 index 0000000..b9607f4 --- /dev/null +++ b/phpunit.xml.dist @@ -0,0 +1,55 @@ + + + + + + + + + + + + tests + + + ./src/lib/Symfony/Components/*/Tests/ + + + + + + ./ + + ./tests + ./vendor + ./src/lib/Symfony/Components/*/Tests/ + + + + + + + + + + + Cache\IntegrationTests + Symfony\Component\Cache + Symfony\Component\Cache\Tests\Fixtures + Symfony\Component\Cache\Traits + Symfony\Component\Cache\Tests\Adapter + Symfony\Component\Cache\Tests\Traits + + + + + + + diff --git a/src/lib/Symfony/Components/Cache/Adapter/TagAware/AbstractTagAwareAdapter.php b/src/lib/Symfony/Components/Cache/Adapter/TagAware/AbstractTagAwareAdapter.php new file mode 100644 index 0000000..ca4e371 --- /dev/null +++ b/src/lib/Symfony/Components/Cache/Adapter/TagAware/AbstractTagAwareAdapter.php @@ -0,0 +1,377 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + * + * Original source: https://github.com/symfony/symfony/pull/30370 + */ + +declare(strict_types=1); + +namespace Symfony\Component\Cache\Adapter\TagAware; + +use Psr\Cache\CacheItemInterface; +use Psr\Log\LoggerAwareInterface; +use Symfony\Component\Cache\Adapter\AdapterInterface; +use Symfony\Component\Cache\Marshaller\DefaultMarshaller; +use Symfony\Component\Cache\Marshaller\MarshallerInterface; +use Symfony\Component\Cache\CacheItem; +use Symfony\Component\Cache\Exception\InvalidArgumentException; +use Symfony\Component\Cache\ResettableInterface; +use Symfony\Component\Cache\Traits\AbstractTrait; + +/** + * Re-implements Symfony's AbstractAdapter (as it uses private properties). + * + * In order to be able to store tags with values to avoid 2x lookups for tags, and to be able to use backported Marshaller + * for serialization. + */ +abstract class AbstractTagAwareAdapter implements AdapterInterface, LoggerAwareInterface, ResettableInterface +{ + use AbstractTrait; + + private const TAG_PREFIX = 'tag-'; + + private $createCacheItem; + private $mergeByLifetime; + + /** + * @var \Symfony\Component\Cache\Marshaller\MarshallerInterface + * NOTE: Not relevant in this way in Symfony 4+ where Abstract trait already uses this + */ + protected static $marshaller; + + /** + * @param string $namespace + * @param int $defaultLifetime + * @param MarshallerInterface|null $marshaller + * + * @throws \Symfony\Component\Cache\Exception\CacheException + */ + protected function __construct(string $namespace = '', int $defaultLifetime = 0, MarshallerInterface $marshaller = null) + { + self::$marshaller = $marshaller ?? new DefaultMarshaller(); + + $this->namespace = '' === $namespace ? '' : CacheItem::validateKey($namespace) . ':'; + if (null !== $this->maxIdLength && \strlen($namespace) > $this->maxIdLength - 24) { + throw new InvalidArgumentException(sprintf('Namespace must be %d chars max, %d given ("%s")', $this->maxIdLength - 24, \strlen($namespace), $namespace)); + } + $this->createCacheItem = \Closure::bind( + static function ($key, $value, $isHit) use ($defaultLifetime) { + $item = new CacheItem(); + $item->key = $key; + $item->isHit = $isHit; + $item->defaultLifetime = $defaultLifetime; + // extract Value and Tags from the cache value + $item->value = $value['value']; + $item->prevTags = $value['tags'] ?? []; + // + + return $item; + }, + null, + CacheItem::class + ); + $getId = \Closure::fromCallable([$this, 'getId']); + $tagPrefix = self::TAG_PREFIX; + $this->mergeByLifetime = \Closure::bind( + static function ($deferred, $namespace, &$expiredIds) use ($getId, $tagPrefix) { + $byLifetime = []; + $now = time(); + $expiredIds = []; + + foreach ($deferred as $key => $item) { + // store Value and Tags on the cache value + $key = (string) $key; + $id = $getId($key); + $value = ['value' => $item->value, 'tags' => $item->tags]; + + // Extract tag changes, these should be removed from values in doSave() + $value['tag-operations'] = ['add' => [], 'remove' => []]; + $oldTags = $item->prevTags ?? []; + foreach (array_diff($value['tags'], $oldTags) as $addedTag) { + $value['tag-operations']['add'][] = $getId($tagPrefix.$addedTag); + } + foreach (array_diff($oldTags, $value['tags']) as $removedTag) { + $value['tag-operations']['remove'][] = $getId($tagPrefix.$removedTag); + } + + if (null === $item->expiry) { + $byLifetime[0 < $item->defaultLifetime ? $item->defaultLifetime : 0][$id] = $value; + } elseif ($item->expiry > $now) { + $byLifetime[$item->expiry - $now][$id] = $value; + } else { + $expiredIds[] = $id; + } + // + } + + return $byLifetime; + }, + null, + CacheItem::class + ); + } + + /** + * {@inheritdoc} + */ + public function getItem($key) + { + if ($this->deferred) { + $this->commit(); + } + $id = $this->getId($key); + + $f = $this->createCacheItem; + $isHit = false; + $value = null; + + try { + foreach ($this->doFetch([$id]) as $value) { + $isHit = true; + } + } catch (\Exception $e) { + CacheItem::log($this->logger, 'Failed to fetch key "{key}"', ['key' => $key, 'exception' => $e]); + } + + return $f($key, $value, $isHit); + } + + /** + * {@inheritdoc} + */ + public function getItems(array $keys = []) + { + if ($this->deferred) { + $this->commit(); + } + $ids = []; + + foreach ($keys as $key) { + $ids[] = $this->getId($key); + } + try { + $items = $this->doFetch($ids); + } catch (\Exception $e) { + CacheItem::log($this->logger, 'Failed to fetch requested items', ['keys' => $keys, 'exception' => $e]); + $items = []; + } + $ids = array_combine($ids, $keys); + + return $this->generateItems($items, $ids); + } + + /** + * {@inheritdoc} + */ + public function save(CacheItemInterface $item) + { + if (!$item instanceof CacheItem) { + return false; + } + $this->deferred[$item->getKey()] = $item; + + return $this->commit(); + } + + /** + * {@inheritdoc} + */ + public function saveDeferred(CacheItemInterface $item) + { + if (!$item instanceof CacheItem) { + return false; + } + $this->deferred[$item->getKey()] = $item; + + return true; + } + + /** + * {@inheritdoc} + */ + public function commit() + { + $ok = true; + $byLifetime = $this->mergeByLifetime; + $byLifetime = $byLifetime($this->deferred, $this->namespace, $expiredIds); + $retry = $this->deferred = []; + + if ($expiredIds) { + $this->doDelete($expiredIds); + } + foreach ($byLifetime as $lifetime => $values) { + try { + $e = $this->doSave($values, $lifetime); + } catch (\Exception $e) { + } + if (true === $e || [] === $e) { + continue; + } + if (\is_array($e) || 1 === \count($values)) { + foreach (\is_array($e) ? $e : array_keys($values) as $id) { + $ok = false; + $v = $values[$id]; + $type = \is_object($v) ? \get_class($v) : \gettype($v); + CacheItem::log($this->logger, 'Failed to save key "{key}" ({type})', ['key' => substr($id, \strlen($this->namespace)), 'type' => $type, 'exception' => $e instanceof \Exception ? $e : null]); + } + } else { + foreach ($values as $id => $v) { + $retry[$lifetime][] = $id; + } + } + } + + // When bulk-save failed, retry each item individually + foreach ($retry as $lifetime => $ids) { + foreach ($ids as $id) { + try { + $v = $byLifetime[$lifetime][$id]; + $e = $this->doSave([$id => $v], $lifetime); + } catch (\Exception $e) { + } + if (true === $e || [] === $e) { + continue; + } + $ok = false; + $type = \is_object($v) ? \get_class($v) : \gettype($v); + CacheItem::log($this->logger, 'Failed to save key "{key}" ({type})', ['key' => substr($id, \strlen($this->namespace)), 'type' => $type, 'exception' => $e instanceof \Exception ? $e : null]); + } + } + + return $ok; + } + + public function __destruct() + { + if ($this->deferred) { + $this->commit(); + } + } + + private function generateItems($items, &$keys) + { + $f = $this->createCacheItem; + + try { + foreach ($items as $id => $value) { + if (!isset($keys[$id])) { + $id = key($keys); + } + $key = $keys[$id]; + unset($keys[$id]); + yield $key => $f($key, $value, true); + } + } catch (\Exception $e) { + CacheItem::log($this->logger, 'Failed to fetch requested items', ['keys' => array_values($keys), 'exception' => $e]); + } + + foreach ($keys as $key) { + yield $key => $f($key, null, false); + } + } + + /** + * {@inheritdoc} + * + * Overloaded in order to deal with tags for adjusted doDelete() signature. + */ + public function deleteItems(array $keys) + { + if (!$keys) { + return true; + } + + $ids = []; + $tagData = []; + + foreach ($keys as $key) { + $ids[$key] = $this->getId($key); + unset($this->deferred[$key]); + } + + foreach ($this->doFetch($ids) as $id => $value) { + foreach ($value['tags'] ?? [] as $tag) { + $tagData[$this->getId(self::TAG_PREFIX.$tag)][] = $id; + } + } + + try { + if ($this->doDelete(array_values($ids), $tagData)) { + return true; + } + } catch (\Exception $e) { + } + + $ok = true; + + // When bulk-delete failed, retry each item individually + foreach ($ids as $key => $id) { + try { + $e = null; + if ($this->doDelete([$id])) { + continue; + } + } catch (\Exception $e) { + } + CacheItem::log($this->logger, 'Failed to delete key "{key}"', ['key' => $key, 'exception' => $e]); + $ok = false; + } + + return $ok; + } + + /** + * Removes multiple items from the pool and their corresponding tags. + * + * @param array $ids An array of identifiers that should be removed from the pool + * @param array $tagData Optional array of tag identifiers => key identifiers that should be removed from the pool + * + * @return bool True if the items were successfully removed, false otherwise + */ + abstract protected function doDelete(array $ids, array $tagData = []); + + /** + * {@inheritdoc} + */ + public function invalidateTags(array $tags) + { + if (empty($tags)) { + return false; + } + + $tagIds = []; + foreach (array_unique($tags) as $tag) { + $tagIds[] = $this->getId(self::TAG_PREFIX.$tag); + } + + if ($this->doInvalidate($tagIds)) { + return true; + } + + return false; + } + + /** + * Invalidates cached items using tags. + * + * @param string[] $tagIds an array of tags to invalidate, key is tag and value is tag id + * + * @return bool True on success + */ + abstract protected function doInvalidate(array $tagIds): bool; + + /** + * Overload unserialize() in order to use marshaller. + */ + protected static function unserialize($value) + { + return self::$marshaller->unmarshall($value); + } +} diff --git a/src/lib/Symfony/Components/Cache/Adapter/TagAware/FilesystemTagAwareAdapter.php b/src/lib/Symfony/Components/Cache/Adapter/TagAware/FilesystemTagAwareAdapter.php new file mode 100644 index 0000000..e109204 --- /dev/null +++ b/src/lib/Symfony/Components/Cache/Adapter/TagAware/FilesystemTagAwareAdapter.php @@ -0,0 +1,208 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + * + * Original source: https://github.com/symfony/symfony/pull/30370 + */ + +declare(strict_types=1); + +namespace Symfony\Component\Cache\Adapter\TagAware; + +use Symfony\Component\Cache\Adapter\TagAwareAdapterInterface; +use Symfony\Component\Cache\PruneableInterface; +use Symfony\Component\Cache\Traits\FilesystemTrait; +use Symfony\Component\Cache\Exception\CacheException; +use Symfony\Component\Filesystem\Filesystem; + +/** + * Class FilesystemTagAwareAdapter, stores tag <> id relationship as a symlink so we don't need to fetch tags on get*(). + * + * Stores tag/id information as symlinks to the cache files they refer to, in order to: + * - skip loading tags info on reads + * - be able to iterate cache to clear on-demand when invalidation by tags + */ +final class FilesystemTagAwareAdapter extends AbstractTagAwareAdapter implements TagAwareAdapterInterface, PruneableInterface +{ + use FilesystemTrait { + doDelete as deleteCache; + } + + /** + * Folder used for tag symlinks. + */ + private const TAG_FOLDER = 'tags'; + + /** + * @var \Symfony\Component\Filesystem\Filesystem|null + */ + private $fs; + + public function __construct(string $namespace = '', int $defaultLifetime = 0, string $directory = null) + { + parent::__construct('', $defaultLifetime); + $this->init($namespace, $directory); + } + + /** + * This method overrides {@see \Symfony\Component\Cache\Traits\FilesystemTrait::doSave}. + * + * It needs to be overridden due to: + * - usage of `serialize()` in the original method + * - need to store tag information on save + * + * {@inheritdoc} + */ + protected function doSave(array $values, $lifetime) + { + // Extract and remove tag operations form values + $tagOperations = ['add' => [], 'remove' => []]; + foreach ($values as $id => $value) { + $tagOperations['add'][$id] = $value['tag-operations']['add']; + $tagOperations['remove'][$id] = $value['tag-operations']['remove']; + unset($value['tag-operations']); + } + + $failed = []; + $serialized = self::$marshaller->marshall($values, $failed); + if (empty($serialized)) { + return $failed; + } + + $expiresAt = $lifetime ? (time() + $lifetime) : 0; + foreach ($serialized as $id => $value) { + $file = $this->getFile($id, true); + if (!$this->write($file, $expiresAt . "\n" . rawurlencode($id) . "\n" . $value, $expiresAt)) { + $failed[] = $id; + continue; + } + } + + if (!empty($failed) && !is_writable($this->directory)) { + throw new CacheException(sprintf('Cache directory is not writable (%s)', $this->directory)); + } + + $fs = $this->getFilesystem(); + // Add Tags as symlinks + foreach ($tagOperations['add'] as $id => $tagIds) { + if (!empty($failed) && \in_array($id, $failed)) { + continue; + } + + $file = $this->getFile($id); + $itemFileName = $this->getItemLinkFileName($id); + foreach ($tagIds as $tagId) { + $fs->symlink($file, $this->getTagFolder($tagId).$itemFileName); + } + } + + // Unlink removed Tags + $files = []; + foreach ($tagOperations['remove'] as $id => $tagIds) { + if (!empty($failed) && \in_array($id, $failed)) { + continue; + } + + $itemFileName = $this->getItemLinkFileName($id); + foreach ($tagIds as $tagId) { + $files[] = $this->getTagFolder($tagId).$itemFileName; + } + } + $fs->remove($files); + + return $failed; + } + + /** + * {@inheritdoc} + */ + protected function doDelete(array $ids, array $tagData = []) + { + $ok = $this->deleteCache($ids); + + // Remove tags + $files = []; + $fs = $this->getFilesystem(); + foreach ($tagData as $tagId => $idMap) { + $tagFolder = $this->getTagFolder($tagId); + foreach ($idMap as $id) { + $files[] = $tagFolder.$this->getItemLinkFileName($id); + } + } + $fs->remove($files); + + return $ok; + } + + /** + * {@inheritdoc} + */ + public function doInvalidate(array $tagIds): bool + { + foreach ($tagIds as $tagId) { + $tagsFolder = $this->getTagFolder($tagId); + if (!is_dir($tagsFolder)) { + continue; + } + + foreach (new \RecursiveIteratorIterator(new \RecursiveDirectoryIterator($tagsFolder, \FilesystemIterator::SKIP_DOTS)) as $itemLink) { + if (!$itemLink->isLink()) { + throw new \Exception('Tag link is not a link: '.$itemLink); + } + + $valueFile = $itemLink->getRealPath(); + if ($valueFile && file_exists($valueFile)) { + @unlink($valueFile); + } + + @unlink((string) $itemLink); + } + } + + return true; + } + + private function getFilesystem(): Filesystem + { + return $this->fs ?? $this->fs = new Filesystem(); + } + + private function getTagFolder(string $tagId): string + { + return $this->directory.self::TAG_FOLDER.\DIRECTORY_SEPARATOR.str_replace('/', '-', $tagId).\DIRECTORY_SEPARATOR; + } + + private function getItemLinkFileName(string $keyId): string + { + // Use MD5 to favor speed over security, which is not an issue here + $hash = str_replace('/', '-', base64_encode(hash('md5', static::class.$keyId, true))); + + return substr($hash, 0, 20); + } + + /** + * This method overrides {@see \Symfony\Component\Cache\Traits\FilesystemCommonTrait::getFile}. + * + * Backports Symfony 4 optimization of using md5 instead of sha1, given this is used on reads. + * + * {@inheritdoc} + */ + private function getFile($id, $mkdir = false) + { + // Use MD5 to favor speed over security, which is not an issue here + $hash = str_replace('/', '-', base64_encode(hash('md5', static::class . $id, true))); + $dir = $this->directory.strtoupper($hash[0].\DIRECTORY_SEPARATOR.$hash[1].\DIRECTORY_SEPARATOR); + + if ($mkdir && !file_exists($dir)) { + @mkdir($dir, 0777, true); + } + + return $dir.substr($hash, 2, 20); + } +} diff --git a/src/lib/Symfony/Components/Cache/Adapter/TagAware/RedisTagAwareAdapter.php b/src/lib/Symfony/Components/Cache/Adapter/TagAware/RedisTagAwareAdapter.php new file mode 100644 index 0000000..4210405 --- /dev/null +++ b/src/lib/Symfony/Components/Cache/Adapter/TagAware/RedisTagAwareAdapter.php @@ -0,0 +1,186 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + * + * Original source: https://github.com/symfony/symfony/pull/30370 + */ + +declare(strict_types=1); + +namespace Symfony\Component\Cache\Adapter\TagAware; + +use Predis; +use Predis\Connection\Aggregate\ClusterInterface; +use Predis\Response\Status; +use Symfony\Component\Cache\Adapter\TagAwareAdapterInterface; +use Symfony\Component\Cache\Traits\RedisTrait; + +/** + * Class RedisTagAwareAdapter, stores tag <> id relationship as a Set so we don't need to fetch tags on get* operations. + * + * Requirements/Limitations: + * - Redis 3.2+ (sPOP) + * - PHP Redis 3.1.3+ (sPOP) or Predis + * - Redis configured with any `volatile-*` eviction policy, or `noeviction` if you are sure to never fill up memory + * - This is to guarantee that tags ("relations") survives cache items so we can reliably invalidate on them, + * which is archived by always storing cache with a expiry, while Set is without expiry (non-volatile). + * - Max 2 billion keys per tag, so if you use a "all" items tag for expiry, that limits you to 2 billion items + */ +final class RedisTagAwareAdapter extends AbstractTagAwareAdapter implements TagAwareAdapterInterface +{ + use RedisTrait; + + /** + * Redis "Set" can hold more than 4 billion members, here we limit ourselves to PHP's > 2 billion max int (32Bit). + */ + private const POP_MAX_LIMIT = 2147483647 - 1; + + /** + * Limits for how many keys are deleted in batch. + */ + private const BULK_DELETE_LIMIT = 10000; + + /** + * On cache items without a lifetime set, we force it to 10 days. + * This is to make sure tags are *never* cleared before cache items are cleared (risking in-consistent cache). + */ + private const FORCED_ITEM_TTL = 864000; + + /** + * @param \Redis|\RedisArray|\RedisCluster|\Predis\Client $redisClient The redis client + * @param string $namespace The default namespace + * @param int $defaultLifetime The default lifetime + * + * @throws \Exception If phpredis is in use but with version lower then 3.1.3. + */ + public function __construct($redisClient, string $namespace = '', int $defaultLifetime = 0) + { + $this->init($redisClient, $namespace, $defaultLifetime); + + // Make sure php-redis is 3.1.3 or higher configured for Redis classes + if (!$this->redis instanceof Predis\Client && version_compare(phpversion('redis'), '3.1.3', '<')) { + throw new \Exception('RedisTagAwareAdapter requries php-redis 3.1.3 or higher, alternatively use predis/predis'); + } + } + + /** + * This method overrides @see \Symfony\Component\Cache\Traits\RedisTrait::doSave. + * + * It needs to be overridden due to: + * - usage of native `serialize` method in the original method. + * - Need to store tags separately also, for invalidateTags() use. + * + * {@inheritdoc} + */ + protected function doSave(array $values, $lifetime) + { + // Extract tag operations + $tagOperations = ['sAdd' => [], 'sRem' => []]; + foreach ($values as $id => $value) { + foreach ($value['tag-operations']['add'] as $tag => $tagId) { + $tagOperations['sAdd'][$tagId][] = $id; + } + + foreach ($value['tag-operations']['remove'] as $tag => $tagId) { + $tagOperations['sRem'][$tagId][] = $id; + } + + unset($value['tag-operations']); + } + + // serilize values + if (!$serialized = self::$marshaller->marshall($values, $failed)) { + return $failed; + } + + // While pipeline isn't supported on RedisCluster, other setups will at least benefit from doing this in one op + $results = $this->pipeline(static function () use ($serialized, $lifetime, $tagOperations) { + // Store cache items, force a ttl if none is set, as there is no MSETEX we need to set each one + foreach ($serialized as $id => $value) { + yield 'setEx' => [ + $id, + 0 >= $lifetime ? self::FORCED_ITEM_TTL : $lifetime, + $value, + ]; + } + + // Add and Remove Tags + foreach ($tagOperations as $command => $tagSet) { + foreach ($tagSet as $tagId => $ids) { + yield $command => array_merge([$tagId], $ids); + } + } + }); + + foreach ($results as $id => $result) { + // Skip results of SADD/SREM operations, they'll be 1 or 0 depending on if set value already existed or not + if (is_numeric($result)) { + continue; + } + // setEx results + if (true !== $result && (!$result instanceof Status || $result !== Status::get('OK'))) { + $failed[] = $id; + } + } + + return $failed; + } + + /** + * {@inheritdoc} + */ + protected function doDelete(array $ids, array $tagData = []) + { + if (!$ids) { + return true; + } + + $predisCluster = $this->redis instanceof \Predis\Client && $this->redis->getConnection() instanceof ClusterInterface; + $this->pipeline(static function () use ($ids, $tagData, $predisCluster) { + if ($predisCluster) { + foreach ($ids as $id) { + yield 'del' => [$id]; + } + } else { + yield 'del' => $ids; + } + + foreach ($tagData as $tagId => $idMap) { + yield 'sRem' => array_merge([$tagId], $idMap); + } + })->rewind(); + + return true; + } + + /** + * {@inheritdoc} + */ + public function doInvalidate(array $tagIds): bool + { + // Pop all tag info at once to avoid race conditions + $tagIdSets = $this->pipeline(static function () use ($tagIds) { + foreach ($tagIds as $tagId) { + // Client: Predis or PHP Redis 3.1.3+ (https://github.com/phpredis/phpredis/commit/d2e203a6) + // Server: Redis 3.2 or higher (https://redis.io/commands/spop) + yield 'sPop' => [$tagId, self::POP_MAX_LIMIT]; + } + }); + + // Flatten generator result from pipleline, ignore keys (tag ids) + $ids = array_unique(array_merge(...iterator_to_array($tagIdSets, false))); + + // Delete cache in chunks to avoid overloading the connection + foreach (\array_chunk($ids, self::BULK_DELETE_LIMIT) as $chunkIds) { + $this->doDelete($chunkIds); + } + + return true; + } +} diff --git a/src/lib/Symfony/Components/Cache/Marshaller/DefaultMarshaller.php b/src/lib/Symfony/Components/Cache/Marshaller/DefaultMarshaller.php new file mode 100644 index 0000000..f3284d4 --- /dev/null +++ b/src/lib/Symfony/Components/Cache/Marshaller/DefaultMarshaller.php @@ -0,0 +1,103 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + * + * Original source: + * https://github.com/symfony/symfony/blob/master/src/Symfony/Component/Cache/Marshaller/DefaultMarshaller.php + * Last revision: https://github.com/symfony/symfony/commit/3cfdc9e9d739dda0bc8d222d458e6bbb34c10c72 (+ CS commit: d2098d7) + */ + +namespace Symfony\Component\Cache\Marshaller; + +use Symfony\Component\Cache\Exception\CacheException; + +/** + * Serializes/unserializes values using igbinary_serialize() if available, serialize() otherwise. + * + * @author Nicolas Grekas + */ +class DefaultMarshaller implements MarshallerInterface +{ + private $useIgbinarySerialize = true; + + public function __construct(bool $useIgbinarySerialize = null) + { + if (null === $useIgbinarySerialize) { + $useIgbinarySerialize = \extension_loaded('igbinary'); + } elseif ($useIgbinarySerialize && !\extension_loaded('igbinary')) { + throw new CacheException('The "igbinary" PHP extension is not loaded.'); + } + $this->useIgbinarySerialize = $useIgbinarySerialize; + } + + /** + * {@inheritdoc} + */ + public function marshall(array $values, ?array &$failed): array + { + $serialized = $failed = []; + + foreach ($values as $id => $value) { + try { + if ($this->useIgbinarySerialize) { + $serialized[$id] = igbinary_serialize($value); + } else { + $serialized[$id] = serialize($value); + } + } catch (\Exception $e) { + $failed[] = $id; + } + } + + return $serialized; + } + + /** + * {@inheritdoc} + */ + public function unmarshall(string $value) + { + if ('b:0;' === $value) { + return false; + } + if ('N;' === $value) { + return null; + } + static $igbinaryNull; + if ($value === ($igbinaryNull ?? $igbinaryNull = \extension_loaded('igbinary') ? igbinary_serialize(null) : false)) { + return null; + } + $unserializeCallbackHandler = ini_set('unserialize_callback_func', __CLASS__.'::handleUnserializeCallback'); + try { + if (':' === ($value[1] ?? ':')) { + if (false !== $value = unserialize($value)) { + return $value; + } + } elseif (false === $igbinaryNull) { + throw new \RuntimeException('Failed to unserialize values, did you forget to install the "igbinary" extension?'); + } elseif (null !== $value = igbinary_unserialize($value)) { + return $value; + } + + throw new \DomainException(error_get_last() ? error_get_last()['message'] : 'Failed to unserialize values.'); + } catch (\Error $e) { + throw new \ErrorException($e->getMessage(), $e->getCode(), E_ERROR, $e->getFile(), $e->getLine()); + } finally { + ini_set('unserialize_callback_func', $unserializeCallbackHandler); + } + } + + /** + * @internal + */ + public static function handleUnserializeCallback($class) + { + throw new \DomainException('Class not found: '.$class); + } +} diff --git a/src/lib/Symfony/Components/Cache/Marshaller/MarshallerInterface.php b/src/lib/Symfony/Components/Cache/Marshaller/MarshallerInterface.php new file mode 100644 index 0000000..bf85040 --- /dev/null +++ b/src/lib/Symfony/Components/Cache/Marshaller/MarshallerInterface.php @@ -0,0 +1,44 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + * + * Original source: + * https://github.com/symfony/symfony/blob/master/src/Symfony/Component/Cache/Marshaller/MarshallerInterface.php + * Last revision: https://github.com/symfony/symfony/commit/3cfdc9e9d739dda0bc8d222d458e6bbb34c10c72 + */ + +namespace Symfony\Component\Cache\Marshaller; + +/** + * Serializes/unserializes PHP values. + * + * Implementations of this interface MUST deal with errors carefully. They MUST + * also deal with forward and backward compatibility at the storage format level. + * + * @author Nicolas Grekas + */ +interface MarshallerInterface +{ + /** + * Serializes a list of values. + * + * When serialization fails for a specific value, no exception should be + * thrown. Instead, its key should be listed in $failed. + */ + public function marshall(array $values, ?array &$failed): array; + + /** + * Unserializes a single value and throws an exception if anything goes wrong. + * + * @return mixed + * + * @throws \Exception Whenever unserialization fails + */ + public function unmarshall(string $value); +} diff --git a/src/lib/Symfony/Components/Cache/Tests/Adapter/TagAware/FilesystemTagAwareAdapterTest.php b/src/lib/Symfony/Components/Cache/Tests/Adapter/TagAware/FilesystemTagAwareAdapterTest.php new file mode 100644 index 0000000..7a7a22e --- /dev/null +++ b/src/lib/Symfony/Components/Cache/Tests/Adapter/TagAware/FilesystemTagAwareAdapterTest.php @@ -0,0 +1,31 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + * + * Original source: https://github.com/symfony/symfony/pull/30370 + */ + +namespace Symfony\Component\Cache\Tests\Adapter\TagAware; + +use Symfony\Component\Cache\Adapter\TagAware\FilesystemTagAwareAdapter; +use Symfony\Component\Cache\Tests\Adapter\FilesystemAdapterTest; +use Symfony\Component\Cache\Tests\Traits\TagAwareTestTrait; + +/** + * @group time-sensitive + */ +class FilesystemTagAwareAdapterTest extends FilesystemAdapterTest +{ + use TagAwareTestTrait; + + public function createCachePool($defaultLifetime = 0) + { + return new FilesystemTagAwareAdapter('', $defaultLifetime); + } +} diff --git a/src/lib/Symfony/Components/Cache/Tests/Adapter/TagAware/PredisTagAwareAdapterTest.php b/src/lib/Symfony/Components/Cache/Tests/Adapter/TagAware/PredisTagAwareAdapterTest.php new file mode 100644 index 0000000..1c83655 --- /dev/null +++ b/src/lib/Symfony/Components/Cache/Tests/Adapter/TagAware/PredisTagAwareAdapterTest.php @@ -0,0 +1,65 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + * + * Original source: https://github.com/symfony/symfony/pull/30370 + */ + +namespace Symfony\Component\Cache\Tests\Adapter\TagAware; + +use Predis\Connection\StreamConnection; +use Symfony\Component\Cache\Adapter\TagAware\RedisTagAwareAdapter; +use Symfony\Component\Cache\Adapter\RedisAdapter; +use Symfony\Component\Cache\Tests\Adapter\PredisAdapterTest; +use Symfony\Component\Cache\Tests\Traits\TagAwareTestTrait; + +class PredisTagAwareAdapterTest extends PredisAdapterTest +{ + use TagAwareTestTrait; + + protected function setUp() + { + parent::setUp(); + $this->skippedTests['testTagItemExpiry'] = 'Testing expiration slows down the test suite'; + } + + public function createCachePool($defaultLifetime = 0) + { + $this->assertInstanceOf(\Predis\Client::class, self::$redis); + $adapter = new RedisTagAwareAdapter(self::$redis, str_replace('\\', '.', __CLASS__), $defaultLifetime); + + return $adapter; + } + + /** + * @todo Drop this overloading when RedisTrait is removedin the future (IF cluster improvments are backported to 3.4) + */ + public function testCreateConnection() + { + $redisHost = getenv('REDIS_HOST'); + + $redis = RedisAdapter::createConnection('redis://'.$redisHost.'/1', ['class' => \Predis\Client::class, 'timeout' => 3]); + $this->assertInstanceOf(\Predis\Client::class, $redis); + + $connection = $redis->getConnection(); + $this->assertInstanceOf(StreamConnection::class, $connection); + + $params = [ + 'scheme' => 'tcp', + 'host' => 'localhost', + 'port' => 6379, + 'persistent' => 0, + 'timeout' => 3, + 'read_write_timeout' => 0, + 'tcp_nodelay' => true, + 'database' => '1', + ]; + $this->assertSame($params, $connection->getParameters()->toArray()); + } +} diff --git a/src/lib/Symfony/Components/Cache/Tests/Adapter/TagAware/PredisTagAwareClusterAdapterTest.php b/src/lib/Symfony/Components/Cache/Tests/Adapter/TagAware/PredisTagAwareClusterAdapterTest.php new file mode 100644 index 0000000..0b0817c --- /dev/null +++ b/src/lib/Symfony/Components/Cache/Tests/Adapter/TagAware/PredisTagAwareClusterAdapterTest.php @@ -0,0 +1,37 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + * + * Original source: https://github.com/symfony/symfony/pull/30370 + */ + +namespace Symfony\Component\Cache\Tests\Adapter\TagAware; + +use Symfony\Component\Cache\Adapter\TagAware\RedisTagAwareAdapter; +use Symfony\Component\Cache\Tests\Adapter\PredisClusterAdapterTest; +use Symfony\Component\Cache\Tests\Traits\TagAwareTestTrait; + +class PredisTagAwareClusterAdapterTest extends PredisClusterAdapterTest +{ + use TagAwareTestTrait; + + protected function setUp() + { + parent::setUp(); + $this->skippedTests['testTagItemExpiry'] = 'Testing expiration slows down the test suite'; + } + + public function createCachePool($defaultLifetime = 0) + { + $this->assertInstanceOf(\Predis\Client::class, self::$redis); + $adapter = new RedisTagAwareAdapter(self::$redis, str_replace('\\', '.', __CLASS__), $defaultLifetime); + + return $adapter; + } +} diff --git a/src/lib/Symfony/Components/Cache/Tests/Adapter/TagAware/RedisTagAwareAdapterTest.php b/src/lib/Symfony/Components/Cache/Tests/Adapter/TagAware/RedisTagAwareAdapterTest.php new file mode 100644 index 0000000..8a1c8e9 --- /dev/null +++ b/src/lib/Symfony/Components/Cache/Tests/Adapter/TagAware/RedisTagAwareAdapterTest.php @@ -0,0 +1,38 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + * + * Original source: https://github.com/symfony/symfony/pull/30370 + */ + +namespace Symfony\Component\Cache\Tests\Adapter\TagAware; + +use Symfony\Component\Cache\Adapter\TagAware\RedisTagAwareAdapter; +use Symfony\Component\Cache\Tests\Adapter\RedisAdapterTest; +use Symfony\Component\Cache\Tests\Traits\TagAwareTestTrait; +use Symfony\Component\Cache\Traits\RedisProxy; + +class RedisTagAwareAdapterTest extends RedisAdapterTest +{ + use TagAwareTestTrait; + + protected function setUp() + { + parent::setUp(); + $this->skippedTests['testTagItemExpiry'] = 'Testing expiration slows down the test suite'; + } + + public function createCachePool($defaultLifetime = 0) + { + $this->assertInstanceOf(RedisProxy::class, self::$redis); + $adapter = new RedisTagAwareAdapter(self::$redis, str_replace('\\', '.', __CLASS__), $defaultLifetime); + + return $adapter; + } +} diff --git a/src/lib/Symfony/Components/Cache/Tests/Adapter/TagAware/RedisTagAwareArrayAdapterTest.php b/src/lib/Symfony/Components/Cache/Tests/Adapter/TagAware/RedisTagAwareArrayAdapterTest.php new file mode 100644 index 0000000..84f3837 --- /dev/null +++ b/src/lib/Symfony/Components/Cache/Tests/Adapter/TagAware/RedisTagAwareArrayAdapterTest.php @@ -0,0 +1,37 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + * + * Original source: https://github.com/symfony/symfony/pull/30370 + */ + +namespace Symfony\Component\Cache\Tests\Adapter\TagAware; + +use Symfony\Component\Cache\Adapter\TagAware\RedisTagAwareAdapter; +use Symfony\Component\Cache\Tests\Adapter\RedisArrayAdapterTest; +use Symfony\Component\Cache\Tests\Traits\TagAwareTestTrait; + +class RedisTagAwareArrayAdapterTest extends RedisArrayAdapterTest +{ + use TagAwareTestTrait; + + protected function setUp() + { + parent::setUp(); + $this->skippedTests['testTagItemExpiry'] = 'Testing expiration slows down the test suite'; + } + + public function createCachePool($defaultLifetime = 0) + { + $this->assertInstanceOf(\RedisArray::class, self::$redis); + $adapter = new RedisTagAwareAdapter(self::$redis, str_replace('\\', '.', __CLASS__), $defaultLifetime); + + return $adapter; + } +} diff --git a/src/lib/Symfony/Components/Cache/Tests/Adapter/TagAware/RedisTagAwareClusterAdapterTest.php b/src/lib/Symfony/Components/Cache/Tests/Adapter/TagAware/RedisTagAwareClusterAdapterTest.php new file mode 100644 index 0000000..3be6c63 --- /dev/null +++ b/src/lib/Symfony/Components/Cache/Tests/Adapter/TagAware/RedisTagAwareClusterAdapterTest.php @@ -0,0 +1,37 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + * + * Original source: https://github.com/symfony/symfony/pull/30370 + */ + +namespace Symfony\Component\Cache\Tests\Adapter\TagAware; + +use Symfony\Component\Cache\Adapter\TagAware\RedisTagAwareAdapter; +use Symfony\Component\Cache\Tests\Adapter\RedisClusterAdapterTest; +use Symfony\Component\Cache\Tests\Traits\TagAwareTestTrait; + +class RedisTagAwareClusterAdapterTest extends RedisClusterAdapterTest +{ + use TagAwareTestTrait; + + protected function setUp() + { + parent::setUp(); + $this->skippedTests['testTagItemExpiry'] = 'Testing expiration slows down the test suite'; + } + + public function createCachePool($defaultLifetime = 0) + { + $this->assertInstanceOf(\RedisCluster::class, self::$redis); + $adapter = new RedisTagAwareAdapter(self::$redis, str_replace('\\', '.', __CLASS__), $defaultLifetime); + + return $adapter; + } +} diff --git a/src/lib/Symfony/Components/Cache/Tests/Traits/TagAwareTestTrait.php b/src/lib/Symfony/Components/Cache/Tests/Traits/TagAwareTestTrait.php new file mode 100644 index 0000000..fd81e73 --- /dev/null +++ b/src/lib/Symfony/Components/Cache/Tests/Traits/TagAwareTestTrait.php @@ -0,0 +1,146 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + * + * Original source: https://github.com/symfony/symfony/pull/30370 + */ + +namespace Symfony\Component\Cache\Tests\Traits; + +/** + * Common assertions for TagAware adapters. + * + * @method \Symfony\Component\Cache\Adapter\TagAwareAdapterInterface createCachePool() Must be implemented by TestCase + */ +trait TagAwareTestTrait +{ + /** + * @expectedException \Psr\Cache\InvalidArgumentException + */ + public function testInvalidTag() + { + $pool = $this->createCachePool(); + $item = $pool->getItem('foo'); + $item->tag(':'); + } + + public function testInvalidateTags() + { + $pool = $this->createCachePool(); + + $i0 = $pool->getItem('i0'); + $i1 = $pool->getItem('i1'); + $i2 = $pool->getItem('i2'); + $i3 = $pool->getItem('i3'); + $foo = $pool->getItem('foo'); + + $pool->save($i0->tag('bar')); + $pool->save($i1->tag('foo')); + $pool->save($i2->tag('foo')->tag('bar')); + $pool->save($i3->tag('foo')->tag('baz')); + $pool->save($foo); + + $pool->invalidateTags(['bar']); + + $this->assertFalse($pool->getItem('i0')->isHit()); + $this->assertTrue($pool->getItem('i1')->isHit()); + $this->assertFalse($pool->getItem('i2')->isHit()); + $this->assertTrue($pool->getItem('i3')->isHit()); + $this->assertTrue($pool->getItem('foo')->isHit()); + + $pool->invalidateTags(['foo']); + + $this->assertFalse($pool->getItem('i1')->isHit()); + $this->assertFalse($pool->getItem('i3')->isHit()); + $this->assertTrue($pool->getItem('foo')->isHit()); + + $anotherPoolInstance = $this->createCachePool(); + + $this->assertFalse($anotherPoolInstance->getItem('i1')->isHit()); + $this->assertFalse($anotherPoolInstance->getItem('i3')->isHit()); + $this->assertTrue($anotherPoolInstance->getItem('foo')->isHit()); + } + + public function testInvalidateCommits() + { + $pool = $this->createCachePool(); + + $foo = $pool->getItem('foo'); + $foo->tag('tag'); + + $pool->saveDeferred($foo->set('foo')); + $pool->invalidateTags(['tag']); + + // ??: This seems to contradict a bit logic in deleteItems, where it does unset($this->deferred[$key]); on key matches + + $foo = $pool->getItem('foo'); + + $this->assertTrue($foo->isHit()); + } + + public function testTagsAreCleanedOnSave() + { + $pool = $this->createCachePool(); + + $i = $pool->getItem('k'); + $pool->save($i->tag('foo')); + + $i = $pool->getItem('k'); + $pool->save($i->tag('bar')); + + $pool->invalidateTags(['foo']); + $this->assertTrue($pool->getItem('k')->isHit()); + } + + public function testTagsAreCleanedOnDelete() + { + $pool = $this->createCachePool(); + + $i = $pool->getItem('k'); + $pool->save($i->tag('foo')); + $pool->deleteItem('k'); + + $pool->save($pool->getItem('k')); + $pool->invalidateTags(['foo']); + + $this->assertTrue($pool->getItem('k')->isHit()); + } + + public function testTagItemExpiry() + { + if (isset($this->skippedTests[__FUNCTION__])) { + $this->markTestSkipped($this->skippedTests[__FUNCTION__]); + } + + $pool = $this->createCachePool(10); + + $item = $pool->getItem('foo'); + $item->tag(['baz']); + $item->expiresAfter(100); + + $pool->save($item); + $pool->invalidateTags(['baz']); + $this->assertFalse($pool->getItem('foo')->isHit()); + + sleep(20); + + $this->assertFalse($pool->getItem('foo')->isHit()); + } + + public function testGetPreviousTags() + { + $pool = $this->createCachePool(); + + $i = $pool->getItem('k'); + $pool->save($i->tag('foo')); + + $i = $pool->getItem('k'); + $this->assertSame(['foo' => 'foo'], $i->getPreviousTags()); + } +} diff --git a/src/lib/Symfony/Components/Cache/Traits/RedisClusterProxy.php b/src/lib/Symfony/Components/Cache/Traits/RedisClusterProxy.php new file mode 100644 index 0000000..de3e66e --- /dev/null +++ b/src/lib/Symfony/Components/Cache/Traits/RedisClusterProxy.php @@ -0,0 +1,67 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + * + * Original source: + * https://github.com/symfony/symfony/blob/master/src/Symfony/Component/Cache/Traits/RedisClusterProxy.php + * Last revision: https://github.com/symfony/symfony/commit/239a022cc01cca52c3f6ddde3231199369cf34c2 + */ + +namespace Symfony\Component\Cache\Traits; + +/** + * @author Alessandro Chitolina + * + * @internal + */ +class RedisClusterProxy +{ + private $redis; + private $initializer; + + public function __construct(\Closure $initializer) + { + $this->initializer = $initializer; + } + + public function __call($method, array $args) + { + $this->redis ?: $this->redis = $this->initializer->__invoke(); + + return $this->redis->{$method}(...$args); + } + + public function hscan($strKey, &$iIterator, $strPattern = null, $iCount = null) + { + $this->redis ?: $this->redis = $this->initializer->__invoke(); + + return $this->redis->hscan($strKey, $iIterator, $strPattern, $iCount); + } + + public function scan(&$iIterator, $strPattern = null, $iCount = null) + { + $this->redis ?: $this->redis = $this->initializer->__invoke(); + + return $this->redis->scan($iIterator, $strPattern, $iCount); + } + + public function sscan($strKey, &$iIterator, $strPattern = null, $iCount = null) + { + $this->redis ?: $this->redis = $this->initializer->__invoke(); + + return $this->redis->sscan($strKey, $iIterator, $strPattern, $iCount); + } + + public function zscan($strKey, &$iIterator, $strPattern = null, $iCount = null) + { + $this->redis ?: $this->redis = $this->initializer->__invoke(); + + return $this->redis->zscan($strKey, $iIterator, $strPattern, $iCount); + } +} diff --git a/src/lib/Symfony/Components/Cache/Traits/RedisTrait.php b/src/lib/Symfony/Components/Cache/Traits/RedisTrait.php new file mode 100644 index 0000000..3ba545c --- /dev/null +++ b/src/lib/Symfony/Components/Cache/Traits/RedisTrait.php @@ -0,0 +1,478 @@ + + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + * + * Original source: + * https://github.com/symfony/symfony/blob/master/src/Symfony/Component/Cache/Traits/RedisTrait.php + * Last revision: https://github.com/symfony/symfony/commit/178506e72ce57c030fca3a06043666525c41bff3 (+ merge commit: 87f3c36) + */ + +namespace Symfony\Component\Cache\Traits; + +use Predis\Connection\Aggregate\ClusterInterface; +use Predis\Connection\Aggregate\RedisCluster; +use Predis\Response\Status; +use Symfony\Component\Cache\Exception\CacheException; +use Symfony\Component\Cache\Exception\InvalidArgumentException; + +/** + * @author Aurimas Niekis + * @author Nicolas Grekas + * + * @internal + */ +trait RedisTrait +{ + private static $defaultConnectionOptions = [ + 'class' => null, + 'persistent' => 0, + 'persistent_id' => null, + 'timeout' => 30, + 'read_timeout' => 0, + 'retry_interval' => 0, + 'compression' => true, + 'tcp_keepalive' => 0, + 'lazy' => null, + 'redis_cluster' => false, + 'dbindex' => 0, + 'failover' => 'none', + ]; + private $redis; + + /** + * @param \Redis|\RedisArray|\RedisCluster|\Predis\Client $redisClient + */ + private function init($redisClient, $namespace, $defaultLifetime) + { + parent::__construct($namespace, $defaultLifetime); + + if (preg_match('#[^-+_.A-Za-z0-9]#', $namespace, $match)) { + throw new InvalidArgumentException(sprintf('RedisAdapter namespace contains "%s" but only characters in [-+_.A-Za-z0-9] are allowed.', $match[0])); + } + if (!$redisClient instanceof \Redis && !$redisClient instanceof \RedisArray && !$redisClient instanceof \RedisCluster && !$redisClient instanceof \Predis\Client && !$redisClient instanceof RedisProxy && !$redisClient instanceof RedisClusterProxy) { + throw new InvalidArgumentException(sprintf('%s() expects parameter 1 to be Redis, RedisArray, RedisCluster or Predis\Client, %s given.', __METHOD__, \is_object($redisClient) ? \get_class($redisClient) : \gettype($redisClient))); + } + $this->redis = $redisClient; + } + + /** + * Creates a Redis connection using a DSN configuration. + * + * Example DSN: + * - redis://localhost + * - redis://example.com:1234 + * - redis://secret@example.com/13 + * - redis:///var/run/redis.sock + * - redis://secret@/var/run/redis.sock/13 + * + * @param string $dsn + * @param array $options See self::$defaultConnectionOptions + * + * @throws InvalidArgumentException when the DSN is invalid + * + * @return \Redis|\RedisCluster|\Predis\Client According to the "class" option + */ + public static function createConnection($dsn, array $options = []) + { + if (0 !== strpos($dsn, 'redis:')) { + throw new InvalidArgumentException(sprintf('Invalid Redis DSN: %s does not start with "redis:".', $dsn)); + } + + if (!\extension_loaded('redis') && !class_exists(\Predis\Client::class)) { + throw new CacheException(sprintf('Cannot find the "redis" extension nor the "predis/predis" package: %s', $dsn)); + } + + $params = preg_replace_callback('#^redis:(//)?(?:(?:[^:@]*+:)?([^@]*+)@)?#', function ($m) use (&$auth) { + if (isset($m[2])) { + $auth = $m[2]; + } + + return 'file:'.($m[1] ?? ''); + }, $dsn); + + if (false === $params = parse_url($params)) { + throw new InvalidArgumentException(sprintf('Invalid Redis DSN: %s', $dsn)); + } + + $query = $hosts = []; + + if (isset($params['query'])) { + parse_str($params['query'], $query); + + if (isset($query['host'])) { + if (!\is_array($hosts = $query['host'])) { + throw new InvalidArgumentException(sprintf('Invalid Redis DSN: %s', $dsn)); + } + foreach ($hosts as $host => $parameters) { + if (\is_string($parameters)) { + parse_str($parameters, $parameters); + } + if (false === $i = strrpos($host, ':')) { + $hosts[$host] = ['scheme' => 'tcp', 'host' => $host, 'port' => 6379] + $parameters; + } elseif ($port = (int) substr($host, 1 + $i)) { + $hosts[$host] = ['scheme' => 'tcp', 'host' => substr($host, 0, $i), 'port' => $port] + $parameters; + } else { + $hosts[$host] = ['scheme' => 'unix', 'path' => substr($host, 0, $i)] + $parameters; + } + } + $hosts = array_values($hosts); + } + } + + if (isset($params['host']) || isset($params['path'])) { + if (!isset($params['dbindex']) && isset($params['path']) && preg_match('#/(\d+)$#', $params['path'], $m)) { + $params['dbindex'] = $m[1]; + $params['path'] = substr($params['path'], 0, -\strlen($m[0])); + } + + if (isset($params['host'])) { + array_unshift($hosts, ['scheme' => 'tcp', 'host' => $params['host'], 'port' => $params['port'] ?? 6379]); + } else { + array_unshift($hosts, ['scheme' => 'unix', 'path' => $params['path']]); + } + } + + if (!$hosts) { + throw new InvalidArgumentException(sprintf('Invalid Redis DSN: %s', $dsn)); + } + + $params += $query + $options + self::$defaultConnectionOptions; + + if (null === $params['class'] && \extension_loaded('redis')) { + $class = $params['redis_cluster'] ? \RedisCluster::class : (1 < \count($hosts) ? \RedisArray::class : \Redis::class); + } else { + $class = null === $params['class'] ? \Predis\Client::class : $params['class']; + } + + if (is_a($class, \Redis::class, true)) { + $connect = $params['persistent'] || $params['persistent_id'] ? 'pconnect' : 'connect'; + $redis = new $class(); + + $initializer = function ($redis) use ($connect, $params, $dsn, $auth, $hosts) { + try { + @$redis->{$connect}($hosts[0]['host'] ?? $hosts[0]['path'], $hosts[0]['port'] ?? null, $params['timeout'], (string) $params['persistent_id'], $params['retry_interval']); + } catch (\RedisException $e) { + throw new InvalidArgumentException(sprintf('Redis connection failed (%s): %s', $e->getMessage(), $dsn)); + } + + set_error_handler(function ($type, $msg) use (&$error) { $error = $msg; }); + $isConnected = $redis->isConnected(); + restore_error_handler(); + if (!$isConnected) { + $error = preg_match('/^Redis::p?connect\(\): (.*)/', $error, $error) ? sprintf(' (%s)', $error[1]) : ''; + throw new InvalidArgumentException(sprintf('Redis connection failed%s: %s', $error, $dsn)); + } + + if ((null !== $auth && !$redis->auth($auth)) + || ($params['dbindex'] && !$redis->select($params['dbindex'])) + || ($params['read_timeout'] && !$redis->setOption(\Redis::OPT_READ_TIMEOUT, $params['read_timeout'])) + ) { + $e = preg_replace('/^ERR /', '', $redis->getLastError()); + throw new InvalidArgumentException(sprintf('Redis connection failed (%s): %s', $e, $dsn)); + } + + if (0 < $params['tcp_keepalive'] && \defined('Redis::OPT_TCP_KEEPALIVE')) { + $redis->setOption(\Redis::OPT_TCP_KEEPALIVE, $params['tcp_keepalive']); + } + if ($params['compression'] && \defined('Redis::COMPRESSION_LZF')) { + $redis->setOption(\Redis::OPT_COMPRESSION, \Redis::COMPRESSION_LZF); + } + + return true; + }; + + if ($params['lazy']) { + $redis = new RedisProxy($redis, $initializer); + } else { + $initializer($redis); + } + } elseif (is_a($class, \RedisArray::class, true)) { + foreach ($hosts as $i => $host) { + $hosts[$i] = 'tcp' === $host['scheme'] ? $host['host'].':'.$host['port'] : $host['path']; + } + $params['lazy_connect'] = $params['lazy'] ?? true; + $params['connect_timeout'] = $params['timeout']; + + try { + $redis = new $class($hosts, $params); + } catch (\RedisClusterException $e) { + throw new InvalidArgumentException(sprintf('Redis connection failed (%s): %s', $e->getMessage(), $dsn)); + } + + if (0 < $params['tcp_keepalive'] && \defined('Redis::OPT_TCP_KEEPALIVE')) { + $redis->setOption(\Redis::OPT_TCP_KEEPALIVE, $params['tcp_keepalive']); + } + if ($params['compression'] && \defined('Redis::COMPRESSION_LZF')) { + $redis->setOption(\Redis::OPT_COMPRESSION, \Redis::COMPRESSION_LZF); + } + } elseif (is_a($class, \RedisCluster::class, true)) { + $initializer = function () use ($class, $params, $dsn, $hosts) { + foreach ($hosts as $i => $host) { + $hosts[$i] = 'tcp' === $host['scheme'] ? $host['host'].':'.$host['port'] : $host['path']; + } + + try { + $redis = new $class(null, $hosts, $params['timeout'], $params['read_timeout'], (bool) $params['persistent']); + } catch (\RedisClusterException $e) { + throw new InvalidArgumentException(sprintf('Redis connection failed (%s): %s', $e->getMessage(), $dsn)); + } + + if (0 < $params['tcp_keepalive'] && \defined('Redis::OPT_TCP_KEEPALIVE')) { + $redis->setOption(\Redis::OPT_TCP_KEEPALIVE, $params['tcp_keepalive']); + } + if ($params['compression'] && \defined('Redis::COMPRESSION_LZF')) { + $redis->setOption(\Redis::OPT_COMPRESSION, \Redis::COMPRESSION_LZF); + } + switch ($params['failover']) { + case 'error': $redis->setOption(\RedisCluster::OPT_SLAVE_FAILOVER, \RedisCluster::FAILOVER_ERROR); break; + case 'distribute': $redis->setOption(\RedisCluster::OPT_SLAVE_FAILOVER, \RedisCluster::FAILOVER_DISTRIBUTE); break; + case 'slaves': $redis->setOption(\RedisCluster::OPT_SLAVE_FAILOVER, \RedisCluster::FAILOVER_DISTRIBUTE_SLAVES); break; + } + + return $redis; + }; + + $redis = $params['lazy'] ? new RedisClusterProxy($initializer) : $initializer(); + } elseif (is_a($class, \Predis\Client::class, true)) { + if ($params['redis_cluster']) { + $params['cluster'] = 'redis'; + } + $params += ['parameters' => []]; + $params['parameters'] += [ + 'persistent' => $params['persistent'], + 'timeout' => $params['timeout'], + 'read_write_timeout' => $params['read_timeout'], + 'tcp_nodelay' => true, + ]; + if ($params['dbindex']) { + $params['parameters']['database'] = $params['dbindex']; + } + if (null !== $auth) { + $params['parameters']['password'] = $auth; + } + if (1 === \count($hosts) && !$params['redis_cluster']) { + $hosts = $hosts[0]; + } elseif (\in_array($params['failover'], ['slaves', 'distribute'], true) && !isset($params['replication'])) { + $params['replication'] = true; + $hosts[0] += ['alias' => 'master']; + } + + $redis = new $class($hosts, array_diff_key($params, self::$defaultConnectionOptions)); + } elseif (class_exists($class, false)) { + throw new InvalidArgumentException(sprintf('"%s" is not a subclass of "Redis", "RedisArray", "RedisCluster" nor "Predis\Client".', $class)); + } else { + throw new InvalidArgumentException(sprintf('Class "%s" does not exist.', $class)); + } + + return $redis; + } + + /** + * {@inheritdoc} + */ + protected function doFetch(array $ids) + { + if (!$ids) { + return []; + } + + $result = []; + + if ($this->redis instanceof \Predis\Client && $this->redis->getConnection() instanceof ClusterInterface) { + $values = $this->pipeline(function () use ($ids) { + foreach ($ids as $id) { + yield 'get' => [$id]; + } + }); + } else { + $values = array_combine($ids, $this->redis->mget($ids)); + } + + foreach ($values as $id => $v) { + if ($v) { + $result[$id] = self::$marshaller->unmarshall($v); + } + } + + return $result; + } + + /** + * {@inheritdoc} + */ + protected function doHave($id) + { + return (bool) $this->redis->exists($id); + } + + /** + * {@inheritdoc} + */ + protected function doClear($namespace) + { + $cleared = true; + $hosts = [$this->redis]; + $evalArgs = [[$namespace], 0]; + + if ($this->redis instanceof \Predis\Client) { + $evalArgs = [0, $namespace]; + + $connection = $this->redis->getConnection(); + if ($connection instanceof ClusterInterface && $connection instanceof \Traversable) { + $hosts = []; + foreach ($connection as $c) { + $hosts[] = new \Predis\Client($c); + } + } + } elseif ($this->redis instanceof \RedisArray) { + $hosts = []; + foreach ($this->redis->_hosts() as $host) { + $hosts[] = $this->redis->_instance($host); + } + } elseif ($this->redis instanceof RedisClusterProxy || $this->redis instanceof \RedisCluster) { + $hosts = []; + foreach ($this->redis->_masters() as $host) { + $hosts[] = $h = new \Redis(); + $h->connect($host[0], $host[1]); + } + } + + foreach ($hosts as $host) { + if (!isset($namespace[0])) { + $cleared = $host->flushDb() && $cleared; + continue; + } + + $info = $host->info('Server'); + $info = isset($info['Server']) ? $info['Server'] : $info; + + if (!version_compare($info['redis_version'], '2.8', '>=')) { + // As documented in Redis documentation (http://redis.io/commands/keys) using KEYS + // can hang your server when it is executed against large databases (millions of items). + // Whenever you hit this scale, you should really consider upgrading to Redis 2.8 or above. + $cleared = $host->eval("local keys=redis.call('KEYS',ARGV[1]..'*') for i=1,#keys,5000 do redis.call('DEL',unpack(keys,i,math.min(i+4999,#keys))) end return 1", $evalArgs[0], $evalArgs[1]) && $cleared; + continue; + } + + $cursor = null; + do { + $keys = $host instanceof \Predis\Client ? $host->scan($cursor, 'MATCH', $namespace.'*', 'COUNT', 1000) : $host->scan($cursor, $namespace.'*', 1000); + if (isset($keys[1]) && \is_array($keys[1])) { + $cursor = $keys[0]; + $keys = $keys[1]; + } + if ($keys) { + $this->doDelete($keys); + } + } while ($cursor = (int) $cursor); + } + + return $cleared; + } + + /** + * {@inheritdoc} + */ + protected function doDelete(array $ids) + { + if (!$ids) { + return true; + } + + if ($this->redis instanceof \Predis\Client) { + $this->pipeline(function () use ($ids) { + foreach ($ids as $id) { + yield 'del' => [$id]; + } + })->rewind(); + } else { + $this->redis->del($ids); + } + + return true; + } + + /** + * {@inheritdoc} + */ + protected function doSave(array $values, $lifetime) + { + if (!$values = self::$marshaller->marshall($values, $failed)) { + return $failed; + } + + $results = $this->pipeline(function () use ($values, $lifetime) { + foreach ($values as $id => $value) { + if (0 >= $lifetime) { + yield 'set' => [$id, $value]; + } else { + yield 'setEx' => [$id, $lifetime, $value]; + } + } + }); + foreach ($results as $id => $result) { + if (true !== $result && (!$result instanceof Status || $result !== Status::get('OK'))) { + $failed[] = $id; + } + } + + return $failed; + } + + private function pipeline(\Closure $generator) + { + $ids = []; + + if ($this->redis instanceof RedisClusterProxy || $this->redis instanceof \RedisCluster || ($this->redis instanceof \Predis\Client && $this->redis->getConnection() instanceof RedisCluster)) { + // phpredis & predis don't support pipelining with RedisCluster + // see https://github.com/phpredis/phpredis/blob/develop/cluster.markdown#pipelining + // see https://github.com/nrk/predis/issues/267#issuecomment-123781423 + $results = []; + foreach ($generator() as $command => $args) { + $results[] = $this->redis->{$command}(...$args); + $ids[] = $args[0]; + } + } elseif ($this->redis instanceof \Predis\Client) { + $results = $this->redis->pipeline(function ($redis) use ($generator, &$ids) { + foreach ($generator() as $command => $args) { + $redis->{$command}(...$args); + $ids[] = $args[0]; + } + }); + } elseif ($this->redis instanceof \RedisArray) { + $connections = $results = $ids = []; + foreach ($generator() as $command => $args) { + if (!isset($connections[$h = $this->redis->_target($args[0])])) { + $connections[$h] = [$this->redis->_instance($h), -1]; + $connections[$h][0]->multi(\Redis::PIPELINE); + } + $connections[$h][0]->{$command}(...$args); + $results[] = [$h, ++$connections[$h][1]]; + $ids[] = $args[0]; + } + foreach ($connections as $h => $c) { + $connections[$h] = $c[0]->exec(); + } + foreach ($results as $k => list($h, $c)) { + $results[$k] = $connections[$h][$c]; + } + } else { + $this->redis->multi(\Redis::PIPELINE); + foreach ($generator() as $command => $args) { + $this->redis->{$command}(...$args); + $ids[] = $args[0]; + } + $results = $this->redis->exec(); + } + + foreach ($ids as $k => $id) { + yield $id => $results[$k]; + } + } +}