Skip to content

Commit

Permalink
Merge pull request #403 from vikinghawk/5.x.x-stable
Browse files Browse the repository at this point in the history
tweaks to recovery retry
  • Loading branch information
acogoluegnes committed Aug 29, 2018
2 parents 8b8c58a + b88f7f5 commit b24a9be
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,11 @@ public RetryResult retryConsumerRecovery(RetryContext context) throws Exception

protected RetryResult doRetry(BiPredicate<RecordedEntity, Exception> condition, RetryOperation<?> operation, RecordedEntity entity, RetryContext context)
throws Exception {
log(entity, context.exception());
int attempts = 0;
Exception exception = context.exception();
while (attempts < retryAttempts) {
if (condition.test(entity, exception)) {
log(entity, exception, attempts);
backoffPolicy.backoff(attempts + 1);
try {
Object result = operation.call(context);
Expand All @@ -122,11 +122,11 @@ protected RetryResult doRetry(BiPredicate<RecordedEntity, Exception> condition,
throw exception;
}
}
throw context.exception();
throw exception;
}

protected void log(RecordedEntity entity, Exception exception) {
LOGGER.info("Error while recovering {}, retrying with {} attempt(s).", entity, retryAttempts, exception);
protected void log(RecordedEntity entity, Exception exception, int attempts) {
LOGGER.info("Error while recovering {}, retrying with {} more attempt(s).", entity, retryAttempts - attempts, exception);
}

public interface RetryOperation<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.ShutdownSignalException;
import com.rabbitmq.utility.Utility;

import java.util.List;
import java.util.function.BiPredicate;
import java.util.function.Predicate;

import static com.rabbitmq.client.impl.recovery.TopologyRecoveryRetryHandlerBuilder.builder;

/**
Expand Down Expand Up @@ -106,7 +104,7 @@ public abstract class TopologyRecoveryRetryLogic {
public static final DefaultRetryHandler.RetryOperation<Void> RECOVER_CONSUMER_QUEUE_BINDINGS = context -> {
if (context.entity() instanceof RecordedConsumer) {
String queue = context.consumer().getQueue();
for (RecordedBinding recordedBinding : context.connection().getRecordedBindings()) {
for (RecordedBinding recordedBinding : Utility.copy(context.connection().getRecordedBindings())) {
if (recordedBinding instanceof RecordedQueueBinding && queue.equals(recordedBinding.getDestination())) {
recordedBinding.recover();
}
Expand All @@ -121,16 +119,15 @@ public abstract class TopologyRecoveryRetryLogic {
public static final DefaultRetryHandler.RetryOperation<String> RECOVER_CONSUMER = context -> context.consumer().recover();

/**
* Pre-configured {@link DefaultRetryHandler} that retries recovery of bindings and consumers
* Pre-configured {@link TopologyRecoveryRetryHandlerBuilder} that retries recovery of bindings and consumers
* when their respective queue is not found.
* This retry handler can be useful for long recovery processes, whereby auto-delete queues
* can be deleted between queue recovery and binding/consumer recovery.
*/
public static final RetryHandler RETRY_ON_QUEUE_NOT_FOUND_RETRY_HANDLER = builder()
public static final TopologyRecoveryRetryHandlerBuilder RETRY_ON_QUEUE_NOT_FOUND_RETRY_HANDLER = builder()
.bindingRecoveryRetryCondition(CHANNEL_CLOSED_NOT_FOUND)
.consumerRecoveryRetryCondition(CHANNEL_CLOSED_NOT_FOUND)
.bindingRecoveryRetryOperation(RECOVER_CHANNEL.andThen(RECOVER_BINDING_QUEUE).andThen(RECOVER_BINDING))
.consumerRecoveryRetryOperation(RECOVER_CHANNEL.andThen(RECOVER_CONSUMER_QUEUE.andThen(RECOVER_CONSUMER)
.andThen(RECOVER_CONSUMER_QUEUE_BINDINGS)))
.build();
.andThen(RECOVER_CONSUMER_QUEUE_BINDINGS)));
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public void topologyRecoveryRetry() throws Exception {
@Override
protected ConnectionFactory newConnectionFactory() {
ConnectionFactory connectionFactory = TestUtils.connectionFactory();
connectionFactory.setTopologyRecoveryRetryHandler(RETRY_ON_QUEUE_NOT_FOUND_RETRY_HANDLER);
connectionFactory.setTopologyRecoveryRetryHandler(RETRY_ON_QUEUE_NOT_FOUND_RETRY_HANDLER.build());
connectionFactory.setNetworkRecoveryInterval(1000);
return connectionFactory;
}
Expand Down

0 comments on commit b24a9be

Please sign in to comment.