diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
index c1331edd8561..5a16f3e5f6df 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java
@@ -130,6 +130,7 @@
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.LiveOnlyActivation;
+import org.apache.activemq.artemis.core.server.impl.ReplicationBackupActivation;
import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivation;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
@@ -1385,6 +1386,8 @@ public static final void waitForRemoteBackup(ClientSessionFactory sessionFactory
if (isReplicated) {
if (activation instanceof SharedNothingBackupActivation) {
isRemoteUpToDate = backup.isReplicaSync();
+ } else if (activation instanceof ReplicationBackupActivation) {
+ isRemoteUpToDate = backup.isReplicaSync();
} else {
//we may have already failed over and changed the Activation
if (actualServer.isStarted()) {
@@ -2523,6 +2526,9 @@ protected static ReplicationEndpoint getReplicationEndpoint(ActiveMQServer serve
if (activation instanceof SharedNothingBackupActivation) {
return ((SharedNothingBackupActivation) activation).getReplicationEndpoint();
}
+ if (activation instanceof ReplicationBackupActivation) {
+ return ((ReplicationBackupActivation) activation).getReplicationEndpoint();
+ }
return null;
}
diff --git a/tests/integration-tests/pom.xml b/tests/integration-tests/pom.xml
index 65140fa567df..4667def4c5de 100644
--- a/tests/integration-tests/pom.xml
+++ b/tests/integration-tests/pom.xml
@@ -50,6 +50,12 @@
test
test-jar
+
+ org.apache.activemq
+ artemis-quorum-ri
+ ${project.version}
+ test
+
org.apache.activemq.tests
unit-tests
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
index e0b72662ef7c..bf5363d38d89 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java
@@ -53,6 +53,8 @@
import org.apache.activemq.artemis.core.server.cluster.ha.HAPolicy;
import org.apache.activemq.artemis.core.server.cluster.ha.ReplicaPolicy;
import org.apache.activemq.artemis.core.server.cluster.ha.ReplicatedPolicy;
+import org.apache.activemq.artemis.core.server.cluster.ha.ReplicationBackupPolicy;
+import org.apache.activemq.artemis.core.server.cluster.ha.ReplicationPrimaryPolicy;
import org.apache.activemq.artemis.core.server.cluster.ha.SharedStoreMasterPolicy;
import org.apache.activemq.artemis.core.server.cluster.ha.SharedStoreSlavePolicy;
import org.apache.activemq.artemis.core.server.files.FileMoveManager;
@@ -778,7 +780,7 @@ public void testFailBack() throws Exception {
((ReplicaPolicy) haPolicy).setMaxSavedReplicatedJournalsSize(1);
}
- simpleFailover(haPolicy instanceof ReplicaPolicy, doFailBack);
+ simpleFailover(haPolicy instanceof ReplicaPolicy || haPolicy instanceof ReplicationBackupPolicy, doFailBack);
}
@Test(timeout = 120000)
@@ -808,7 +810,9 @@ public void testFailBackLiveRestartsBackupIsGone() throws Exception {
Thread.sleep(100);
Assert.assertFalse("backup is not running", backupServer.isStarted());
- Assert.assertFalse("must NOT be a backup", liveServer.getServer().getHAPolicy() instanceof BackupPolicy);
+ final boolean isBackup = liveServer.getServer().getHAPolicy() instanceof BackupPolicy ||
+ liveServer.getServer().getHAPolicy() instanceof ReplicationBackupPolicy;
+ Assert.assertFalse("must NOT be a backup", isBackup);
adaptLiveConfigForReplicatedFailBack(liveServer);
beforeRestart(liveServer);
liveServer.start();
@@ -819,7 +823,8 @@ public void testFailBackLiveRestartsBackupIsGone() throws Exception {
ClientSession session2 = createSession(sf, false, false);
session2.start();
ClientConsumer consumer2 = session2.createConsumer(FailoverTestBase.ADDRESS);
- boolean replication = liveServer.getServer().getHAPolicy() instanceof ReplicatedPolicy;
+ final boolean replication = liveServer.getServer().getHAPolicy() instanceof ReplicatedPolicy ||
+ liveServer.getServer().getHAPolicy() instanceof ReplicationPrimaryPolicy;
if (replication)
receiveMessages(consumer2, 0, NUM_MESSAGES, true);
assertNoMoreMessages(consumer2);
@@ -830,7 +835,7 @@ public void testFailBackLiveRestartsBackupIsGone() throws Exception {
public void testSimpleFailover() throws Exception {
HAPolicy haPolicy = backupServer.getServer().getHAPolicy();
- simpleFailover(haPolicy instanceof ReplicaPolicy, false);
+ simpleFailover(haPolicy instanceof ReplicaPolicy || haPolicy instanceof ReplicationBackupPolicy, false);
}
@Test(timeout = 120000)
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestBase.java
index 1609b5679c10..3fdbe67dce9e 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestBase.java
@@ -19,6 +19,7 @@
import java.io.IOException;
import java.net.ServerSocket;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -36,15 +37,19 @@
import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.ha.DistributedPrimitiveManagerConfiguration;
import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.SharedStoreMasterPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.SharedStoreSlavePolicyConfiguration;
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnector;
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMRegistry;
import org.apache.activemq.artemis.core.server.NodeManager;
+import org.apache.activemq.artemis.core.server.cluster.ha.HAPolicy;
import org.apache.activemq.artemis.core.server.cluster.ha.ReplicatedPolicy;
+import org.apache.activemq.artemis.core.server.cluster.ha.ReplicationPrimaryPolicy;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.InVMNodeManager;
+import org.apache.activemq.artemis.quorum.file.FileBasedPrimitiveManager;
import org.apache.activemq.artemis.tests.integration.cluster.util.SameProcessActiveMQServer;
import org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
@@ -52,9 +57,13 @@
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
public abstract class FailoverTestBase extends ActiveMQTestBase {
// Constants -----------------------------------------------------
+ @Rule
+ public TemporaryFolder tmpFolder = new TemporaryFolder();
protected static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress");
@@ -208,7 +217,34 @@ protected void createReplicatedConfigs() throws Exception {
liveServer = createTestableServer(liveConfig);
}
+ protected void createPluggableReplicatedConfigs() throws Exception {
+ final TransportConfiguration liveConnector = getConnectorTransportConfiguration(true);
+ final TransportConfiguration backupConnector = getConnectorTransportConfiguration(false);
+ final TransportConfiguration backupAcceptor = getAcceptorTransportConfiguration(false);
+
+ backupConfig = createDefaultInVMConfig();
+ liveConfig = createDefaultInVMConfig();
+
+ DistributedPrimitiveManagerConfiguration managerConfiguration =
+ new DistributedPrimitiveManagerConfiguration(FileBasedPrimitiveManager.class.getName(),
+ Collections.singletonMap("locks-folder", tmpFolder.newFolder("manager").toString()));
+
+ ReplicatedBackupUtils.configurePluggableQuorumReplicationPair(backupConfig, backupConnector, backupAcceptor, liveConfig, liveConnector, null, managerConfiguration, managerConfiguration);
+
+ backupConfig.setBindingsDirectory(getBindingsDir(0, true)).setJournalDirectory(getJournalDir(0, true)).setPagingDirectory(getPageDir(0, true)).setLargeMessagesDirectory(getLargeMessagesDir(0, true)).setSecurityEnabled(false);
+
+ setupHAPolicyConfiguration();
+ nodeManager = createReplicatedBackupNodeManager(backupConfig);
+
+ backupServer = createTestableServer(backupConfig);
+
+ liveConfig.clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true));
+
+ liveServer = createTestableServer(liveConfig);
+ }
+
protected void setupHAPolicyConfiguration() {
+ Assert.assertTrue(backupConfig.getHAPolicyConfiguration() instanceof ReplicaPolicyConfiguration);
((ReplicaPolicyConfiguration) backupConfig.getHAPolicyConfiguration()).setMaxSavedReplicatedJournalsSize(-1).setAllowFailBack(true);
((ReplicaPolicyConfiguration) backupConfig.getHAPolicyConfiguration()).setRestartBackup(false);
}
@@ -225,8 +261,13 @@ protected final void adaptLiveConfigForReplicatedFailBack(TestableServer server)
configuration.getConnectorConfigurations().put(backupConnector.getName(), backupConnector);
return;
}
- ReplicatedPolicy haPolicy = (ReplicatedPolicy) server.getServer().getHAPolicy();
- haPolicy.setCheckForLiveServer(true);
+ HAPolicy policy = server.getServer().getHAPolicy();
+ if (policy instanceof ReplicatedPolicy) {
+ ((ReplicatedPolicy) policy).setCheckForLiveServer(true);
+ } else if (policy instanceof ReplicationPrimaryPolicy) {
+ Assert.assertTrue("Adapting won't work for the current configuration", ((ReplicationPrimaryPolicy) policy).isCheckForLiveServer());
+ }
+
}
@Override
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/quorum/PluggableQuorumBackupAuthenticationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/quorum/PluggableQuorumBackupAuthenticationTest.java
new file mode 100644
index 000000000000..b1e6a94dc2ad
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/quorum/PluggableQuorumBackupAuthenticationTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.cluster.failover.quorum;
+
+import java.util.Arrays;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.Interceptor;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.core.config.ha.ReplicationBackupPolicyConfiguration;
+import org.apache.activemq.artemis.core.config.ha.ReplicationPrimaryPolicyConfiguration;
+import org.apache.activemq.artemis.core.protocol.core.Packet;
+import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.tests.integration.cluster.failover.FailoverTestBase;
+import org.apache.activemq.artemis.tests.integration.cluster.failover.FakeServiceComponent;
+import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static java.util.Arrays.asList;
+
+@RunWith(Parameterized.class)
+public class PluggableQuorumBackupAuthenticationTest extends FailoverTestBase {
+
+ private static CountDownLatch registrationStarted;
+
+ @Parameterized.Parameter
+ public boolean useNetty;
+
+ @Parameterized.Parameters(name = "useNetty={1}")
+ public static Iterable