diff --git a/bom/application/pom.xml b/bom/application/pom.xml
index a0d13d221a243..c9bbbfbaeda62 100644
--- a/bom/application/pom.xml
+++ b/bom/application/pom.xml
@@ -51,7 +51,7 @@
1.0.132.7.02.21.0
- 3.15.0
+ 3.16.01.1.01.2.11.3.5
diff --git a/docs/src/main/asciidoc/kafka-dev-services.adoc b/docs/src/main/asciidoc/kafka-dev-services.adoc
index 7082f8629bfd9..2bea2abbfe50c 100644
--- a/docs/src/main/asciidoc/kafka-dev-services.adoc
+++ b/docs/src/main/asciidoc/kafka-dev-services.adoc
@@ -46,6 +46,7 @@ You can set the port by configuring the `quarkus.kafka.devservices.port` propert
Note that the Kafka advertised address is automatically configured with the chosen port.
+[[configuring-the-image]]
== Configuring the image
Dev Services for Kafka supports https://redpanda.com[Redpanda] and https://strimzi.io[Strimzi] (in https://github.com/apache/kafka/blob/trunk/config/kraft/README.md[Kraft] mode).
@@ -83,6 +84,7 @@ without trying to re-partition the existing topic to a different number of parti
You can configure timeout for Kafka admin client calls used in topic creation using `quarkus.kafka.devservices.topic-partitions-timeout`, it defaults to 2 seconds.
+[[redpanda-enabling-transactions]]
== Enabling transactions
By default, the Red Panda broker does not act as a transaction coordinator.
diff --git a/docs/src/main/asciidoc/kafka.adoc b/docs/src/main/asciidoc/kafka.adoc
index 00270c48de843..daf5cfb45c6c6 100644
--- a/docs/src/main/asciidoc/kafka.adoc
+++ b/docs/src/main/asciidoc/kafka.adoc
@@ -1052,6 +1052,80 @@ Reciprocally, multiple producers on the same channel can be merged by setting `m
On the `@Incoming` methods, you can control how multiple channels are merged using the `@Merge` annotation.
====
+=== Kafka Transactions
+
+Kafka transactions enable atomic writes to multiple Kafka topics and partitions.
+The Kafka connector provides `KafkaTransactions` custom emitter for writing Kafka records inside a transaction.
+It can be injected as a regular emitter `@Channel`:
+
+[source, java]
+----
+import javax.enterprise.context.ApplicationScoped;
+
+import org.eclipse.microprofile.reactive.messaging.Channel;
+
+import io.smallrye.mutiny.Uni;
+import io.smallrye.reactive.messaging.kafka.KafkaRecord;
+import io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions;
+
+@ApplicationScoped
+public class KafkaTransactionalProducer {
+
+ @Channel("tx-out-example")
+ KafkaTransactions txProducer;
+
+ public Uni emitInTransaction() {
+ return txProducer.withTransaction(emitter -> {
+ emitter.send(KafkaRecord.of(1, "a"));
+ emitter.send(KafkaRecord.of(2, "b"));
+ emitter.send(KafkaRecord.of(3, "c"));
+ return Uni.createFrom().voidItem();
+ });
+ }
+
+}
+----
+
+The function given to the `withTransaction` method receives a `TransactionalEmitter` for producing records, and returns a `Uni` that provides the result of the transaction.
+
+* If the processing completes successfully, the producer is flushed and the transaction is committed.
+* If the processing throws an exception, returns a failing `Uni`, or marks the `TransactionalEmitter` for abort, the transaction is aborted.
+
+Kafka transactional producers require configuring `acks=all` client property, and a unique id for `transactional.id`, which implies `enable.idempotence=true`.
+When Quarkus detects the use of `KafkaTransactions` for an outgoing channel it configures these properties on the channel,
+providing a default value of `"${quarkus.application.name}-${channelName}"` for `transactional.id` property.
+
+Note that for production use the `transactional.id` must be unique across all application instances.
+
+
+[IMPORTANT]
+====
+While a normal message emitter would support concurrent calls to `send` methods and consequently queues outgoing messages to be written to Kafka,
+a `KafkaTransactions` emitter only supports one transaction at a time.
+A transaction is considered in progress from the call to the `withTransaction` until the returned `Uni` results in success or failure.
+While a transaction is in progress, subsequent calls to the `withTransaction`, including nested ones inside the given function, will throw `IllegalStateException`.
+
+Note that in Reactive Messaging, the execution of processing methods, is already serialized, unless `@Blocking(ordered = false)` is used.
+If `withTransaction` can be called concurrently, for example from a REST endpoint, it is recommended to limit the concurrency of the execution.
+This can be done using the `@Bulkhead` annotation from link:https://quarkus.io/guides/smallrye-fault-tolerance[_Microprofile Fault Tolerance_].
+
+An example usage can be found in <>.
+====
+
+==== Transaction-aware consumers
+
+If you'd like to consume records only written and committed inside a Kafka transaction you need to configure the `isolation.level` property on the incoming channel as such:
+
+[source, properties]
+----
+mp.messaging.incoming.prices-in.isolation.level=read_committed
+----
+
+[NOTE]
+====
+If you are using Dev Services for Kafka using Redpanda, you need to <>.
+====
+
== Processing Messages
Applications streaming data often need to consume some events from a topic, process them and publish the result to a different topic.
@@ -1116,6 +1190,82 @@ record key propagation produces the outgoing record with the same _key_ as the i
If the outgoing record already contains a _key_, it *won't be overridden* by the incoming record key.
If the incoming record does have a _null_ key, the `mp.messaging.outgoing.$channel.key` property is used.
+=== Exactly-Once Processing
+
+Kafka Transactions allows managing consumer offsets inside a transaction, together with produced messages.
+This enables coupling a consumer with a transactional producer in a _consume-transform-produce_ pattern, also known as *exactly-once processing*.
+
+The `KafkaTransactions` custom emitter provides a way to apply exactly-once processing to an incoming Kafka message inside a transaction.
+
+The following example includes a batch of Kafka records inside a transaction.
+
+[source, java]
+----
+import javax.enterprise.context.ApplicationScoped;
+
+import org.eclipse.microprofile.reactive.messaging.Channel;
+import org.eclipse.microprofile.reactive.messaging.Incoming;
+import org.eclipse.microprofile.reactive.messaging.OnOverflow;
+
+import io.smallrye.mutiny.Uni;
+import io.smallrye.reactive.messaging.kafka.KafkaRecord;
+import io.smallrye.reactive.messaging.kafka.KafkaRecordBatch;
+import io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions;
+
+@ApplicationScoped
+public class KafkaExactlyOnceProcessor {
+
+ @Channel("prices-out")
+ @OnOverflow(value = OnOverflow.Strategy.BUFFER, bufferSize = 500) // <3>
+ KafkaTransactions txProducer;
+
+ @Incoming("prices-in")
+ public Uni emitInTransaction(KafkaRecordBatch batch) { // <1>
+ return txProducer.withTransactionAndAck(batch, emitter -> { // <2>
+ for (KafkaRecord record : batch) {
+ emitter.send(KafkaRecord.of(record.getKey(), record.getPayload() + 1)); // <3>
+ }
+ return Uni.createFrom().voidItem();
+ });
+ }
+
+}
+----
+
+<1> It is recommended to use exactly-once processing along with the batch consumption mode.
+While it is possible to use it with a single Kafka message, it'll have a significant performance impact.
+<2> The consumed `KafkaRecordBatch` message is passed to the `KafkaTransactions#withTransactionAndAck` in order to handle the offset commits and message acks.
+<3> The `send` method writes records to Kafka inside the transaction, without waiting for send receipt from the broker.
+Messages pending to be written to Kafka will be buffered, and flushed before committing the transaction.
+It is therefore recommended configuring the `@OnOverflow` `bufferSize` in order to fit enough messages, for example the `max.poll.records`, maximum amount of records returned in a batch.
+
+- If the processing completes successfully, _before committing the transaction_, the topic partition offsets of the given batch message will be committed to the transaction.
+- If the processing needs to abort, _after aborting the transaction_, the consumer's position is reset to the last committed offset, effectively resuming the consumption from that offset. If no consumer offset has been committed to a topic-partition, the consumer's position is reset to the beginning of the topic-partition, _even if the offset reset policy is `latest`_.
+
+When using exactly-once processing, consumed message offset commits are handled by the transaction and therefore the application should not commit offsets through other means.
+The consumer should have `enable.auto.commit=false` (the default) and set explicitly `commit-strategy=ignore`:
+
+[source, properties]
+----
+mp.messaging.incoming.prices-in.commit-strategy=ignore
+mp.messaging.incoming.prices-in.failure-strategy=ignore
+----
+
+==== Error handling for the exactly-once processing
+
+The `Uni` returned from the `KafkaTransactions#withTransaction` will yield a failure if the transaction fails and is aborted.
+The application can choose to handle the error case, but if a failing `Uni` is returned from the `@Incoming` method, the incoming channel will effectively fail and stop the reactive stream.
+
+The `KafkaTransactions#withTransactionAndAck` method acks and nacks the message but will *not* return a failing `Uni`.
+Nacked messages will be handled by the failure strategy of the incoming channel, (see <>).
+Configuring `failure-strategy=ignore` simply resets the Kafka consumer to the last committed offsets and resumes the consumption from there.
+
+[NOTE]
+====
+Redpanda does not yet support link:https://github.com/redpanda-data/redpanda/issues/3279[producer scalability for exactly-once processing].
+In order to use Kafka exactly-once processing with Quarkus you can configure Dev Services for Kafka to <>.
+====
+
[[kafka-bare-clients]]
== Accessing Kafka clients directly
@@ -2386,6 +2536,118 @@ private String toJson(Fruit f) {
The workaround is a bit more complex as besides sending the fruits coming from Kafka, we need to send pings periodically.
To achieve this we merge the stream coming from Kafka and a periodic stream emitting `{}` every 10 seconds.
+[[chaining-kafka-transactions-with-hibernate-reactive-transactions]]
+=== Chaining Kafka Transactions with Hibernate Reactive transactions
+
+By chaining a Kafka transaction with a Hibernate Reactive transaction you can send records to a Kafka transaction,
+perform database updates and commit the Kafka transaction only if the database transaction is successful.
+
+The following example demonstrates:
+
+* Receive a payload by serving HTTP requests using RESTEasy Reactive,
+* Limit concurrency of that HTTP endpoint using Smallrye Fault Tolerance,
+* Start a Kafka transaction and send the payload to Kafka record,
+* Store the payload in the database using Hibernate Reactive with Panache,
+* Commit the Kafka transaction only if the entity is persisted successfully.
+
+[source, java]
+----
+package org.acme;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.core.MediaType;
+
+import org.eclipse.microprofile.faulttolerance.Bulkhead;
+import org.eclipse.microprofile.reactive.messaging.Channel;
+import org.hibernate.reactive.mutiny.Mutiny;
+
+import io.quarkus.hibernate.reactive.panache.Panache;
+import io.smallrye.mutiny.Uni;
+import io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions;
+
+@Path("/")
+public class FruitProducer {
+
+ @Channel("kafka") KafkaTransactions kafkaTx; // <1>
+
+ @POST
+ @Path("/fruits")
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Bulkhead(1) // <2>
+ public Uni post(Fruit fruit) { // <3>
+ return kafkaTx.withTransaction(emitter -> { // <4>
+ emitter.send(fruit); // <5>
+ return Panache.withTransaction(() -> { // <6>
+ return fruit.persist(); // <7>
+ });
+ }).replaceWithVoid();
+ }
+}
+
+----
+<1> Inject a `KafkaTransactions` which exposes a Mutiny API. It allows the integration with the Mutiny API exposed by Hibernate Reactive with Panache.
+<2> Limit the concurrency of the HTTP endpoint to "1", preventing starting multiple transactions at a given time.
+<3> The HTTP method receiving the payload returns a `Uni`. The HTTP response is written when the operation completes (the entity is persisted and Kafka transaction is committed).
+<4> Begin a Kafka transaction.
+<5> Send the payload to Kafka inside the Kafka transaction.
+<6> Persist the entity into the database in a Hibernate Reactive transaction.
+<7> Once the persist operation completes, and there is no errors, the Kafka transaction is committed.
+The result is omitted and returned as the HTTP response.
+
+In the previous example the database transaction (inner) will commit followed by the Kafka transaction (outer).
+If you wish to commit the Kafka transaction first and the database transaction second, you need to nest them in the reverse order.
+
+The next example demostrates that using the Hibernate Reactive API (without Panache):
+
+[source, java]
+----
+import javax.inject.Inject;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.core.MediaType;
+
+import org.eclipse.microprofile.faulttolerance.Bulkhead;
+import org.eclipse.microprofile.reactive.messaging.Channel;
+import org.hibernate.reactive.mutiny.Mutiny;
+
+import io.smallrye.mutiny.Uni;
+import io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions;
+import io.vertx.mutiny.core.Context;
+import io.vertx.mutiny.core.Vertx;
+
+@Path("/")
+public class FruitProducer {
+
+ @Channel("kafka") KafkaTransactions kafkaTx;
+
+ @Inject Mutiny.SessionFactory sf; // <1>
+
+ @POST
+ @Path("/fruits")
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Bulkhead(1)
+ public Uni post(Fruit fruit) {
+ Context context = Vertx.currentContext(); // <2>
+ return sf.withTransaction(session -> // <3>
+ kafkaTx.withTransaction(emitter -> // <4>
+ session.persist(fruit).invoke(() -> emitter.send(fruit)) // <5>
+ ).emitOn(context::runOnContext) // <6>
+ );
+ }
+}
+----
+
+<1> Inject the Hibernate Reactive `SessionFactory`.
+<2> Capture the caller Vert.x context.
+<3> Begin a Hibernate Reactive transaction.
+<4> Begin a Kafka transaction.
+<5> Persist the payload and send the entity to Kafka.
+<6> The Kafka transaction terminates on the Kafka producer sender thread.
+We need to switch to the Vert.x context previously captured in order to terminate the Hibernate Reactive transaction on the same context we started it.
+
== Logging
To reduce the amount of log written by the Kafka client, Quarkus sets the level of the following log categories to `WARNING`:
@@ -2500,6 +2762,47 @@ To allow your Quarkus application to use that secret, add the following line to
%prod.quarkus.openshift.env.secrets=kafka-credentials
----
+==== Red Hat OpenShift Service Registry
+
+https://www.redhat.com/en/technologies/cloud-computing/openshift/openshift-service-registry[Red Hat OpenShift Service Registry]
+provides fully managed service registry for handling Kafka schemas.
+
+You can follow the instructions from
+https://access.redhat.com/documentation/en-us/red_hat_openshift_service_registry/1/guide/ab1894d1-cae0-4d11-b185-81d62b4aabc7#_60472331-fa00-48ec-a621-bbd039500c7d[Getting started with Red Hat OpenShift Service Registry],
+or use the `rhoas` CLI to create a new service registry instance:
+
+[source, shell]
+----
+rhoas service-registry create --name my-schema-registry
+----
+
+Make sure to note the _Registry URL_ of the instance created.
+For authentication, you can use the same _ServiceAccount_ you created previously.
+You need to make sure that it has the necessary permissions to access the service registry.
+
+For example, using the `rhoas` CLI, you can grant the `MANAGER` role to the service account:
+
+[source, shell]
+----
+rhoas service-registry role add --role manager --service-account [SERVICE_ACCOUNT_CLIENT_ID]
+----
+
+Then, you can configure the Quarkus application to connect to the schema registry as follows:
+
+[source, properties]
+----
+mp.messaging.connector.smallrye-kafka.apicurio.registry.url=${RHOAS_SERVICE_REGISTRY_URL} <1>
+mp.messaging.connector.smallrye-kafka.apicurio.auth.service.token.endpoint=${RHOAS_OAUTH_TOKEN_ENDPOINT} <2>
+mp.messaging.connector.smallrye-kafka.apicurio.auth.client.id=${RHOAS_CLIENT_ID} <3>
+mp.messaging.connector.smallrye-kafka.apicurio.auth.client.secret=${RHOAS_CLIENT_ID} <4>
+----
+<1> The service registry URL, given on the admin console, such as `https://bu98.serviceregistry.rhcloud.com/t/0e95af2c-6e11-475e-82ee-f13bd782df24/apis/registry/v2`
+<2> The OAuth token endpoint URL, such as `https://identity.api.openshift.com/auth/realms/rhoas/protocol/openid-connect/token`
+<3> The client id (from the service account)
+<4> The client secret (from the service account)
+
+You will need to include additional configuration
+
== Going further
This guide has shown how you can interact with Kafka using Quarkus.
diff --git a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DotNames.java b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DotNames.java
index 8501d99eedcfb..66366a7095eb7 100644
--- a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DotNames.java
+++ b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DotNames.java
@@ -10,6 +10,7 @@ final class DotNames {
static final DotName EMITTER = DotName.createSimple(org.eclipse.microprofile.reactive.messaging.Emitter.class.getName());
static final DotName MUTINY_EMITTER = DotName.createSimple(io.smallrye.reactive.messaging.MutinyEmitter.class.getName());
+ static final DotName KAFKA_EMITTER = DotName.createSimple(io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions.class.getName());
static final DotName MESSAGE = DotName.createSimple(org.eclipse.microprofile.reactive.messaging.Message.class.getName());
static final DotName KAFKA_RECORD = DotName.createSimple(io.smallrye.reactive.messaging.kafka.KafkaRecord.class.getName());
diff --git a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java
index b6fa969d50584..2d4081420b350 100644
--- a/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java
+++ b/extensions/smallrye-reactive-messaging-kafka/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/SmallRyeReactiveMessagingKafkaProcessor.java
@@ -167,6 +167,8 @@ void discoverDefaultSerdeConfig(DefaultSerdeDiscoveryState discovery,
processIncomingType(discovery, config, incomingType, channelName, generatedClass, reflection,
alreadyGeneratedDeserializers);
+ processKafkaTransactions(discovery, config, channelName, injectionPointType);
+
Type outgoingType = getOutgoingTypeFromChannelInjectionPoint(injectionPointType);
processOutgoingType(discovery, outgoingType, (keySerializer, valueSerializer) -> {
produceRuntimeConfigurationDefaultBuildItem(discovery, config,
@@ -180,6 +182,23 @@ void discoverDefaultSerdeConfig(DefaultSerdeDiscoveryState discovery,
}
}
+ private void processKafkaTransactions(DefaultSerdeDiscoveryState discovery,
+ BuildProducer config, String channelName, Type injectionPointType) {
+ if (injectionPointType != null && isKafkaEmitter(injectionPointType)) {
+ LOGGER.infof("Transactional producer detected for channel '%s', setting following default config values: "
+ + "'mp.messaging.outgoing.%s.transactional.id=${quarkus.application.name}-${channelName}', "
+ + "'mp.messaging.outgoing.%s.enable.idempotence=true', "
+ + "'mp.messaging.outgoing.%s.acks=all'", channelName, channelName, channelName, channelName);
+ produceRuntimeConfigurationDefaultBuildItem(discovery, config,
+ "mp.messaging.outgoing." + channelName + ".transactional.id",
+ "${quarkus.application.name}-" + channelName);
+ produceRuntimeConfigurationDefaultBuildItem(discovery, config,
+ "mp.messaging.outgoing." + channelName + ".enable.idempotence", "true");
+ produceRuntimeConfigurationDefaultBuildItem(discovery, config,
+ "mp.messaging.outgoing." + channelName + ".acks", "all");
+ }
+ }
+
private void processIncomingType(DefaultSerdeDiscoveryState discovery,
BuildProducer config, Type incomingType, String channelName,
BuildProducer generatedClass, BuildProducer reflection,
@@ -352,7 +371,7 @@ private Type getOutgoingTypeFromChannelInjectionPoint(Type injectionPointType) {
return null;
}
- if (isEmitter(injectionPointType) || isMutinyEmitter(injectionPointType)) {
+ if (isEmitter(injectionPointType) || isMutinyEmitter(injectionPointType) || isKafkaEmitter(injectionPointType)) {
return injectionPointType.asParameterizedType().arguments().get(0);
} else {
return null;
@@ -482,6 +501,13 @@ private static boolean isMutinyEmitter(Type type) {
&& type.asParameterizedType().arguments().size() == 1;
}
+ private static boolean isKafkaEmitter(Type type) {
+ // raw type KafkaTransactions is wrong, must be KafkaTransactions
+ return DotNames.KAFKA_EMITTER.equals(type.name())
+ && type.kind() == Type.Kind.PARAMETERIZED_TYPE
+ && type.asParameterizedType().arguments().size() == 1;
+ }
+
// ---
private static boolean isMessage(Type type) {
@@ -627,11 +653,11 @@ private Result deserializerFor(DefaultSerdeDiscoveryState discovery, Type type,
if (clazz == null) {
clazz = JacksonSerdeGenerator.generateDeserializer(generatedClass, type);
LOGGER.infof("Generating Jackson deserializer for type %s", type.name().toString());
- result = Result.of(clazz);
// Deserializers are access by reflection.
reflection.produce(new ReflectiveClassBuildItem(true, true, false, clazz));
alreadyGeneratedSerializers.put(type.toString(), clazz);
}
+ result = Result.of(clazz);
}
return result;
}
@@ -653,11 +679,11 @@ private Result serializerFor(DefaultSerdeDiscoveryState discovery, Type type,
if (clazz == null) {
clazz = JacksonSerdeGenerator.generateSerializer(generatedClass, type);
LOGGER.infof("Generating Jackson serializer for type %s", type.name().toString());
- result = Result.of(clazz);
// Serializers are access by reflection.
reflection.produce(new ReflectiveClassBuildItem(true, true, false, clazz));
alreadyGeneratedSerializers.put(type.toString(), clazz);
}
+ result = Result.of(clazz);
}
return result;
diff --git a/extensions/smallrye-reactive-messaging-kafka/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeConfigTest.java b/extensions/smallrye-reactive-messaging-kafka/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeConfigTest.java
index 3f21351a981f6..0b1c82670c545 100644
--- a/extensions/smallrye-reactive-messaging-kafka/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeConfigTest.java
+++ b/extensions/smallrye-reactive-messaging-kafka/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/kafka/deployment/DefaultSerdeConfigTest.java
@@ -51,6 +51,7 @@
import io.smallrye.reactive.messaging.kafka.KafkaRecord;
import io.smallrye.reactive.messaging.kafka.KafkaRecordBatch;
import io.smallrye.reactive.messaging.kafka.Record;
+import io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;
@@ -2675,4 +2676,24 @@ void method3(CustomDto payload) {
}
}
+ @Test
+ void kafkaTransactions() {
+ // @formatter:off
+ Tuple[] expectations = {
+ tuple("mp.messaging.outgoing.tx.value.serializer", "org.apache.kafka.common.serialization.StringSerializer"),
+ tuple("mp.messaging.outgoing.tx.transactional.id", "${quarkus.application.name}-tx"),
+ tuple("mp.messaging.outgoing.tx.enable.idempotence", "true"),
+ tuple("mp.messaging.outgoing.tx.acks", "all"),
+ };
+ doTest(expectations, TransactionalProducer.class);
+
+ }
+
+ private static class TransactionalProducer {
+
+ @Channel("tx")
+ KafkaTransactions kafkaTransactions;
+
+ }
+
}
diff --git a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/ReactiveMessagingDotNames.java b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/ReactiveMessagingDotNames.java
index 6634a6b2bc2bb..19c664f2c09be 100644
--- a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/ReactiveMessagingDotNames.java
+++ b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/ReactiveMessagingDotNames.java
@@ -17,6 +17,7 @@
import io.smallrye.reactive.messaging.annotations.ConnectorAttribute;
import io.smallrye.reactive.messaging.annotations.ConnectorAttributes;
import io.smallrye.reactive.messaging.annotations.Emitter;
+import io.smallrye.reactive.messaging.annotations.EmitterFactoryFor;
import io.smallrye.reactive.messaging.annotations.Incomings;
import io.smallrye.reactive.messaging.annotations.Merge;
import io.smallrye.reactive.messaging.annotations.OnOverflow;
@@ -49,6 +50,7 @@ public final class ReactiveMessagingDotNames {
static final DotName ACKNOWLEDGMENT = DotName.createSimple(Acknowledgment.class.getName());
static final DotName MERGE = DotName.createSimple(Merge.class.getName());
static final DotName BROADCAST = DotName.createSimple(Broadcast.class.getName());
+ static final DotName EMITTER_FACTORY_FOR = DotName.createSimple(EmitterFactoryFor.class.getName());
static final DotName INCOMING_CONNECTOR_FACTORY = DotName.createSimple(IncomingConnectorFactory.class.getName());
static final DotName OUTGOING_CONNECTOR_FACTORY = DotName.createSimple(OutgoingConnectorFactory.class.getName());
diff --git a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java
index b9ca570ca90e5..87e9596456d51 100644
--- a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java
+++ b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/SmallRyeReactiveMessagingProcessor.java
@@ -74,13 +74,13 @@
import io.quarkus.smallrye.reactivemessaging.runtime.WorkerConfiguration;
import io.quarkus.smallrye.reactivemessaging.runtime.devmode.DevModeSupportConnectorFactory;
import io.quarkus.smallrye.reactivemessaging.runtime.devmode.DevModeSupportConnectorFactoryInterceptor;
+import io.smallrye.reactive.messaging.EmitterConfiguration;
import io.smallrye.reactive.messaging.Invoker;
import io.smallrye.reactive.messaging.annotations.Blocking;
import io.smallrye.reactive.messaging.health.SmallRyeReactiveMessagingLivenessCheck;
import io.smallrye.reactive.messaging.health.SmallRyeReactiveMessagingReadinessCheck;
import io.smallrye.reactive.messaging.health.SmallRyeReactiveMessagingStartupCheck;
import io.smallrye.reactive.messaging.providers.extension.ChannelConfiguration;
-import io.smallrye.reactive.messaging.providers.extension.EmitterConfiguration;
public class SmallRyeReactiveMessagingProcessor {
diff --git a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/WiringProcessor.java b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/WiringProcessor.java
index 64d3e0d1f9a9e..ce4c563f749b9 100644
--- a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/WiringProcessor.java
+++ b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/WiringProcessor.java
@@ -88,10 +88,17 @@ void extractComponents(BeanDiscoveryFinishedBuildItem beanDiscoveryFinished,
BuildProducer validationErrors,
BuildProducer configDescriptionBuildItemBuildProducer) {
+ Map emitterFactories = new HashMap<>();
// We need to collect all business methods annotated with @Incoming/@Outgoing first
for (BeanInfo bean : beanDiscoveryFinished.beanStream().classBeans()) {
// TODO: add support for inherited business methods
//noinspection OptionalGetWithoutIsPresent
+ AnnotationInstance emitterFactory = transformedAnnotations.getAnnotation(bean.getTarget().get(),
+ ReactiveMessagingDotNames.EMITTER_FACTORY_FOR);
+ if (emitterFactory != null) {
+ emitterFactories.put(emitterFactory.value().asClass().name().toString(), emitterFactory);
+ }
+
for (MethodInfo method : bean.getTarget().get().asClass().methods()) {
// @Incoming is repeatable
AnnotationInstance incoming = transformedAnnotations.getAnnotation(method,
@@ -131,31 +138,33 @@ void extractComponents(BeanDiscoveryFinishedBuildItem beanDiscoveryFinished,
ReactiveMessagingDotNames.CHANNEL);
Optional legacyChannel = WiringHelper.getAnnotation(transformedAnnotations, injectionPoint,
ReactiveMessagingDotNames.LEGACY_CHANNEL);
- boolean isEmitter = injectionPoint.getRequiredType().name().equals(ReactiveMessagingDotNames.EMITTER);
- boolean isMutinyEmitter = injectionPoint.getRequiredType().name()
- .equals(ReactiveMessagingDotNames.MUTINY_EMITTER);
+
+ String injectionType = injectionPoint.getRequiredType().name().toString();
+ AnnotationInstance emitterType = emitterFactories.get(injectionType);
+
boolean isLegacyEmitter = injectionPoint.getRequiredType().name()
.equals(ReactiveMessagingDotNames.LEGACY_EMITTER);
- if (isEmitter || isMutinyEmitter) {
- // New emitter from the spec, or Mutiny emitter
- handleEmitter(transformedAnnotations, appChannels, emitters, validationErrors, injectionPoint, broadcast,
- channel, ReactiveMessagingDotNames.ON_OVERFLOW);
- }
-
- if (isLegacyEmitter) {
- // Deprecated Emitter from SmallRye (emitter, channel and on overflow have been added to the spec)
- handleEmitter(transformedAnnotations, appChannels, emitters, validationErrors, injectionPoint, broadcast,
- legacyChannel, ReactiveMessagingDotNames.LEGACY_ON_OVERFLOW);
- }
+ if (emitterType != null) {
+ if (isLegacyEmitter) {
+ // Deprecated Emitter from SmallRye (emitter, channel and on overflow have been added to the spec)
+ handleEmitter(transformedAnnotations, appChannels, emitters, validationErrors, injectionPoint,
+ emitterType, broadcast, legacyChannel, ReactiveMessagingDotNames.LEGACY_ON_OVERFLOW);
+ } else {
+ // New emitter from the spec, or Mutiny emitter
+ handleEmitter(transformedAnnotations, appChannels, emitters, validationErrors, injectionPoint,
+ emitterType, broadcast, channel, ReactiveMessagingDotNames.ON_OVERFLOW);
+ }
+ } else {
+ if (channel.isPresent()) {
+ handleChannelInjection(appChannels, channels, channel.get());
+ }
- if (channel.isPresent() && !(isEmitter || isMutinyEmitter)) {
- handleChannelInjection(appChannels, channels, channel.get());
+ if (legacyChannel.isPresent()) {
+ handleChannelInjection(appChannels, channels, legacyChannel.get());
+ }
}
- if (legacyChannel.isPresent() && !isLegacyEmitter) {
- handleChannelInjection(appChannels, channels, legacyChannel.get());
- }
}
}
@@ -175,6 +184,7 @@ private void handleEmitter(TransformedAnnotationsBuildItem transformedAnnotation
BuildProducer emitters,
BuildProducer validationErrors,
InjectionPointInfo injectionPoint,
+ AnnotationInstance emitterType,
Optional broadcast,
Optional annotation,
DotName onOverflowAnnotation) {
@@ -187,7 +197,7 @@ private void handleEmitter(TransformedAnnotationsBuildItem transformedAnnotation
String channelName = annotation.get().value().asString();
Optional overflow = WiringHelper.getAnnotation(transformedAnnotations, injectionPoint,
onOverflowAnnotation);
- createEmitter(appChannels, emitters, injectionPoint, channelName, overflow, broadcast);
+ createEmitter(appChannels, emitters, injectionPoint, channelName, emitterType, overflow, broadcast);
}
}
@@ -365,14 +375,18 @@ public void autoConfigureConnectorForOrphansAndProduceManagedChannels(
}
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
- private void createEmitter(BuildProducer appChannels, BuildProducer emitters,
+ private void createEmitter(BuildProducer appChannels,
+ BuildProducer emitters,
InjectionPointInfo injectionPoint,
String channelName,
+ AnnotationInstance emitter,
Optional overflow,
Optional broadcast) {
LOGGER.debugf("Emitter injection point '%s' detected, channel name: '%s'",
injectionPoint.getTargetInfo(), channelName);
+ String emitterTypeName = emitter.value().asClass().name().toString();
+
boolean hasBroadcast = false;
int awaitSubscribers = -1;
int bufferSize = -1;
@@ -390,12 +404,10 @@ private void createEmitter(BuildProducer appChannels, BuildPro
strategy = annotation.value().asString();
}
- boolean isMutinyEmitter = injectionPoint.getRequiredType().name()
- .equals(ReactiveMessagingDotNames.MUTINY_EMITTER);
produceOutgoingChannel(appChannels, channelName);
emitters.produce(
InjectedEmitterBuildItem
- .of(channelName, isMutinyEmitter, strategy, bufferSize, hasBroadcast, awaitSubscribers));
+ .of(channelName, emitterTypeName, strategy, bufferSize, hasBroadcast, awaitSubscribers));
}
}
diff --git a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/items/InjectedEmitterBuildItem.java b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/items/InjectedEmitterBuildItem.java
index c50e22c60dec5..7d534c7e41494 100644
--- a/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/items/InjectedEmitterBuildItem.java
+++ b/extensions/smallrye-reactive-messaging/deployment/src/main/java/io/quarkus/smallrye/reactivemessaging/deployment/items/InjectedEmitterBuildItem.java
@@ -3,7 +3,9 @@
import io.quarkus.builder.item.MultiBuildItem;
import io.quarkus.smallrye.reactivemessaging.deployment.BroadcastLiteral;
import io.quarkus.smallrye.reactivemessaging.deployment.OnOverflowLiteral;
-import io.smallrye.reactive.messaging.providers.extension.EmitterConfiguration;
+import io.quarkus.smallrye.reactivemessaging.runtime.EmitterFactoryForLiteral;
+import io.quarkus.smallrye.reactivemessaging.runtime.QuarkusEmitterConfiguration;
+import io.smallrye.reactive.messaging.EmitterConfiguration;
/**
* Represents an emitter injection.
@@ -14,15 +16,15 @@ public final class InjectedEmitterBuildItem extends MultiBuildItem {
* Creates a new instance of {@link InjectedEmitterBuildItem} setting the overflow strategy.
*
* @param name the name of the stream
- * @param isMutinyEmitter if the emitter is a {@link io.smallrye.reactive.messaging.MutinyEmitter}
+ * @param emitterType emitterType
* @param overflow the overflow strategy
* @param bufferSize the buffer size, if overflow is set to {@code BUFFER}
* @return the new {@link InjectedEmitterBuildItem}
*/
- public static InjectedEmitterBuildItem of(String name, boolean isMutinyEmitter, String overflow, int bufferSize,
+ public static InjectedEmitterBuildItem of(String name, String emitterType, String overflow, int bufferSize,
boolean hasBroadcast,
int awaitSubscribers) {
- return new InjectedEmitterBuildItem(name, isMutinyEmitter, overflow, bufferSize, hasBroadcast, awaitSubscribers);
+ return new InjectedEmitterBuildItem(name, emitterType, overflow, bufferSize, hasBroadcast, awaitSubscribers);
}
/**
@@ -48,9 +50,9 @@ public static InjectedEmitterBuildItem of(String name, boolean isMutinyEmitter,
private final boolean hasBroadcast;
/**
- * Whether the emitter is a {@link io.smallrye.reactive.messaging.MutinyEmitter} or a regular (non-mutiny) emitter.
+ * The emitter type
*/
- private final boolean isMutinyEmitter;
+ private final String emitterType;
/**
* If the emitter uses the {@link io.smallrye.reactive.messaging.annotations.Broadcast} annotation, indicates the
@@ -58,19 +60,30 @@ public static InjectedEmitterBuildItem of(String name, boolean isMutinyEmitter,
*/
private final int awaitSubscribers;
- public InjectedEmitterBuildItem(String name, boolean isMutinyEmitter, String overflow, int bufferSize, boolean hasBroadcast,
+ public InjectedEmitterBuildItem(String name, String emitterType, String overflow, int bufferSize,
+ boolean hasBroadcast,
int awaitSubscribers) {
this.name = name;
this.overflow = overflow;
- this.isMutinyEmitter = isMutinyEmitter;
+ this.emitterType = emitterType;
this.bufferSize = bufferSize;
this.hasBroadcast = hasBroadcast;
this.awaitSubscribers = hasBroadcast ? awaitSubscribers : -1;
}
public EmitterConfiguration getEmitterConfig() {
- return new EmitterConfiguration(name, isMutinyEmitter, OnOverflowLiteral.create(overflow, bufferSize),
+ return new QuarkusEmitterConfiguration(name, EmitterFactoryForLiteral.of(loadEmitterClass()),
+ OnOverflowLiteral.create(overflow, bufferSize),
hasBroadcast ? new BroadcastLiteral(awaitSubscribers) : null);
}
+ private Class> loadEmitterClass() {
+ try {
+ return Class.forName(emitterType, false, Thread.currentThread().getContextClassLoader());
+ } catch (ClassNotFoundException e) {
+ // should not happen
+ throw new RuntimeException(e);
+ }
+ }
+
}
diff --git a/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/ConnectorAttachmentCustomEmitterTest.java b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/ConnectorAttachmentCustomEmitterTest.java
new file mode 100644
index 0000000000000..652321db30178
--- /dev/null
+++ b/extensions/smallrye-reactive-messaging/deployment/src/test/java/io/quarkus/smallrye/reactivemessaging/wiring/ConnectorAttachmentCustomEmitterTest.java
@@ -0,0 +1,117 @@
+package io.quarkus.smallrye.reactivemessaging.wiring;
+
+import static org.awaitility.Awaitility.await;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+import javax.annotation.PostConstruct;
+import javax.enterprise.context.ApplicationScoped;
+import javax.enterprise.inject.Produces;
+import javax.enterprise.inject.Typed;
+import javax.enterprise.inject.spi.InjectionPoint;
+import javax.inject.Inject;
+
+import org.eclipse.microprofile.config.Config;
+import org.eclipse.microprofile.reactive.messaging.Channel;
+import org.eclipse.microprofile.reactive.messaging.Message;
+import org.eclipse.microprofile.reactive.messaging.spi.Connector;
+import org.eclipse.microprofile.reactive.messaging.spi.OutgoingConnectorFactory;
+import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
+import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;
+import org.jboss.shrinkwrap.api.ShrinkWrap;
+import org.jboss.shrinkwrap.api.spec.JavaArchive;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.reactivestreams.Publisher;
+
+import io.quarkus.test.QuarkusUnitTest;
+import io.smallrye.mutiny.Multi;
+import io.smallrye.reactive.messaging.ChannelRegistry;
+import io.smallrye.reactive.messaging.EmitterConfiguration;
+import io.smallrye.reactive.messaging.EmitterFactory;
+import io.smallrye.reactive.messaging.EmitterType;
+import io.smallrye.reactive.messaging.MessagePublisherProvider;
+import io.smallrye.reactive.messaging.annotations.EmitterFactoryFor;
+import io.smallrye.reactive.messaging.providers.extension.ChannelProducer;
+
+public class ConnectorAttachmentCustomEmitterTest {
+
+ @RegisterExtension
+ static final QuarkusUnitTest config = new QuarkusUnitTest()
+ .setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
+ .addClasses(MyDummyConnector.class, MySink.class,
+ CustomEmitter.class, CustomEmitterImpl.class, CustomEmitterFactory.class));
+
+ @Inject
+ @Connector("dummy")
+ MyDummyConnector connector;
+
+ @Test
+ public void testAutoAttachmentOfOutgoingChannel() {
+ await().until(() -> connector.getList().size() == 5);
+ }
+
+ @ApplicationScoped
+ static class MySink {
+
+ @Channel("sink")
+ CustomEmitter channel;
+
+ @PostConstruct
+ public void init() {
+ assert Objects.nonNull(channel);
+ }
+ }
+
+ @ApplicationScoped
+ @Connector("dummy")
+ static class MyDummyConnector implements OutgoingConnectorFactory {
+
+ private final List> list = new CopyOnWriteArrayList<>();
+
+ @Override
+ public SubscriberBuilder extends Message>, Void> getSubscriberBuilder(Config config) {
+ return ReactiveStreams.> builder().forEach(list::add);
+ }
+
+ public List> getList() {
+ return list;
+ }
+
+ }
+
+ interface CustomEmitter extends EmitterType {
+
+ }
+
+ static class CustomEmitterImpl implements MessagePublisherProvider, CustomEmitter {
+
+ @Override
+ public Publisher> getPublisher() {
+ return Multi.createFrom().range(0, 5).map(Message::of).map(m -> (Message) m);
+ }
+ }
+
+ @ApplicationScoped
+ @EmitterFactoryFor(CustomEmitter.class)
+ static class CustomEmitterFactory implements EmitterFactory> {
+ @Inject
+ ChannelRegistry channelRegistry;
+
+ @Override
+ public CustomEmitterImpl