Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support automatic namespacing #3781

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions src/main/java/redis/clients/jedis/AbstractTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

public abstract class AbstractTransaction extends PipeliningBase implements Closeable {

@Deprecated
protected AbstractTransaction() {
super(new CommandObjects());
}
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/redis/clients/jedis/ClusterCommandObjects.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@ public class ClusterCommandObjects extends CommandObjects {

@Override
protected ClusterCommandArguments commandArguments(ProtocolCommand command) {
return new ClusterCommandArguments(command);
ClusterCommandArguments comArgs = new ClusterCommandArguments(command);
if (keyPreProcessor != null) comArgs.setKeyArgumentPreProcessor(keyPreProcessor);
return comArgs;
}

private static final String CLUSTER_UNSUPPORTED_MESSAGE = "Not supported in cluster mode.";
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/redis/clients/jedis/CommandArguments.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.Collection;
import java.util.Iterator;

import redis.clients.jedis.annots.Experimental;
import redis.clients.jedis.args.Rawable;
import redis.clients.jedis.args.RawableFactory;
import redis.clients.jedis.commands.ProtocolCommand;
Expand All @@ -12,6 +13,7 @@

public class CommandArguments implements Iterable<Rawable> {

private CommandKeyArgumentPreProcessor keyPreProc = null;
private final ArrayList<Rawable> args;

private boolean blocking;
Expand All @@ -29,6 +31,11 @@ public ProtocolCommand getCommand() {
return (ProtocolCommand) args.get(0);
}

@Experimental
void setKeyArgumentPreProcessor(CommandKeyArgumentPreProcessor keyPreProcessor) {
this.keyPreProc = keyPreProcessor;
}

public CommandArguments add(Object arg) {
if (arg == null) {
throw new IllegalArgumentException("null is not a valid argument.");
Expand Down Expand Up @@ -68,6 +75,10 @@ public CommandArguments addObjects(Collection args) {
}

public CommandArguments key(Object key) {
if (keyPreProc != null) {
key = keyPreProc.actualKey(key);
}

if (key instanceof Rawable) {
Rawable raw = (Rawable) key;
processKey(raw.getRaw());
Expand All @@ -83,6 +94,7 @@ public CommandArguments key(Object key) {
} else {
throw new IllegalArgumentException("\"" + key.toString() + "\" is not a valid argument.");
}

return this;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package redis.clients.jedis;

import redis.clients.jedis.annots.Experimental;

@Experimental
public interface CommandKeyArgumentPreProcessor {

/**
* @param paramKey key name in application
* @return key name in Redis server
*/
Object actualKey(Object paramKey);
}
24 changes: 21 additions & 3 deletions src/main/java/redis/clients/jedis/CommandObjects.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import redis.clients.jedis.Protocol.Command;
import redis.clients.jedis.Protocol.Keyword;
import redis.clients.jedis.annots.Experimental;
import redis.clients.jedis.args.*;
import redis.clients.jedis.bloom.*;
import redis.clients.jedis.bloom.RedisBloomProtocol.*;
Expand Down Expand Up @@ -50,17 +51,24 @@ protected RedisProtocol getProtocol() {
return protocol;
}

private JedisBroadcastAndRoundRobinConfig broadcastAndRoundRobinConfig = null;
protected volatile CommandKeyArgumentPreProcessor keyPreProcessor = null;
private volatile JsonObjectMapper jsonObjectMapper;
private final AtomicInteger searchDialect = new AtomicInteger(0);

private JedisBroadcastAndRoundRobinConfig broadcastAndRoundRobinConfig = null;

void setBroadcastAndRoundRobinConfig(JedisBroadcastAndRoundRobinConfig config) {
this.broadcastAndRoundRobinConfig = config;
}

@Experimental
void setKeyArgumentPreProcessor(CommandKeyArgumentPreProcessor keyPreProcessor) {
this.keyPreProcessor = keyPreProcessor;
}

protected CommandArguments commandArguments(ProtocolCommand command) {
return new CommandArguments(command);
CommandArguments comArgs = new CommandArguments(command);
if (keyPreProcessor != null) comArgs.setKeyArgumentPreProcessor(keyPreProcessor);
return comArgs;
}

private final CommandObject<String> PING_COMMAND_OBJECT = new CommandObject<>(commandArguments(PING), BuilderFactory.STRING);
Expand Down Expand Up @@ -4280,6 +4288,16 @@ public final CommandObject<Object> tFunctionCallAsync(String library, String fun
}
// RedisGears commands

// Transaction commands
public final CommandObject<String> watch(String... keys) {
return new CommandObject<>(commandArguments(WATCH).keys((Object[]) keys), BuilderFactory.STRING);
}

public final CommandObject<String> watch(byte[]... keys) {
return new CommandObject<>(commandArguments(WATCH).keys((Object[]) keys), BuilderFactory.STRING);
}
// Transaction commands

/**
* Get the instance for JsonObjectMapper if not null, otherwise a new instance reference with
* default implementation will be created and returned.
Expand Down
19 changes: 15 additions & 4 deletions src/main/java/redis/clients/jedis/Pipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,23 @@ public Pipeline(Connection connection) {
}

public Pipeline(Connection connection, boolean closeConnection) {
super(new CommandObjects());
this(connection, closeConnection, createCommandObjects(connection));
}

private static CommandObjects createCommandObjects(Connection connection) {
CommandObjects commandObjects = new CommandObjects();
RedisProtocol proto = connection.getRedisProtocol();
if (proto != null) commandObjects.setProtocol(proto);
return commandObjects;
}

Pipeline(Connection connection, boolean closeConnection, CommandObjects commandObjects) {
super(commandObjects);
this.connection = connection;
this.closeConnection = closeConnection;
RedisProtocol proto = this.connection.getRedisProtocol();
if (proto != null) this.commandObjects.setProtocol(proto);
setGraphCommands(new GraphCommandObjects(this.connection));
GraphCommandObjects graphCommandObjects = new GraphCommandObjects(this.connection);
graphCommandObjects.setBaseCommandArgumentsCreator(protocolCommand -> commandObjects.commandArguments(protocolCommand));
setGraphCommands(graphCommandObjects);
}

@Override
Expand Down
37 changes: 29 additions & 8 deletions src/main/java/redis/clients/jedis/ReliableTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import static redis.clients.jedis.Protocol.Command.EXEC;
import static redis.clients.jedis.Protocol.Command.MULTI;
import static redis.clients.jedis.Protocol.Command.UNWATCH;
import static redis.clients.jedis.Protocol.Command.WATCH;

import java.util.ArrayList;
import java.util.LinkedList;
Expand All @@ -17,8 +16,7 @@
import redis.clients.jedis.graph.GraphCommandObjects;

/**
* ReliableTransaction is a transaction where commands are immediately sent to Redis server and the
* 'QUEUED' reply checked.
* A transaction where commands are immediately sent to Redis server and the {@code QUEUED} reply checked.
*/
public class ReliableTransaction extends TransactionBase {

Expand Down Expand Up @@ -66,9 +64,34 @@ public ReliableTransaction(Connection connection, boolean doMulti) {
* @param closeConnection should the 'connection' be closed when 'close()' is called?
*/
public ReliableTransaction(Connection connection, boolean doMulti, boolean closeConnection) {
this(connection, doMulti, closeConnection, createCommandObjects(connection));
}

private static CommandObjects createCommandObjects(Connection connection) {
CommandObjects commandObjects = new CommandObjects();
RedisProtocol proto = connection.getRedisProtocol();
if (proto != null) commandObjects.setProtocol(proto);
return commandObjects;
}

/**
* Creates a new transaction.
*
* A user wanting to WATCH/UNWATCH keys followed by a call to MULTI ({@link #multi()}) it should
* be {@code doMulti=false}.
*
* @param connection connection
* @param commandObjects command objects
* @param doMulti {@code false} should be set to enable manual WATCH, UNWATCH and MULTI
* @param closeConnection should the 'connection' be closed when 'close()' is called?
*/
ReliableTransaction(Connection connection, boolean doMulti, boolean closeConnection, CommandObjects commandObjects) {
super(commandObjects);
this.connection = connection;
this.closeConnection = closeConnection;
setGraphCommands(new GraphCommandObjects(this.connection));
GraphCommandObjects graphCommandObjects = new GraphCommandObjects(this.connection);
graphCommandObjects.setBaseCommandArgumentsCreator(protocolCommand -> commandObjects.commandArguments(protocolCommand));
setGraphCommands(graphCommandObjects);
if (doMulti) multi();
}

Expand All @@ -84,16 +107,14 @@ public final void multi() {

@Override
public String watch(final String... keys) {
connection.sendCommand(WATCH, keys);
String status = connection.getStatusCodeReply();
String status = connection.executeCommand(commandObjects.watch(keys));
inWatch = true;
return status;
}

@Override
public String watch(final byte[]... keys) {
connection.sendCommand(WATCH, keys);
String status = connection.getStatusCodeReply();
String status = connection.executeCommand(commandObjects.watch(keys));
inWatch = true;
return status;
}
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/redis/clients/jedis/ShardedCommandObjects.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ public ShardedCommandObjects(Hashing algo, Pattern tagPattern) {

@Override
protected ShardedCommandArguments commandArguments(ProtocolCommand command) {
return new ShardedCommandArguments(algo, tagPattern, command);
ShardedCommandArguments comArgs = new ShardedCommandArguments(algo, tagPattern, command);
if (keyPreProcessor != null) comArgs.setKeyArgumentPreProcessor(keyPreProcessor);
return comArgs;
}

@Override
Expand Down
38 changes: 30 additions & 8 deletions src/main/java/redis/clients/jedis/Transaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import static redis.clients.jedis.Protocol.Command.EXEC;
import static redis.clients.jedis.Protocol.Command.MULTI;
import static redis.clients.jedis.Protocol.Command.UNWATCH;
import static redis.clients.jedis.Protocol.Command.WATCH;

import java.util.ArrayList;
import java.util.LinkedList;
Expand All @@ -16,7 +15,7 @@
import redis.clients.jedis.graph.GraphCommandObjects;

/**
* A pipeline based transaction.
* A transaction based on <a href="https://redis.io/docs/manual/pipelining/">pipelining</a>.
*/
public class Transaction extends TransactionBase {

Expand Down Expand Up @@ -59,7 +58,7 @@ public Transaction(Connection connection) {
* @param doMulti {@code false} should be set to enable manual WATCH, UNWATCH and MULTI
*/
public Transaction(Connection connection, boolean doMulti) {
this(connection, doMulti, false);
this(connection, doMulti, false, createCommandObjects(connection));
}

/**
Expand All @@ -73,12 +72,37 @@ public Transaction(Connection connection, boolean doMulti) {
* @param closeConnection should the 'connection' be closed when 'close()' is called?
*/
public Transaction(Connection connection, boolean doMulti, boolean closeConnection) {
this(connection, doMulti, closeConnection, createCommandObjects(connection));
}

/**
* Creates a new transaction.
*
* A user wanting to WATCH/UNWATCH keys followed by a call to MULTI ({@link #multi()}) it should
* be {@code doMulti=false}.
*
* @param connection connection
* @param commandObjects command objects
* @param doMulti {@code false} should be set to enable manual WATCH, UNWATCH and MULTI
* @param closeConnection should the 'connection' be closed when 'close()' is called?
*/
Transaction(Connection connection, boolean doMulti, boolean closeConnection, CommandObjects commandObjects) {
super(commandObjects);
this.connection = connection;
this.closeConnection = closeConnection;
setGraphCommands(new GraphCommandObjects(this.connection));
GraphCommandObjects graphCommandObjects = new GraphCommandObjects(this.connection);
graphCommandObjects.setBaseCommandArgumentsCreator(protocolCommand -> commandObjects.commandArguments(protocolCommand));
setGraphCommands(graphCommandObjects);
if (doMulti) multi();
}

private static CommandObjects createCommandObjects(Connection connection) {
CommandObjects commandObjects = new CommandObjects();
RedisProtocol proto = connection.getRedisProtocol();
if (proto != null) commandObjects.setProtocol(proto);
return commandObjects;
}

@Override
public final void multi() {
connection.sendCommand(MULTI);
Expand All @@ -88,16 +112,14 @@ public final void multi() {

@Override
public String watch(final String... keys) {
connection.sendCommand(WATCH, keys);
String status = connection.getStatusCodeReply();
String status = connection.executeCommand(commandObjects.watch(keys));
inWatch = true;
return status;
}

@Override
public String watch(final byte[]... keys) {
connection.sendCommand(WATCH, keys);
String status = connection.getStatusCodeReply();
String status = connection.executeCommand(commandObjects.watch(keys));
inWatch = true;
return status;
}
Expand Down
1 change: 1 addition & 0 deletions src/main/java/redis/clients/jedis/TransactionBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
@Deprecated
public abstract class TransactionBase extends AbstractTransaction {

@Deprecated
protected TransactionBase() {
super();
}
Expand Down
9 changes: 7 additions & 2 deletions src/main/java/redis/clients/jedis/UnifiedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -4874,7 +4874,7 @@ public PipelineBase pipelined() {
} else if (provider instanceof MultiClusterPooledConnectionProvider) {
return new MultiClusterPipeline((MultiClusterPooledConnectionProvider) provider, commandObjects);
} else {
return new Pipeline(provider.getConnection(), true);
return new Pipeline(provider.getConnection(), true, commandObjects);
}
}

Expand All @@ -4895,7 +4895,7 @@ public AbstractTransaction transaction(boolean doMulti) {
} else if (provider instanceof MultiClusterPooledConnectionProvider) {
return new MultiClusterTransaction((MultiClusterPooledConnectionProvider) provider, doMulti, commandObjects);
} else {
return new Transaction(provider.getConnection(), doMulti, true);
return new Transaction(provider.getConnection(), doMulti, true, commandObjects);
}
}

Expand Down Expand Up @@ -4939,6 +4939,11 @@ public Object executeCommand(CommandArguments args) {
return executeCommand(new CommandObject<>(args, BuilderFactory.RAW_OBJECT));
}

@Experimental
public void setKeyArgumentPreProcessor(CommandKeyArgumentPreProcessor keyPreProcessor) {
this.commandObjects.setKeyArgumentPreProcessor(keyPreProcessor);
}

public void setJsonObjectMapper(JsonObjectMapper jsonObjectMapper) {
this.commandObjects.setJsonObjectMapper(jsonObjectMapper);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public final void multi() {
*/
@Override
public final String watch(String... keys) {
appendCommand(new CommandObject<>(new CommandArguments(WATCH).addObjects((Object[]) keys), NO_OP_BUILDER));
appendCommand(commandObjects.watch(keys));
extraCommandCount.incrementAndGet();
inWatch = true;
return null;
Expand All @@ -104,7 +104,7 @@ public final String watch(String... keys) {
*/
@Override
public final String watch(byte[]... keys) {
appendCommand(new CommandObject<>(new CommandArguments(WATCH).addObjects((Object[]) keys), NO_OP_BUILDER));
appendCommand(commandObjects.watch(keys));
extraCommandCount.incrementAndGet();
inWatch = true;
return null;
Expand Down