Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Server-assisted Client-side Caching #3757

Open
wants to merge 34 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
da9c463
Initial support for client-side caching (#3658)
sazzad16 Dec 28, 2023
0ca0dd1
Merge branch 'master' into 5.2.0
sazzad16 Dec 28, 2023
5fa2c80
Merge branch 'master' into 5.2.0
sazzad16 Jan 1, 2024
89617c9
Support for client-side caching - phase 2 (#3673)
sazzad16 Jan 8, 2024
6d4930f
Merge branch 'master' into 5.2.0
sazzad16 Jan 10, 2024
fca975f
Fix transaction failure tests using mock (#3683)
sazzad16 Jan 11, 2024
d87fc6e
Merge branch 'master' into 5.2.0
sazzad16 Jan 17, 2024
3ab6bdc
Support client-side caching from UnifiedJedis (#3691)
sazzad16 Jan 17, 2024
5f1d8c6
Client-side caching by hashing command arguments (#3700)
sazzad16 Feb 15, 2024
4cada22
Cover Redis commands for client side caching (#3702)
sazzad16 Feb 15, 2024
2480b02
Support Client-side caching through URI/URL (#3703)
sazzad16 Feb 15, 2024
26606b9
Test GuavaCSC and CaffeineCSC (#3742)
sazzad16 Feb 28, 2024
333dcd7
Support white-list and black-list commands and keys (#3755)
sazzad16 Mar 6, 2024
e66f498
Introduce interface(s) for hashing CommandObject (#3743)
sazzad16 Mar 6, 2024
c02e5be
Merge branch 'master' into 5.2.0
sazzad16 Mar 6, 2024
1651b26
Client-side cache related naming changes (#3758)
sazzad16 Mar 10, 2024
b897094
Reformat clientSideCache variable names (#3761)
sazzad16 Mar 10, 2024
dc35d45
Merge branch 'master' into 5.2.0
sazzad16 Mar 10, 2024
a2f5d16
Format tabs in pom.xml
sazzad16 Mar 21, 2024
39fc618
Merge branch 'master' into 5.2.0
sazzad16 Mar 21, 2024
6a488b6
Merge branch 'master' into 5.2.0
sazzad16 Mar 27, 2024
a4737e0
Use Experimental annotation
sazzad16 Mar 27, 2024
b7881ac
Merge branch 'master' into 5.2.0
sazzad16 Apr 3, 2024
767fc01
Fix client side cache tests (#3799)
sazzad16 Apr 4, 2024
3bd45a4
Remove openhft hashing from source dependency (#3800)
sazzad16 Apr 5, 2024
bb99c16
Merge branch 'master' into 5.2.0
sazzad16 Apr 29, 2024
82c0226
Test different functionalities of client side cache (#3828)
sazzad16 Apr 29, 2024
6a1dfc8
Test JedisURIHelper#getClientSideCache(URI) (#3835)
sazzad16 May 6, 2024
27e1553
Merge branch 'master' into 5.2.0
sazzad16 May 7, 2024
e45e4a7
Merge branch 'master' into 5.2.0
sazzad16 Jun 6, 2024
103575d
Merge fix: after introducing EndpointConfig in #3836
sazzad16 Jun 6, 2024
a347d7c
Tweak maximumSize test in CaffeineClientSideCacheTest
sazzad16 Jun 6, 2024
11ce88e
Little more tweak maximumSize test in CaffeineClientSideCacheTest
sazzad16 Jun 6, 2024
6b9d338
Fix incompatibilities with the latest RedisStack (#3855)
uglide Jun 6, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
22 changes: 22 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,27 @@
<version>2.10.1</version>
</dependency>

<!-- Optional dependencies -->
<!-- Client-side caching -->
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>33.0.0-jre</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>2.9.3</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>net.openhft</groupId>
<artifactId>zero-allocation-hashing</artifactId>
<version>0.16</version>
<scope>test</scope>
</dependency>

<!-- UNIX socket connection support -->
<dependency>
<groupId>com.kohlschutter.junixsocket</groupId>
Expand All @@ -90,6 +111,7 @@
<version>1.19.0</version>
<scope>test</scope>
</dependency>

<!-- test -->
<dependency>
<groupId>junit</groupId>
Expand Down
3 changes: 1 addition & 2 deletions src/main/java/redis/clients/jedis/CommandObjects.java
Original file line number Diff line number Diff line change
Expand Up @@ -3495,8 +3495,7 @@ public final CommandObject<String> jsonMerge(String key, Path path, Object pojo)
}

public final CommandObject<Object> jsonGet(String key) {
return new CommandObject<>(commandArguments(JsonCommand.GET).key(key),
protocol != RedisProtocol.RESP3 ? JSON_GENERIC_OBJECT : JsonBuilderFactory.JSON_OBJECT);
return new CommandObject<>(commandArguments(JsonCommand.GET).key(key), JSON_GENERIC_OBJECT);
}

@Deprecated
Expand Down
37 changes: 30 additions & 7 deletions src/main/java/redis/clients/jedis/Connection.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@

import redis.clients.jedis.Protocol.Command;
import redis.clients.jedis.Protocol.Keyword;
import redis.clients.jedis.annots.Experimental;
import redis.clients.jedis.args.ClientAttributeOption;
import redis.clients.jedis.args.Rawable;
import redis.clients.jedis.commands.ProtocolCommand;
import redis.clients.jedis.csc.ClientSideCache;
import redis.clients.jedis.exceptions.JedisConnectionException;
import redis.clients.jedis.exceptions.JedisDataException;
import redis.clients.jedis.exceptions.JedisException;
Expand All @@ -34,6 +36,7 @@ public class Connection implements Closeable {
private Socket socket;
private RedisOutputStream outputStream;
private RedisInputStream inputStream;
private ClientSideCache clientSideCache;
private int soTimeout = 0;
private int infiniteSoTimeout = 0;
private boolean broken = false;
Expand All @@ -51,9 +54,7 @@ public Connection(final HostAndPort hostAndPort) {
}

public Connection(final HostAndPort hostAndPort, final JedisClientConfig clientConfig) {
this(new DefaultJedisSocketFactory(hostAndPort, clientConfig));
this.infiniteSoTimeout = clientConfig.getBlockingSocketTimeoutMillis();
initializeFromClientConfig(clientConfig);
this(new DefaultJedisSocketFactory(hostAndPort, clientConfig), clientConfig);
}

public Connection(final JedisSocketFactory socketFactory) {
Expand All @@ -67,6 +68,16 @@ public Connection(final JedisSocketFactory socketFactory, JedisClientConfig clie
initializeFromClientConfig(clientConfig);
}

@Experimental
public Connection(final JedisSocketFactory socketFactory, JedisClientConfig clientConfig,
ClientSideCache clientSideCache) {
this.socketFactory = socketFactory;
this.soTimeout = clientConfig.getSocketTimeoutMillis();
this.infiniteSoTimeout = clientConfig.getBlockingSocketTimeoutMillis();
initializeFromClientConfig(clientConfig);
initializeClientSideCache(clientSideCache);
}

@Override
public String toString() {
return "Connection{" + socketFactory + "}";
Expand Down Expand Up @@ -347,10 +358,7 @@ protected Object readProtocolWithCheckingBroken() {
}

try {
return Protocol.read(inputStream);
// Object read = Protocol.read(inputStream);
// System.out.println(SafeEncoder.encodeObject(read));
// return read;
return Protocol.read(inputStream, clientSideCache);
} catch (JedisConnectionException exc) {
broken = true;
throw exc;
Expand Down Expand Up @@ -514,4 +522,19 @@ public boolean ping() {
}
return true;
}

private void initializeClientSideCache(ClientSideCache csCache) {
this.clientSideCache = csCache;
if (clientSideCache != null) {
if (protocol != RedisProtocol.RESP3) {
throw new JedisException("Client side caching is only supported with RESP3.");
}

sendCommand(Protocol.Command.CLIENT, "TRACKING", "ON");
String reply = getStatusCodeReply();
if (!"OK".equals(reply)) {
throw new JedisException("Could not enable client tracking. Reply: " + reply);
}
}
}
}
20 changes: 13 additions & 7 deletions src/main/java/redis/clients/jedis/ConnectionFactory.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package redis.clients.jedis;


import org.apache.commons.pool2.PooledObject;
import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.DefaultPooledObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import redis.clients.jedis.annots.Experimental;
import redis.clients.jedis.csc.ClientSideCache;
import redis.clients.jedis.exceptions.JedisException;

/**
Expand All @@ -17,21 +17,28 @@ public class ConnectionFactory implements PooledObjectFactory<Connection> {
private static final Logger logger = LoggerFactory.getLogger(ConnectionFactory.class);

private final JedisSocketFactory jedisSocketFactory;

private final JedisClientConfig clientConfig;
private ClientSideCache clientSideCache = null;

public ConnectionFactory(final HostAndPort hostAndPort) {
this.clientConfig = DefaultJedisClientConfig.builder().build();
this.jedisSocketFactory = new DefaultJedisSocketFactory(hostAndPort);
}

public ConnectionFactory(final HostAndPort hostAndPort, final JedisClientConfig clientConfig) {
this.clientConfig = DefaultJedisClientConfig.copyConfig(clientConfig);
this.clientConfig = clientConfig;
this.jedisSocketFactory = new DefaultJedisSocketFactory(hostAndPort, this.clientConfig);
}

@Experimental
public ConnectionFactory(final HostAndPort hostAndPort, final JedisClientConfig clientConfig, ClientSideCache csCache) {
this.clientConfig = clientConfig;
this.jedisSocketFactory = new DefaultJedisSocketFactory(hostAndPort, this.clientConfig);
this.clientSideCache = csCache;
}

public ConnectionFactory(final JedisSocketFactory jedisSocketFactory, final JedisClientConfig clientConfig) {
this.clientConfig = DefaultJedisClientConfig.copyConfig(clientConfig);
this.clientConfig = clientConfig;
this.jedisSocketFactory = jedisSocketFactory;
}

Expand All @@ -54,9 +61,8 @@ public void destroyObject(PooledObject<Connection> pooledConnection) throws Exce

@Override
public PooledObject<Connection> makeObject() throws Exception {
Connection jedis = null;
try {
jedis = new Connection(jedisSocketFactory, clientConfig);
Connection jedis = new Connection(jedisSocketFactory, clientConfig, clientSideCache);
return new DefaultPooledObject<>(jedis);
} catch (JedisException je) {
logger.debug("Error while makeObject", je);
Expand Down
13 changes: 13 additions & 0 deletions src/main/java/redis/clients/jedis/ConnectionPool.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import org.apache.commons.pool2.PooledObjectFactory;
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
import redis.clients.jedis.annots.Experimental;
import redis.clients.jedis.csc.ClientSideCache;
import redis.clients.jedis.util.Pool;

public class ConnectionPool extends Pool<Connection> {
Expand All @@ -10,6 +12,11 @@ public ConnectionPool(HostAndPort hostAndPort, JedisClientConfig clientConfig) {
this(new ConnectionFactory(hostAndPort, clientConfig));
}

@Experimental
public ConnectionPool(HostAndPort hostAndPort, JedisClientConfig clientConfig, ClientSideCache clientSideCache) {
this(new ConnectionFactory(hostAndPort, clientConfig, clientSideCache));
}

public ConnectionPool(PooledObjectFactory<Connection> factory) {
super(factory);
}
Expand All @@ -19,6 +26,12 @@ public ConnectionPool(HostAndPort hostAndPort, JedisClientConfig clientConfig,
this(new ConnectionFactory(hostAndPort, clientConfig), poolConfig);
}

@Experimental
public ConnectionPool(HostAndPort hostAndPort, JedisClientConfig clientConfig, ClientSideCache clientSideCache,
GenericObjectPoolConfig<Connection> poolConfig) {
this(new ConnectionFactory(hostAndPort, clientConfig, clientSideCache), poolConfig);
}

public ConnectionPool(PooledObjectFactory<Connection> factory,
GenericObjectPoolConfig<Connection> poolConfig) {
super(factory, poolConfig);
Expand Down
62 changes: 52 additions & 10 deletions src/main/java/redis/clients/jedis/JedisCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,9 @@
import java.util.Set;

import org.apache.commons.pool2.impl.GenericObjectPoolConfig;

import redis.clients.jedis.annots.Experimental;
import redis.clients.jedis.providers.ClusterConnectionProvider;
import redis.clients.jedis.csc.ClientSideCache;
import redis.clients.jedis.util.JedisClusterCRC16;

public class JedisCluster extends UnifiedJedis {
Expand Down Expand Up @@ -198,28 +199,69 @@ public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfi
Duration.ofMillis((long) clientConfig.getSocketTimeoutMillis() * maxAttempts), poolConfig);
}

public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, int maxAttempts,
Duration maxTotalRetriesDuration, GenericObjectPoolConfig<Connection> poolConfig) {
this(new ClusterConnectionProvider(clusterNodes, clientConfig, poolConfig), maxAttempts, maxTotalRetriesDuration,
clientConfig.getRedisProtocol());
}

public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig,
GenericObjectPoolConfig<Connection> poolConfig, Duration topologyRefreshPeriod, int maxAttempts,
Duration maxTotalRetriesDuration) {
this(new ClusterConnectionProvider(clusterNodes, clientConfig, poolConfig, topologyRefreshPeriod),
maxAttempts, maxTotalRetriesDuration, clientConfig.getRedisProtocol());
}

public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, int maxAttempts,
Duration maxTotalRetriesDuration, GenericObjectPoolConfig<Connection> poolConfig) {
this(new ClusterConnectionProvider(clusterNodes, clientConfig, poolConfig), maxAttempts, maxTotalRetriesDuration,
clientConfig.getRedisProtocol());
private JedisCluster(ClusterConnectionProvider provider, int maxAttempts, Duration maxTotalRetriesDuration,
RedisProtocol protocol) {
super(provider, maxAttempts, maxTotalRetriesDuration, protocol);
}

// Uses a fetched connection to process protocol. Should be avoided if possible.
public JedisCluster(ClusterConnectionProvider provider, int maxAttempts,
@Experimental
public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, ClientSideCache clientSideCache) {
this(clusterNodes, clientConfig, clientSideCache, DEFAULT_MAX_ATTEMPTS,
Duration.ofMillis(DEFAULT_MAX_ATTEMPTS * clientConfig.getSocketTimeoutMillis()));
}

@Experimental
public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, ClientSideCache clientSideCache,
int maxAttempts, Duration maxTotalRetriesDuration) {
this(new ClusterConnectionProvider(clusterNodes, clientConfig, clientSideCache), maxAttempts, maxTotalRetriesDuration,
clientConfig.getRedisProtocol(), clientSideCache);
}

@Experimental
public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, ClientSideCache clientSideCache,
int maxAttempts, Duration maxTotalRetriesDuration, GenericObjectPoolConfig<Connection> poolConfig) {
this(new ClusterConnectionProvider(clusterNodes, clientConfig, clientSideCache, poolConfig),
maxAttempts, maxTotalRetriesDuration, clientConfig.getRedisProtocol(), clientSideCache);
}

@Experimental
public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, ClientSideCache clientSideCache,
GenericObjectPoolConfig<Connection> poolConfig) {
this(new ClusterConnectionProvider(clusterNodes, clientConfig, clientSideCache, poolConfig),
DEFAULT_MAX_ATTEMPTS, Duration.ofMillis(DEFAULT_MAX_ATTEMPTS * clientConfig.getSocketTimeoutMillis()),
clientConfig.getRedisProtocol(), clientSideCache);
}

@Experimental
public JedisCluster(Set<HostAndPort> clusterNodes, JedisClientConfig clientConfig, ClientSideCache clientSideCache,
GenericObjectPoolConfig<Connection> poolConfig, Duration topologyRefreshPeriod, int maxAttempts,
Duration maxTotalRetriesDuration) {
super(provider, maxAttempts, maxTotalRetriesDuration);
this(new ClusterConnectionProvider(clusterNodes, clientConfig, clientSideCache, poolConfig, topologyRefreshPeriod),
maxAttempts, maxTotalRetriesDuration, clientConfig.getRedisProtocol(), clientSideCache);
}

@Experimental
private JedisCluster(ClusterConnectionProvider provider, int maxAttempts, Duration maxTotalRetriesDuration,
RedisProtocol protocol) {
super(provider, maxAttempts, maxTotalRetriesDuration, protocol);
RedisProtocol protocol, ClientSideCache clientSideCache) {
super(provider, maxAttempts, maxTotalRetriesDuration, protocol, clientSideCache);
}

// Uses a fetched connection to process protocol. Should be avoided if possible.
public JedisCluster(ClusterConnectionProvider provider, int maxAttempts, Duration maxTotalRetriesDuration) {
super(provider, maxAttempts, maxTotalRetriesDuration);
}

public Map<String, ConnectionPool> getClusterNodes() {
Expand Down