Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Smallrye Reactive Messaging 3.16.0 #25204

Merged
merged 3 commits into from May 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion bom/application/pom.xml
Expand Up @@ -51,7 +51,7 @@
<smallrye-reactive-streams-operators.version>1.0.13</smallrye-reactive-streams-operators.version>
<smallrye-reactive-types-converter.version>2.7.0</smallrye-reactive-types-converter.version>
<smallrye-mutiny-vertx-binding.version>2.21.0</smallrye-mutiny-vertx-binding.version>
<smallrye-reactive-messaging.version>3.15.0</smallrye-reactive-messaging.version>
<smallrye-reactive-messaging.version>3.16.0</smallrye-reactive-messaging.version>
<smallrye-stork.version>1.1.0</smallrye-stork.version>
<jakarta.activation.version>1.2.1</jakarta.activation.version>
<jakarta.annotation-api.version>1.3.5</jakarta.annotation-api.version>
Expand Down
2 changes: 2 additions & 0 deletions docs/src/main/asciidoc/kafka-dev-services.adoc
Expand Up @@ -46,6 +46,7 @@ You can set the port by configuring the `quarkus.kafka.devservices.port` propert

Note that the Kafka advertised address is automatically configured with the chosen port.

[[configuring-the-image]]
== Configuring the image

Dev Services for Kafka supports https://redpanda.com[Redpanda] and https://strimzi.io[Strimzi] (in https://github.com/apache/kafka/blob/trunk/config/kraft/README.md[Kraft] mode).
Expand Down Expand Up @@ -83,6 +84,7 @@ without trying to re-partition the existing topic to a different number of parti

You can configure timeout for Kafka admin client calls used in topic creation using `quarkus.kafka.devservices.topic-partitions-timeout`, it defaults to 2 seconds.

[[redpanda-enabling-transactions]]
== Enabling transactions

By default, the Red Panda broker does not act as a transaction coordinator.
Expand Down
303 changes: 303 additions & 0 deletions docs/src/main/asciidoc/kafka.adoc

Large diffs are not rendered by default.

Expand Up @@ -10,6 +10,7 @@ final class DotNames {

static final DotName EMITTER = DotName.createSimple(org.eclipse.microprofile.reactive.messaging.Emitter.class.getName());
static final DotName MUTINY_EMITTER = DotName.createSimple(io.smallrye.reactive.messaging.MutinyEmitter.class.getName());
static final DotName KAFKA_EMITTER = DotName.createSimple(io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions.class.getName());

static final DotName MESSAGE = DotName.createSimple(org.eclipse.microprofile.reactive.messaging.Message.class.getName());
static final DotName KAFKA_RECORD = DotName.createSimple(io.smallrye.reactive.messaging.kafka.KafkaRecord.class.getName());
Expand Down
Expand Up @@ -167,6 +167,8 @@ void discoverDefaultSerdeConfig(DefaultSerdeDiscoveryState discovery,
processIncomingType(discovery, config, incomingType, channelName, generatedClass, reflection,
alreadyGeneratedDeserializers);

processKafkaTransactions(discovery, config, channelName, injectionPointType);

Type outgoingType = getOutgoingTypeFromChannelInjectionPoint(injectionPointType);
processOutgoingType(discovery, outgoingType, (keySerializer, valueSerializer) -> {
produceRuntimeConfigurationDefaultBuildItem(discovery, config,
Expand All @@ -180,6 +182,23 @@ void discoverDefaultSerdeConfig(DefaultSerdeDiscoveryState discovery,
}
}

private void processKafkaTransactions(DefaultSerdeDiscoveryState discovery,
BuildProducer<RunTimeConfigurationDefaultBuildItem> config, String channelName, Type injectionPointType) {
if (injectionPointType != null && isKafkaEmitter(injectionPointType)) {
LOGGER.infof("Transactional producer detected for channel '%s', setting following default config values: "
+ "'mp.messaging.outgoing.%s.transactional.id=${quarkus.application.name}-${channelName}', "
+ "'mp.messaging.outgoing.%s.enable.idempotence=true', "
+ "'mp.messaging.outgoing.%s.acks=all'", channelName, channelName, channelName, channelName);
produceRuntimeConfigurationDefaultBuildItem(discovery, config,
"mp.messaging.outgoing." + channelName + ".transactional.id",
"${quarkus.application.name}-" + channelName);
produceRuntimeConfigurationDefaultBuildItem(discovery, config,
"mp.messaging.outgoing." + channelName + ".enable.idempotence", "true");
produceRuntimeConfigurationDefaultBuildItem(discovery, config,
"mp.messaging.outgoing." + channelName + ".acks", "all");
}
}

private void processIncomingType(DefaultSerdeDiscoveryState discovery,
BuildProducer<RunTimeConfigurationDefaultBuildItem> config, Type incomingType, String channelName,
BuildProducer<GeneratedClassBuildItem> generatedClass, BuildProducer<ReflectiveClassBuildItem> reflection,
Expand Down Expand Up @@ -352,7 +371,7 @@ private Type getOutgoingTypeFromChannelInjectionPoint(Type injectionPointType) {
return null;
}

if (isEmitter(injectionPointType) || isMutinyEmitter(injectionPointType)) {
if (isEmitter(injectionPointType) || isMutinyEmitter(injectionPointType) || isKafkaEmitter(injectionPointType)) {
return injectionPointType.asParameterizedType().arguments().get(0);
} else {
return null;
Expand Down Expand Up @@ -482,6 +501,13 @@ private static boolean isMutinyEmitter(Type type) {
&& type.asParameterizedType().arguments().size() == 1;
}

private static boolean isKafkaEmitter(Type type) {
// raw type KafkaTransactions is wrong, must be KafkaTransactions<Something>
return DotNames.KAFKA_EMITTER.equals(type.name())
&& type.kind() == Type.Kind.PARAMETERIZED_TYPE
&& type.asParameterizedType().arguments().size() == 1;
}

// ---

private static boolean isMessage(Type type) {
Expand Down Expand Up @@ -627,11 +653,11 @@ private Result deserializerFor(DefaultSerdeDiscoveryState discovery, Type type,
if (clazz == null) {
clazz = JacksonSerdeGenerator.generateDeserializer(generatedClass, type);
LOGGER.infof("Generating Jackson deserializer for type %s", type.name().toString());
result = Result.of(clazz);
// Deserializers are access by reflection.
reflection.produce(new ReflectiveClassBuildItem(true, true, false, clazz));
alreadyGeneratedSerializers.put(type.toString(), clazz);
}
result = Result.of(clazz);
}
return result;
}
Expand All @@ -653,11 +679,11 @@ private Result serializerFor(DefaultSerdeDiscoveryState discovery, Type type,
if (clazz == null) {
clazz = JacksonSerdeGenerator.generateSerializer(generatedClass, type);
LOGGER.infof("Generating Jackson serializer for type %s", type.name().toString());
result = Result.of(clazz);
// Serializers are access by reflection.
reflection.produce(new ReflectiveClassBuildItem(true, true, false, clazz));
alreadyGeneratedSerializers.put(type.toString(), clazz);
}
result = Result.of(clazz);
}

return result;
Expand Down
Expand Up @@ -51,6 +51,7 @@
import io.smallrye.reactive.messaging.kafka.KafkaRecord;
import io.smallrye.reactive.messaging.kafka.KafkaRecordBatch;
import io.smallrye.reactive.messaging.kafka.Record;
import io.smallrye.reactive.messaging.kafka.transactions.KafkaTransactions;
import io.vertx.core.json.JsonArray;
import io.vertx.core.json.JsonObject;

Expand Down Expand Up @@ -2675,4 +2676,24 @@ void method3(CustomDto payload) {
}
}

@Test
void kafkaTransactions() {
// @formatter:off
Tuple[] expectations = {
tuple("mp.messaging.outgoing.tx.value.serializer", "org.apache.kafka.common.serialization.StringSerializer"),
tuple("mp.messaging.outgoing.tx.transactional.id", "${quarkus.application.name}-tx"),
tuple("mp.messaging.outgoing.tx.enable.idempotence", "true"),
tuple("mp.messaging.outgoing.tx.acks", "all"),
};
doTest(expectations, TransactionalProducer.class);

}

private static class TransactionalProducer {

@Channel("tx")
KafkaTransactions<String> kafkaTransactions;

}

}
Expand Up @@ -17,6 +17,7 @@
import io.smallrye.reactive.messaging.annotations.ConnectorAttribute;
import io.smallrye.reactive.messaging.annotations.ConnectorAttributes;
import io.smallrye.reactive.messaging.annotations.Emitter;
import io.smallrye.reactive.messaging.annotations.EmitterFactoryFor;
import io.smallrye.reactive.messaging.annotations.Incomings;
import io.smallrye.reactive.messaging.annotations.Merge;
import io.smallrye.reactive.messaging.annotations.OnOverflow;
Expand Down Expand Up @@ -49,6 +50,7 @@ public final class ReactiveMessagingDotNames {
static final DotName ACKNOWLEDGMENT = DotName.createSimple(Acknowledgment.class.getName());
static final DotName MERGE = DotName.createSimple(Merge.class.getName());
static final DotName BROADCAST = DotName.createSimple(Broadcast.class.getName());
static final DotName EMITTER_FACTORY_FOR = DotName.createSimple(EmitterFactoryFor.class.getName());

static final DotName INCOMING_CONNECTOR_FACTORY = DotName.createSimple(IncomingConnectorFactory.class.getName());
static final DotName OUTGOING_CONNECTOR_FACTORY = DotName.createSimple(OutgoingConnectorFactory.class.getName());
Expand Down
Expand Up @@ -74,13 +74,13 @@
import io.quarkus.smallrye.reactivemessaging.runtime.WorkerConfiguration;
import io.quarkus.smallrye.reactivemessaging.runtime.devmode.DevModeSupportConnectorFactory;
import io.quarkus.smallrye.reactivemessaging.runtime.devmode.DevModeSupportConnectorFactoryInterceptor;
import io.smallrye.reactive.messaging.EmitterConfiguration;
import io.smallrye.reactive.messaging.Invoker;
import io.smallrye.reactive.messaging.annotations.Blocking;
import io.smallrye.reactive.messaging.health.SmallRyeReactiveMessagingLivenessCheck;
import io.smallrye.reactive.messaging.health.SmallRyeReactiveMessagingReadinessCheck;
import io.smallrye.reactive.messaging.health.SmallRyeReactiveMessagingStartupCheck;
import io.smallrye.reactive.messaging.providers.extension.ChannelConfiguration;
import io.smallrye.reactive.messaging.providers.extension.EmitterConfiguration;

public class SmallRyeReactiveMessagingProcessor {

Expand Down
Expand Up @@ -88,10 +88,17 @@ void extractComponents(BeanDiscoveryFinishedBuildItem beanDiscoveryFinished,
BuildProducer<ValidationPhaseBuildItem.ValidationErrorBuildItem> validationErrors,
BuildProducer<ConfigDescriptionBuildItem> configDescriptionBuildItemBuildProducer) {

Map<String, AnnotationInstance> emitterFactories = new HashMap<>();
// We need to collect all business methods annotated with @Incoming/@Outgoing first
for (BeanInfo bean : beanDiscoveryFinished.beanStream().classBeans()) {
// TODO: add support for inherited business methods
//noinspection OptionalGetWithoutIsPresent
AnnotationInstance emitterFactory = transformedAnnotations.getAnnotation(bean.getTarget().get(),
ReactiveMessagingDotNames.EMITTER_FACTORY_FOR);
if (emitterFactory != null) {
emitterFactories.put(emitterFactory.value().asClass().name().toString(), emitterFactory);
}

for (MethodInfo method : bean.getTarget().get().asClass().methods()) {
// @Incoming is repeatable
AnnotationInstance incoming = transformedAnnotations.getAnnotation(method,
Expand Down Expand Up @@ -131,31 +138,33 @@ void extractComponents(BeanDiscoveryFinishedBuildItem beanDiscoveryFinished,
ReactiveMessagingDotNames.CHANNEL);
Optional<AnnotationInstance> legacyChannel = WiringHelper.getAnnotation(transformedAnnotations, injectionPoint,
ReactiveMessagingDotNames.LEGACY_CHANNEL);
boolean isEmitter = injectionPoint.getRequiredType().name().equals(ReactiveMessagingDotNames.EMITTER);
boolean isMutinyEmitter = injectionPoint.getRequiredType().name()
.equals(ReactiveMessagingDotNames.MUTINY_EMITTER);

String injectionType = injectionPoint.getRequiredType().name().toString();
AnnotationInstance emitterType = emitterFactories.get(injectionType);

boolean isLegacyEmitter = injectionPoint.getRequiredType().name()
.equals(ReactiveMessagingDotNames.LEGACY_EMITTER);

if (isEmitter || isMutinyEmitter) {
// New emitter from the spec, or Mutiny emitter
handleEmitter(transformedAnnotations, appChannels, emitters, validationErrors, injectionPoint, broadcast,
channel, ReactiveMessagingDotNames.ON_OVERFLOW);
}

if (isLegacyEmitter) {
// Deprecated Emitter from SmallRye (emitter, channel and on overflow have been added to the spec)
handleEmitter(transformedAnnotations, appChannels, emitters, validationErrors, injectionPoint, broadcast,
legacyChannel, ReactiveMessagingDotNames.LEGACY_ON_OVERFLOW);
}
if (emitterType != null) {
if (isLegacyEmitter) {
// Deprecated Emitter from SmallRye (emitter, channel and on overflow have been added to the spec)
handleEmitter(transformedAnnotations, appChannels, emitters, validationErrors, injectionPoint,
emitterType, broadcast, legacyChannel, ReactiveMessagingDotNames.LEGACY_ON_OVERFLOW);
} else {
// New emitter from the spec, or Mutiny emitter
handleEmitter(transformedAnnotations, appChannels, emitters, validationErrors, injectionPoint,
emitterType, broadcast, channel, ReactiveMessagingDotNames.ON_OVERFLOW);
}
} else {
if (channel.isPresent()) {
handleChannelInjection(appChannels, channels, channel.get());
}

if (channel.isPresent() && !(isEmitter || isMutinyEmitter)) {
handleChannelInjection(appChannels, channels, channel.get());
if (legacyChannel.isPresent()) {
handleChannelInjection(appChannels, channels, legacyChannel.get());
}
}

if (legacyChannel.isPresent() && !isLegacyEmitter) {
handleChannelInjection(appChannels, channels, legacyChannel.get());
}
}

}
Expand All @@ -175,6 +184,7 @@ private void handleEmitter(TransformedAnnotationsBuildItem transformedAnnotation
BuildProducer<InjectedEmitterBuildItem> emitters,
BuildProducer<ValidationPhaseBuildItem.ValidationErrorBuildItem> validationErrors,
InjectionPointInfo injectionPoint,
AnnotationInstance emitterType,
Optional<AnnotationInstance> broadcast,
Optional<AnnotationInstance> annotation,
DotName onOverflowAnnotation) {
Expand All @@ -187,7 +197,7 @@ private void handleEmitter(TransformedAnnotationsBuildItem transformedAnnotation
String channelName = annotation.get().value().asString();
Optional<AnnotationInstance> overflow = WiringHelper.getAnnotation(transformedAnnotations, injectionPoint,
onOverflowAnnotation);
createEmitter(appChannels, emitters, injectionPoint, channelName, overflow, broadcast);
createEmitter(appChannels, emitters, injectionPoint, channelName, emitterType, overflow, broadcast);
}
}

Expand Down Expand Up @@ -365,14 +375,18 @@ public void autoConfigureConnectorForOrphansAndProduceManagedChannels(
}

@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
private void createEmitter(BuildProducer<ChannelBuildItem> appChannels, BuildProducer<InjectedEmitterBuildItem> emitters,
private void createEmitter(BuildProducer<ChannelBuildItem> appChannels,
BuildProducer<InjectedEmitterBuildItem> emitters,
InjectionPointInfo injectionPoint,
String channelName,
AnnotationInstance emitter,
Optional<AnnotationInstance> overflow,
Optional<AnnotationInstance> broadcast) {
LOGGER.debugf("Emitter injection point '%s' detected, channel name: '%s'",
injectionPoint.getTargetInfo(), channelName);

String emitterTypeName = emitter.value().asClass().name().toString();

boolean hasBroadcast = false;
int awaitSubscribers = -1;
int bufferSize = -1;
Expand All @@ -390,12 +404,10 @@ private void createEmitter(BuildProducer<ChannelBuildItem> appChannels, BuildPro
strategy = annotation.value().asString();
}

boolean isMutinyEmitter = injectionPoint.getRequiredType().name()
.equals(ReactiveMessagingDotNames.MUTINY_EMITTER);
produceOutgoingChannel(appChannels, channelName);
emitters.produce(
InjectedEmitterBuildItem
.of(channelName, isMutinyEmitter, strategy, bufferSize, hasBroadcast, awaitSubscribers));
.of(channelName, emitterTypeName, strategy, bufferSize, hasBroadcast, awaitSubscribers));
}

}