From 362ef2b305df45346e4ee0fb7752c1bb03383166 Mon Sep 17 00:00:00 2001 From: franz1981 Date: Thu, 29 Apr 2021 11:29:08 +0200 Subject: [PATCH] ARTEMIS-2716 Support existing shared nothing replication tests --- .../artemis/tests/util/ActiveMQTestBase.java | 6 + tests/integration-tests/pom.xml | 6 + .../cluster/failover/FailoverTest.java | 13 +- .../cluster/failover/FailoverTestBase.java | 45 +++- ...uggableQuorumBackupAuthenticationTest.java | 121 +++++++++ .../PluggableQuorumBackupSyncJournalTest.java | 39 +++ ...orumExtraBackupReplicatedFailoverTest.java | 104 ++++++++ ...uggableQuorumNettyReplicationStopTest.java | 36 +++ ...rumPageCleanupWhileReplicaCatchupTest.java | 36 +++ ...PluggableQuorumReplicatedFailoverTest.java | 239 ++++++++++++++++++ ...rumReplicatedLargeMessageFailoverTest.java | 36 +++ ...atedLargeMessageWithDelayFailoverTest.java | 36 +++ ...bleQuorumReplicatedPagingFailoverTest.java | 35 +++ .../cluster/util/BackupSyncDelay.java | 16 +- ...gableQuorumReplicationFlowControlTest.java | 61 +++++ .../PluggableQuorumReplicationOrderTest.java | 63 +++++ .../PluggableQuorumReplicationTest.java | 110 ++++++++ .../replication/ReplicationTest.java | 29 ++- ...aredNothingReplicationFlowControlTest.java | 21 +- .../SharedNothingReplicationTest.java | 4 +- ...uorumBasicSecurityManagerFailoverTest.java | 128 ++++++++++ .../tests/util/ReplicatedBackupUtils.java | 29 +++ 22 files changed, 1194 insertions(+), 19 deletions(-) create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/quorum/PluggableQuorumBackupAuthenticationTest.java create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/quorum/PluggableQuorumBackupSyncJournalTest.java create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/quorum/PluggableQuorumExtraBackupReplicatedFailoverTest.java create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/quorum/PluggableQuorumNettyReplicationStopTest.java create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/quorum/PluggableQuorumPageCleanupWhileReplicaCatchupTest.java create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/quorum/PluggableQuorumReplicatedFailoverTest.java create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/quorum/PluggableQuorumReplicatedLargeMessageFailoverTest.java create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/quorum/PluggableQuorumReplicatedLargeMessageWithDelayFailoverTest.java create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/quorum/PluggableQuorumReplicatedPagingFailoverTest.java create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/PluggableQuorumReplicationFlowControlTest.java create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/PluggableQuorumReplicationOrderTest.java create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/PluggableQuorumReplicationTest.java create mode 100644 tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/security/PluggableQuorumBasicSecurityManagerFailoverTest.java diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java index c1331edd8561..5a16f3e5f6df 100644 --- a/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java +++ b/artemis-server/src/test/java/org/apache/activemq/artemis/tests/util/ActiveMQTestBase.java @@ -130,6 +130,7 @@ import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.server.impl.AddressInfo; import org.apache.activemq.artemis.core.server.impl.LiveOnlyActivation; +import org.apache.activemq.artemis.core.server.impl.ReplicationBackupActivation; import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivation; import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; @@ -1385,6 +1386,8 @@ public static final void waitForRemoteBackup(ClientSessionFactory sessionFactory if (isReplicated) { if (activation instanceof SharedNothingBackupActivation) { isRemoteUpToDate = backup.isReplicaSync(); + } else if (activation instanceof ReplicationBackupActivation) { + isRemoteUpToDate = backup.isReplicaSync(); } else { //we may have already failed over and changed the Activation if (actualServer.isStarted()) { @@ -2523,6 +2526,9 @@ protected static ReplicationEndpoint getReplicationEndpoint(ActiveMQServer serve if (activation instanceof SharedNothingBackupActivation) { return ((SharedNothingBackupActivation) activation).getReplicationEndpoint(); } + if (activation instanceof ReplicationBackupActivation) { + return ((ReplicationBackupActivation) activation).getReplicationEndpoint(); + } return null; } diff --git a/tests/integration-tests/pom.xml b/tests/integration-tests/pom.xml index 5618d7638225..319bce22040a 100644 --- a/tests/integration-tests/pom.xml +++ b/tests/integration-tests/pom.xml @@ -51,6 +51,12 @@ test test-jar + + org.apache.activemq + artemis-quorum-ri + ${project.version} + test + org.apache.activemq.tests unit-tests diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java index e0b72662ef7c..bf5363d38d89 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTest.java @@ -53,6 +53,8 @@ import org.apache.activemq.artemis.core.server.cluster.ha.HAPolicy; import org.apache.activemq.artemis.core.server.cluster.ha.ReplicaPolicy; import org.apache.activemq.artemis.core.server.cluster.ha.ReplicatedPolicy; +import org.apache.activemq.artemis.core.server.cluster.ha.ReplicationBackupPolicy; +import org.apache.activemq.artemis.core.server.cluster.ha.ReplicationPrimaryPolicy; import org.apache.activemq.artemis.core.server.cluster.ha.SharedStoreMasterPolicy; import org.apache.activemq.artemis.core.server.cluster.ha.SharedStoreSlavePolicy; import org.apache.activemq.artemis.core.server.files.FileMoveManager; @@ -778,7 +780,7 @@ public void testFailBack() throws Exception { ((ReplicaPolicy) haPolicy).setMaxSavedReplicatedJournalsSize(1); } - simpleFailover(haPolicy instanceof ReplicaPolicy, doFailBack); + simpleFailover(haPolicy instanceof ReplicaPolicy || haPolicy instanceof ReplicationBackupPolicy, doFailBack); } @Test(timeout = 120000) @@ -808,7 +810,9 @@ public void testFailBackLiveRestartsBackupIsGone() throws Exception { Thread.sleep(100); Assert.assertFalse("backup is not running", backupServer.isStarted()); - Assert.assertFalse("must NOT be a backup", liveServer.getServer().getHAPolicy() instanceof BackupPolicy); + final boolean isBackup = liveServer.getServer().getHAPolicy() instanceof BackupPolicy || + liveServer.getServer().getHAPolicy() instanceof ReplicationBackupPolicy; + Assert.assertFalse("must NOT be a backup", isBackup); adaptLiveConfigForReplicatedFailBack(liveServer); beforeRestart(liveServer); liveServer.start(); @@ -819,7 +823,8 @@ public void testFailBackLiveRestartsBackupIsGone() throws Exception { ClientSession session2 = createSession(sf, false, false); session2.start(); ClientConsumer consumer2 = session2.createConsumer(FailoverTestBase.ADDRESS); - boolean replication = liveServer.getServer().getHAPolicy() instanceof ReplicatedPolicy; + final boolean replication = liveServer.getServer().getHAPolicy() instanceof ReplicatedPolicy || + liveServer.getServer().getHAPolicy() instanceof ReplicationPrimaryPolicy; if (replication) receiveMessages(consumer2, 0, NUM_MESSAGES, true); assertNoMoreMessages(consumer2); @@ -830,7 +835,7 @@ public void testFailBackLiveRestartsBackupIsGone() throws Exception { public void testSimpleFailover() throws Exception { HAPolicy haPolicy = backupServer.getServer().getHAPolicy(); - simpleFailover(haPolicy instanceof ReplicaPolicy, false); + simpleFailover(haPolicy instanceof ReplicaPolicy || haPolicy instanceof ReplicationBackupPolicy, false); } @Test(timeout = 120000) diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestBase.java index 1609b5679c10..3fdbe67dce9e 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestBase.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/FailoverTestBase.java @@ -19,6 +19,7 @@ import java.io.IOException; import java.net.ServerSocket; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -36,15 +37,19 @@ import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal; import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration; import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.config.ha.DistributedPrimitiveManagerConfiguration; import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration; import org.apache.activemq.artemis.core.config.ha.SharedStoreMasterPolicyConfiguration; import org.apache.activemq.artemis.core.config.ha.SharedStoreSlavePolicyConfiguration; import org.apache.activemq.artemis.core.remoting.impl.invm.InVMConnector; import org.apache.activemq.artemis.core.remoting.impl.invm.InVMRegistry; import org.apache.activemq.artemis.core.server.NodeManager; +import org.apache.activemq.artemis.core.server.cluster.ha.HAPolicy; import org.apache.activemq.artemis.core.server.cluster.ha.ReplicatedPolicy; +import org.apache.activemq.artemis.core.server.cluster.ha.ReplicationPrimaryPolicy; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.server.impl.InVMNodeManager; +import org.apache.activemq.artemis.quorum.file.FileBasedPrimitiveManager; import org.apache.activemq.artemis.tests.integration.cluster.util.SameProcessActiveMQServer; import org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; @@ -52,9 +57,13 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; public abstract class FailoverTestBase extends ActiveMQTestBase { // Constants ----------------------------------------------------- + @Rule + public TemporaryFolder tmpFolder = new TemporaryFolder(); protected static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress"); @@ -208,7 +217,34 @@ protected void createReplicatedConfigs() throws Exception { liveServer = createTestableServer(liveConfig); } + protected void createPluggableReplicatedConfigs() throws Exception { + final TransportConfiguration liveConnector = getConnectorTransportConfiguration(true); + final TransportConfiguration backupConnector = getConnectorTransportConfiguration(false); + final TransportConfiguration backupAcceptor = getAcceptorTransportConfiguration(false); + + backupConfig = createDefaultInVMConfig(); + liveConfig = createDefaultInVMConfig(); + + DistributedPrimitiveManagerConfiguration managerConfiguration = + new DistributedPrimitiveManagerConfiguration(FileBasedPrimitiveManager.class.getName(), + Collections.singletonMap("locks-folder", tmpFolder.newFolder("manager").toString())); + + ReplicatedBackupUtils.configurePluggableQuorumReplicationPair(backupConfig, backupConnector, backupAcceptor, liveConfig, liveConnector, null, managerConfiguration, managerConfiguration); + + backupConfig.setBindingsDirectory(getBindingsDir(0, true)).setJournalDirectory(getJournalDir(0, true)).setPagingDirectory(getPageDir(0, true)).setLargeMessagesDirectory(getLargeMessagesDir(0, true)).setSecurityEnabled(false); + + setupHAPolicyConfiguration(); + nodeManager = createReplicatedBackupNodeManager(backupConfig); + + backupServer = createTestableServer(backupConfig); + + liveConfig.clearAcceptorConfigurations().addAcceptorConfiguration(getAcceptorTransportConfiguration(true)); + + liveServer = createTestableServer(liveConfig); + } + protected void setupHAPolicyConfiguration() { + Assert.assertTrue(backupConfig.getHAPolicyConfiguration() instanceof ReplicaPolicyConfiguration); ((ReplicaPolicyConfiguration) backupConfig.getHAPolicyConfiguration()).setMaxSavedReplicatedJournalsSize(-1).setAllowFailBack(true); ((ReplicaPolicyConfiguration) backupConfig.getHAPolicyConfiguration()).setRestartBackup(false); } @@ -225,8 +261,13 @@ protected final void adaptLiveConfigForReplicatedFailBack(TestableServer server) configuration.getConnectorConfigurations().put(backupConnector.getName(), backupConnector); return; } - ReplicatedPolicy haPolicy = (ReplicatedPolicy) server.getServer().getHAPolicy(); - haPolicy.setCheckForLiveServer(true); + HAPolicy policy = server.getServer().getHAPolicy(); + if (policy instanceof ReplicatedPolicy) { + ((ReplicatedPolicy) policy).setCheckForLiveServer(true); + } else if (policy instanceof ReplicationPrimaryPolicy) { + Assert.assertTrue("Adapting won't work for the current configuration", ((ReplicationPrimaryPolicy) policy).isCheckForLiveServer()); + } + } @Override diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/quorum/PluggableQuorumBackupAuthenticationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/quorum/PluggableQuorumBackupAuthenticationTest.java new file mode 100644 index 000000000000..b1e6a94dc2ad --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/quorum/PluggableQuorumBackupAuthenticationTest.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.cluster.failover.quorum; + +import java.util.Arrays; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.Interceptor; +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.core.config.ha.ReplicationBackupPolicyConfiguration; +import org.apache.activemq.artemis.core.config.ha.ReplicationPrimaryPolicyConfiguration; +import org.apache.activemq.artemis.core.protocol.core.Packet; +import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; +import org.apache.activemq.artemis.tests.integration.cluster.failover.FailoverTestBase; +import org.apache.activemq.artemis.tests.integration.cluster.failover.FakeServiceComponent; +import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils; +import org.apache.activemq.artemis.tests.util.Wait; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static java.util.Arrays.asList; + +@RunWith(Parameterized.class) +public class PluggableQuorumBackupAuthenticationTest extends FailoverTestBase { + + private static CountDownLatch registrationStarted; + + @Parameterized.Parameter + public boolean useNetty; + + @Parameterized.Parameters(name = "useNetty={1}") + public static Iterable getParams() { + return asList(new Object[][]{{false}, {true}}); + } + + @Override + @Before + public void setUp() throws Exception { + startBackupServer = false; + registrationStarted = new CountDownLatch(1); + super.setUp(); + } + + @Test + public void testWrongPasswordSetting() throws Exception { + FakeServiceComponent fakeServiceComponent = new FakeServiceComponent("fake web server"); + Wait.assertTrue(liveServer.getServer()::isActive); + waitForServerToStart(liveServer.getServer()); + backupServer.start(); + backupServer.getServer().addExternalComponent(fakeServiceComponent, true); + assertTrue(registrationStarted .await(5, TimeUnit.SECONDS)); + /* + * can't intercept the message at the backup, so we intercept the registration message at the + * live. + */ + Wait.waitFor(() -> !backupServer.isStarted()); + assertFalse("backup should have stopped", backupServer.isStarted()); + Wait.assertFalse(fakeServiceComponent::isStarted); + backupServer.stop(); + liveServer.stop(); + } + + @Override + protected void createConfigs() throws Exception { + createPluggableReplicatedConfigs(); + backupConfig.setClusterPassword("crocodile"); + liveConfig.setIncomingInterceptorClassNames(Arrays.asList(NotifyingInterceptor.class.getName())); + backupConfig.setSecurityEnabled(true); + liveConfig.setSecurityEnabled(true); + } + + @Override + protected void setupHAPolicyConfiguration() { + ((ReplicationPrimaryPolicyConfiguration) liveConfig.getHAPolicyConfiguration()).setCheckForLiveServer(true); + ((ReplicationBackupPolicyConfiguration) backupConfig.getHAPolicyConfiguration()).setMaxSavedReplicatedJournalsSize(2).setAllowFailBack(true); + } + + @Override + protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) { + return useNetty ? getNettyAcceptorTransportConfiguration(live) : + TransportConfigurationUtils.getInVMAcceptor(live); + } + + @Override + protected TransportConfiguration getConnectorTransportConfiguration(final boolean live) { + return useNetty ? getNettyConnectorTransportConfiguration(live) : + TransportConfigurationUtils.getInVMConnector(live); + } + + public static final class NotifyingInterceptor implements Interceptor { + + @Override + public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException { + if (packet.getType() == PacketImpl.BACKUP_REGISTRATION) { + registrationStarted.countDown(); + } else if (packet.getType() == PacketImpl.CLUSTER_CONNECT) { + registrationStarted.countDown(); + } + return true; + } + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/quorum/PluggableQuorumBackupSyncJournalTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/quorum/PluggableQuorumBackupSyncJournalTest.java new file mode 100644 index 000000000000..6f620a64f22b --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/quorum/PluggableQuorumBackupSyncJournalTest.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.cluster.failover.quorum; + +import org.apache.activemq.artemis.core.config.ha.ReplicationBackupPolicyConfiguration; +import org.apache.activemq.artemis.core.config.ha.ReplicationPrimaryPolicyConfiguration; +import org.apache.activemq.artemis.tests.integration.cluster.failover.BackupSyncJournalTest; + +public class PluggableQuorumBackupSyncJournalTest extends BackupSyncJournalTest { + + @Override + protected void createConfigs() throws Exception { + createPluggableReplicatedConfigs(); + } + + @Override + protected void setupHAPolicyConfiguration() { + ((ReplicationPrimaryPolicyConfiguration) liveConfig.getHAPolicyConfiguration()) + .setCheckForLiveServer(true); + ((ReplicationBackupPolicyConfiguration) backupConfig.getHAPolicyConfiguration()) + .setMaxSavedReplicatedJournalsSize(2) + .setAllowFailBack(true); + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/quorum/PluggableQuorumExtraBackupReplicatedFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/quorum/PluggableQuorumExtraBackupReplicatedFailoverTest.java new file mode 100644 index 000000000000..e9f7576dc060 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/quorum/PluggableQuorumExtraBackupReplicatedFailoverTest.java @@ -0,0 +1,104 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.cluster.failover.quorum; + +import java.util.Arrays; + +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.config.ha.ReplicationBackupPolicyConfiguration; +import org.apache.activemq.artemis.core.config.ha.ReplicationPrimaryPolicyConfiguration; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.tests.integration.cluster.failover.FailoverTestBase; +import org.apache.activemq.artemis.tests.integration.cluster.util.TestableServer; +import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils; +import org.apache.activemq.artemis.tests.util.Wait; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +public class PluggableQuorumExtraBackupReplicatedFailoverTest extends FailoverTestBase { + + private static final String GROUP_NAME = "foo"; + + @Parameterized.Parameter + public boolean useGroupName; + + @Parameterized.Parameters(name = "useGroupName={0}") + public static Iterable getParams() { + return Arrays.asList(new Object[][]{{false}, {true}}); + } + + @Override + protected void createConfigs() throws Exception { + createPluggableReplicatedConfigs(); + } + + @Override + protected void setupHAPolicyConfiguration() { + if (useGroupName) { + ((ReplicationPrimaryPolicyConfiguration) liveConfig.getHAPolicyConfiguration()).setGroupName(GROUP_NAME); + ((ReplicationBackupPolicyConfiguration) backupConfig.getHAPolicyConfiguration()).setGroupName(GROUP_NAME); + } + } + + @Override + protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) { + return TransportConfigurationUtils.getInVMAcceptor(live); + } + + @Override + protected TransportConfiguration getConnectorTransportConfiguration(final boolean live) { + return TransportConfigurationUtils.getInVMConnector(live); + } + + @Test + public void testExtraBackupReplicates() throws Exception { + Configuration secondBackupConfig = backupConfig.copy(); + String secondBackupGroupName = ((ReplicationBackupPolicyConfiguration) secondBackupConfig.getHAPolicyConfiguration()).getGroupName(); + Assert.assertEquals(((ReplicationBackupPolicyConfiguration) backupConfig.getHAPolicyConfiguration()).getGroupName(), + secondBackupGroupName); + if (useGroupName) { + Assert.assertEquals(GROUP_NAME, secondBackupGroupName); + } else { + Assert.assertNull(secondBackupGroupName); + } + TestableServer secondBackupServer = createTestableServer(secondBackupConfig); + secondBackupConfig.setBindingsDirectory(getBindingsDir(1, true)) + .setJournalDirectory(getJournalDir(1, true)) + .setPagingDirectory(getPageDir(1, true)) + .setLargeMessagesDirectory(getLargeMessagesDir(1, true)) + .setSecurityEnabled(false); + + waitForRemoteBackupSynchronization(backupServer.getServer()); + + secondBackupServer.start(); + Thread.sleep(5000); + backupServer.stop(); + waitForSync(secondBackupServer.getServer()); + waitForRemoteBackupSynchronization(secondBackupServer.getServer()); + + } + + private void waitForSync(ActiveMQServer server) throws Exception { + Wait.waitFor(server::isReplicaSync); + } + +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/quorum/PluggableQuorumNettyReplicationStopTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/quorum/PluggableQuorumNettyReplicationStopTest.java new file mode 100644 index 000000000000..1381cad3c417 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/quorum/PluggableQuorumNettyReplicationStopTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.cluster.failover.quorum; + +import org.apache.activemq.artemis.core.config.ha.ReplicationBackupPolicyConfiguration; +import org.apache.activemq.artemis.core.config.ha.ReplicationPrimaryPolicyConfiguration; +import org.apache.activemq.artemis.tests.integration.cluster.failover.NettyReplicationStopTest; + +public class PluggableQuorumNettyReplicationStopTest extends NettyReplicationStopTest { + + @Override + protected void createConfigs() throws Exception { + createPluggableReplicatedConfigs(); + } + + @Override + protected void setupHAPolicyConfiguration() { + ((ReplicationPrimaryPolicyConfiguration) liveConfig.getHAPolicyConfiguration()).setCheckForLiveServer(true); + ((ReplicationBackupPolicyConfiguration) backupConfig.getHAPolicyConfiguration()).setMaxSavedReplicatedJournalsSize(2).setAllowFailBack(true); + } + +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/quorum/PluggableQuorumPageCleanupWhileReplicaCatchupTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/quorum/PluggableQuorumPageCleanupWhileReplicaCatchupTest.java new file mode 100644 index 000000000000..b5b8b3af0279 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/quorum/PluggableQuorumPageCleanupWhileReplicaCatchupTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.cluster.failover.quorum; + +import org.apache.activemq.artemis.core.config.ha.ReplicationBackupPolicyConfiguration; +import org.apache.activemq.artemis.core.config.ha.ReplicationPrimaryPolicyConfiguration; +import org.apache.activemq.artemis.tests.integration.cluster.failover.PageCleanupWhileReplicaCatchupTest; + +public class PluggableQuorumPageCleanupWhileReplicaCatchupTest extends PageCleanupWhileReplicaCatchupTest { + + @Override + protected void createConfigs() throws Exception { + createPluggableReplicatedConfigs(); + } + + @Override + protected void setupHAPolicyConfiguration() { + ((ReplicationPrimaryPolicyConfiguration) liveConfig.getHAPolicyConfiguration()).setCheckForLiveServer(true); + ((ReplicationBackupPolicyConfiguration) backupConfig.getHAPolicyConfiguration()).setMaxSavedReplicatedJournalsSize(2).setAllowFailBack(true); + } + +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/quorum/PluggableQuorumReplicatedFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/quorum/PluggableQuorumReplicatedFailoverTest.java new file mode 100644 index 000000000000..b47200cbbd26 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/quorum/PluggableQuorumReplicatedFailoverTest.java @@ -0,0 +1,239 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.cluster.failover.quorum; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.concurrent.TimeUnit; + +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpHandler; +import com.sun.net.httpserver.HttpServer; +import org.apache.activemq.artemis.api.core.QueueConfiguration; +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.component.WebServerComponent; +import org.apache.activemq.artemis.core.config.ha.ReplicationBackupPolicyConfiguration; +import org.apache.activemq.artemis.core.config.ha.ReplicationPrimaryPolicyConfiguration; +import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.ServiceComponent; +import org.apache.activemq.artemis.dto.AppDTO; +import org.apache.activemq.artemis.dto.WebServerDTO; +import org.apache.activemq.artemis.tests.integration.cluster.failover.FailoverTest; +import org.apache.activemq.artemis.tests.util.Wait; +import org.junit.Assert; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static java.util.Arrays.asList; + +@RunWith(Parameterized.class) +public class PluggableQuorumReplicatedFailoverTest extends FailoverTest { + + private static final String GROUP_NAME = "foo"; + + @Parameterized.Parameter + public boolean useGroupName; + @Parameterized.Parameter(1) + public boolean useNetty; + + @Parameterized.Parameters(name = "useGroupName={0}, useNetty={1}") + public static Iterable getParams() { + return asList(new Object[][]{ + {false, false}, + {false, true}, + {true, false}, + {true, true}}); + } + + + protected void beforeWaitForRemoteBackupSynchronization() { + } + + private void waitForSync(ActiveMQServer server) throws Exception { + Wait.waitFor(server::isReplicaSync); + } + + /** + * Default maxSavedReplicatedJournalsSize is 2, this means the backup will fall back to replicated only twice, after this + * it is stopped permanently. + */ + @Test(timeout = 120000) + public void testReplicatedFailback() throws Exception { + try { + beforeWaitForRemoteBackupSynchronization(); + + waitForSync(backupServer.getServer()); + + createSessionFactory(); + + ClientSession session = createSession(sf, true, true); + + session.createQueue(new QueueConfiguration(ADDRESS)); + + crash(session); + + liveServer.start(); + + waitForSync(liveServer.getServer()); + + waitForSync(backupServer.getServer()); + + waitForServerToStart(liveServer.getServer()); + + session = createSession(sf, true, true); + + crash(session); + + liveServer.start(); + + waitForSync(liveServer.getServer()); + + waitForSync(backupServer.getServer()); + + waitForServerToStart(liveServer.getServer()); + + session = createSession(sf, true, true); + + crash(session); + + liveServer.start(); + + waitForSync(liveServer.getServer()); + + liveServer.getServer().waitForActivation(5, TimeUnit.SECONDS); + + waitForSync(liveServer.getServer()); + + waitForServerToStart(backupServer.getServer()); + + assertTrue(backupServer.getServer().isStarted()); + + } finally { + if (sf != null) { + sf.close(); + } + try { + liveServer.getServer().stop(); + } catch (Throwable ignored) { + } + try { + backupServer.getServer().stop(); + } catch (Throwable ignored) { + } + } + } + + @Test + public void testReplicatedFailbackBackupFromLiveBackToBackup() throws Exception { + + InetSocketAddress address = new InetSocketAddress("127.0.0.1", 8787); + HttpServer httpServer = HttpServer.create(address, 100); + httpServer.start(); + + try { + httpServer.createContext("/", new HttpHandler() { + @Override + public void handle(HttpExchange t) throws IOException { + String response = "This is a unit test"; + t.sendResponseHeaders(200, response.length()); + OutputStream os = t.getResponseBody(); + os.write(response.getBytes()); + os.close(); + } + }); + WebServerDTO wdto = new WebServerDTO(); + AppDTO appDTO = new AppDTO(); + appDTO.war = "console.war"; + appDTO.url = "console"; + wdto.apps = new ArrayList(); + wdto.apps.add(appDTO); + wdto.bind = "http://localhost:0"; + wdto.path = "console"; + WebServerComponent webServerComponent = new WebServerComponent(); + webServerComponent.configure(wdto, ".", "."); + webServerComponent.start(); + + backupServer.getServer().getNetworkHealthCheck().parseURIList("http://localhost:8787"); + Assert.assertTrue(backupServer.getServer().getNetworkHealthCheck().isStarted()); + backupServer.getServer().addExternalComponent(webServerComponent, false); + // this is called when backup servers go from live back to backup + backupServer.getServer().fail(true); + Assert.assertTrue(backupServer.getServer().getNetworkHealthCheck().isStarted()); + Assert.assertTrue(backupServer.getServer().getExternalComponents().get(0).isStarted()); + ((ServiceComponent) (backupServer.getServer().getExternalComponents().get(0))).stop(true); + } finally { + httpServer.stop(0); + } + + } + + @Override + protected void createConfigs() throws Exception { + createPluggableReplicatedConfigs(); + } + + @Override + protected void setupHAPolicyConfiguration() { + ((ReplicationPrimaryPolicyConfiguration) liveConfig.getHAPolicyConfiguration()) + .setCheckForLiveServer(true) + .setGroupName(useGroupName ? GROUP_NAME : null); + ((ReplicationBackupPolicyConfiguration) backupConfig.getHAPolicyConfiguration()) + .setMaxSavedReplicatedJournalsSize(2) + .setAllowFailBack(true) + .setGroupName(useGroupName ? GROUP_NAME : null); + } + + @Override + protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) { + return useNetty ? getNettyAcceptorTransportConfiguration(live) : + super.getAcceptorTransportConfiguration(live); + } + + @Override + protected TransportConfiguration getConnectorTransportConfiguration(final boolean live) { + return useNetty ? getNettyConnectorTransportConfiguration(live) : + super.getConnectorTransportConfiguration(live); + } + + @Override + protected void crash(boolean waitFailure, ClientSession... sessions) throws Exception { + if (sessions.length > 0) { + for (ClientSession session : sessions) { + waitForRemoteBackup(session.getSessionFactory(), 5, true, backupServer.getServer()); + } + } else { + waitForRemoteBackup(null, 5, true, backupServer.getServer()); + } + super.crash(waitFailure, sessions); + } + + @Override + protected void crash(ClientSession... sessions) throws Exception { + if (sessions.length > 0) { + for (ClientSession session : sessions) { + waitForRemoteBackup(session.getSessionFactory(), 5, true, backupServer.getServer()); + } + } else { + waitForRemoteBackup(null, 5, true, backupServer.getServer()); + } + super.crash(sessions); + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/quorum/PluggableQuorumReplicatedLargeMessageFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/quorum/PluggableQuorumReplicatedLargeMessageFailoverTest.java new file mode 100644 index 000000000000..23e728154b24 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/quorum/PluggableQuorumReplicatedLargeMessageFailoverTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.cluster.failover.quorum; + +import org.apache.activemq.artemis.core.config.ha.ReplicationBackupPolicyConfiguration; +import org.apache.activemq.artemis.core.config.ha.ReplicationPrimaryPolicyConfiguration; +import org.apache.activemq.artemis.tests.integration.cluster.failover.ReplicatedLargeMessageFailoverTest; + +public class PluggableQuorumReplicatedLargeMessageFailoverTest extends ReplicatedLargeMessageFailoverTest { + + @Override + protected void createConfigs() throws Exception { + createPluggableReplicatedConfigs(); + } + + @Override + protected void setupHAPolicyConfiguration() { + ((ReplicationPrimaryPolicyConfiguration) liveConfig.getHAPolicyConfiguration()).setCheckForLiveServer(true); + ((ReplicationBackupPolicyConfiguration) backupConfig.getHAPolicyConfiguration()).setMaxSavedReplicatedJournalsSize(2).setAllowFailBack(true); + } + +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/quorum/PluggableQuorumReplicatedLargeMessageWithDelayFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/quorum/PluggableQuorumReplicatedLargeMessageWithDelayFailoverTest.java new file mode 100644 index 000000000000..bf23c9b1246f --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/quorum/PluggableQuorumReplicatedLargeMessageWithDelayFailoverTest.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.cluster.failover.quorum; + +import org.apache.activemq.artemis.core.config.ha.ReplicationBackupPolicyConfiguration; +import org.apache.activemq.artemis.core.config.ha.ReplicationPrimaryPolicyConfiguration; +import org.apache.activemq.artemis.tests.integration.cluster.failover.ReplicatedLargeMessageWithDelayFailoverTest; + +public class PluggableQuorumReplicatedLargeMessageWithDelayFailoverTest extends ReplicatedLargeMessageWithDelayFailoverTest { + + @Override + protected void createConfigs() throws Exception { + createPluggableReplicatedConfigs(); + } + + @Override + protected void setupHAPolicyConfiguration() { + ((ReplicationPrimaryPolicyConfiguration) liveConfig.getHAPolicyConfiguration()).setCheckForLiveServer(true); + ((ReplicationBackupPolicyConfiguration) backupConfig.getHAPolicyConfiguration()).setMaxSavedReplicatedJournalsSize(2).setAllowFailBack(true); + } + +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/quorum/PluggableQuorumReplicatedPagingFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/quorum/PluggableQuorumReplicatedPagingFailoverTest.java new file mode 100644 index 000000000000..d80460855ba3 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/failover/quorum/PluggableQuorumReplicatedPagingFailoverTest.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.cluster.failover.quorum; + +import org.apache.activemq.artemis.core.config.ha.ReplicationBackupPolicyConfiguration; +import org.apache.activemq.artemis.core.config.ha.ReplicationPrimaryPolicyConfiguration; +import org.apache.activemq.artemis.tests.integration.cluster.failover.PagingFailoverTest; + +public class PluggableQuorumReplicatedPagingFailoverTest extends PagingFailoverTest { + + @Override + protected void createConfigs() throws Exception { + createPluggableReplicatedConfigs(); + } + + @Override + protected void setupHAPolicyConfiguration() { + ((ReplicationPrimaryPolicyConfiguration) liveConfig.getHAPolicyConfiguration()).setCheckForLiveServer(true); + ((ReplicationBackupPolicyConfiguration) backupConfig.getHAPolicyConfiguration()).setMaxSavedReplicatedJournalsSize(2).setAllowFailBack(true); + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java index 9ddffd389a2e..b0af71b3675a 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/cluster/util/BackupSyncDelay.java @@ -32,6 +32,8 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationStartSyncMessage; import org.apache.activemq.artemis.core.replication.ReplicationEndpoint; import org.apache.activemq.artemis.core.server.ActiveMQServer; +import org.apache.activemq.artemis.core.server.impl.Activation; +import org.apache.activemq.artemis.core.server.impl.ReplicationBackupActivation; import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivation; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; @@ -94,8 +96,18 @@ public BackupSyncDelay(TestableServer backupServer, TestableServer liveServer) { public boolean intercept(Packet packet, RemotingConnection connection) throws ActiveMQException { if (packet.getType() == PacketImpl.BACKUP_REGISTRATION) { try { - SharedNothingBackupActivation activation = (SharedNothingBackupActivation) backup.getActivation(); - ReplicationEndpoint repEnd = activation.getReplicationEndpoint(); + Activation backupActivation = backup.getActivation(); + ReplicationEndpoint repEnd = null; + if (backupActivation instanceof SharedNothingBackupActivation) { + SharedNothingBackupActivation activation = (SharedNothingBackupActivation) backupActivation; + repEnd = activation.getReplicationEndpoint(); + } else if (backupActivation instanceof ReplicationBackupActivation) { + ReplicationBackupActivation activation = (ReplicationBackupActivation) backupActivation; + repEnd = activation.getReplicationEndpoint(); + } + if (repEnd == null) { + throw new NullPointerException("replication endpoint isn't supposed to be null"); + } handler.addSubHandler(repEnd); Channel repChannel = repEnd.getChannel(); repChannel.setHandler(handler); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/PluggableQuorumReplicationFlowControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/PluggableQuorumReplicationFlowControlTest.java new file mode 100644 index 000000000000..a359af9d87e1 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/PluggableQuorumReplicationFlowControlTest.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.replication; + +import java.io.IOException; +import java.util.Collections; + +import org.apache.activemq.artemis.core.config.HAPolicyConfiguration; +import org.apache.activemq.artemis.core.config.ha.DistributedPrimitiveManagerConfiguration; +import org.apache.activemq.artemis.core.config.ha.ReplicationBackupPolicyConfiguration; +import org.apache.activemq.artemis.core.config.ha.ReplicationPrimaryPolicyConfiguration; +import org.apache.activemq.artemis.quorum.file.FileBasedPrimitiveManager; +import org.junit.Before; +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; + +public class PluggableQuorumReplicationFlowControlTest extends SharedNothingReplicationFlowControlTest { + + private DistributedPrimitiveManagerConfiguration managerConfiguration; + @Rule + public TemporaryFolder tmpFolder = new TemporaryFolder(); + + @Before + public void init() throws IOException { + managerConfiguration = new DistributedPrimitiveManagerConfiguration(FileBasedPrimitiveManager.class.getName(), Collections.singletonMap("locks-folder", tmpFolder.newFolder("manager").toString())); + } + + @Override + protected HAPolicyConfiguration createReplicationBackupConfiguration() { + ReplicationBackupPolicyConfiguration haPolicy = ReplicationBackupPolicyConfiguration.withDefault(); + haPolicy.setDistributedManagerConfiguration(managerConfiguration); + haPolicy.setClusterName("cluster"); + // fail-fast in order to let the backup to quickly retry syncing with primary + haPolicy.setQuorumVoteWait(1); + haPolicy.setVoteRetries(0); + return haPolicy; + } + + @Override + protected HAPolicyConfiguration createReplicationLiveConfiguration() { + ReplicationPrimaryPolicyConfiguration haPolicy = ReplicationPrimaryPolicyConfiguration.withDefault(); + haPolicy.setDistributedManagerConfiguration(managerConfiguration); + haPolicy.setCheckForLiveServer(false); + return haPolicy; + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/PluggableQuorumReplicationOrderTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/PluggableQuorumReplicationOrderTest.java new file mode 100644 index 000000000000..f5ed720792b0 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/PluggableQuorumReplicationOrderTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.replication; + +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.core.config.ha.ReplicationBackupPolicyConfiguration; +import org.apache.activemq.artemis.core.config.ha.ReplicationPrimaryPolicyConfiguration; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import static java.util.Arrays.asList; + +@RunWith(Parameterized.class) +public class PluggableQuorumReplicationOrderTest extends ReplicationOrderTest { + + @Parameterized.Parameter + public boolean useNetty; + + @Parameterized.Parameters(name = "useNetty={1}") + public static Iterable getParams() { + return asList(new Object[][]{{false}, {true}}); + } + + @Override + protected void createConfigs() throws Exception { + createPluggableReplicatedConfigs(); + } + + @Override + protected void setupHAPolicyConfiguration() { + ((ReplicationPrimaryPolicyConfiguration) liveConfig.getHAPolicyConfiguration()) + .setCheckForLiveServer(true); + ((ReplicationBackupPolicyConfiguration) backupConfig.getHAPolicyConfiguration()) + .setMaxSavedReplicatedJournalsSize(2) + .setAllowFailBack(true); + } + + @Override + protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) { + return useNetty ? getNettyAcceptorTransportConfiguration(live) : + super.getAcceptorTransportConfiguration(live); + } + + @Override + protected TransportConfiguration getConnectorTransportConfiguration(final boolean live) { + return useNetty ? getNettyConnectorTransportConfiguration(live) : + super.getConnectorTransportConfiguration(live); + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/PluggableQuorumReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/PluggableQuorumReplicationTest.java new file mode 100644 index 000000000000..b125666bd11a --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/PluggableQuorumReplicationTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.replication; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; + +import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration; +import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.config.ha.DistributedPrimitiveManagerConfiguration; +import org.apache.activemq.artemis.core.config.ha.ReplicationBackupPolicyConfiguration; +import org.apache.activemq.artemis.core.config.ha.ReplicationPrimaryPolicyConfiguration; +import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; +import org.apache.activemq.artemis.core.server.JournalType; +import org.apache.activemq.artemis.quorum.file.FileBasedPrimitiveManager; +import org.junit.Before; +import org.junit.Rule; +import org.junit.rules.TemporaryFolder; + +public class PluggableQuorumReplicationTest extends SharedNothingReplicationTest { + + private DistributedPrimitiveManagerConfiguration managerConfiguration; + @Rule + public TemporaryFolder tmpFolder = new TemporaryFolder(); + + @Before + public void init() throws IOException { + managerConfiguration = + new DistributedPrimitiveManagerConfiguration(FileBasedPrimitiveManager.class.getName(), + Collections.singletonMap("locks-folder", tmpFolder.newFolder("manager").toString())); + } + + @Override + protected Configuration createLiveConfiguration() throws Exception { + Configuration conf = new ConfigurationImpl(); + conf.setName("localhost::live"); + + File liveDir = brokersFolder.newFolder("live"); + conf.setBrokerInstance(liveDir); + + conf.addAcceptorConfiguration("live", "tcp://localhost:61616"); + conf.addConnectorConfiguration("backup", "tcp://localhost:61617"); + conf.addConnectorConfiguration("live", "tcp://localhost:61616"); + + conf.setClusterUser("mycluster"); + conf.setClusterPassword("mypassword"); + + ReplicationPrimaryPolicyConfiguration haPolicy = ReplicationPrimaryPolicyConfiguration.withDefault(); + haPolicy.setDistributedManagerConfiguration(managerConfiguration); + haPolicy.setCheckForLiveServer(false); + conf.setHAPolicyConfiguration(haPolicy); + + ClusterConnectionConfiguration ccconf = new ClusterConnectionConfiguration(); + ccconf.setStaticConnectors(new ArrayList<>()).getStaticConnectors().add("backup"); + ccconf.setName("cluster"); + ccconf.setConnectorName("live"); + conf.addClusterConfiguration(ccconf); + + conf.setSecurityEnabled(false).setJMXManagementEnabled(false).setJournalType(JournalType.MAPPED).setJournalFileSize(1024 * 512).setConnectionTTLOverride(60_000L); + + return conf; + } + + @Override + protected Configuration createBackupConfiguration() throws Exception { + Configuration conf = new ConfigurationImpl(); + conf.setName("localhost::backup"); + + File backupDir = brokersFolder.newFolder("backup"); + conf.setBrokerInstance(backupDir); + + ReplicationBackupPolicyConfiguration haPolicy = ReplicationBackupPolicyConfiguration.withDefault(); + haPolicy.setDistributedManagerConfiguration(managerConfiguration); + haPolicy.setClusterName("cluster"); + conf.setHAPolicyConfiguration(haPolicy); + + conf.addAcceptorConfiguration("backup", "tcp://localhost:61617"); + conf.addConnectorConfiguration("live", "tcp://localhost:61616"); + conf.addConnectorConfiguration("backup", "tcp://localhost:61617"); + + conf.setClusterUser("mycluster"); + conf.setClusterPassword("mypassword"); + + ClusterConnectionConfiguration ccconf = new ClusterConnectionConfiguration(); + ccconf.setStaticConnectors(new ArrayList<>()).getStaticConnectors().add("live"); + ccconf.setName("cluster"); + ccconf.setConnectorName("backup"); + conf.addClusterConfiguration(ccconf); + + conf.setSecurityEnabled(false).setJMXManagementEnabled(false).setJournalType(JournalType.MAPPED).setJournalFileSize(1024 * 512).setConnectionTTLOverride(60_000L); + + return conf; + } +} diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java index d344a17ae508..94070489fa78 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/ReplicationTest.java @@ -18,6 +18,7 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; @@ -48,6 +49,7 @@ import org.apache.activemq.artemis.api.core.client.ServerLocator; import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration; import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.config.ha.DistributedPrimitiveManagerConfiguration; import org.apache.activemq.artemis.core.config.ha.SharedStoreSlavePolicyConfiguration; import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.SequentialFileFactory; @@ -84,6 +86,7 @@ import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.settings.HierarchicalRepository; import org.apache.activemq.artemis.core.settings.impl.AddressSettings; +import org.apache.activemq.artemis.quorum.file.FileBasedPrimitiveManager; import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ReplicatedBackupUtils; @@ -96,10 +99,26 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +@RunWith(Parameterized.class) public final class ReplicationTest extends ActiveMQTestBase { + @Rule + public TemporaryFolder tmpFolder = new TemporaryFolder(); + + @Parameterized.Parameter + public boolean pluggableQuorum; + + @Parameterized.Parameters(name = "PluggableQuorum={0}") + public static Iterable data() { + return Arrays.asList(new Object[][]{{true}, {false}}); + } + private ThreadFactory tFactory; private ExecutorService executor; private ExecutorFactory factory; @@ -143,7 +162,15 @@ private void setupServer(boolean useNetty, Configuration backupConfig = createDefaultInVMConfig().setHAPolicyConfiguration(new SharedStoreSlavePolicyConfiguration()).setBindingsDirectory(getBindingsDir(0, true)).setJournalDirectory(getJournalDir(0, true)).setPagingDirectory(getPageDir(0, true)).setLargeMessagesDirectory(getLargeMessagesDir(0, true)).setIncomingInterceptorClassNames(incomingInterceptors.length > 0 ? Arrays.asList(incomingInterceptors) : new ArrayList()); - ReplicatedBackupUtils.configureReplicationPair(backupConfig, backupConnector, backupAcceptor, liveConfig, liveConnector, liveAcceptor); + if (!pluggableQuorum) { + ReplicatedBackupUtils.configureReplicationPair(backupConfig, backupConnector, backupAcceptor, liveConfig, liveConnector, liveAcceptor); + } else { + DistributedPrimitiveManagerConfiguration managerConfiguration = + new DistributedPrimitiveManagerConfiguration(FileBasedPrimitiveManager.class.getName(), + Collections.singletonMap("locks-folder", tmpFolder.newFolder("manager").toString())); + + ReplicatedBackupUtils.configurePluggableQuorumReplicationPair(backupConfig, backupConnector, backupAcceptor, liveConfig, liveConnector, liveAcceptor, managerConfiguration, managerConfiguration); + } if (extraConfig != null) { extraConfig.config(liveConfig, backupConfig); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationFlowControlTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationFlowControlTest.java index f2a8a2830976..1d923731d601 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationFlowControlTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationFlowControlTest.java @@ -47,6 +47,7 @@ import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl; import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration; import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.config.HAPolicyConfiguration; import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration; import org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration; import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl; @@ -173,7 +174,6 @@ public void testReplicationIfFlowControlled() throws Exception { SequentialFileFactory fileFactory; - File liveJournalDir = brokersFolder.getRoot().toPath().resolve("live").resolve("data").resolve("journal").toFile(); fileFactory = new MappedSequentialFileFactory(liveConfiguration.getJournalLocation(), liveConfiguration.getJournalFileSize(), false, liveConfiguration.getJournalBufferSize_NIO(), liveConfiguration.getJournalBufferTimeout_NIO(), null); JournalImpl liveMessageJournal = new JournalImpl(liveConfiguration.getJournalFileSize(), liveConfiguration.getJournalMinFiles(), liveConfiguration.getJournalPoolFiles(), liveConfiguration.getJournalCompactMinFiles(), liveConfiguration.getJournalCompactPercentage(), fileFactory, "activemq-data", "amq", fileFactory.getMaxIO()); @@ -355,6 +355,12 @@ public synchronized void close(boolean waitSync, boolean block) throws IOExcepti } } + protected HAPolicyConfiguration createReplicationLiveConfiguration() { + return new ReplicatedPolicyConfiguration() + .setVoteOnReplicationFailure(false) + .setCheckForLiveServer(false); + } + // Set a small call timeout and write buffer high water mark value to trigger replication flow control private Configuration createLiveConfiguration() throws Exception { Configuration conf = new ConfigurationImpl(); @@ -370,10 +376,7 @@ private Configuration createLiveConfiguration() throws Exception { conf.setClusterUser("mycluster"); conf.setClusterPassword("mypassword"); - ReplicatedPolicyConfiguration haPolicy = new ReplicatedPolicyConfiguration(); - haPolicy.setVoteOnReplicationFailure(false); - haPolicy.setCheckForLiveServer(false); - conf.setHAPolicyConfiguration(haPolicy); + conf.setHAPolicyConfiguration(createReplicationLiveConfiguration()); ClusterConnectionConfiguration ccconf = new ClusterConnectionConfiguration(); ccconf.setStaticConnectors(new ArrayList<>()).getStaticConnectors().add("backup"); @@ -387,6 +390,10 @@ private Configuration createLiveConfiguration() throws Exception { return conf; } + protected HAPolicyConfiguration createReplicationBackupConfiguration() { + return new ReplicaPolicyConfiguration().setClusterName("cluster"); + } + private Configuration createBackupConfiguration() throws Exception { Configuration conf = new ConfigurationImpl(); conf.setName("localhost::backup"); @@ -394,9 +401,7 @@ private Configuration createBackupConfiguration() throws Exception { File backupDir = brokersFolder.newFolder("backup"); conf.setBrokerInstance(backupDir); - ReplicaPolicyConfiguration haPolicy = new ReplicaPolicyConfiguration(); - haPolicy.setClusterName("cluster"); - conf.setHAPolicyConfiguration(haPolicy); + conf.setHAPolicyConfiguration(createReplicationBackupConfiguration()); conf.addAcceptorConfiguration("backup", "tcp://localhost:61617"); conf.addConnectorConfiguration("live", "tcp://localhost:61616"); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationTest.java index 7cc8bb93978a..fc04b3b379c9 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/replication/SharedNothingReplicationTest.java @@ -237,7 +237,7 @@ public void addRecord(RecordInfo info) { Assert.assertTrue("The test is not valid, slow persister stopped being used", SlowMessagePersister._getInstance().used); } - private Configuration createLiveConfiguration() throws Exception { + protected Configuration createLiveConfiguration() throws Exception { Configuration conf = new ConfigurationImpl(); conf.setName("localhost::live"); @@ -267,7 +267,7 @@ private Configuration createLiveConfiguration() throws Exception { return conf; } - private Configuration createBackupConfiguration() throws Exception { + protected Configuration createBackupConfiguration() throws Exception { Configuration conf = new ConfigurationImpl(); conf.setName("localhost::backup"); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/security/PluggableQuorumBasicSecurityManagerFailoverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/security/PluggableQuorumBasicSecurityManagerFailoverTest.java new file mode 100644 index 000000000000..60f27a13c272 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/security/PluggableQuorumBasicSecurityManagerFailoverTest.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.tests.integration.security; + +import java.util.Collections; + +import org.apache.activemq.artemis.api.core.ActiveMQException; +import org.apache.activemq.artemis.api.core.TransportConfiguration; +import org.apache.activemq.artemis.api.core.client.ClientSession; +import org.apache.activemq.artemis.api.core.client.ClientSessionFactory; +import org.apache.activemq.artemis.core.config.ha.DistributedPrimitiveManagerConfiguration; +import org.apache.activemq.artemis.core.config.ha.ReplicationBackupPolicyConfiguration; +import org.apache.activemq.artemis.core.config.ha.ReplicationPrimaryPolicyConfiguration; +import org.apache.activemq.artemis.quorum.file.FileBasedPrimitiveManager; +import org.apache.activemq.artemis.spi.core.security.ActiveMQBasicSecurityManager; +import org.apache.activemq.artemis.tests.integration.cluster.failover.FailoverTestBase; +import org.apache.activemq.artemis.tests.util.ReplicatedBackupUtils; +import org.apache.activemq.artemis.tests.util.TransportConfigurationUtils; +import org.junit.Assert; +import org.junit.Test; + +public class PluggableQuorumBasicSecurityManagerFailoverTest extends FailoverTestBase { + + @Override + protected void createConfigs() throws Exception { + createPluggableReplicatedConfigs(); + } + + @Override + protected void createPluggableReplicatedConfigs() throws Exception { + final TransportConfiguration liveConnector = getConnectorTransportConfiguration(true); + final TransportConfiguration backupConnector = getConnectorTransportConfiguration(false); + final TransportConfiguration backupAcceptor = getAcceptorTransportConfiguration(false); + + backupConfig = createDefaultInVMConfig(); + liveConfig = createDefaultInVMConfig(); + + DistributedPrimitiveManagerConfiguration managerConfiguration = + new DistributedPrimitiveManagerConfiguration(FileBasedPrimitiveManager.class.getName(), + Collections.singletonMap("locks-folder", + tmpFolder.newFolder("manager").toString())); + + ReplicatedBackupUtils.configurePluggableQuorumReplicationPair(backupConfig, backupConnector, backupAcceptor, + liveConfig, liveConnector, null, + managerConfiguration, managerConfiguration); + + backupConfig + .setSecurityEnabled(true) + .setBindingsDirectory(getBindingsDir(0, true)) + .setJournalDirectory(getJournalDir(0, true)) + .setPagingDirectory(getPageDir(0, true)) + .setLargeMessagesDirectory(getLargeMessagesDir(0, true)); + + setupHAPolicyConfiguration(); + nodeManager = createReplicatedBackupNodeManager(backupConfig); + + backupServer = createTestableServer(backupConfig); + + backupServer.getServer().setSecurityManager(new ActiveMQBasicSecurityManager()); + + liveConfig + .setSecurityEnabled(true) + .clearAcceptorConfigurations() + .addAcceptorConfiguration(getAcceptorTransportConfiguration(true)); + + liveServer = createTestableServer(liveConfig); + + liveServer.getServer().setSecurityManager(new ActiveMQBasicSecurityManager()); + } + + @Override + protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live) { + return TransportConfigurationUtils.getInVMAcceptor(live); + } + + @Override + protected TransportConfiguration getConnectorTransportConfiguration(final boolean live) { + return TransportConfigurationUtils.getInVMConnector(live); + } + + @Override + protected void setupHAPolicyConfiguration() { + ((ReplicationPrimaryPolicyConfiguration) liveConfig.getHAPolicyConfiguration()).setCheckForLiveServer(true); + ((ReplicationBackupPolicyConfiguration) backupConfig.getHAPolicyConfiguration()).setMaxSavedReplicatedJournalsSize(2).setAllowFailBack(true); + } + + @Test + public void testFailover() throws Exception { + + liveServer.getServer().getActiveMQServerControl().addUser("foo", "bar", "baz", false); + + ClientSessionFactory cf = createSessionFactory(getServerLocator()); + ClientSession session = null; + + try { + session = cf.createSession("foo", "bar", false, true, true, false, 0); + } catch (ActiveMQException e) { + e.printStackTrace(); + Assert.fail("should not throw exception"); + } + + crash(session); + waitForServerToStart(backupServer.getServer()); + + try { + cf = createSessionFactory(getServerLocator()); + session = cf.createSession("foo", "bar", false, true, true, false, 0); + } catch (ActiveMQException e) { + e.printStackTrace(); + Assert.fail("should not throw exception"); + } + } +} + diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/ReplicatedBackupUtils.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/ReplicatedBackupUtils.java index 1a38a6ac7020..e2a4fcf8f47d 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/ReplicatedBackupUtils.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/util/ReplicatedBackupUtils.java @@ -18,8 +18,11 @@ import org.apache.activemq.artemis.api.core.TransportConfiguration; import org.apache.activemq.artemis.core.config.Configuration; +import org.apache.activemq.artemis.core.config.ha.DistributedPrimitiveManagerConfiguration; import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration; import org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration; +import org.apache.activemq.artemis.core.config.ha.ReplicationBackupPolicyConfiguration; +import org.apache.activemq.artemis.core.config.ha.ReplicationPrimaryPolicyConfiguration; public final class ReplicatedBackupUtils { @@ -48,4 +51,30 @@ public static void configureReplicationPair(Configuration backupConfig, liveConfig.setName(LIVE_NODE_NAME).addConnectorConfiguration(LIVE_NODE_NAME, liveConnector).addConnectorConfiguration(BACKUP_NODE_NAME, backupConnector).setSecurityEnabled(false).addClusterConfiguration(ActiveMQTestBase.basicClusterConnectionConfig(LIVE_NODE_NAME, BACKUP_NODE_NAME)).setHAPolicyConfiguration(new ReplicatedPolicyConfiguration()); } + + + public static void configurePluggableQuorumReplicationPair(Configuration backupConfig, + TransportConfiguration backupConnector, + TransportConfiguration backupAcceptor, + Configuration liveConfig, + TransportConfiguration liveConnector, + TransportConfiguration liveAcceptor, + DistributedPrimitiveManagerConfiguration primaryManagerConfiguration, + DistributedPrimitiveManagerConfiguration backupManagerConfiguration) { + if (backupAcceptor != null) { + backupConfig.clearAcceptorConfigurations().addAcceptorConfiguration(backupAcceptor); + } + + if (liveAcceptor != null) { + liveConfig.clearAcceptorConfigurations().addAcceptorConfiguration(liveAcceptor); + } + + backupConfig.addConnectorConfiguration(BACKUP_NODE_NAME, backupConnector).addConnectorConfiguration(LIVE_NODE_NAME, liveConnector).addClusterConfiguration(ActiveMQTestBase.basicClusterConnectionConfig(BACKUP_NODE_NAME, LIVE_NODE_NAME)) + .setHAPolicyConfiguration(ReplicationBackupPolicyConfiguration.withDefault() + .setDistributedManagerConfiguration(backupManagerConfiguration)); + + liveConfig.setName(LIVE_NODE_NAME).addConnectorConfiguration(LIVE_NODE_NAME, liveConnector).addConnectorConfiguration(BACKUP_NODE_NAME, backupConnector).setSecurityEnabled(false).addClusterConfiguration(ActiveMQTestBase.basicClusterConnectionConfig(LIVE_NODE_NAME, BACKUP_NODE_NAME)) + .setHAPolicyConfiguration(ReplicationPrimaryPolicyConfiguration.withDefault() + .setDistributedManagerConfiguration(primaryManagerConfiguration)); + } }