Skip to content

Commit

Permalink
spring-projectsGH-1444: Listener Observability Initial Commit
Browse files Browse the repository at this point in the history
- expand scope to include error handler: spring-projects#1287
  • Loading branch information
garyrussell committed Sep 6, 2022
1 parent d4e0f5c commit 6865003
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 4 deletions.
1 change: 1 addition & 0 deletions build.gradle
Expand Up @@ -387,6 +387,7 @@ project('spring-rabbit') {
optionalApi "ch.qos.logback:logback-classic:$logbackVersion"
optionalApi 'org.apache.logging.log4j:log4j-core'
optionalApi 'io.micrometer:micrometer-core'
api 'io.micrometer:micrometer-observation'
optionalApi 'io.micrometer:micrometer-tracing'
// Spring Data projection message binding support
optionalApi ("org.springframework.data:spring-data-commons") {
Expand Down
Expand Up @@ -63,6 +63,8 @@
import org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter;
import org.springframework.amqp.rabbit.support.ListenerExecutionFailedException;
import org.springframework.amqp.rabbit.support.MessagePropertiesConverter;
import org.springframework.amqp.rabbit.support.micrometer.MessageReceiverContext;
import org.springframework.amqp.rabbit.support.micrometer.RabbitListenerObservation;
import org.springframework.amqp.support.ConditionalExceptionLogger;
import org.springframework.amqp.support.ConsumerTagStrategy;
import org.springframework.amqp.support.postprocessor.MessagePostProcessorUtils;
Expand Down Expand Up @@ -91,6 +93,8 @@

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ShutdownSignalException;
import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationRegistry;

/**
* @author Mark Pollack
Expand Down Expand Up @@ -238,7 +242,9 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor

private MicrometerHolder micrometerHolder;

private boolean micrometerEnabled = true;
private boolean micrometerEnabled = false;

private boolean tracingEnabled = false;

private boolean isBatchListener;

Expand All @@ -254,6 +260,8 @@ public abstract class AbstractMessageListenerContainer extends RabbitAccessor

private MessageAckListener messageAckListener = (success, deliveryTag, cause) -> { };

private ObservationRegistry observationRegistry;

@Override
public void setApplicationEventPublisher(ApplicationEventPublisher applicationEventPublisher) {
this.applicationEventPublisher = applicationEventPublisher;
Expand Down Expand Up @@ -1151,14 +1159,24 @@ public void setMicrometerTags(Map<String, String> tags) {
}

/**
* Set to false to disable micrometer listener timers.
* Set to true to enable micrometer listener timers. Ignored when
* {@link #setTracingEnabled(boolean)} is true.
* @param micrometerEnabled false to disable.
* @since 2.2
*/
public void setMicrometerEnabled(boolean micrometerEnabled) {
this.micrometerEnabled = micrometerEnabled;
}

/**
* Enable tracing via micrometer.
* @param tracingEnabled true to enable.
* @since 3.0
*/
public void setTracingEnabled(boolean tracingEnabled) {
this.tracingEnabled = tracingEnabled;
}

/**
* Get the consumeDelay - a time to wait before consuming in ms.
* @return the consume delay.
Expand Down Expand Up @@ -1230,7 +1248,7 @@ public void afterPropertiesSet() {
validateConfiguration();
initialize();
try {
if (this.micrometerHolder == null && MICROMETER_PRESENT && this.micrometerEnabled
if (this.micrometerHolder == null && MICROMETER_PRESENT && this.micrometerEnabled && !this.tracingEnabled
&& this.applicationContext != null) {
String id = getListenerId();
if (id == null) {
Expand All @@ -1246,6 +1264,7 @@ public void afterPropertiesSet() {
if (this.isAsyncReplies() && !AcknowledgeMode.MANUAL.equals(this.acknowledgeMode)) {
this.acknowledgeMode = AcknowledgeMode.MANUAL;
}
// TODO - get obs registry from context if present
}

@Override
Expand Down Expand Up @@ -1499,8 +1518,22 @@ protected void invokeErrorHandler(Throwable ex) {
* @see #invokeListener
* @see #handleListenerException
*/
@SuppressWarnings(UNCHECKED)
protected void executeListener(Channel channel, Object data) {
Observation observation;
if (!this.tracingEnabled || data instanceof List) {
observation = Observation.NOOP;
}
else {
observation = Observation.createNotStarted(RabbitListenerObservation.LISTENER_OBSERVATION.getName(),
new MessageReceiverContext((Message) data), this.observationRegistry)
.lowCardinalityKeyValue(RabbitListenerObservation.ListenerLowCardinalityTags.LISTENER_ID.asString(),
getListenerId());
}
observation.observe(() -> executeListenerAndHandleException(channel, data));
}

@SuppressWarnings(UNCHECKED)
protected void executeListenerAndHandleException(Channel channel, Object data) {
if (!isRunning()) {
if (logger.isWarnEnabled()) {
logger.warn(
Expand Down
@@ -0,0 +1,36 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.amqp.rabbit.support.micrometer;

import org.springframework.amqp.core.Message;

import io.micrometer.observation.transport.ReceiverContext;

/**
* {@link ReceiverContext} for {@link Message}s.
*
* @author Gary Russell
* @since 2.8
*
*/
public class MessageReceiverContext extends ReceiverContext<Message> {

public MessageReceiverContext(Message message) {
super((carrier, key) -> carrier.getMessageProperties().getHeader(key));
}

}
@@ -0,0 +1,72 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.amqp.rabbit.support.micrometer;

import io.micrometer.common.docs.KeyName;
import io.micrometer.observation.docs.DocumentedObservation;

/**
* Spring Rabbit Observation for listeners.
*
* @author Gary Russell
* @since 3.0
*
*/
public enum RabbitListenerObservation implements DocumentedObservation {

/**
* Observation for Rabbit listeners.
*/
LISTENER_OBSERVATION {

@Override
public String getName() {
return "spring.rabbit.listener";
}

@Override
public String getContextualName() {
return "RabbitListener Observation";
}

@Override
public KeyName[] getLowCardinalityKeyNames() {
return ListenerLowCardinalityTags.values();
}

};

/**
* Low cardinality tags.
*/
public enum ListenerLowCardinalityTags implements KeyName {

/**
* Listener id.
*/
LISTENER_ID {

@Override
public String asString() {
return "listener.id";
}

}

}

}
@@ -0,0 +1,6 @@
/**
* Provides classes for Micrometer support.
*/
@org.springframework.lang.NonNullApi
@org.springframework.lang.NonNullFields
package org.springframework.amqp.rabbit.support.micrometer;

0 comments on commit 6865003

Please sign in to comment.