reference reactor Operator #294
-
Hi!
|
Beta Was this translation helpful? Give feedback.
Replies: 2 comments
-
Hello Ilya, Which version do you use? In the current version there is no AsyncScheduledBucket, it is removed since I have played a bit with ProjectReactor and got the following: public static <T> Mono<T> decorate(SchedulingBucket bucket, ScheduledExecutorService scheduledExecutorService, int tokenToConsume, Supplier<Mono<T>> monoSupplier) {
// if there is enough tokens in the bucket then returned future is immoderately completed
// otherwise it will be completed inside some thread belonged to scheduledExecutorService after refilling requested amount of tokens
CompletableFuture<Void> rateLimitingFuture = bucket.consume(tokenToConsume, scheduledExecutorService);
CompletableFuture<T> resultFuture = rateLimitingFuture.thenCompose((Void nothing) -> {
Mono<T> mono = monoSupplier.get();
return mono.toFuture();
});
return Mono.fromFuture(resultFuture);
}
// this variant looks better for me than unlimited "decorate" because allows to specify maximum time that you are agree to wait for tokens refill
public static <T> Mono<T> decorateWithWaitingLimit(SchedulingBucket bucket, Duration waitingLimit, ScheduledExecutorService scheduledExecutorService, int tokenToConsume, Supplier<Mono<T>> monoSupplier) {
// if there is enough tokens in the bucket then returned future is immoderately completed
// otherwise it will be completed inside some thread belonged to scheduledExecutorService after refilling requested amount of tokens.
//
// If there are no requested tokens in the bucket and requested amount can not be refilled in waitingLimit then future immediately completed by {@code false}.
CompletableFuture<Boolean> rateLimitingFuture = bucket.tryConsume(tokenToConsume, waitingLimit, scheduledExecutorService);
CompletableFuture<T> resultFuture = rateLimitingFuture.thenCompose((Boolean wasConsumed) -> {
if (!wasConsumed) {
// TODO replace with another exaption type that is more suitable for your application
Exception exception = new TimeoutException(tokenToConsume + " tokens can not be consumed in " + waitingLimit.toMillis() + " milliseconds");
CompletableFuture<T> failedFuture = new CompletableFuture<>();
failedFuture.completeExceptionally(exception);
return failedFuture;
}
Mono<T> mono = monoSupplier.get();
return mono.toFuture();
});
return Mono.fromFuture(resultFuture);
} You also can play with code above by checkouting this commit. xample of usage can be something like this: Mono<MyClass> mono = decorate(bucket.asScheduler(), someScheduledExecutorService, requestWeight,
() -> webClient.get().uri("some://uri").retrieve() .bodyToMono<MyClass>());
// or
Duration maxTimeBeforeCancelByRateLimiting = Duration.ofSeconds(10);
Mono<MyClass> mono = decorateWithWaitingLimit(bucket.asScheduler(), maxTimeBeforeCancelByRateLimiting, someScheduledExecutorService, requestWeight,
() -> webClient.get().uri("some://uri").retrieve() .bodyToMono<MyClass>()); P.S. in case your Mono's pipeline is fully lazy, in other words it is cheap until subscription, then |
Beta Was this translation helpful? Give feedback.
-
Hello, Vladimir! |
Beta Was this translation helpful? Give feedback.
Hello Ilya,
Which version do you use? In the current version there is no AsyncScheduledBucket, it is removed since
7.0
, there is SchedulingBucket interface that is common for both blocking and non-blocking. Semantic did not change.I have played a bit with ProjectReactor and got the following: