Skip to content

Commit

Permalink
spring-projectsGH-1444: Listener Observability Initial Commit
Browse files Browse the repository at this point in the history
- expand scope to include error handler: spring-projects#1287
- tracing can't be used with a batch listener (multiple messages in listener call)
  • Loading branch information
garyrussell committed Sep 13, 2022
1 parent f073c5f commit 6f35711
Show file tree
Hide file tree
Showing 13 changed files with 709 additions and 10 deletions.
2 changes: 2 additions & 0 deletions build.gradle
Expand Up @@ -387,6 +387,7 @@ project('spring-rabbit') {
optionalApi "ch.qos.logback:logback-classic:$logbackVersion"
optionalApi 'org.apache.logging.log4j:log4j-core'
optionalApi 'io.micrometer:micrometer-core'
api 'io.micrometer:micrometer-observation'
optionalApi 'io.micrometer:micrometer-tracing'
// Spring Data projection message binding support
optionalApi ("org.springframework.data:spring-data-commons") {
Expand All @@ -398,6 +399,7 @@ project('spring-rabbit') {
testApi project(':spring-rabbit-junit')
testImplementation("com.willowtreeapps.assertk:assertk-jvm:$assertkVersion")
testImplementation "org.hibernate.validator:hibernate-validator:$hibernateValidationVersion"
testImplementation 'io.micrometer:micrometer-observation-test'
testImplementation 'io.micrometer:micrometer-tracing-bridge-brave'
testImplementation 'io.micrometer:micrometer-tracing-test'
testImplementation 'io.micrometer:micrometer-tracing-integration-test'
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -21,10 +21,13 @@

import org.springframework.amqp.rabbit.support.RabbitExceptionTranslator;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.context.ApplicationContext;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;

import com.rabbitmq.client.Channel;
import io.micrometer.observation.ObservationRegistry;

/**
* @author Mark Fisher
Expand All @@ -40,6 +43,8 @@ public abstract class RabbitAccessor implements InitializingBean {

private volatile boolean transactional;

private ObservationRegistry observationRegistry;

public boolean isChannelTransacted() {
return this.transactional;
}
Expand Down Expand Up @@ -113,4 +118,17 @@ protected RuntimeException convertRabbitAccessException(Exception ex) {
return RabbitExceptionTranslator.convertRabbitAccessException(ex);
}

protected void obtainObservationRegistry(@Nullable ApplicationContext appContext) {
if (this.observationRegistry == null && appContext != null) {
ObjectProvider<ObservationRegistry> registry =
appContext.getBeanProvider(ObservationRegistry.class);
this.observationRegistry = registry.getIfUnique();
}
}

@Nullable
protected ObservationRegistry getObservationRegistry() {
return this.observationRegistry;
}

}
Expand Up @@ -74,6 +74,9 @@
import org.springframework.amqp.rabbit.support.MessagePropertiesConverter;
import org.springframework.amqp.rabbit.support.RabbitExceptionTranslator;
import org.springframework.amqp.rabbit.support.ValueExpression;
import org.springframework.amqp.rabbit.support.micrometer.MessageSenderContext;
import org.springframework.amqp.rabbit.support.micrometer.RabbitTemplateObservation;
import org.springframework.amqp.rabbit.support.micrometer.RabbitTemplateObservationConvention;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.amqp.support.converter.SmartMessageConverter;
Expand All @@ -83,6 +86,8 @@
import org.springframework.beans.factory.BeanFactoryAware;
import org.springframework.beans.factory.BeanNameAware;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.expression.BeanFactoryResolver;
import org.springframework.context.expression.MapAccessor;
import org.springframework.core.ParameterizedTypeReference;
Expand All @@ -108,6 +113,8 @@
import com.rabbitmq.client.Return;
import com.rabbitmq.client.ShutdownListener;
import com.rabbitmq.client.ShutdownSignalException;
import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;

/**
* <p>
Expand Down Expand Up @@ -152,7 +159,7 @@
* @since 1.0
*/
public class RabbitTemplate extends RabbitAccessor // NOSONAR type line count
implements BeanFactoryAware, RabbitOperations, ChannelAwareMessageListener,
implements BeanFactoryAware, RabbitOperations, ChannelAwareMessageListener, ApplicationContextAware,
ListenerContainerAware, PublisherCallbackChannel.Listener, BeanNameAware, DisposableBean {

private static final String UNCHECKED = "unchecked";
Expand Down Expand Up @@ -198,6 +205,8 @@ public class RabbitTemplate extends RabbitAccessor // NOSONAR type line count

private final AtomicInteger containerInstance = new AtomicInteger();

private ApplicationContext applicationContext;

private String exchange = DEFAULT_EXCHANGE;

private String routingKey = DEFAULT_ROUTING_KEY;
Expand Down Expand Up @@ -258,13 +267,19 @@ public class RabbitTemplate extends RabbitAccessor // NOSONAR type line count

private ErrorHandler replyErrorHandler;

private boolean useChannelForCorrelation;

private boolean observationEnabled;

private RabbitTemplateObservationConvention observationConvention;

private volatile boolean usingFastReplyTo;

private volatile boolean evaluatedFastReplyTo;

private volatile boolean isListener;

private boolean useChannelForCorrelation;
private volatile boolean observationRegistryObtained;

/**
* Convenient constructor for use with setter injection. Don't forget to set the connection factory.
Expand Down Expand Up @@ -297,6 +312,29 @@ public final void setConnectionFactory(ConnectionFactory connectionFactory) {
}
}

@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}

/**
* Enable observation via micrometer.
* @param observationEnabled true to enable.
* @since 3.0
*/
public void setObservationEnabled(boolean observationEnabled) {
this.observationEnabled = observationEnabled;
}

/**
* Set an observation convention; used to add additional key/values to observations.
* @param observationConvention the convention.
* @since 3.0
*/
public void setObservationConvention(RabbitTemplateObservationConvention observationConvention) {
this.observationConvention = observationConvention;
}

/**
* The name of the default exchange to use for send operations when none is specified. Defaults to <code>""</code>
* which is the default exchange in the broker (per the AMQP specification).
Expand Down Expand Up @@ -2348,7 +2386,7 @@ private boolean isPublisherConfirmsOrReturns(ConnectionFactory connectionFactory
* @throws IOException If thrown by RabbitMQ API methods.
*/
public void doSend(Channel channel, String exchangeArg, String routingKeyArg, Message message,
boolean mandatory, @Nullable CorrelationData correlationData) throws IOException {
boolean mandatory, @Nullable CorrelationData correlationData) {

String exch = nullSafeExchange(exchangeArg);
String rKey = nullSafeRoutingKey(routingKeyArg);
Expand Down Expand Up @@ -2378,14 +2416,38 @@ public void doSend(Channel channel, String exchangeArg, String routingKeyArg, Me
logger.debug("Publishing message [" + messageToUse
+ "] on exchange [" + exch + "], routingKey = [" + rKey + "]");
}
sendToRabbit(channel, exch, rKey, mandatory, messageToUse);
observeTheSend(channel, message, mandatory, exch, rKey, messageToUse);
// Check if commit needed
if (isChannelLocallyTransacted(channel)) {
// Transacted channel created by this template -> commit.
RabbitUtils.commitIfNecessary(channel);
}
}

protected void observeTheSend(Channel channel, Message message, boolean mandatory, String exch, String rKey,
Message messageToUse) {

if (!this.observationRegistryObtained) {
obtainObservationRegistry(this.applicationContext);
this.observationRegistryObtained = true;
}
Observation observation;
ObservationRegistry registry = getObservationRegistry();
if (!this.observationEnabled || registry == null) {
observation = Observation.NOOP;
}
else {
observation = RabbitTemplateObservation.TEMPLATE_OBSERVATION.observation(registry,
new MessageSenderContext(message))
.lowCardinalityKeyValue(RabbitTemplateObservation.TemplateLowCardinalityTags.BEAN_NAME.asString(),
this.beanName);
if (this.observationConvention != null) {
observation.observationConvention(this.observationConvention);
}
}
observation.observe(() -> sendToRabbit(channel, exch, rKey, mandatory, messageToUse));
}

/**
* Return the exchange or the default exchange if null.
* @param exchange the exchange.
Expand All @@ -2407,10 +2469,16 @@ public String nullSafeRoutingKey(String rk) {
}

protected void sendToRabbit(Channel channel, String exchange, String routingKey, boolean mandatory,
Message message) throws IOException {
Message message) {

BasicProperties convertedMessageProperties = this.messagePropertiesConverter
.fromMessageProperties(message.getMessageProperties(), this.encoding);
channel.basicPublish(exchange, routingKey, mandatory, convertedMessageProperties, message.getBody());
try {
channel.basicPublish(exchange, routingKey, mandatory, convertedMessageProperties, message.getBody());
}
catch (IOException ex) {
throw RabbitExceptionTranslator.convertRabbitAccessException(ex);
}
}

private void setupConfirm(Channel channel, Message message, @Nullable CorrelationData correlationDataArg) {
Expand Down
Expand Up @@ -63,6 +63,9 @@
import org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter;
import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException;
import org.springframework.amqp.rabbit.support.MessagePropertiesConverter;
import org.springframework.amqp.rabbit.support.micrometer.MessageReceiverContext;
import org.springframework.amqp.rabbit.support.micrometer.RabbitListenerObservation;
import org.springframework.amqp.rabbit.support.micrometer.RabbitListenerObservationConvention;
import org.springframework.amqp.support.ConditionalExceptionLogger;
import org.springframework.amqp.support.ConsumerTagStrategy;
import org.springframework.amqp.support.postprocessor.MessagePostProcessorUtils;
Expand Down Expand Up @@ -91,6 +94,8 @@

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ShutdownSignalException;
import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;

/**
* @author Mark Pollack
Expand Down Expand Up @@ -240,6 +245,8 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor

private boolean micrometerEnabled = true;

private boolean observationEnabled = false;

private boolean isBatchListener;

private long consumeDelay;
Expand All @@ -254,6 +261,8 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor

private MessageAckListener messageAckListener = (success, deliveryTag, cause) -> { };

private RabbitListenerObservationConvention observationConvention;

@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
Expand Down Expand Up @@ -1159,6 +1168,24 @@ public void setMicrometerEnabled(boolean micrometerEnabled) {
this.micrometerEnabled = micrometerEnabled;
}

/**
* Enable observation via micrometer.
* @param observationEnabled true to enable.
* @since 3.0
*/
public void setObservationEnabled(boolean observationEnabled) {
this.observationEnabled = observationEnabled;
}

/**
* Set an observation convention; used to add additional key/values to observations.
* @param observationConvention the convention.
* @since 3.0
*/
public void setObservationConvention(RabbitListenerObservationConvention observationConvention) {
this.observationConvention = observationConvention;
}

/**
* Get the consumeDelay - a time to wait before consuming in ms.
* @return the consume delay.
Expand Down Expand Up @@ -1230,7 +1257,7 @@ public void afterPropertiesSet() {
validateConfiguration();
initialize();
try {
if (this.micrometerHolder == null && MICROMETER_PRESENT && this.micrometerEnabled
if (this.micrometerHolder == null && MICROMETER_PRESENT && this.micrometerEnabled && !this.observationEnabled
&& this.applicationContext != null) {
String id = getListenerId();
if (id == null) {
Expand Down Expand Up @@ -1402,6 +1429,7 @@ public void start() {
}
}
}
obtainObservationRegistry(this.applicationContext);
try {
logger.debug("Starting Rabbit listener container.");
configureAdminIfNeeded();
Expand Down Expand Up @@ -1499,8 +1527,26 @@ protected void invokeErrorHandler(Throwable ex) {
* @see #invokeListener
* @see #handleListenerException
*/
@SuppressWarnings(UNCHECKED)
protected void executeListener(Channel channel, Object data) {
Observation observation;
ObservationRegistry registry = getObservationRegistry();
if (!this.observationEnabled || data instanceof List || registry == null) {
observation = Observation.NOOP;
}
else {
observation = RabbitListenerObservation.LISTENER_OBSERVATION.observation(registry,
new MessageReceiverContext((Message) data))
.lowCardinalityKeyValue(RabbitListenerObservation.ListenerLowCardinalityTags.LISTENER_ID.asString(),
getListenerId());
if (this.observationConvention != null) {
observation.observationConvention(this.observationConvention);
}
}
observation.observe(() -> executeListenerAndHandleException(channel, data));
}

@SuppressWarnings(UNCHECKED)
protected void executeListenerAndHandleException(Channel channel, Object data) {
if (!isRunning()) {
if (logger.isWarnEnabled()) {
logger.warn(
Expand Down
@@ -0,0 +1,37 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.amqp.rabbit.support.micrometer;

import org.springframework.amqp.core.Message;

import io.micrometer.observation.transport.ReceiverContext;

/**
* {@link ReceiverContext} for {@link Message}s.
*
* @author Gary Russell
* @since 2.8
*
*/
public class MessageReceiverContext extends ReceiverContext<Message> {

public MessageReceiverContext(Message message) {
super((carrier, key) -> carrier.getMessageProperties().getHeader(key));
setCarrier(message);
}

}

0 comments on commit 6f35711

Please sign in to comment.