Skip to content

Commit

Permalink
Fixed - RKeys doesn't use nameMapper. #4673
Browse files Browse the repository at this point in the history
  • Loading branch information
Nikita Koksharov committed Nov 18, 2022
1 parent bd25024 commit 25ff540
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 29 deletions.
97 changes: 69 additions & 28 deletions redisson/src/main/java/org/redisson/RedissonKeys.java
Expand Up @@ -22,8 +22,15 @@
import org.redisson.client.RedisClient;
import org.redisson.client.RedisException;
import org.redisson.client.codec.StringCodec;
import org.redisson.client.handler.State;
import org.redisson.client.protocol.RedisCommand;
import org.redisson.client.protocol.RedisCommands;
import org.redisson.client.protocol.RedisStrictCommand;
import org.redisson.client.protocol.convertor.Convertor;
import org.redisson.client.protocol.decoder.ListMultiDecoder2;
import org.redisson.client.protocol.decoder.ListScanResult;
import org.redisson.client.protocol.decoder.ListScanResultReplayDecoder;
import org.redisson.client.protocol.decoder.ObjectListReplayDecoder;
import org.redisson.command.CommandAsyncExecutor;
import org.redisson.command.CommandBatchService;
import org.redisson.connection.ConnectionManager;
Expand Down Expand Up @@ -71,7 +78,7 @@ public RType getType(String key) {

@Override
public RFuture<RType> getTypeAsync(String key) {
return commandExecutor.readAsync(key, RedisCommands.TYPE, key);
return commandExecutor.readAsync(map(key), RedisCommands.TYPE, map(key));
}

@Override
Expand All @@ -81,17 +88,25 @@ public int getSlot(String key) {

@Override
public RFuture<Integer> getSlotAsync(String key) {
return commandExecutor.readAsync(null, RedisCommands.KEYSLOT, key);
return commandExecutor.readAsync(null, RedisCommands.KEYSLOT, map(key));
}

@Override
public Iterable<String> getKeysByPattern(String pattern) {
return getKeysByPattern(pattern, 10);
}

private final RedisCommand<ListScanResult<String>> scan = new RedisCommand<ListScanResult<String>>("SCAN", new ListMultiDecoder2(
new ListScanResultReplayDecoder() {
@Override
public ListScanResult<Object> decode(List<Object> parts, State state) {
return new ListScanResult<>((Long) parts.get(0), (List<Object>) (Object) unmap((List<String>) parts.get(1)));
}
}, new ObjectListReplayDecoder<String>()));

@Override
public Iterable<String> getKeysByPattern(String pattern, int count) {
return getKeysByPattern(RedisCommands.SCAN, pattern, 0, count);
return getKeysByPattern(scan, pattern, 0, count);
}

public <T> Iterable<T> getKeysByPattern(RedisCommand<?> command, String pattern, int limit, int count) {
Expand All @@ -115,7 +130,7 @@ public Iterable<String> getKeysWithLimit(int limit) {

@Override
public Iterable<String> getKeysWithLimit(String pattern, int limit) {
return getKeysByPattern(RedisCommands.SCAN, pattern, limit, limit);
return getKeysByPattern(scan, pattern, limit, limit);
}

@Override
Expand All @@ -128,7 +143,7 @@ public Iterable<String> getKeys(int count) {
return getKeysByPattern(null, count);
}

public RFuture<ScanResult<Object>> scanIteratorAsync(RedisClient client, MasterSlaveEntry entry, RedisCommand<?> command, long startPos,
private RFuture<ScanResult<Object>> scanIteratorAsync(RedisClient client, MasterSlaveEntry entry, RedisCommand<?> command, long startPos,
String pattern, int count) {
if (pattern == null) {
return commandExecutor.readAsync(client, entry, StringCodec.INSTANCE, command, startPos, "COUNT",
Expand All @@ -140,7 +155,7 @@ public RFuture<ScanResult<Object>> scanIteratorAsync(RedisClient client, MasterS

public RFuture<ScanResult<Object>> scanIteratorAsync(RedisClient client, MasterSlaveEntry entry, long startPos,
String pattern, int count) {
return scanIteratorAsync(client, entry, RedisCommands.SCAN, startPos, pattern, count);
return scanIteratorAsync(client, entry, scan, startPos, pattern, count);
}

private <T> Iterator<T> createKeysIterator(MasterSlaveEntry entry, RedisCommand<?> command, String pattern, int count) {
Expand Down Expand Up @@ -171,7 +186,7 @@ public RFuture<Long> touchAsync(String... names) {
return new CompletableFutureWrapper<>(0L);
}

return commandExecutor.writeBatchedAsync(null, RedisCommands.TOUCH_LONG, new LongSlotCallback(), names);
return commandExecutor.writeBatchedAsync(null, RedisCommands.TOUCH_LONG, new LongSlotCallback(), map(names));
}

@Override
Expand All @@ -185,21 +200,27 @@ public RFuture<Long> countExistsAsync(String... names) {
return new CompletableFutureWrapper<>(0L);
}

List<String> keysList = Arrays.stream(names)
.map(k -> commandExecutor.getConnectionManager().getConfig().getNameMapper().map(k))
.collect(Collectors.toList());

return commandExecutor.readBatchedAsync(StringCodec.INSTANCE, RedisCommands.EXISTS_LONG, new LongSlotCallback(), keysList.toArray(new String[0]));
return commandExecutor.readBatchedAsync(StringCodec.INSTANCE, RedisCommands.EXISTS_LONG, new LongSlotCallback(), map(names));
}

@Override
public String randomKey() {
return commandExecutor.get(randomKeyAsync());
}

private final RedisStrictCommand<String> randomKey = new RedisStrictCommand<String>("RANDOMKEY", new Convertor<String>() {
@Override
public String convert(Object obj) {
if (obj == null) {
return null;
}
return unmap((String) obj);
}
});

@Override
public RFuture<String> randomKeyAsync() {
return commandExecutor.readRandomAsync(StringCodec.INSTANCE, RedisCommands.RANDOM_KEY);
return commandExecutor.readRandomAsync(StringCodec.INSTANCE, randomKey);
}

@Override
Expand Down Expand Up @@ -233,7 +254,7 @@ public RFuture<Long> deleteByPatternAsync(String pattern) {
commandExecutor.getConnectionManager().getExecutor().execute(() -> {
long count = 0;
try {
Iterator<String> keysIterator = createKeysIterator(entry, RedisCommands.SCAN, pattern, batchSize);
Iterator<String> keysIterator = createKeysIterator(entry, scan, pattern, batchSize);
List<String> keys = new ArrayList<>();
while (keysIterator.hasNext()) {
String key = keysIterator.next();
Expand Down Expand Up @@ -290,12 +311,12 @@ public long delete(RObject... objects) {

@Override
public RFuture<Long> deleteAsync(RObject... objects) {
List<String> keys = new ArrayList<String>();
List<String> keys = new ArrayList<>();
for (RObject obj : objects) {
keys.add(((RedissonObject) obj).getRawName());
keys.add(obj.getName());
}

return deleteAsync(keys.toArray(new String[keys.size()]));
return deleteAsync(keys.toArray(new String[0]));
}

@Override
Expand All @@ -309,7 +330,7 @@ public RFuture<Long> unlinkAsync(String... keys) {
return new CompletableFutureWrapper<>(0L);
}

return commandExecutor.writeBatchedAsync(null, RedisCommands.UNLINK, new LongSlotCallback(), keys);
return commandExecutor.writeBatchedAsync(null, RedisCommands.UNLINK, new LongSlotCallback(), map(keys));
}

@Override
Expand All @@ -318,7 +339,27 @@ public RFuture<Long> deleteAsync(String... keys) {
return new CompletableFutureWrapper<>(0L);
}

return commandExecutor.writeBatchedAsync(null, RedisCommands.DEL, new LongSlotCallback(), keys);
return commandExecutor.writeBatchedAsync(null, RedisCommands.DEL, new LongSlotCallback(), map(keys));
}

private String map(String key) {
return commandExecutor.getConnectionManager().getConfig().getNameMapper().map(key);
}

private String unmap(String key) {
return commandExecutor.getConnectionManager().getConfig().getNameMapper().unmap(key);
}

private List<String> unmap(List<String> keys) {
return keys.stream()
.map(k -> commandExecutor.getConnectionManager().getConfig().getNameMapper().unmap(k))
.collect(Collectors.toList());
}

private String[] map(String[] keys) {
return Arrays.stream(keys)
.map(k -> commandExecutor.getConnectionManager().getConfig().getNameMapper().map(k))
.toArray(String[]::new);
}

@Override
Expand Down Expand Up @@ -381,7 +422,7 @@ public long remainTimeToLive(String name) {

@Override
public RFuture<Long> remainTimeToLiveAsync(String name) {
return commandExecutor.readAsync(name, StringCodec.INSTANCE, RedisCommands.PTTL, name);
return commandExecutor.readAsync(map(name), StringCodec.INSTANCE, RedisCommands.PTTL, map(name));
}

@Override
Expand All @@ -391,7 +432,7 @@ public void rename(String currentName, String newName) {

@Override
public RFuture<Void> renameAsync(String currentName, String newName) {
return commandExecutor.writeAsync(currentName, RedisCommands.RENAME, currentName, newName);
return commandExecutor.writeAsync(map(currentName), RedisCommands.RENAME, map(currentName), map(newName));
}

@Override
Expand All @@ -401,7 +442,7 @@ public boolean renamenx(String oldName, String newName) {

@Override
public RFuture<Boolean> renamenxAsync(String oldName, String newName) {
return commandExecutor.writeAsync(oldName, RedisCommands.RENAMENX, oldName, newName);
return commandExecutor.writeAsync(map(oldName), RedisCommands.RENAMENX, map(oldName), map(newName));
}

@Override
Expand All @@ -411,7 +452,7 @@ public boolean clearExpire(String name) {

@Override
public RFuture<Boolean> clearExpireAsync(String name) {
return commandExecutor.writeAsync(name, StringCodec.INSTANCE, RedisCommands.PERSIST, name);
return commandExecutor.writeAsync(map(name), StringCodec.INSTANCE, RedisCommands.PERSIST, map(name));
}

@Override
Expand All @@ -421,7 +462,7 @@ public boolean expireAt(String name, long timestamp) {

@Override
public RFuture<Boolean> expireAtAsync(String name, long timestamp) {
return commandExecutor.writeAsync(name, StringCodec.INSTANCE, RedisCommands.PEXPIREAT, name, timestamp);
return commandExecutor.writeAsync(map(name), StringCodec.INSTANCE, RedisCommands.PEXPIREAT, map(name), timestamp);
}

@Override
Expand All @@ -431,7 +472,7 @@ public boolean expire(String name, long timeToLive, TimeUnit timeUnit) {

@Override
public RFuture<Boolean> expireAsync(String name, long timeToLive, TimeUnit timeUnit) {
return commandExecutor.writeAsync(name, StringCodec.INSTANCE, RedisCommands.PEXPIRE, name,
return commandExecutor.writeAsync(name, StringCodec.INSTANCE, RedisCommands.PEXPIRE, map(name),
timeUnit.toMillis(timeToLive));
}

Expand All @@ -442,7 +483,7 @@ public void migrate(String name, String host, int port, int database, long timeo

@Override
public RFuture<Void> migrateAsync(String name, String host, int port, int database, long timeout) {
return commandExecutor.writeAsync(name, RedisCommands.MIGRATE, host, port, name, database, timeout);
return commandExecutor.writeAsync(map(name), RedisCommands.MIGRATE, host, port, map(name), database, timeout);
}

@Override
Expand All @@ -452,7 +493,7 @@ public void copy(String name, String host, int port, int database, long timeout)

@Override
public RFuture<Void> copyAsync(String name, String host, int port, int database, long timeout) {
return commandExecutor.writeAsync(name, RedisCommands.MIGRATE, host, port, name, database, timeout, "COPY");
return commandExecutor.writeAsync(map(name), RedisCommands.MIGRATE, host, port, map(name), database, timeout, "COPY");
}

@Override
Expand All @@ -462,7 +503,7 @@ public boolean move(String name, int database) {

@Override
public RFuture<Boolean> moveAsync(String name, int database) {
return commandExecutor.writeAsync(name, RedisCommands.MOVE, name, database);
return commandExecutor.writeAsync(map(name), RedisCommands.MOVE, map(name), database);
}

@Override
Expand Down
2 changes: 1 addition & 1 deletion redisson/src/test/java/org/redisson/RedissonKeysTest.java
Expand Up @@ -208,7 +208,7 @@ public void testKeysIterable() throws InterruptedException {
Iterator<String> iterator = redisson.getKeys().getKeys().iterator();
for (; iterator.hasNext();) {
String key = iterator.next();
keys.remove(key);
keys.remove(redisson.getConfig().useSingleServer().getNameMapper().map(key));
iterator.remove();
}
Assertions.assertEquals(0, keys.size());
Expand Down

0 comments on commit 25ff540

Please sign in to comment.