Skip to content

Commit

Permalink
Add option for connection recovery triggering
Browse files Browse the repository at this point in the history
Fixes #379
  • Loading branch information
acogoluegnes committed Jul 25, 2018
1 parent 3f675f4 commit 2b251b0
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 11 deletions.
18 changes: 18 additions & 0 deletions src/main/java/com/rabbitmq/client/ConnectionFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
import javax.net.SocketFactory;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLSocketFactory;
Expand Down Expand Up @@ -177,6 +178,12 @@ public class ConnectionFactory implements Cloneable {
*/
private int workPoolTimeout = DEFAULT_WORK_POOL_TIMEOUT;

/**
* Condition to trigger automatic connection recovery.
* @since 5.4.0
*/
private Predicate<ShutdownSignalException> connectionRecoveryTriggeringCondition;

/** @return the default host to use for connections */
public String getHost() {
return host;
Expand Down Expand Up @@ -1070,6 +1077,7 @@ public ConnectionParams params(ExecutorService consumerWorkServiceExecutor) {
result.setChannelShouldCheckRpcResponseType(channelShouldCheckRpcResponseType);
result.setWorkPoolTimeout(workPoolTimeout);
result.setErrorOnWriteListener(errorOnWriteListener);
result.setConnectionRecoveryTriggeringCondition(connectionRecoveryTriggeringCondition);
return result;
}

Expand Down Expand Up @@ -1419,4 +1427,14 @@ public int getWorkPoolTimeout() {
public void setErrorOnWriteListener(ErrorOnWriteListener errorOnWriteListener) {
this.errorOnWriteListener = errorOnWriteListener;
}

/**
* Allows to decide on automatic connection recovery is triggered.
* Default is for shutdown not initiated by application or missed heartbeat errors.
* @param connectionRecoveryTriggeringCondition
*/
public void setConnectionRecoveryTriggeringCondition(Predicate<ShutdownSignalException> connectionRecoveryTriggeringCondition) {
this.connectionRecoveryTriggeringCondition = connectionRecoveryTriggeringCondition;
}

}
11 changes: 11 additions & 0 deletions src/main/java/com/rabbitmq/client/impl/ConnectionParams.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@
import com.rabbitmq.client.RecoveryDelayHandler;
import com.rabbitmq.client.RecoveryDelayHandler.DefaultRecoveryDelayHandler;
import com.rabbitmq.client.SaslConfig;
import com.rabbitmq.client.ShutdownSignalException;

import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.function.Predicate;

public class ConnectionParams {
private CredentialsProvider credentialsProvider;
Expand All @@ -46,6 +48,7 @@ public class ConnectionParams {
private boolean channelShouldCheckRpcResponseType;
private ErrorOnWriteListener errorOnWriteListener;
private int workPoolTimeout = -1;
private Predicate<ShutdownSignalException> connectionRecoveryTriggeringCondition;

private ExceptionHandler exceptionHandler;
private ThreadFactory threadFactory;
Expand Down Expand Up @@ -235,4 +238,12 @@ public void setWorkPoolTimeout(int workPoolTimeout) {
public int getWorkPoolTimeout() {
return workPoolTimeout;
}

public void setConnectionRecoveryTriggeringCondition(Predicate<ShutdownSignalException> connectionRecoveryTriggeringCondition) {
this.connectionRecoveryTriggeringCondition = connectionRecoveryTriggeringCondition;
}

public Predicate<ShutdownSignalException> getConnectionRecoveryTriggeringCondition() {
return connectionRecoveryTriggeringCondition;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Predicate;

/**
* Connection implementation that performs automatic recovery when
Expand All @@ -61,6 +62,9 @@
*/
public class AutorecoveringConnection implements RecoverableConnection, NetworkConnection {

public static final Predicate<ShutdownSignalException> DEFAULT_CONNECTION_RECOVERY_TRIGGERING_CONDITION =
cause -> !cause.isInitiatedByApplication() || (cause.getCause() instanceof MissedHeartbeatException);

private static final Logger LOGGER = LoggerFactory.getLogger(AutorecoveringConnection.class);

private final RecoveryAwareAMQConnectionFactory cf;
Expand All @@ -87,6 +91,8 @@ public class AutorecoveringConnection implements RecoverableConnection, NetworkC
// be created after application code has initiated shutdown.
private final Object recoveryLock = new Object();

private final Predicate<ShutdownSignalException> connectionRecoveryTriggeringCondition;

public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f, List<Address> addrs) {
this(params, f, new ListAddressResolver(addrs));
}
Expand All @@ -99,9 +105,14 @@ public AutorecoveringConnection(ConnectionParams params, FrameHandlerFactory f,
this.cf = new RecoveryAwareAMQConnectionFactory(params, f, addressResolver, metricsCollector);
this.params = params;

this.connectionRecoveryTriggeringCondition = params.getConnectionRecoveryTriggeringCondition() == null ?
DEFAULT_CONNECTION_RECOVERY_TRIGGERING_CONDITION : params.getConnectionRecoveryTriggeringCondition();

System.out.println(this.connectionRecoveryTriggeringCondition);

setupErrorOnWriteListenerForPotentialRecovery();

this.channels = new ConcurrentHashMap<Integer, AutorecoveringChannel>();
this.channels = new ConcurrentHashMap<>();
}

private void setupErrorOnWriteListenerForPotentialRecovery() {
Expand Down Expand Up @@ -484,16 +495,13 @@ private void addAutomaticRecoveryListener(final RecoveryAwareAMQConnection newCo
final AutorecoveringConnection c = this;
// this listener will run after shutdown listeners,
// see https://github.com/rabbitmq/rabbitmq-java-client/issues/135
RecoveryCanBeginListener starter = new RecoveryCanBeginListener() {
@Override
public void recoveryCanBegin(ShutdownSignalException cause) {
try {
if (shouldTriggerConnectionRecovery(cause)) {
c.beginAutomaticRecovery();
}
} catch (Exception e) {
newConn.getExceptionHandler().handleConnectionRecoveryException(c, e);
RecoveryCanBeginListener starter = cause -> {
try {
if (shouldTriggerConnectionRecovery(cause)) {
c.beginAutomaticRecovery();
}
} catch (Exception e) {
newConn.getExceptionHandler().handleConnectionRecoveryException(c, e);
}
};
synchronized (this) {
Expand All @@ -502,7 +510,7 @@ public void recoveryCanBegin(ShutdownSignalException cause) {
}

protected boolean shouldTriggerConnectionRecovery(ShutdownSignalException cause) {
return !cause.isInitiatedByApplication() || (cause.getCause() instanceof MissedHeartbeatException);
return connectionRecoveryTriggeringCondition.test(cause);
}

/**
Expand Down

0 comments on commit 2b251b0

Please sign in to comment.