diff --git a/extensions/redis-client/deployment/src/test/java/io/quarkus/redis/client/deployment/patterns/PubSubOnStartupTest.java b/extensions/redis-client/deployment/src/test/java/io/quarkus/redis/client/deployment/patterns/PubSubOnStartupTest.java new file mode 100644 index 0000000000000..fb76d6723738f --- /dev/null +++ b/extensions/redis-client/deployment/src/test/java/io/quarkus/redis/client/deployment/patterns/PubSubOnStartupTest.java @@ -0,0 +1,137 @@ +package io.quarkus.redis.client.deployment.patterns; + +import static org.awaitility.Awaitility.await; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.Consumer; + +import javax.annotation.PreDestroy; +import javax.enterprise.context.ApplicationScoped; +import javax.inject.Inject; + +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.spec.JavaArchive; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.redis.client.deployment.RedisTestResource; +import io.quarkus.redis.datasource.ReactiveRedisDataSource; +import io.quarkus.redis.datasource.RedisDataSource; +import io.quarkus.redis.datasource.pubsub.PubSubCommands; +import io.quarkus.redis.datasource.pubsub.ReactivePubSubCommands; +import io.quarkus.redis.datasource.string.StringCommands; +import io.quarkus.runtime.Startup; +import io.quarkus.test.QuarkusUnitTest; +import io.quarkus.test.common.QuarkusTestResource; +import io.smallrye.mutiny.Multi; +import io.smallrye.mutiny.subscription.Cancellable; + +@QuarkusTestResource(RedisTestResource.class) +public class PubSubOnStartupTest { + @RegisterExtension + static final QuarkusUnitTest unitTest = new QuarkusUnitTest() + .setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class).addClass(MyCache.class) + .addClass(BusinessObject.class).addClass(Notification.class).addClass(MySubscriber.class)) + .overrideConfigKey("quarkus.redis.hosts", "${quarkus.redis.tr}"); + + @Inject + MyCache cache; + + @Inject + MySubscriber subscriber; + + @Test + void cacheWithPubSub() { + BusinessObject foo = cache.get("ps-foo-2"); + BusinessObject bar = cache.get("ps-bar-2"); + Assertions.assertNull(foo); + Assertions.assertNull(bar); + + cache.set("ps-foo-2", new BusinessObject("ps-foo-2")); + cache.set("ps-bar-2", new BusinessObject("ps-bar-2")); + + await().until(() -> subscriber.list().size() == 2); + } + + public static final class BusinessObject { + public String result; + + public BusinessObject() { + + } + + public BusinessObject(String v) { + this.result = v; + } + } + + public static final class Notification { + public String key; + public BusinessObject bo; + + public Notification() { + + } + + public Notification(String key, BusinessObject bo) { + this.key = key; + this.bo = bo; + } + } + + @ApplicationScoped + @Startup + public static class MySubscriber implements Consumer { + private final ReactivePubSubCommands pub; + + private final Cancellable cancellable; + + public List list = new ArrayList<>(); + + public MySubscriber(ReactiveRedisDataSource ds) { + pub = ds.pubsub(Notification.class); + Multi multi = pub.subscribe("notifications"); + cancellable = multi.subscribe().with(n -> list.add(n)); + } + + @PreDestroy + public void terminate() { + cancellable.cancel(); + } + + @Override + public void accept(Notification notification) { + // Received the notification + list.add(notification); + } + + public List list() { + return list; + } + } + + @ApplicationScoped + public static class MyCache { + + private final StringCommands commands; + private final PubSubCommands pub; + + public MyCache(RedisDataSource ds) { + commands = ds.string(BusinessObject.class); + pub = ds.pubsub(Notification.class); + } + + public BusinessObject get(String key) { + return commands.get(key); + } + + public void set(String key, BusinessObject bo) { + commands.set(key, bo); + pub.publish("notifications", new Notification(key, bo)); + } + + } + +} diff --git a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/client/RedisClientRecorder.java b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/client/RedisClientRecorder.java index ff7b3eb4f5ec0..9d841ee1fcaa0 100644 --- a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/client/RedisClientRecorder.java +++ b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/client/RedisClientRecorder.java @@ -33,14 +33,15 @@ public class RedisClientRecorder { private final RedisConfig config; private static final Map clients = new HashMap<>(); private static final Map dataSources = new HashMap<>(); + private Vertx vertx; public RedisClientRecorder(RedisConfig rc) { this.config = rc; } public void initialize(RuntimeValue vertx, Set names) { - Vertx v = Vertx.newInstance(vertx.getValue()); - _initialize(v, names); + this.vertx = Vertx.newInstance(vertx.getValue()); + _initialize(this.vertx, names); } private void closeAllClients() { @@ -138,7 +139,7 @@ public ReactiveRedisDataSource get() { RedisClientAndApi redisClientAndApi = clients.get(name); Redis redis = redisClientAndApi.redis; RedisAPI api = redisClientAndApi.api; - return new ReactiveRedisDataSourceImpl(redis, api); + return new ReactiveRedisDataSourceImpl(vertx, redis, api); }); } }; diff --git a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/BlockingRedisDataSourceImpl.java b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/BlockingRedisDataSourceImpl.java index 59b2bb61ad641..81b57589cab3f 100644 --- a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/BlockingRedisDataSourceImpl.java +++ b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/BlockingRedisDataSourceImpl.java @@ -22,6 +22,7 @@ import io.quarkus.redis.datasource.transactions.OptimisticLockingTransactionResult; import io.quarkus.redis.datasource.transactions.TransactionResult; import io.quarkus.redis.datasource.transactions.TransactionalRedisDataSource; +import io.vertx.mutiny.core.Vertx; import io.vertx.mutiny.redis.client.Command; import io.vertx.mutiny.redis.client.Redis; import io.vertx.mutiny.redis.client.RedisAPI; @@ -35,8 +36,8 @@ public class BlockingRedisDataSourceImpl implements RedisDataSource { final ReactiveRedisDataSourceImpl reactive; final RedisConnection connection; - public BlockingRedisDataSourceImpl(Redis redis, RedisAPI api, Duration timeout) { - this(new ReactiveRedisDataSourceImpl(redis, api), timeout); + public BlockingRedisDataSourceImpl(Vertx vertx, Redis redis, RedisAPI api, Duration timeout) { + this(new ReactiveRedisDataSourceImpl(vertx, redis, api), timeout); } public BlockingRedisDataSourceImpl(ReactiveRedisDataSourceImpl reactive, Duration timeout) { @@ -45,13 +46,14 @@ public BlockingRedisDataSourceImpl(ReactiveRedisDataSourceImpl reactive, Duratio this.connection = reactive.connection; } - public BlockingRedisDataSourceImpl(Redis redis, RedisConnection connection, Duration timeout) { - this(new ReactiveRedisDataSourceImpl(redis, connection), timeout); + public BlockingRedisDataSourceImpl(Vertx vertx, Redis redis, RedisConnection connection, Duration timeout) { + this(new ReactiveRedisDataSourceImpl(vertx, redis, connection), timeout); } public TransactionResult withTransaction(Consumer ds) { RedisConnection connection = reactive.redis.connect().await().atMost(timeout); - ReactiveRedisDataSourceImpl dataSource = new ReactiveRedisDataSourceImpl(reactive.redis, connection); + ReactiveRedisDataSourceImpl dataSource = new ReactiveRedisDataSourceImpl(reactive.getVertx(), reactive.redis, + connection); TransactionHolder th = new TransactionHolder(); BlockingTransactionalRedisDataSourceImpl source = new BlockingTransactionalRedisDataSourceImpl( new ReactiveTransactionalRedisDataSourceImpl(dataSource, th), timeout); @@ -73,7 +75,8 @@ public TransactionResult withTransaction(Consumer @Override public TransactionResult withTransaction(Consumer ds, String... watchedKeys) { RedisConnection connection = reactive.redis.connect().await().atMost(timeout); - ReactiveRedisDataSourceImpl dataSource = new ReactiveRedisDataSourceImpl(reactive.redis, connection); + ReactiveRedisDataSourceImpl dataSource = new ReactiveRedisDataSourceImpl(reactive.getVertx(), reactive.redis, + connection); TransactionHolder th = new TransactionHolder(); BlockingTransactionalRedisDataSourceImpl source = new BlockingTransactionalRedisDataSourceImpl( new ReactiveTransactionalRedisDataSourceImpl(dataSource, th), timeout); @@ -104,7 +107,8 @@ public TransactionResult withTransaction(Consumer public OptimisticLockingTransactionResult withTransaction(Function preTxBlock, BiConsumer tx, String... watchedKeys) { RedisConnection connection = reactive.redis.connect().await().atMost(timeout); - ReactiveRedisDataSourceImpl dataSource = new ReactiveRedisDataSourceImpl(reactive.redis, connection); + ReactiveRedisDataSourceImpl dataSource = new ReactiveRedisDataSourceImpl(reactive.getVertx(), reactive.redis, + connection); TransactionHolder th = new TransactionHolder(); BlockingTransactionalRedisDataSourceImpl source = new BlockingTransactionalRedisDataSourceImpl( new ReactiveTransactionalRedisDataSourceImpl(dataSource, th), timeout); @@ -116,7 +120,8 @@ public OptimisticLockingTransactionResult withTransaction(Function consumer) { } BlockingRedisDataSourceImpl source = reactive.redis.connect() - .map(rc -> new BlockingRedisDataSourceImpl(reactive.redis, rc, timeout)) + .map(rc -> new BlockingRedisDataSourceImpl(reactive.getVertx(), reactive.redis, rc, timeout)) .await().atMost(timeout); try { diff --git a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/ReactivePubSubCommandsImpl.java b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/ReactivePubSubCommandsImpl.java index df0243ce7067f..7273cadeb82a1 100644 --- a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/ReactivePubSubCommandsImpl.java +++ b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/ReactivePubSubCommandsImpl.java @@ -14,21 +14,25 @@ import io.smallrye.common.vertx.VertxContext; import io.smallrye.mutiny.Multi; import io.smallrye.mutiny.Uni; +import io.smallrye.mutiny.subscription.UniEmitter; import io.vertx.core.Context; import io.vertx.core.Vertx; import io.vertx.mutiny.redis.client.Command; import io.vertx.mutiny.redis.client.Redis; import io.vertx.mutiny.redis.client.RedisAPI; import io.vertx.mutiny.redis.client.RedisConnection; +import io.vertx.mutiny.redis.client.Response; public class ReactivePubSubCommandsImpl extends AbstractRedisCommands implements ReactivePubSubCommands { private final Class classOfMessage; - private final Redis ds; + private final Redis client; + private final ReactiveRedisDataSourceImpl datasource; public ReactivePubSubCommandsImpl(ReactiveRedisDataSourceImpl ds, Class classOfMessage) { super(ds, new Marshaller(classOfMessage)); - this.ds = ds.redis; + this.client = ds.redis; + this.datasource = ds; this.classOfMessage = classOfMessage; } @@ -67,7 +71,7 @@ public Uni subscribeToPatterns(List patterns, C } } - return ds.connect() + return client.connect() .chain(conn -> { RedisAPI api = RedisAPI.api(conn); ReactiveRedisPatternSubscriberImpl subscriber = new ReactiveRedisPatternSubscriberImpl(conn, api, onMessage, @@ -91,7 +95,7 @@ public Uni subscribe(List channels, Consumer } } - return ds.connect() + return client.connect() .chain(conn -> { RedisAPI api = RedisAPI.api(conn); ReactiveAbstractRedisSubscriberImpl subscriber = new ReactiveAbstractRedisSubscriberImpl(conn, api, @@ -146,14 +150,13 @@ public Uni subscribe() { Uni handled = Uni.createFrom().emitter(emitter -> { connection.handler(r -> { if (r != null && r.size() > 0) { - Context context = VertxContext.getOrCreateDuplicatedContext(Vertx.currentContext()); - String command = r.get(0).toString(); - if ("subscribe".equalsIgnoreCase(command) || "psubscribe".equalsIgnoreCase(command)) { - emitter.complete(null); // Subscribed - } else if ("message".equalsIgnoreCase(command)) { - context.runOnContext(x -> onMessage.accept(marshaller.decode(classOfMessage, r.get(2)))); - } else if ("pmessage".equalsIgnoreCase(command)) { - context.runOnContext(x -> onMessage.accept(marshaller.decode(classOfMessage, r.get(3)))); + Context ctxt = Vertx.currentContext(); + if (ctxt != null) { + handleRedisEvent(emitter, r); + } else { + datasource.getVertx().runOnContext(() -> { + handleRedisEvent(emitter, r); + }); } } }); @@ -165,6 +168,18 @@ public Uni subscribe() { .replaceWith(id); } + private void handleRedisEvent(UniEmitter emitter, Response r) { + Context context = VertxContext.getOrCreateDuplicatedContext(Vertx.currentContext()); + String command = r.get(0).toString(); + if ("subscribe".equalsIgnoreCase(command) || "psubscribe".equalsIgnoreCase(command)) { + emitter.complete(null); // Subscribed + } else if ("message".equalsIgnoreCase(command)) { + context.runOnContext(x -> onMessage.accept(marshaller.decode(classOfMessage, r.get(2)))); + } else if ("pmessage".equalsIgnoreCase(command)) { + context.runOnContext(x -> onMessage.accept(marshaller.decode(classOfMessage, r.get(3)))); + } + } + public Uni closeAndUnregister(Collection collection) { if (collection.isEmpty()) { return connection.close(); diff --git a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/ReactiveRedisDataSourceImpl.java b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/ReactiveRedisDataSourceImpl.java index 11d0efee4a14e..06c8d5d016bbc 100644 --- a/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/ReactiveRedisDataSourceImpl.java +++ b/extensions/redis-client/runtime/src/main/java/io/quarkus/redis/runtime/datasource/ReactiveRedisDataSourceImpl.java @@ -24,6 +24,7 @@ import io.quarkus.redis.datasource.transactions.ReactiveTransactionalRedisDataSource; import io.quarkus.redis.datasource.transactions.TransactionResult; import io.smallrye.mutiny.Uni; +import io.vertx.mutiny.core.Vertx; import io.vertx.mutiny.redis.client.Command; import io.vertx.mutiny.redis.client.Redis; import io.vertx.mutiny.redis.client.RedisAPI; @@ -35,17 +36,22 @@ public class ReactiveRedisDataSourceImpl implements ReactiveRedisDataSource, Red final Redis redis; final RedisConnection connection; + private final Vertx vertx; - public ReactiveRedisDataSourceImpl(Redis redis, RedisAPI api) { + public ReactiveRedisDataSourceImpl(Vertx vertx, Redis redis, RedisAPI api) { nonNull(redis, "redis"); nonNull(api, "api"); + nonNull(vertx, "vertx"); + this.vertx = vertx; this.redis = redis; this.connection = null; } - public ReactiveRedisDataSourceImpl(Redis redis, RedisConnection connection) { + public ReactiveRedisDataSourceImpl(Vertx vertx, Redis redis, RedisConnection connection) { nonNull(redis, "redis"); nonNull(connection, "connection"); + nonNull(vertx, "vertx"); + this.vertx = vertx; this.redis = redis; this.connection = connection; } @@ -63,7 +69,7 @@ public Uni withTransaction(Function { - ReactiveRedisDataSourceImpl singleConnectionDS = new ReactiveRedisDataSourceImpl(redis, connection); + ReactiveRedisDataSourceImpl singleConnectionDS = new ReactiveRedisDataSourceImpl(vertx, redis, connection); TransactionHolder th = new TransactionHolder(); return connection.send(Request.cmd(Command.MULTI)) .chain(x -> function.apply(new ReactiveTransactionalRedisDataSourceImpl(singleConnectionDS, th))) @@ -87,7 +93,8 @@ public Uni withTransaction(Function { - ReactiveRedisDataSourceImpl singleConnectionDS = new ReactiveRedisDataSourceImpl(redis, connection); + ReactiveRedisDataSourceImpl singleConnectionDS = new ReactiveRedisDataSourceImpl(vertx, redis, connection); + List watched = List.of(keys); TransactionHolder th = new TransactionHolder(); return watch(connection, keys) // WATCH keys .chain(() -> connection.send(Request.cmd(Command.MULTI)) @@ -128,10 +135,10 @@ public Uni> withTransaction(Function { - ReactiveRedisDataSourceImpl singleConnectionDS = new ReactiveRedisDataSourceImpl(redis, connection); + ReactiveRedisDataSourceImpl singleConnectionDS = new ReactiveRedisDataSourceImpl(vertx, redis, connection); TransactionHolder th = new TransactionHolder(); return watch(connection, watchedKeys) // WATCH keys - .chain(x -> preTxBlock.apply(new ReactiveRedisDataSourceImpl(redis, connection)))// Execute the pre-tx-block + .chain(x -> preTxBlock.apply(new ReactiveRedisDataSourceImpl(vertx, redis, connection)))// Execute the pre-tx-block .chain(input -> connection.send(Request.cmd(Command.MULTI)) .chain(x -> tx .apply(input, new ReactiveTransactionalRedisDataSourceImpl(singleConnectionDS, th))) @@ -202,7 +209,7 @@ public Uni withConnection(Function> fun } return redis.connect() .onItem().transformToUni(connection -> { - ReactiveRedisDataSourceImpl singleConnectionDS = new ReactiveRedisDataSourceImpl(redis, connection); + ReactiveRedisDataSourceImpl singleConnectionDS = new ReactiveRedisDataSourceImpl(vertx, redis, connection); return function.apply(singleConnectionDS) .onTermination().call(connection::close); }); @@ -273,4 +280,8 @@ public ReactivePubSubCommands pubsub(Class messageType) { public Redis getRedis() { return redis; } + + public Vertx getVertx() { + return vertx; + } } diff --git a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/BitMapCommandsTest.java b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/BitMapCommandsTest.java index 3f6bd50518e12..04d46ad94caf4 100644 --- a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/BitMapCommandsTest.java +++ b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/BitMapCommandsTest.java @@ -27,7 +27,7 @@ public class BitMapCommandsTest extends DatasourceTestBase { @BeforeEach void initialize() { - ds = new BlockingRedisDataSourceImpl(redis, api, Duration.ofSeconds(1)); + ds = new BlockingRedisDataSourceImpl(vertx, redis, api, Duration.ofSeconds(1)); bitmap = ds.bitmap(); } diff --git a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/ConnectionRecyclingTest.java b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/ConnectionRecyclingTest.java index 06549b10c6a1d..8d3d98581e91c 100644 --- a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/ConnectionRecyclingTest.java +++ b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/ConnectionRecyclingTest.java @@ -12,9 +12,9 @@ public class ConnectionRecyclingTest extends DatasourceTestBase { - RedisDataSource ds = new BlockingRedisDataSourceImpl(redis, api, Duration.ofSeconds(1)); + RedisDataSource ds = new BlockingRedisDataSourceImpl(vertx, redis, api, Duration.ofSeconds(1)); - ReactiveRedisDataSource rds = new ReactiveRedisDataSourceImpl(redis, api); + ReactiveRedisDataSource rds = new ReactiveRedisDataSourceImpl(vertx, redis, api); @AfterEach public void tearDown() { diff --git a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/CustomCommandsTest.java b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/CustomCommandsTest.java index 68334b52ff361..1f6f8d25f26c9 100644 --- a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/CustomCommandsTest.java +++ b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/CustomCommandsTest.java @@ -23,7 +23,7 @@ public class CustomCommandsTest extends DatasourceTestBase { @BeforeEach void initialize() { - ds = new BlockingRedisDataSourceImpl(redis, api, Duration.ofSeconds(5)); + ds = new BlockingRedisDataSourceImpl(vertx, redis, api, Duration.ofSeconds(5)); } diff --git a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/GeoCommandsTest.java b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/GeoCommandsTest.java index 429b77aaac5a2..7924dfa0e3838 100644 --- a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/GeoCommandsTest.java +++ b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/GeoCommandsTest.java @@ -54,7 +54,7 @@ public class GeoCommandsTest extends DatasourceTestBase { @BeforeEach void initialize() { - ds = new BlockingRedisDataSourceImpl(redis, api, Duration.ofSeconds(1)); + ds = new BlockingRedisDataSourceImpl(vertx, redis, api, Duration.ofSeconds(1)); geo = ds.geo(Place.class); } diff --git a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/HashCommandsTest.java b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/HashCommandsTest.java index c46d762a6b2b6..6a556631f6c17 100644 --- a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/HashCommandsTest.java +++ b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/HashCommandsTest.java @@ -28,7 +28,7 @@ public class HashCommandsTest extends DatasourceTestBase { @BeforeEach void initialize() { - ds = new BlockingRedisDataSourceImpl(redis, api, Duration.ofSeconds(1)); + ds = new BlockingRedisDataSourceImpl(vertx, redis, api, Duration.ofSeconds(1)); hash = ds.hash(Person.class); } diff --git a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/HyperLogLogCommandsTest.java b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/HyperLogLogCommandsTest.java index c6b1f3d82183a..e32b5dead9f1b 100644 --- a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/HyperLogLogCommandsTest.java +++ b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/HyperLogLogCommandsTest.java @@ -22,7 +22,7 @@ public class HyperLogLogCommandsTest extends DatasourceTestBase { @BeforeEach void initialize() { - ds = new BlockingRedisDataSourceImpl(redis, api, Duration.ofSeconds(1)); + ds = new BlockingRedisDataSourceImpl(vertx, redis, api, Duration.ofSeconds(1)); hll = ds.hyperloglog(Person.class); } diff --git a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/KeyCommandsTest.java b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/KeyCommandsTest.java index 3dfd2848131aa..613f41a0a5754 100644 --- a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/KeyCommandsTest.java +++ b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/KeyCommandsTest.java @@ -39,7 +39,7 @@ public class KeyCommandsTest extends DatasourceTestBase { @BeforeEach void initialize() { - ds = new BlockingRedisDataSourceImpl(redis, api, Duration.ofSeconds(1)); + ds = new BlockingRedisDataSourceImpl(vertx, redis, api, Duration.ofSeconds(1)); strings = ds.string(Person.class); keys = ds.key(); diff --git a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/ListCommandTest.java b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/ListCommandTest.java index 5b918515c95b3..1bfd7a616181c 100644 --- a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/ListCommandTest.java +++ b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/ListCommandTest.java @@ -25,7 +25,7 @@ public class ListCommandTest extends DatasourceTestBase { @BeforeEach void initialize() { - ds = new BlockingRedisDataSourceImpl(redis, api, Duration.ofSeconds(5)); + ds = new BlockingRedisDataSourceImpl(vertx, redis, api, Duration.ofSeconds(5)); lists = ds.list(Person.class); } diff --git a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/NumericCommandsTest.java b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/NumericCommandsTest.java index 1f1ccd38b821c..11ed36f9b6e2e 100644 --- a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/NumericCommandsTest.java +++ b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/NumericCommandsTest.java @@ -21,7 +21,7 @@ public class NumericCommandsTest extends DatasourceTestBase { @BeforeEach void initialize() { - ds = new BlockingRedisDataSourceImpl(redis, api, Duration.ofSeconds(5)); + ds = new BlockingRedisDataSourceImpl(vertx, redis, api, Duration.ofSeconds(5)); num = ds.string(Long.class); } diff --git a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/OptimisticLockingTest.java b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/OptimisticLockingTest.java index c4a28a4aff81e..80597d1574816 100644 --- a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/OptimisticLockingTest.java +++ b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/OptimisticLockingTest.java @@ -23,8 +23,8 @@ public class OptimisticLockingTest extends DatasourceTestBase { @BeforeEach void initialize() { - blocking = new BlockingRedisDataSourceImpl(redis, api, Duration.ofSeconds(60)); - reactive = new ReactiveRedisDataSourceImpl(redis, api); + blocking = new BlockingRedisDataSourceImpl(vertx, redis, api, Duration.ofSeconds(60)); + reactive = new ReactiveRedisDataSourceImpl(vertx, redis, api); } @AfterEach diff --git a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/PubSubCommandsTest.java b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/PubSubCommandsTest.java index 46dd18d671a57..9aa1d65e44abb 100644 --- a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/PubSubCommandsTest.java +++ b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/PubSubCommandsTest.java @@ -31,10 +31,10 @@ public class PubSubCommandsTest extends DatasourceTestBase { @BeforeEach void initialize() { - ds = new BlockingRedisDataSourceImpl(redis, api, Duration.ofSeconds(5)); + ds = new BlockingRedisDataSourceImpl(vertx, redis, api, Duration.ofSeconds(5)); pubsub = ds.pubsub(Person.class); - ReactiveRedisDataSourceImpl reactiveDS = new ReactiveRedisDataSourceImpl(redis, api); + ReactiveRedisDataSourceImpl reactiveDS = new ReactiveRedisDataSourceImpl(vertx, redis, api); reactive = reactiveDS.pubsub(Person.class); } diff --git a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/PubSubTest.java b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/PubSubTest.java index 365fade2e3359..2ec46f52d8308 100644 --- a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/PubSubTest.java +++ b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/PubSubTest.java @@ -25,7 +25,7 @@ public class PubSubTest extends DatasourceTestBase { @BeforeEach void initialize() { - ds = new ReactiveRedisDataSourceImpl(redis, api); + ds = new ReactiveRedisDataSourceImpl(vertx, redis, api); ps = new ReactivePubSubCommandsImpl<>(ds, Person.class); } diff --git a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/SetCommandsTest.java b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/SetCommandsTest.java index 058d2eaeb7fa6..021c697c902fe 100644 --- a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/SetCommandsTest.java +++ b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/SetCommandsTest.java @@ -31,7 +31,7 @@ public class SetCommandsTest extends DatasourceTestBase { @BeforeEach void initialize() { - ds = new BlockingRedisDataSourceImpl(redis, api, Duration.ofSeconds(1)); + ds = new BlockingRedisDataSourceImpl(vertx, redis, api, Duration.ofSeconds(1)); sets = ds.set(Person.class); } diff --git a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/SortCommandsTest.java b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/SortCommandsTest.java index 1d2d56f81ed45..439e9dc9dc1a9 100644 --- a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/SortCommandsTest.java +++ b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/SortCommandsTest.java @@ -23,7 +23,7 @@ public class SortCommandsTest extends DatasourceTestBase { @BeforeEach void initialize() { - ds = new BlockingRedisDataSourceImpl(redis, api, Duration.ofSeconds(5)); + ds = new BlockingRedisDataSourceImpl(vertx, redis, api, Duration.ofSeconds(5)); lists = ds.list(String.class); strings = ds.string(String.class); diff --git a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/SortedSetCommandsTest.java b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/SortedSetCommandsTest.java index 107f89d621056..9e6d105c80da8 100644 --- a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/SortedSetCommandsTest.java +++ b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/SortedSetCommandsTest.java @@ -39,7 +39,7 @@ public class SortedSetCommandsTest extends DatasourceTestBase { @BeforeEach void initialize() { - ds = new BlockingRedisDataSourceImpl(redis, api, Duration.ofSeconds(2)); + ds = new BlockingRedisDataSourceImpl(vertx, redis, api, Duration.ofSeconds(2)); setOfPlaces = ds.sortedSet(Place.class); setOfStrings = ds.sortedSet(String.class); diff --git a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/StringCommandsTest.java b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/StringCommandsTest.java index 1fef39756a35b..e5a0a1572984a 100644 --- a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/StringCommandsTest.java +++ b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/StringCommandsTest.java @@ -30,7 +30,7 @@ public class StringCommandsTest extends DatasourceTestBase { @BeforeEach void initialize() { - ds = new BlockingRedisDataSourceImpl(redis, api, Duration.ofSeconds(1)); + ds = new BlockingRedisDataSourceImpl(vertx, redis, api, Duration.ofSeconds(1)); strings = ds.string(String.class); } diff --git a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/TransactionalBitMapCommandsTest.java b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/TransactionalBitMapCommandsTest.java index 3ac3708e2c9d4..968e46c33141c 100644 --- a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/TransactionalBitMapCommandsTest.java +++ b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/TransactionalBitMapCommandsTest.java @@ -24,8 +24,8 @@ public class TransactionalBitMapCommandsTest extends DatasourceTestBase { @BeforeEach void initialize() { - blocking = new BlockingRedisDataSourceImpl(redis, api, Duration.ofSeconds(60)); - reactive = new ReactiveRedisDataSourceImpl(redis, api); + blocking = new BlockingRedisDataSourceImpl(vertx, redis, api, Duration.ofSeconds(60)); + reactive = new ReactiveRedisDataSourceImpl(vertx, redis, api); } @AfterEach diff --git a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/TransactionalGeoCommandsTest.java b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/TransactionalGeoCommandsTest.java index 0dccac1bc97d4..00b411bc080f5 100644 --- a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/TransactionalGeoCommandsTest.java +++ b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/TransactionalGeoCommandsTest.java @@ -27,8 +27,8 @@ public class TransactionalGeoCommandsTest extends DatasourceTestBase { @BeforeEach void initialize() { - blocking = new BlockingRedisDataSourceImpl(redis, api, Duration.ofSeconds(60)); - reactive = new ReactiveRedisDataSourceImpl(redis, api); + blocking = new BlockingRedisDataSourceImpl(vertx, redis, api, Duration.ofSeconds(60)); + reactive = new ReactiveRedisDataSourceImpl(vertx, redis, api); } @AfterEach diff --git a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/TransactionalHashCommandsTest.java b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/TransactionalHashCommandsTest.java index 539aa746363f6..dfc0ca93c404a 100644 --- a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/TransactionalHashCommandsTest.java +++ b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/TransactionalHashCommandsTest.java @@ -23,8 +23,8 @@ public class TransactionalHashCommandsTest extends DatasourceTestBase { @BeforeEach void initialize() { - blocking = new BlockingRedisDataSourceImpl(redis, api, Duration.ofSeconds(60)); - reactive = new ReactiveRedisDataSourceImpl(redis, api); + blocking = new BlockingRedisDataSourceImpl(vertx, redis, api, Duration.ofSeconds(60)); + reactive = new ReactiveRedisDataSourceImpl(vertx, redis, api); } @AfterEach diff --git a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/TransactionalHyperLogLogCommandsTest.java b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/TransactionalHyperLogLogCommandsTest.java index 0ff8b5fbb9a34..0e35aacaacdf5 100644 --- a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/TransactionalHyperLogLogCommandsTest.java +++ b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/TransactionalHyperLogLogCommandsTest.java @@ -21,8 +21,8 @@ public class TransactionalHyperLogLogCommandsTest extends DatasourceTestBase { @BeforeEach void initialize() { - blocking = new BlockingRedisDataSourceImpl(redis, api, Duration.ofSeconds(60)); - reactive = new ReactiveRedisDataSourceImpl(redis, api); + blocking = new BlockingRedisDataSourceImpl(vertx, redis, api, Duration.ofSeconds(60)); + reactive = new ReactiveRedisDataSourceImpl(vertx, redis, api); } @AfterEach diff --git a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/TransactionalKeyTest.java b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/TransactionalKeyTest.java index 6b3ebeb9079d2..49e24cde384b7 100644 --- a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/TransactionalKeyTest.java +++ b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/TransactionalKeyTest.java @@ -25,8 +25,8 @@ public class TransactionalKeyTest extends DatasourceTestBase { @BeforeEach void initialize() { - blocking = new BlockingRedisDataSourceImpl(redis, api, Duration.ofSeconds(60)); - reactive = new ReactiveRedisDataSourceImpl(redis, api); + blocking = new BlockingRedisDataSourceImpl(vertx, redis, api, Duration.ofSeconds(60)); + reactive = new ReactiveRedisDataSourceImpl(vertx, redis, api); } @AfterEach diff --git a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/TransactionalListCommandsTest.java b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/TransactionalListCommandsTest.java index 785dba59faaba..4e6859858b46e 100644 --- a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/TransactionalListCommandsTest.java +++ b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/TransactionalListCommandsTest.java @@ -21,8 +21,8 @@ public class TransactionalListCommandsTest extends DatasourceTestBase { @BeforeEach void initialize() { - blocking = new BlockingRedisDataSourceImpl(redis, api, Duration.ofSeconds(60)); - reactive = new ReactiveRedisDataSourceImpl(redis, api); + blocking = new BlockingRedisDataSourceImpl(vertx, redis, api, Duration.ofSeconds(60)); + reactive = new ReactiveRedisDataSourceImpl(vertx, redis, api); } @AfterEach diff --git a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/TransactionalSetCommandsTest.java b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/TransactionalSetCommandsTest.java index 2012265dff121..7cf6dd4348624 100644 --- a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/TransactionalSetCommandsTest.java +++ b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/TransactionalSetCommandsTest.java @@ -21,8 +21,8 @@ public class TransactionalSetCommandsTest extends DatasourceTestBase { @BeforeEach void initialize() { - blocking = new BlockingRedisDataSourceImpl(redis, api, Duration.ofSeconds(60)); - reactive = new ReactiveRedisDataSourceImpl(redis, api); + blocking = new BlockingRedisDataSourceImpl(vertx, redis, api, Duration.ofSeconds(60)); + reactive = new ReactiveRedisDataSourceImpl(vertx, redis, api); } @AfterEach diff --git a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/TransactionalSortedSetCommandsTest.java b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/TransactionalSortedSetCommandsTest.java index c35fee02637a9..b1adc85cda942 100644 --- a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/TransactionalSortedSetCommandsTest.java +++ b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/TransactionalSortedSetCommandsTest.java @@ -24,8 +24,8 @@ public class TransactionalSortedSetCommandsTest extends DatasourceTestBase { @BeforeEach void initialize() { - blocking = new BlockingRedisDataSourceImpl(redis, api, Duration.ofSeconds(60)); - reactive = new ReactiveRedisDataSourceImpl(redis, api); + blocking = new BlockingRedisDataSourceImpl(vertx, redis, api, Duration.ofSeconds(60)); + reactive = new ReactiveRedisDataSourceImpl(vertx, redis, api); } @AfterEach diff --git a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/TransactionalStringCommandsTest.java b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/TransactionalStringCommandsTest.java index 407eb71232392..947f174a2c6fe 100644 --- a/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/TransactionalStringCommandsTest.java +++ b/extensions/redis-client/runtime/src/test/java/io/quarkus/redis/datasource/TransactionalStringCommandsTest.java @@ -22,8 +22,8 @@ public class TransactionalStringCommandsTest extends DatasourceTestBase { @BeforeEach void initialize() { - blocking = new BlockingRedisDataSourceImpl(redis, api, Duration.ofSeconds(60)); - reactive = new ReactiveRedisDataSourceImpl(redis, api); + blocking = new BlockingRedisDataSourceImpl(vertx, redis, api, Duration.ofSeconds(60)); + reactive = new ReactiveRedisDataSourceImpl(vertx, redis, api); } @AfterEach