diff --git a/redisson/src/main/java/org/redisson/RedissonKeys.java b/redisson/src/main/java/org/redisson/RedissonKeys.java index f83324f662f..941ff87f544 100644 --- a/redisson/src/main/java/org/redisson/RedissonKeys.java +++ b/redisson/src/main/java/org/redisson/RedissonKeys.java @@ -22,8 +22,15 @@ import org.redisson.client.RedisClient; import org.redisson.client.RedisException; import org.redisson.client.codec.StringCodec; +import org.redisson.client.handler.State; import org.redisson.client.protocol.RedisCommand; import org.redisson.client.protocol.RedisCommands; +import org.redisson.client.protocol.RedisStrictCommand; +import org.redisson.client.protocol.convertor.Convertor; +import org.redisson.client.protocol.decoder.ListMultiDecoder2; +import org.redisson.client.protocol.decoder.ListScanResult; +import org.redisson.client.protocol.decoder.ListScanResultReplayDecoder; +import org.redisson.client.protocol.decoder.ObjectListReplayDecoder; import org.redisson.command.CommandAsyncExecutor; import org.redisson.command.CommandBatchService; import org.redisson.connection.ConnectionManager; @@ -71,7 +78,7 @@ public RType getType(String key) { @Override public RFuture getTypeAsync(String key) { - return commandExecutor.readAsync(key, RedisCommands.TYPE, key); + return commandExecutor.readAsync(map(key), RedisCommands.TYPE, map(key)); } @Override @@ -81,7 +88,7 @@ public int getSlot(String key) { @Override public RFuture getSlotAsync(String key) { - return commandExecutor.readAsync(null, RedisCommands.KEYSLOT, key); + return commandExecutor.readAsync(null, RedisCommands.KEYSLOT, map(key)); } @Override @@ -89,9 +96,17 @@ public Iterable getKeysByPattern(String pattern) { return getKeysByPattern(pattern, 10); } + private final RedisCommand> scan = new RedisCommand>("SCAN", new ListMultiDecoder2( + new ListScanResultReplayDecoder() { + @Override + public ListScanResult decode(List parts, State state) { + return new ListScanResult<>((Long) parts.get(0), (List) (Object) unmap((List) parts.get(1))); + } + }, new ObjectListReplayDecoder())); + @Override public Iterable getKeysByPattern(String pattern, int count) { - return getKeysByPattern(RedisCommands.SCAN, pattern, 0, count); + return getKeysByPattern(scan, pattern, 0, count); } public Iterable getKeysByPattern(RedisCommand command, String pattern, int limit, int count) { @@ -115,7 +130,7 @@ public Iterable getKeysWithLimit(int limit) { @Override public Iterable getKeysWithLimit(String pattern, int limit) { - return getKeysByPattern(RedisCommands.SCAN, pattern, limit, limit); + return getKeysByPattern(scan, pattern, limit, limit); } @Override @@ -128,7 +143,7 @@ public Iterable getKeys(int count) { return getKeysByPattern(null, count); } - public RFuture> scanIteratorAsync(RedisClient client, MasterSlaveEntry entry, RedisCommand command, long startPos, + private RFuture> scanIteratorAsync(RedisClient client, MasterSlaveEntry entry, RedisCommand command, long startPos, String pattern, int count) { if (pattern == null) { return commandExecutor.readAsync(client, entry, StringCodec.INSTANCE, command, startPos, "COUNT", @@ -140,7 +155,7 @@ public RFuture> scanIteratorAsync(RedisClient client, MasterS public RFuture> scanIteratorAsync(RedisClient client, MasterSlaveEntry entry, long startPos, String pattern, int count) { - return scanIteratorAsync(client, entry, RedisCommands.SCAN, startPos, pattern, count); + return scanIteratorAsync(client, entry, scan, startPos, pattern, count); } private Iterator createKeysIterator(MasterSlaveEntry entry, RedisCommand command, String pattern, int count) { @@ -171,7 +186,7 @@ public RFuture touchAsync(String... names) { return new CompletableFutureWrapper<>(0L); } - return commandExecutor.writeBatchedAsync(null, RedisCommands.TOUCH_LONG, new LongSlotCallback(), names); + return commandExecutor.writeBatchedAsync(null, RedisCommands.TOUCH_LONG, new LongSlotCallback(), map(names)); } @Override @@ -185,11 +200,7 @@ public RFuture countExistsAsync(String... names) { return new CompletableFutureWrapper<>(0L); } - List keysList = Arrays.stream(names) - .map(k -> commandExecutor.getConnectionManager().getConfig().getNameMapper().map(k)) - .collect(Collectors.toList()); - - return commandExecutor.readBatchedAsync(StringCodec.INSTANCE, RedisCommands.EXISTS_LONG, new LongSlotCallback(), keysList.toArray(new String[0])); + return commandExecutor.readBatchedAsync(StringCodec.INSTANCE, RedisCommands.EXISTS_LONG, new LongSlotCallback(), map(names)); } @Override @@ -197,9 +208,19 @@ public String randomKey() { return commandExecutor.get(randomKeyAsync()); } + private final RedisStrictCommand randomKey = new RedisStrictCommand("RANDOMKEY", new Convertor() { + @Override + public String convert(Object obj) { + if (obj == null) { + return null; + } + return unmap((String) obj); + } + }); + @Override public RFuture randomKeyAsync() { - return commandExecutor.readRandomAsync(StringCodec.INSTANCE, RedisCommands.RANDOM_KEY); + return commandExecutor.readRandomAsync(StringCodec.INSTANCE, randomKey); } @Override @@ -233,7 +254,7 @@ public RFuture deleteByPatternAsync(String pattern) { commandExecutor.getConnectionManager().getExecutor().execute(() -> { long count = 0; try { - Iterator keysIterator = createKeysIterator(entry, RedisCommands.SCAN, pattern, batchSize); + Iterator keysIterator = createKeysIterator(entry, scan, pattern, batchSize); List keys = new ArrayList<>(); while (keysIterator.hasNext()) { String key = keysIterator.next(); @@ -290,12 +311,12 @@ public long delete(RObject... objects) { @Override public RFuture deleteAsync(RObject... objects) { - List keys = new ArrayList(); + List keys = new ArrayList<>(); for (RObject obj : objects) { - keys.add(((RedissonObject) obj).getRawName()); + keys.add(obj.getName()); } - return deleteAsync(keys.toArray(new String[keys.size()])); + return deleteAsync(keys.toArray(new String[0])); } @Override @@ -309,7 +330,7 @@ public RFuture unlinkAsync(String... keys) { return new CompletableFutureWrapper<>(0L); } - return commandExecutor.writeBatchedAsync(null, RedisCommands.UNLINK, new LongSlotCallback(), keys); + return commandExecutor.writeBatchedAsync(null, RedisCommands.UNLINK, new LongSlotCallback(), map(keys)); } @Override @@ -318,7 +339,27 @@ public RFuture deleteAsync(String... keys) { return new CompletableFutureWrapper<>(0L); } - return commandExecutor.writeBatchedAsync(null, RedisCommands.DEL, new LongSlotCallback(), keys); + return commandExecutor.writeBatchedAsync(null, RedisCommands.DEL, new LongSlotCallback(), map(keys)); + } + + private String map(String key) { + return commandExecutor.getConnectionManager().getConfig().getNameMapper().map(key); + } + + private String unmap(String key) { + return commandExecutor.getConnectionManager().getConfig().getNameMapper().unmap(key); + } + + private List unmap(List keys) { + return keys.stream() + .map(k -> commandExecutor.getConnectionManager().getConfig().getNameMapper().unmap(k)) + .collect(Collectors.toList()); + } + + private String[] map(String[] keys) { + return Arrays.stream(keys) + .map(k -> commandExecutor.getConnectionManager().getConfig().getNameMapper().map(k)) + .toArray(String[]::new); } @Override @@ -381,7 +422,7 @@ public long remainTimeToLive(String name) { @Override public RFuture remainTimeToLiveAsync(String name) { - return commandExecutor.readAsync(name, StringCodec.INSTANCE, RedisCommands.PTTL, name); + return commandExecutor.readAsync(map(name), StringCodec.INSTANCE, RedisCommands.PTTL, map(name)); } @Override @@ -391,7 +432,7 @@ public void rename(String currentName, String newName) { @Override public RFuture renameAsync(String currentName, String newName) { - return commandExecutor.writeAsync(currentName, RedisCommands.RENAME, currentName, newName); + return commandExecutor.writeAsync(map(currentName), RedisCommands.RENAME, map(currentName), map(newName)); } @Override @@ -401,7 +442,7 @@ public boolean renamenx(String oldName, String newName) { @Override public RFuture renamenxAsync(String oldName, String newName) { - return commandExecutor.writeAsync(oldName, RedisCommands.RENAMENX, oldName, newName); + return commandExecutor.writeAsync(map(oldName), RedisCommands.RENAMENX, map(oldName), map(newName)); } @Override @@ -411,7 +452,7 @@ public boolean clearExpire(String name) { @Override public RFuture clearExpireAsync(String name) { - return commandExecutor.writeAsync(name, StringCodec.INSTANCE, RedisCommands.PERSIST, name); + return commandExecutor.writeAsync(map(name), StringCodec.INSTANCE, RedisCommands.PERSIST, map(name)); } @Override @@ -421,7 +462,7 @@ public boolean expireAt(String name, long timestamp) { @Override public RFuture expireAtAsync(String name, long timestamp) { - return commandExecutor.writeAsync(name, StringCodec.INSTANCE, RedisCommands.PEXPIREAT, name, timestamp); + return commandExecutor.writeAsync(map(name), StringCodec.INSTANCE, RedisCommands.PEXPIREAT, map(name), timestamp); } @Override @@ -431,7 +472,7 @@ public boolean expire(String name, long timeToLive, TimeUnit timeUnit) { @Override public RFuture expireAsync(String name, long timeToLive, TimeUnit timeUnit) { - return commandExecutor.writeAsync(name, StringCodec.INSTANCE, RedisCommands.PEXPIRE, name, + return commandExecutor.writeAsync(name, StringCodec.INSTANCE, RedisCommands.PEXPIRE, map(name), timeUnit.toMillis(timeToLive)); } @@ -442,7 +483,7 @@ public void migrate(String name, String host, int port, int database, long timeo @Override public RFuture migrateAsync(String name, String host, int port, int database, long timeout) { - return commandExecutor.writeAsync(name, RedisCommands.MIGRATE, host, port, name, database, timeout); + return commandExecutor.writeAsync(map(name), RedisCommands.MIGRATE, host, port, map(name), database, timeout); } @Override @@ -452,7 +493,7 @@ public void copy(String name, String host, int port, int database, long timeout) @Override public RFuture copyAsync(String name, String host, int port, int database, long timeout) { - return commandExecutor.writeAsync(name, RedisCommands.MIGRATE, host, port, name, database, timeout, "COPY"); + return commandExecutor.writeAsync(map(name), RedisCommands.MIGRATE, host, port, map(name), database, timeout, "COPY"); } @Override @@ -462,7 +503,7 @@ public boolean move(String name, int database) { @Override public RFuture moveAsync(String name, int database) { - return commandExecutor.writeAsync(name, RedisCommands.MOVE, name, database); + return commandExecutor.writeAsync(map(name), RedisCommands.MOVE, map(name), database); } @Override diff --git a/redisson/src/test/java/org/redisson/RedissonKeysTest.java b/redisson/src/test/java/org/redisson/RedissonKeysTest.java index e7363b00102..76b68c8ff7b 100644 --- a/redisson/src/test/java/org/redisson/RedissonKeysTest.java +++ b/redisson/src/test/java/org/redisson/RedissonKeysTest.java @@ -208,7 +208,7 @@ public void testKeysIterable() throws InterruptedException { Iterator iterator = redisson.getKeys().getKeys().iterator(); for (; iterator.hasNext();) { String key = iterator.next(); - keys.remove(key); + keys.remove(redisson.getConfig().useSingleServer().getNameMapper().map(key)); iterator.remove(); } Assertions.assertEquals(0, keys.size());