Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proof-of-concept for automatic key prefixing #3770

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/main/java/redis/clients/jedis/CommandObjects.java
Original file line number Diff line number Diff line change
Expand Up @@ -4258,6 +4258,16 @@ public final CommandObject<Object> tFunctionCallAsync(String library, String fun
}
// RedisGears commands

// Transaction commands
public final CommandObject<String> watch(String... keys) {
return new CommandObject<>(commandArguments(WATCH).keys((Object[]) keys), BuilderFactory.STRING);
}

public final CommandObject<String> watch(byte[]... keys) {
return new CommandObject<>(commandArguments(WATCH).keys((Object[]) keys), BuilderFactory.STRING);
}
// Transaction commands

/**
* Get the instance for JsonObjectMapper if not null, otherwise a new instance reference with
* default implementation will be created and returned.
Expand Down
17 changes: 17 additions & 0 deletions src/main/java/redis/clients/jedis/JedisCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import redis.clients.jedis.executors.ClusterCommandExecutor;
import redis.clients.jedis.providers.ClusterConnectionProvider;
import redis.clients.jedis.util.JedisClusterCRC16;

Expand Down Expand Up @@ -222,6 +223,22 @@ private JedisCluster(ClusterConnectionProvider provider, int maxAttempts, Durati
super(provider, maxAttempts, maxTotalRetriesDuration, protocol);
}

/**
* Constructor that allows CommandObjects to be customized. The RedisProtocol specified will be written into the given
* CommandObjects.
*
* @param provider The ClusterConnectionProvider.
* @param maxAttempts Max number of attempts execute a command.
* @param maxTotalRetriesDuration Max amount of time to execute a command.
* @param commandObjects The CommandObjects.
* @param protocol The RedisProtocol that will be written into the given CommandObjects.
*/
public JedisCluster(ClusterConnectionProvider provider, int maxAttempts, Duration maxTotalRetriesDuration,
ClusterCommandObjects commandObjects, RedisProtocol protocol) {
super(new ClusterCommandExecutor(provider, maxAttempts, maxTotalRetriesDuration), provider, commandObjects,
protocol);
}

public Map<String, ConnectionPool> getClusterNodes() {
return ((ClusterConnectionProvider) provider).getNodes();
}
Expand Down
15 changes: 15 additions & 0 deletions src/main/java/redis/clients/jedis/JedisPooled.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import redis.clients.jedis.executors.CommandExecutor;
import redis.clients.jedis.executors.DefaultCommandExecutor;
import redis.clients.jedis.providers.PooledConnectionProvider;
import redis.clients.jedis.util.JedisURIHelper;
import redis.clients.jedis.util.Pool;
Expand Down Expand Up @@ -394,6 +396,19 @@ public JedisPooled(PooledConnectionProvider provider) {
super(provider);
}

/**
* Constructor that allows CommandObjects to be customized. The RedisProtocol specified will be written into the given
* CommandObjects.
*
* @param provider The PooledConnectionProvider.
* @param commandObjects The CommandObjects.
* @param redisProtocol The RedisProtocol that will be written into the given CommandObjects.
*/
public JedisPooled(PooledConnectionProvider provider, CommandObjects commandObjects,
RedisProtocol redisProtocol) {
super(new DefaultCommandExecutor(provider), provider, commandObjects, redisProtocol);
}

public final Pool<Connection> getPool() {
return ((PooledConnectionProvider) provider).getPool();
}
Expand Down
14 changes: 14 additions & 0 deletions src/main/java/redis/clients/jedis/JedisSentineled.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import java.util.Set;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.executors.CommandExecutor;
import redis.clients.jedis.executors.DefaultCommandExecutor;
import redis.clients.jedis.providers.SentineledConnectionProvider;

public class JedisSentineled extends UnifiedJedis {
Expand All @@ -23,6 +25,18 @@ public JedisSentineled(SentineledConnectionProvider sentineledConnectionProvider
super(sentineledConnectionProvider);
}

/**
* Constructor that allows CommandObjects to be customized. The RedisProtocol specified will be written into the given
* CommandObjects.
*
* @param provider The SentineledConnectionProvider.
* @param commandObjects The CommandObjects.
* @param redisProtocol The RedisProtocol that will be written into the given CommandObjects.
*/
public JedisSentineled(SentineledConnectionProvider provider, CommandObjects commandObjects, RedisProtocol redisProtocol) {
super(new DefaultCommandExecutor(provider), provider, commandObjects, redisProtocol);
}

public HostAndPort getCurrentMaster() {
return ((SentineledConnectionProvider) provider).getCurrentMaster();
}
Expand Down
19 changes: 15 additions & 4 deletions src/main/java/redis/clients/jedis/Pipeline.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,23 @@ public Pipeline(Connection connection) {
}

public Pipeline(Connection connection, boolean closeConnection) {
super(new CommandObjects());
this(connection, commandObjects(connection), closeConnection);
}

private static CommandObjects commandObjects(Connection connection) {
RedisProtocol proto = connection.getRedisProtocol();
CommandObjects commandObjects = new CommandObjects();
if (proto != null) commandObjects.setProtocol(proto);
return commandObjects;
}

public Pipeline(Connection connection, CommandObjects commandObjects, boolean closeConnection) {
sazzad16 marked this conversation as resolved.
Show resolved Hide resolved
super(commandObjects);
this.connection = connection;
this.closeConnection = closeConnection;
RedisProtocol proto = this.connection.getRedisProtocol();
if (proto != null) this.commandObjects.setProtocol(proto);
setGraphCommands(new GraphCommandObjects(this.connection));
GraphCommandObjects graphCommandObjects = new GraphCommandObjects(this.connection);
graphCommandObjects.setBaseCommandArgumentsCreator(protocolCommand -> commandObjects.commandArguments(protocolCommand));
setGraphCommands(graphCommandObjects);
}

@Override
Expand Down
29 changes: 23 additions & 6 deletions src/main/java/redis/clients/jedis/Transaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,10 @@ public Transaction(Connection connection) {
* @param doMulti {@code false} should be set to enable manual WATCH, UNWATCH and MULTI
*/
public Transaction(Connection connection, boolean doMulti) {
this(connection, doMulti, false);
this(connection, new CommandObjects(), doMulti, false);
}


/**
* Creates a new transaction.
*
Expand All @@ -73,9 +74,27 @@ public Transaction(Connection connection, boolean doMulti) {
* @param closeConnection should the 'connection' be closed when 'close()' is called?
*/
public Transaction(Connection connection, boolean doMulti, boolean closeConnection) {
killergerbah marked this conversation as resolved.
Show resolved Hide resolved
this(connection, new CommandObjects(), doMulti, closeConnection);
}

/**
* Creates a new transaction.
*
* A user wanting to WATCH/UNWATCH keys followed by a call to MULTI ({@link #multi()}) it should
* be {@code doMulti=false}.
*
* @param connection connection
* @param commandObjects commandObjects
* @param doMulti {@code false} should be set to enable manual WATCH, UNWATCH and MULTI
* @param closeConnection should the 'connection' be closed when 'close()' is called?
*/
public Transaction(Connection connection, CommandObjects commandObjects, boolean doMulti, boolean closeConnection) {
super(commandObjects);
this.connection = connection;
this.closeConnection = closeConnection;
setGraphCommands(new GraphCommandObjects(this.connection));
GraphCommandObjects graphCommandObjects = new GraphCommandObjects(this.connection);
graphCommandObjects.setBaseCommandArgumentsCreator(protocolCommand -> commandObjects.commandArguments(protocolCommand));
setGraphCommands(graphCommandObjects);
if (doMulti) multi();
}

Expand All @@ -88,16 +107,14 @@ public final void multi() {

@Override
public String watch(final String... keys) {
connection.sendCommand(WATCH, keys);
String status = connection.getStatusCodeReply();
String status = connection.executeCommand(commandObjects.watch(keys));
inWatch = true;
return status;
}

@Override
public String watch(final byte[]... keys) {
connection.sendCommand(WATCH, keys);
String status = connection.getStatusCodeReply();
String status = connection.executeCommand(commandObjects.watch(keys));
inWatch = true;
return status;
}
Expand Down
10 changes: 5 additions & 5 deletions src/main/java/redis/clients/jedis/UnifiedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -206,12 +206,12 @@ public UnifiedJedis(CommandExecutor executor) {
this(executor, (ConnectionProvider) null);
}

private UnifiedJedis(CommandExecutor executor, ConnectionProvider provider) {
public UnifiedJedis(CommandExecutor executor, ConnectionProvider provider) {
this(executor, provider, new CommandObjects());
}

// Uses a fetched connection to process protocol. Should be avoided if possible.
private UnifiedJedis(CommandExecutor executor, ConnectionProvider provider, CommandObjects commandObjects) {
public UnifiedJedis(CommandExecutor executor, ConnectionProvider provider, CommandObjects commandObjects) {
this(executor, provider, commandObjects, null);
if (this.provider != null) {
try (Connection conn = this.provider.getConnection()) {
Expand All @@ -223,7 +223,7 @@ private UnifiedJedis(CommandExecutor executor, ConnectionProvider provider, Comm
}
}

private UnifiedJedis(CommandExecutor executor, ConnectionProvider provider, CommandObjects commandObjects,
public UnifiedJedis(CommandExecutor executor, ConnectionProvider provider, CommandObjects commandObjects,
RedisProtocol protocol) {
this.provider = provider;
this.executor = executor;
Expand Down Expand Up @@ -4859,7 +4859,7 @@ public PipelineBase pipelined() {
} else if (provider instanceof MultiClusterPooledConnectionProvider) {
return new MultiClusterPipeline((MultiClusterPooledConnectionProvider) provider, commandObjects);
} else {
return new Pipeline(provider.getConnection(), true);
return new Pipeline(provider.getConnection(), commandObjects, true);
killergerbah marked this conversation as resolved.
Show resolved Hide resolved
}
}

Expand All @@ -4869,7 +4869,7 @@ public AbstractTransaction multi() {
} else if (provider instanceof MultiClusterPooledConnectionProvider) {
return new MultiClusterTransaction((MultiClusterPooledConnectionProvider) provider, true, commandObjects);
} else {
return new Transaction(provider.getConnection(), true, true);
return new Transaction(provider.getConnection(), commandObjects, true, true);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public final void multi() {
*/
@Override
public final String watch(String... keys) {
appendCommand(new CommandObject<>(new CommandArguments(WATCH).addObjects((Object[]) keys), NO_OP_BUILDER));
appendCommand(commandObjects.watch(keys));
extraCommandCount.incrementAndGet();
inWatch = true;
return null;
Expand All @@ -102,7 +102,7 @@ public final String watch(String... keys) {
*/
@Override
public final String watch(byte[]... keys) {
appendCommand(new CommandObject<>(new CommandArguments(WATCH).addObjects((Object[]) keys), NO_OP_BUILDER));
appendCommand(commandObjects.watch(keys));
extraCommandCount.incrementAndGet();
inWatch = true;
return null;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package redis.clients.jedis.util.prefix;

import redis.clients.jedis.ClusterCommandArguments;
import redis.clients.jedis.CommandArguments;
import redis.clients.jedis.commands.ProtocolCommand;

public class ClusterCommandArgumentsWithPrefixedKeys extends ClusterCommandArguments {
private final byte[] prefixBytes;
private final String prefixString;

public ClusterCommandArgumentsWithPrefixedKeys(ProtocolCommand command, String prefixString, byte[] prefixBytes) {
super(command);
this.prefixString = prefixString;
this.prefixBytes = prefixBytes;
}

public CommandArguments key(Object key) {
return super.key(Prefixer.prefixKey(key, prefixString, prefixBytes));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package redis.clients.jedis.util.prefix;

import redis.clients.jedis.ClusterCommandArguments;
import redis.clients.jedis.ClusterCommandObjects;
import redis.clients.jedis.commands.ProtocolCommand;
import redis.clients.jedis.util.SafeEncoder;

public class ClusterCommandObjectsWithPrefixedKeys extends ClusterCommandObjects {
private final String prefixString;
private final byte[] prefixBytes;

public ClusterCommandObjectsWithPrefixedKeys(String prefixString) {
this.prefixString = prefixString;
prefixBytes = SafeEncoder.encode(prefixString);
}

@Override
protected ClusterCommandArguments commandArguments(ProtocolCommand command) {
return new ClusterCommandArgumentsWithPrefixedKeys(command, prefixString, prefixBytes);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package redis.clients.jedis.util.prefix;

import redis.clients.jedis.CommandArguments;
import redis.clients.jedis.commands.ProtocolCommand;

public class CommandArgumentsWithPrefixedKeys extends CommandArguments {
private final byte[] prefixBytes;
private final String prefixString;

public CommandArgumentsWithPrefixedKeys(ProtocolCommand command, String prefixString, byte[] prefixBytes) {
super(command);
this.prefixString = prefixString;
this.prefixBytes = prefixBytes;
}

public CommandArguments key(Object key) {
return super.key(Prefixer.prefixKey(key, prefixString, prefixBytes));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package redis.clients.jedis.util.prefix;

import redis.clients.jedis.CommandArguments;
import redis.clients.jedis.CommandObjects;
import redis.clients.jedis.commands.ProtocolCommand;
import redis.clients.jedis.util.SafeEncoder;

public class CommandObjectsWithPrefixedKeys extends CommandObjects {
private final String prefixString;
private final byte[] prefixBytes;

public CommandObjectsWithPrefixedKeys(String prefixString) {
this.prefixString = prefixString;
prefixBytes = SafeEncoder.encode(prefixString);
}

@Override
protected CommandArguments commandArguments(ProtocolCommand command) {
return new CommandArgumentsWithPrefixedKeys(command, prefixString, prefixBytes);
}
}
34 changes: 34 additions & 0 deletions src/main/java/redis/clients/jedis/util/prefix/Prefixer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package redis.clients.jedis.util.prefix;

import redis.clients.jedis.args.Rawable;
import redis.clients.jedis.args.RawableFactory;

final class Prefixer {
private Prefixer() {
}

static Object prefixKey(Object key, String prefixString, byte[] prefixBytes) {
if (key instanceof Rawable) {
byte[] raw = ((Rawable) key).getRaw();
return RawableFactory.from(prefixKeyWithBytes(raw, prefixBytes));
}

if (key instanceof byte[]) {
return prefixKeyWithBytes((byte[]) key, prefixBytes);
}

if (key instanceof String) {
String raw = (String) key;
return prefixString + raw;
}

throw new IllegalArgumentException("\"" + key.toString() + "\" is not a valid argument.");
}

private static byte[] prefixKeyWithBytes(byte[] key, byte[] prefixBytes) {
byte[] namespaced = new byte[prefixBytes.length + key.length];
System.arraycopy(prefixBytes, 0, namespaced, 0, prefixBytes.length);
System.arraycopy(key, 0, namespaced, prefixBytes.length, key.length);
return namespaced;
}
}