Since Lettuce Redis cluster client is not supported, I modified LettuceBasedProxyManager myself, executed the command with RedisTemplate, but got a Default Bucket #322
-
Hello @vladimir-bukhtoyarov ,The following is the code I rewritten, but the Bucket I got was not configured by me. Can you help me to see why I can't get the Bucket I configured myself? protected CompareAndSwapOperation beginCompareAndSwapOperation(byte[] key) {
byte[][] keys = {key};
return new CompareAndSwapOperation() {
@Override
public Optional<byte[]> getStateData() {
Optional<Object> o = Optional.ofNullable(redisTemplate.opsForValue().get(key));
if (o.isEmpty()) {
return Optional.ofNullable(null);
} else {
if (o.get() instanceof byte[]) {
return Optional.ofNullable((byte[]) o.get());
} else {
return Optional.ofNullable(null);
}
}
}
@Override
public boolean compareAndSwap(
byte[] originalData, byte[] newData, RemoteBucketState newState) {
return compareAndSwapFuture(key, keys, originalData, newData, newState);
}
};
} Thankyou in advance! |
Beta Was this translation helpful? Give feedback.
Replies: 4 comments 6 replies
-
@licy12306 hello I can not help with something while self-executable example is not provided. Create self-executable test based on test-containers, like this https://github.com/bucket4j/bucket4j/blob/master/bucket4j-redis/src/test/java/io/github/bucket4j/redis/redisson/cas/RedissonBasedProxyManagerTest.java |
Beta Was this translation helpful? Give feedback.
-
@vladimir-bukhtoyarov Hello, because my project uses Spring-assembled Redis, I also want to use redisTemplate directly when accessing bucket4j, so this is the LettuceBasedProxyManager I rewrote, pass in RedisTemplate from outside, and put commands.get(keys) and commands .eval() is replaced by redisTemplate.opsForValue().get(key) and redisTemplate.execute(RedisScript script, List keys, Object... args). When executing redisTemplate.opsForValue().get(key), the same effect as commands.get(keys) cannot be obtained.I am sorry i am too fail, I can't know why, please help me. import io.github.bucket4j.TimeMeter;
import io.github.bucket4j.distributed.ExpirationAfterWriteStrategy;
import io.github.bucket4j.distributed.proxy.generic.compare_and_swap.AbstractCompareAndSwapBasedProxyManager;
import io.github.bucket4j.distributed.proxy.generic.compare_and_swap.AsyncCompareAndSwapOperation;
import io.github.bucket4j.distributed.proxy.generic.compare_and_swap.CompareAndSwapOperation;
import io.github.bucket4j.distributed.remote.RemoteBucketState;
import io.github.bucket4j.redis.AbstractRedisProxyManagerBuilder;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.RedisScript;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
public class LettuceProxyManager extends AbstractCompareAndSwapBasedProxyManager<byte[]> {
// private final RedisAdvancedClusterAsyncCommands<byte[], byte[]> commands;
private final RedisTemplate redisTemplate;
private final ExpirationAfterWriteStrategy expirationStrategy;
// public static LettuceProxyManagerBuilder builderFor(
// RedisAdvancedClusterAsyncCommands<byte[], byte[]> redisAdvancedClusterAsyncCommands) {
// return new LettuceProxyManagerBuilder(redisAdvancedClusterAsyncCommands);
// }
public static LettuceProxyManagerBuilder builderFor(RedisTemplate redisTemplate) {
return new LettuceProxyManagerBuilder(redisTemplate);
}
// public static LettuceProxyManagerBuilder builderFor(
// StatefulRedisClusterConnection<byte[], byte[]> statefulRedisClusterConnection) {
// return new LettuceProxyManagerBuilder(statefulRedisClusterConnection.async());
// }
// public static LettuceProxyManagerBuilder builderFor(RedisClusterClient redisClusterClient) {
// return builderFor(redisClusterClient.connect(ByteArrayCodec.INSTANCE));
// }
public static class LettuceProxyManagerBuilder
extends AbstractRedisProxyManagerBuilder<LettuceProxyManagerBuilder> {
// private final RedisAdvancedClusterAsyncCommands<byte[], byte[]> commands;
private final RedisTemplate redisTemplate;
// private LettuceProxyManagerBuilder(RedisAdvancedClusterAsyncCommands<byte[], byte[]>
// commands) {
// this.commands = Objects.requireNonNull(commands);
// }
private LettuceProxyManagerBuilder(RedisTemplate redisTemplate) {
this.redisTemplate = Objects.requireNonNull(redisTemplate);
}
public LettuceProxyManager build() {
return new LettuceProxyManager(this);
}
}
private LettuceProxyManager(LettuceProxyManagerBuilder builder) {
super(builder.getClientSideConfig());
this.expirationStrategy = builder.getNotNullExpirationStrategy();
this.redisTemplate = builder.redisTemplate;
}
@Override
protected CompareAndSwapOperation beginCompareAndSwapOperation(byte[] key) {
byte[][] keys = {key};
return new CompareAndSwapOperation() {
@Override
public Optional<byte[]> getStateData() {
Optional<Object> o = Optional.ofNullable(redisTemplate.opsForValue().get(key));
if (o.isEmpty()) {
return Optional.ofNullable(null);
} else {
if (o.get() instanceof byte[]) {
return Optional.ofNullable((byte[]) o.get());
} else {
return Optional.ofNullable(null);
}
}
}
@Override
public boolean compareAndSwap(
byte[] originalData, byte[] newData, RemoteBucketState newState) {
return compareAndSwapFuture(key, keys, originalData, newData, newState);
}
};
}
@Override
protected AsyncCompareAndSwapOperation beginAsyncCompareAndSwapOperation(byte[] key) {
byte[][] keys = {key};
return new AsyncCompareAndSwapOperation() {
@Override
public CompletableFuture<Optional<byte[]>> getStateData() {
// RedisFuture<byte[]> stateFuture = commands.get(key);
// return convertToCompletableFuture(stateFuture)
// .thenApply((byte[] resultBytes) -> Optional.ofNullable(resultBytes));
return convertToCompletableFuture(key)
.thenApply((byte[] resultBytes) -> Optional.ofNullable(resultBytes));
}
@Override
public CompletableFuture<Boolean> compareAndSwap(
byte[] originalData, byte[] newData, RemoteBucketState newState) {
// return convertToCompletableFuture(
// compareAndSwapFuture(key, keys, originalData, newData, newState));
return CompletableFuture.supplyAsync(
() -> compareAndSwapFuture(key, keys, originalData, newData, newState));
}
};
}
@Override
public void removeProxy(byte[] key) {
Boolean delete = redisTemplate.delete(key);
// RedisFuture<?> future = commands.del(key);
// getFutureValue(future);
}
@Override
protected CompletableFuture<Void> removeAsync(byte[] key) {
// RedisFuture<?> future = commands.del(key);
// return convertToCompletableFuture(future).thenApply(bytes -> null);
return convertToCompletableFuture(key).thenApply(bytes -> null);
}
@Override
public boolean isAsyncModeSupported() {
return true;
}
private Boolean compareAndSwapFuture(
byte[] key, byte[][] keys, byte[] originalData, byte[] newData, RemoteBucketState newState) {
long ttlMillis = calculateTtlMillis(newState);
if (ttlMillis > 0) {
if (originalData == null) {
// nulls are prohibited as values, so "replace" must not be used in such cases
String script = "return redis.call('set', KEYS[1], ARGV[1], 'nx', 'px', ARGV[2])";
byte[][] params = {newData, encodeLong(ttlMillis)};
// return commands.eval(script, ScriptOutputType.BOOLEAN, keys, params);
return executeEval(script, keys, params);
} else {
String script =
"if redis.call('get', KEYS[1]) == ARGV[1] then "
+ "redis.call('psetex', KEYS[1], ARGV[3], ARGV[2]); "
+ "return 1; "
+ "else "
+ "return 0; "
+ "end";
byte[][] params = {originalData, newData, encodeLong(ttlMillis)};
// return commands.eval(script, ScriptOutputType.BOOLEAN, keys, params);
return executeEval(script, keys, params);
}
} else {
if (originalData == null) {
// nulls are prohibited as values, so "replace" must not be used in such cases
String script = "return redis.call('set', KEYS[1], ARGV[1], 'nx')";
byte[][] params = {newData};
// return commands.eval(script, ScriptOutputType.BOOLEAN, keys, params);
return executeEval(script, keys, params);
} else {
String script =
"if redis.call('get', KEYS[1]) == ARGV[1] then "
+ "redis.call('set', KEYS[1], ARGV[2]); "
+ "return 1; "
+ "else "
+ "return 0; "
+ "end";
byte[][] params = {originalData, newData};
// return commands.eval(script, ScriptOutputType.BOOLEAN, keys, params);
return executeEval(script, keys, params);
}
}
}
// private <T> CompletableFuture<T> convertToCompletableFuture(RedisFuture<T> redissonFuture) {
// CompletableFuture<T> jdkFuture = new CompletableFuture<>();
// redissonFuture.whenComplete(
// (result, error) -> {
// if (error != null) {
// jdkFuture.completeExceptionally(error);
// } else {
// jdkFuture.complete(result);
// }
// });
// return jdkFuture;
// }
private <T> CompletableFuture<T> convertToCompletableFuture(T key) {
CompletableFuture<T> jdkFuture = new CompletableFuture<>();
CompletableFuture.supplyAsync(
() -> {
Optional<Object> optional = Optional.ofNullable(redisTemplate.opsForValue().get(key));
if (!optional.isEmpty() && optional.get() instanceof byte[] bytes) {
return Optional.ofNullable((byte[]) optional.get());
}
return Optional.ofNullable(null);
})
.whenComplete(
(result, error) -> {
if (error != null) {
jdkFuture.completeExceptionally(error);
} else {
jdkFuture.complete((T) result);
}
});
return jdkFuture;
}
//
// private <V> V getFutureValue(RedisFuture<V> value) {
// try {
// return value.get();
// } catch (InterruptedException e) {
// value.cancel(true);
// Thread.currentThread().interrupt();
// throw new RedisException(e);
// } catch (ExecutionException e) {
// throw e.getCause() instanceof RedisException
// ? (RedisException) e.getCause()
// : new RedisException("Unexpected exception while processing command", e.getCause());
// }
// }
//
// private <V> V getCompletableFutureValue(CompletableFuture<V> value) {
// try {
// return value.get();
// } catch (InterruptedException e) {
// value.cancel(true);
// Thread.currentThread().interrupt();
// throw new RedisException(e);
// } catch (ExecutionException e) {
// throw e.getCause() instanceof RedisException
// ? (RedisException) e.getCause()
// : new RedisException("Unexpected exception while processing command", e.getCause());
// }
// }
private byte[] encodeLong(Long value) {
return ("" + value).getBytes(StandardCharsets.UTF_8);
}
private long calculateTtlMillis(RemoteBucketState state) {
Optional<TimeMeter> clock = getClientSideConfig().getClientSideClock();
long currentTimeNanos =
clock.isPresent() ? clock.get().currentTimeNanos() : System.currentTimeMillis() * 1_000_000;
return expirationStrategy.calculateTimeToLiveMillis(state, currentTimeNanos);
}
// execute script
private Boolean executeEval(String script, byte[][] keys, byte[][] params) {
RedisScript redisScript = RedisScript.of(script, Boolean.class);
Object result = redisTemplate.execute(redisScript, Arrays.asList(keys), params); //
if (result != null && result instanceof Boolean) {
return (Boolean) result;
}
return false;
}
} |
Beta Was this translation helpful? Give feedback.
-
hello @vladimir-bukhtoyarov |
Beta Was this translation helpful? Give feedback.
-
Redis Cluster support has been added for both Letuce and Jedis in release <dependency>
<groupId>com.bucket4j</groupId>
<artifactId>bucket4j-redis</artifactId>
<version>8.2.RC2</version>
</dependency> Example usage for Jedis: JedisCluster jedisCluster = ...;
ProxyManager<byte[]> proxyManager = JedisBasedProxyManager.builderFor(jedisCluster)
.withExpirationStrategy(ExpirationAfterWriteStrategy.fixedTimeToLive(Duration.ofSeconds(10)))
.build(); Example usage for Letuce: RedisClusterClient redisClusterClient = ...;
ProxyManager<byte[]> proxyManager = LettuceBasedProxyManager.builderFor(redisClusterClient)
.withExpirationStrategy(ExpirationAfterWriteStrategy.fixedTimeToLive(Duration.ofSeconds(10)))
.build(); Also, support for spring-data-redis has been removed as infinite source of problems. |
Beta Was this translation helpful? Give feedback.
@licy12306 hello
I can not help with something while self-executable example is not provided. Create self-executable test based on test-containers, like this https://github.com/bucket4j/bucket4j/blob/master/bucket4j-redis/src/test/java/io/github/bucket4j/redis/redisson/cas/RedissonBasedProxyManagerTest.java