Skip to content

Commit

Permalink
Second iteration on key-prefixing POC
Browse files Browse the repository at this point in the history
- Restore cached key-less commands in CommandObjects
- Support Transactions
- New constructors do not take CommandExecutor
- Requested JavaDoc regarding new constructors specifying RedisProtocol
- New classes moved into 'prefix' packages
- De-duplicate prefixing code
  • Loading branch information
R-J Lim committed Mar 16, 2024
1 parent 84ee29d commit 872fcfd
Show file tree
Hide file tree
Showing 21 changed files with 307 additions and 194 deletions.

This file was deleted.

This file was deleted.

35 changes: 29 additions & 6 deletions src/main/java/redis/clients/jedis/CommandObjects.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,16 +63,22 @@ protected CommandArguments commandArguments(ProtocolCommand command) {
return new CommandArguments(command);
}

private final CommandObject<String> PING_COMMAND_OBJECT = new CommandObject<>(commandArguments(PING), BuilderFactory.STRING);

public final CommandObject<String> ping() {
return new CommandObject<>(commandArguments(PING), BuilderFactory.STRING);
return PING_COMMAND_OBJECT;
}

private final CommandObject<String> FLUSHALL_COMMAND_OBJECT = new CommandObject<>(commandArguments(FLUSHALL), BuilderFactory.STRING);

public final CommandObject<String> flushAll() {
return new CommandObject<>(commandArguments(FLUSHALL), BuilderFactory.STRING);
return FLUSHALL_COMMAND_OBJECT;
}

private final CommandObject<String> FLUSHDB_COMMAND_OBJECT = new CommandObject<>(commandArguments(FLUSHDB), BuilderFactory.STRING);

public final CommandObject<String> flushDB() {
return new CommandObject<>(commandArguments(FLUSHDB), BuilderFactory.STRING);
return FLUSHDB_COMMAND_OBJECT;
}

public final CommandObject<String> configSet(String parameter, String value) {
Expand Down Expand Up @@ -2812,8 +2818,10 @@ public final CommandObject<String> scriptLoad(String script, String sampleKey) {
return new CommandObject<>(commandArguments(SCRIPT).add(LOAD).add(script).processKey(sampleKey), BuilderFactory.STRING);
}

private final CommandObject<String> SCRIPT_FLUSH_COMMAND_OBJECT = new CommandObject<>(commandArguments(SCRIPT).add(FLUSH), BuilderFactory.STRING);

public final CommandObject<String> scriptFlush() {
return new CommandObject<>(commandArguments(SCRIPT).add(FLUSH), BuilderFactory.STRING);
return SCRIPT_FLUSH_COMMAND_OBJECT;
}

public final CommandObject<String> scriptFlush(String sampleKey) {
Expand All @@ -2824,8 +2832,10 @@ public final CommandObject<String> scriptFlush(String sampleKey, FlushMode flush
return new CommandObject<>(commandArguments(SCRIPT).add(FLUSH).add(flushMode).processKey(sampleKey), BuilderFactory.STRING);
}

private final CommandObject<String> SCRIPT_KILL_COMMAND_OBJECT = new CommandObject<>(commandArguments(SCRIPT).add(KILL), BuilderFactory.STRING);

public final CommandObject<String> scriptKill() {
return new CommandObject<>(commandArguments(SCRIPT).add(KILL), BuilderFactory.STRING);
return SCRIPT_KILL_COMMAND_OBJECT;
}

public final CommandObject<String> scriptKill(String sampleKey) {
Expand Down Expand Up @@ -2853,8 +2863,11 @@ public final CommandObject<String> scriptKill(byte[] sampleKey) {
return new CommandObject<>(commandArguments(SCRIPT).add(KILL).processKey(sampleKey), BuilderFactory.STRING);
}

private final CommandObject<String> SLOWLOG_RESET_COMMAND_OBJECT
= new CommandObject<>(commandArguments(SLOWLOG).add(Keyword.RESET), BuilderFactory.STRING);

public final CommandObject<String> slowlogReset() {
return new CommandObject<>(commandArguments(SLOWLOG).add(Keyword.RESET), BuilderFactory.STRING);
return SLOWLOG_RESET_COMMAND_OBJECT;
}

public final CommandObject<Object> fcall(String name, List<String> keys, List<String> args) {
Expand Down Expand Up @@ -4245,6 +4258,16 @@ public final CommandObject<Object> tFunctionCallAsync(String library, String fun
}
// RedisGears commands

// Transaction commands
public final CommandObject<Object> watch(String... keys) {
return new CommandObject<>(commandArguments(WATCH).keys((Object[]) keys), BuilderFactory.RAW_OBJECT);
}

public final CommandObject<Object> watch(byte[]... keys) {
return new CommandObject<>(commandArguments(WATCH).keys((Object[]) keys), BuilderFactory.RAW_OBJECT);
}
// Transaction commands

/**
* Get the instance for JsonObjectMapper if not null, otherwise a new instance reference with
* default implementation will be created and returned.
Expand Down
16 changes: 14 additions & 2 deletions src/main/java/redis/clients/jedis/JedisCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,20 @@ private JedisCluster(ClusterConnectionProvider provider, int maxAttempts, Durati
super(provider, maxAttempts, maxTotalRetriesDuration, protocol);
}

public JedisCluster(ClusterCommandExecutor executor, ClusterConnectionProvider provider, ClusterCommandObjects commandObjects, RedisProtocol protocol) {
super(executor, provider, commandObjects, protocol);
/**
* Constructor that allows CommandObjects to be customized. The RedisProtocol specified will be written into the given
* CommandObjects.
*
* @param provider The ClusterConnectionProvider.
* @param maxAttempts Max number of attempts execute a command.
* @param maxTotalRetriesDuration Max amount of time to execute a command.
* @param commandObjects The CommandObjects.
* @param protocol The RedisProtocol that will be written into the given CommandObjects.
*/
public JedisCluster(ClusterConnectionProvider provider, int maxAttempts, Duration maxTotalRetriesDuration,
ClusterCommandObjects commandObjects, RedisProtocol protocol) {
super(new ClusterCommandExecutor(provider, maxAttempts, maxTotalRetriesDuration), provider, commandObjects,
protocol);
}

public Map<String, ConnectionPool> getClusterNodes() {
Expand Down
13 changes: 11 additions & 2 deletions src/main/java/redis/clients/jedis/JedisPooled.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import redis.clients.jedis.executors.CommandExecutor;
import redis.clients.jedis.executors.DefaultCommandExecutor;
import redis.clients.jedis.providers.PooledConnectionProvider;
import redis.clients.jedis.util.JedisURIHelper;
import redis.clients.jedis.util.Pool;
Expand Down Expand Up @@ -395,9 +396,17 @@ public JedisPooled(PooledConnectionProvider provider) {
super(provider);
}

public JedisPooled(CommandExecutor executor, PooledConnectionProvider provider, CommandObjects commandObjects,
/**
* Constructor that allows CommandObjects to be customized. The RedisProtocol specified will be written into the given
* CommandObjects.
*
* @param provider The PooledConnectionProvider.
* @param commandObjects The CommandObjects.
* @param redisProtocol The RedisProtocol that will be written into the given CommandObjects.
*/
public JedisPooled(PooledConnectionProvider provider, CommandObjects commandObjects,
RedisProtocol redisProtocol) {
super(executor, provider, commandObjects, redisProtocol);
super(new DefaultCommandExecutor(provider), provider, commandObjects, redisProtocol);
}

public final Pool<Connection> getPool() {
Expand Down
13 changes: 11 additions & 2 deletions src/main/java/redis/clients/jedis/JedisSentineled.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import java.util.Set;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.executors.CommandExecutor;
import redis.clients.jedis.executors.DefaultCommandExecutor;
import redis.clients.jedis.providers.SentineledConnectionProvider;

public class JedisSentineled extends UnifiedJedis {
Expand All @@ -24,8 +25,16 @@ public JedisSentineled(SentineledConnectionProvider sentineledConnectionProvider
super(sentineledConnectionProvider);
}

public JedisSentineled(CommandExecutor executor, SentineledConnectionProvider sentineledConnectionProvider, CommandObjects commandObjects, RedisProtocol redisProtocol) {
super(executor, sentineledConnectionProvider, commandObjects, redisProtocol);
/**
* Constructor that allows CommandObjects to be customized. The RedisProtocol specified will be written into the given
* CommandObjects.
*
* @param provider The SentineledConnectionProvider.
* @param commandObjects The CommandObjects.
* @param redisProtocol The RedisProtocol that will be written into the given CommandObjects.
*/
public JedisSentineled(SentineledConnectionProvider provider, CommandObjects commandObjects, RedisProtocol redisProtocol) {
super(new DefaultCommandExecutor(provider), provider, commandObjects, redisProtocol);
}

public HostAndPort getCurrentMaster() {
Expand Down
15 changes: 11 additions & 4 deletions src/main/java/redis/clients/jedis/Pipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,23 @@ public Pipeline(Connection connection) {
}

public Pipeline(Connection connection, boolean closeConnection) {
this(connection, new CommandObjects(), closeConnection);
this(connection, commandObjects(connection), closeConnection);
}

private static CommandObjects commandObjects(Connection connection) {
RedisProtocol proto = connection.getRedisProtocol();
CommandObjects commandObjects = new CommandObjects();
if (proto != null) commandObjects.setProtocol(proto);
return commandObjects;
}

public Pipeline(Connection connection, CommandObjects commandObjects, boolean closeConnection) {
super(commandObjects);
this.connection = connection;
this.closeConnection = closeConnection;
RedisProtocol proto = this.connection.getRedisProtocol();
if (proto != null) this.commandObjects.setProtocol(proto);
setGraphCommands(new GraphCommandObjects(this.connection));
GraphCommandObjects graphCommandObjects = new GraphCommandObjects(this.connection);
graphCommandObjects.setBaseCommandArgumentsCreator(protocolCommand -> commandObjects.commandArguments(protocolCommand));
setGraphCommands(graphCommandObjects);
}

@Override
Expand Down
13 changes: 8 additions & 5 deletions src/main/java/redis/clients/jedis/Transaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public Transaction(Connection connection) {
* @param doMulti {@code false} should be set to enable manual WATCH, UNWATCH and MULTI
*/
public Transaction(Connection connection, boolean doMulti) {
this(connection, doMulti, false);
this(connection, new CommandObjects(), doMulti, false);
}

/**
Expand All @@ -72,10 +72,13 @@ public Transaction(Connection connection, boolean doMulti) {
* @param doMulti {@code false} should be set to enable manual WATCH, UNWATCH and MULTI
* @param closeConnection should the 'connection' be closed when 'close()' is called?
*/
public Transaction(Connection connection, boolean doMulti, boolean closeConnection) {
public Transaction(Connection connection, CommandObjects commandObjects, boolean doMulti, boolean closeConnection) {
super(commandObjects);
this.connection = connection;
this.closeConnection = closeConnection;
setGraphCommands(new GraphCommandObjects(this.connection));
GraphCommandObjects graphCommandObjects = new GraphCommandObjects(this.connection);
graphCommandObjects.setBaseCommandArgumentsCreator(protocolCommand -> commandObjects.commandArguments(protocolCommand));
setGraphCommands(graphCommandObjects);
if (doMulti) multi();
}

Expand All @@ -88,15 +91,15 @@ public final void multi() {

@Override
public String watch(final String... keys) {
connection.sendCommand(WATCH, keys);
connection.sendCommand(commandObjects.watch(keys).getArguments());
String status = connection.getStatusCodeReply();
inWatch = true;
return status;
}

@Override
public String watch(final byte[]... keys) {
connection.sendCommand(WATCH, keys);
connection.sendCommand(commandObjects.watch(keys).getArguments());
String status = connection.getStatusCodeReply();
inWatch = true;
return status;
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/redis/clients/jedis/UnifiedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -4869,7 +4869,7 @@ public AbstractTransaction multi() {
} else if (provider instanceof MultiClusterPooledConnectionProvider) {
return new MultiClusterTransaction((MultiClusterPooledConnectionProvider) provider, true, commandObjects);
} else {
return new Transaction(provider.getConnection(), true, true);
return new Transaction(provider.getConnection(), commandObjects, true, true);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public final void multi() {
*/
@Override
public final String watch(String... keys) {
appendCommand(new CommandObject<>(new CommandArguments(WATCH).addObjects((Object[]) keys), NO_OP_BUILDER));
appendCommand(commandObjects.watch(keys));
extraCommandCount.incrementAndGet();
inWatch = true;
return null;
Expand All @@ -102,7 +102,7 @@ public final String watch(String... keys) {
*/
@Override
public final String watch(byte[]... keys) {
appendCommand(new CommandObject<>(new CommandArguments(WATCH).addObjects((Object[]) keys), NO_OP_BUILDER));
appendCommand(commandObjects.watch(keys));
extraCommandCount.incrementAndGet();
inWatch = true;
return null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package redis.clients.jedis.util.prefix;

import redis.clients.jedis.ClusterCommandArguments;
import redis.clients.jedis.CommandArguments;
import redis.clients.jedis.commands.ProtocolCommand;

public class ClusterCommandArgumentsWithPrefixedKeys extends ClusterCommandArguments {
private final byte[] prefixBytes;
private final String prefixString;

public ClusterCommandArgumentsWithPrefixedKeys(ProtocolCommand command, String prefixString, byte[] prefixBytes) {
super(command);
this.prefixString = prefixString;
this.prefixBytes = prefixBytes;
}

public CommandArguments key(Object key) {
return super.key(Prefixer.prefixKey(key, prefixString, prefixBytes));
}
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,21 @@
package redis.clients.jedis;
package redis.clients.jedis.util.prefix;

import redis.clients.jedis.ClusterCommandArguments;
import redis.clients.jedis.ClusterCommandObjects;
import redis.clients.jedis.commands.ProtocolCommand;
import redis.clients.jedis.util.SafeEncoder;

public class ClusterCommandObjectsWithPrefixedKeys extends ClusterCommandObjects {
private final String prefixString;
private final byte[] prefixBytes;

public ClusterCommandObjectsWithPrefixedKeys(String prefixString) {
this.prefixString = prefixString;
prefixBytes = SafeEncoder.encode(prefixString);
}

@Override
protected ClusterCommandArguments commandArguments(ProtocolCommand command) {
return new ClusterCommandArgumentsWithPrefixedKeys(command, prefixString);
return new ClusterCommandArgumentsWithPrefixedKeys(command, prefixString, prefixBytes);
}
}

0 comments on commit 872fcfd

Please sign in to comment.