Skip to content

Commit

Permalink
ARTEMIS-2716 Porting replication integration tests
Browse files Browse the repository at this point in the history
  • Loading branch information
franz1981 committed Jun 25, 2021
1 parent 1278abe commit ab818f3
Show file tree
Hide file tree
Showing 48 changed files with 1,613 additions and 180 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.LiveOnlyActivation;
import org.apache.activemq.artemis.core.server.impl.ReplicationBackupActivation;
import org.apache.activemq.artemis.core.server.impl.SharedNothingBackupActivation;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
Expand Down Expand Up @@ -1385,6 +1386,8 @@ public static final void waitForRemoteBackup(ClientSessionFactory sessionFactory
if (isReplicated) {
if (activation instanceof SharedNothingBackupActivation) {
isRemoteUpToDate = backup.isReplicaSync();
} else if (activation instanceof ReplicationBackupActivation) {
isRemoteUpToDate = backup.isReplicaSync();
} else {
//we may have already failed over and changed the Activation
if (actualServer.isStarted()) {
Expand Down Expand Up @@ -2523,6 +2526,9 @@ protected static ReplicationEndpoint getReplicationEndpoint(ActiveMQServer serve
if (activation instanceof SharedNothingBackupActivation) {
return ((SharedNothingBackupActivation) activation).getReplicationEndpoint();
}
if (activation instanceof ReplicationBackupActivation) {
return ((ReplicationBackupActivation) activation).getReplicationEndpoint();
}
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ public class ScaleDownFailoverTest extends ClusterTestBase {
public void setUp() throws Exception {
super.setUp();
stopCount = 0;
setupLiveServer(0, isFileStorage(), false, isNetty(), true);
setupLiveServer(1, isFileStorage(), false, isNetty(), true);
setupLiveServer(2, isFileStorage(), false, isNetty(), true);
setupLiveServer(0, isFileStorage(), HAType.SharedNothingReplication, isNetty(), true);
setupLiveServer(1, isFileStorage(), HAType.SharedNothingReplication, isNetty(), true);
setupLiveServer(2, isFileStorage(), HAType.SharedNothingReplication, isNetty(), true);
ScaleDownConfiguration scaleDownConfiguration = new ScaleDownConfiguration();
ScaleDownConfiguration scaleDownConfiguration2 = new ScaleDownConfiguration();
scaleDownConfiguration2.setEnabled(false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ public class ScaleDownFailureTest extends ClusterTestBase {
@Before
public void setUp() throws Exception {
super.setUp();
setupLiveServer(0, isFileStorage(), false, isNetty(), true);
setupLiveServer(1, isFileStorage(), false, isNetty(), true);
setupLiveServer(0, isFileStorage(), HAType.SharedNothingReplication, isNetty(), true);
setupLiveServer(1, isFileStorage(), HAType.SharedNothingReplication, isNetty(), true);
if (isGrouped()) {
ScaleDownConfiguration scaleDownConfiguration = new ScaleDownConfiguration();
scaleDownConfiguration.setGroupName("bill");
Expand Down
6 changes: 6 additions & 0 deletions tests/integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>artemis-quorum-ri</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.activemq.tests</groupId>
<artifactId>unit-tests</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,11 @@ protected void createReplicatedConfigs() throws Exception {
backupConfig = createDefaultConfig(0, true);
liveConfig = createDefaultConfig(0, true);

ReplicatedBackupUtils.configureReplicationPair(backupConfig, backupConnector, backupAcceptor, liveConfig, liveConnector, null);
configureReplicationPair(backupConnector, backupAcceptor, liveConnector);

backupConfig.setBindingsDirectory(getBindingsDir(0, true)).setJournalDirectory(getJournalDir(0, true)).setPagingDirectory(getPageDir(0, true)).setLargeMessagesDirectory(getLargeMessagesDir(0, true)).setSecurityEnabled(false);

((ReplicaPolicyConfiguration) backupConfig.getHAPolicyConfiguration()).setMaxSavedReplicatedJournalsSize(-1).setAllowFailBack(true);
((ReplicaPolicyConfiguration) backupConfig.getHAPolicyConfiguration()).setRestartBackup(false);


nodeManager = new InVMNodeManager(true, backupConfig.getJournalLocation());

Expand All @@ -109,6 +108,14 @@ protected void createReplicatedConfigs() throws Exception {
liveServer = createTestableServer(liveConfig, nodeManager);
}

protected void configureReplicationPair(TransportConfiguration backupConnector,
TransportConfiguration backupAcceptor,
TransportConfiguration liveConnector) {
ReplicatedBackupUtils.configureReplicationPair(backupConfig, backupConnector, backupAcceptor, liveConfig, liveConnector, null);
((ReplicaPolicyConfiguration) backupConfig.getHAPolicyConfiguration()).setMaxSavedReplicatedJournalsSize(-1).setAllowFailBack(true);
((ReplicaPolicyConfiguration) backupConfig.getHAPolicyConfiguration()).setRestartBackup(false);
}


@Before
@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.client;

import java.util.Collections;

import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.ha.DistributedPrimitiveManagerConfiguration;
import org.apache.activemq.artemis.core.config.ha.ReplicationBackupPolicyConfiguration;
import org.apache.activemq.artemis.quorum.file.FileBasedPrimitiveManager;
import org.apache.activemq.artemis.tests.util.ReplicatedBackupUtils;
import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;

public class PluggableQuorumInfiniteRedeliveryTest extends InfiniteRedeliveryTest {

@Rule
public TemporaryFolder tmpFolder = new TemporaryFolder();

private DistributedPrimitiveManagerConfiguration managerConfiguration;

public PluggableQuorumInfiniteRedeliveryTest(String protocol, boolean useCLI) {
super(protocol, useCLI);
}

@Before
@Override
public void setUp() throws Exception {
super.setUp();
this.managerConfiguration = new DistributedPrimitiveManagerConfiguration(FileBasedPrimitiveManager.class.getName(),
Collections.singletonMap("locks-folder", tmpFolder.newFolder("manager").toString()));
}

@Override
protected void configureReplicationPair(TransportConfiguration backupConnector,
TransportConfiguration backupAcceptor,
TransportConfiguration liveConnector) {

ReplicatedBackupUtils.configurePluggableQuorumReplicationPair(backupConfig, backupConnector, backupAcceptor,
liveConfig, liveConnector, null,
managerConfiguration, managerConfiguration);
((ReplicationBackupPolicyConfiguration) backupConfig.getHAPolicyConfiguration())
.setMaxSavedReplicatedJournalsSize(-1).setAllowFailBack(true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package org.apache.activemq.artemis.tests.integration.cluster.distribution;

import java.io.File;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.net.URI;
Expand Down Expand Up @@ -56,9 +57,12 @@
import org.apache.activemq.artemis.core.config.ClusterConnectionConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.HAPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.DistributedPrimitiveManagerConfiguration;
import org.apache.activemq.artemis.core.config.ha.LiveOnlyPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.ReplicaPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.ReplicatedPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.ReplicationBackupPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.ReplicationPrimaryPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.SharedStoreMasterPolicyConfiguration;
import org.apache.activemq.artemis.core.config.ha.SharedStoreSlavePolicyConfiguration;
import org.apache.activemq.artemis.core.postoffice.Binding;
Expand All @@ -85,16 +89,22 @@
import org.apache.activemq.artemis.core.server.group.impl.GroupingHandlerConfiguration;
import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.core.server.impl.InVMNodeManager;
import org.apache.activemq.artemis.quorum.file.FileBasedPrimitiveManager;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.PortCheckRule;
import org.jboss.logging.Logger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;

public abstract class ClusterTestBase extends ActiveMQTestBase {

@Rule
public TemporaryFolder tmpFolder = new TemporaryFolder();

private static final Logger log = Logger.getLogger(ClusterTestBase.class);

private static final int[] PORTS = {TransportConstants.DEFAULT_PORT, TransportConstants.DEFAULT_PORT + 1, TransportConstants.DEFAULT_PORT + 2, TransportConstants.DEFAULT_PORT + 3, TransportConstants.DEFAULT_PORT + 4, TransportConstants.DEFAULT_PORT + 5, TransportConstants.DEFAULT_PORT + 6, TransportConstants.DEFAULT_PORT + 7, TransportConstants.DEFAULT_PORT + 8, TransportConstants.DEFAULT_PORT + 9,};
Expand Down Expand Up @@ -134,6 +144,21 @@ protected boolean isForceUniqueStorageManagerIds() {
return true;
}

private DistributedPrimitiveManagerConfiguration pluggableQuorumConfiguration = null;

private DistributedPrimitiveManagerConfiguration getOrCreatePluggableQuorumConfiguration() {
if (pluggableQuorumConfiguration != null) {
return pluggableQuorumConfiguration;
}
try {
pluggableQuorumConfiguration = new DistributedPrimitiveManagerConfiguration(FileBasedPrimitiveManager.class.getName(), Collections.singletonMap("locks-folder", tmpFolder.newFolder("manager").toString()));
} catch (IOException ioException) {
log.error(ioException);
return null;
}
return pluggableQuorumConfiguration;
}

@Override
@Before
public void setUp() throws Exception {
Expand All @@ -159,11 +184,19 @@ public void setUp() throws Exception {

}

public enum HAType {
SharedStore, SharedNothingReplication, PluggableQuorumReplication
}

protected HAType haType() {
return HAType.SharedNothingReplication;
}

/**
* Whether the servers share the storage or not.
*/
protected boolean isSharedStore() {
return false;
protected final boolean isSharedStore() {
return HAType.SharedStore.equals(haType());
}

@Override
Expand Down Expand Up @@ -1481,14 +1514,14 @@ protected ActiveMQServer getServer(final int node) {
}

protected void setupServer(final int node, final boolean fileStorage, final boolean netty) throws Exception {
setupLiveServer(node, fileStorage, false, netty, false);
setupLiveServer(node, fileStorage, HAType.SharedNothingReplication, netty, false);
}

protected void setupLiveServer(final int node,
final boolean fileStorage,
final boolean netty,
boolean isLive) throws Exception {
setupLiveServer(node, fileStorage, false, netty, isLive);
setupLiveServer(node, fileStorage, HAType.SharedNothingReplication, netty, isLive);
}

protected boolean isResolveProtocols() {
Expand All @@ -1497,27 +1530,26 @@ protected boolean isResolveProtocols() {

protected void setupLiveServer(final int node,
final boolean fileStorage,
final boolean sharedStorage,
final HAType haType,
final boolean netty,
boolean liveOnly) throws Exception {
if (servers[node] != null) {
throw new IllegalArgumentException("Already a server at node " + node);
}

HAPolicyConfiguration haPolicyConfiguration = null;
final HAPolicyConfiguration haPolicyConfiguration;
if (liveOnly) {
haPolicyConfiguration = new LiveOnlyPolicyConfiguration();
} else {
if (sharedStorage)
haPolicyConfiguration = new SharedStoreMasterPolicyConfiguration();
else
haPolicyConfiguration = new ReplicatedPolicyConfiguration();
haPolicyConfiguration = haPolicyLiveConfiguration(haType);
}

Configuration configuration = createBasicConfig(node).setJournalMaxIO_AIO(1000).setThreadPoolMaxSize(10).clearAcceptorConfigurations().addAcceptorConfiguration(createTransportConfiguration(netty, true, generateParams(node, netty))).setHAPolicyConfiguration(haPolicyConfiguration).setResolveProtocols(isResolveProtocols());

ActiveMQServer server;

final boolean sharedStorage = HAType.SharedStore.equals(haType);

if (fileStorage) {
if (sharedStorage) {
server = createInVMFailoverServer(true, configuration, nodeManagers[node], node);
Expand All @@ -1538,6 +1570,20 @@ protected void setupLiveServer(final int node,
servers[node] = addServer(server);
}

private HAPolicyConfiguration haPolicyLiveConfiguration(HAType haType) {
switch (haType) {
case SharedStore:
return new SharedStoreMasterPolicyConfiguration();
case SharedNothingReplication:
return new ReplicatedPolicyConfiguration();
case PluggableQuorumReplication:
return ReplicationPrimaryPolicyConfiguration.withDefault()
.setDistributedManagerConfiguration(getOrCreatePluggableQuorumConfiguration());
default:
throw new AssertionError("Unsupported haType = " + haType);
}
}

/**
* Server lacks a {@link ClusterConnectionConfiguration} necessary for the remote (replicating)
* backup case.
Expand All @@ -1549,14 +1595,14 @@ protected void setupLiveServer(final int node,
* @param node
* @param liveNode
* @param fileStorage
* @param sharedStorage
* @param haType
* @param netty
* @throws Exception
*/
protected void setupBackupServer(final int node,
final int liveNode,
final boolean fileStorage,
final boolean sharedStorage,
final HAType haType,
final boolean netty) throws Exception {
if (servers[node] != null) {
throw new IllegalArgumentException("Already a server at node " + node);
Expand All @@ -1566,7 +1612,9 @@ protected void setupBackupServer(final int node,
TransportConfiguration backupConfig = createTransportConfiguration(netty, false, generateParams(node, netty));
TransportConfiguration acceptorConfig = createTransportConfiguration(netty, true, generateParams(node, netty));

Configuration configuration = createBasicConfig(sharedStorage ? liveNode : node).clearAcceptorConfigurations().addAcceptorConfiguration(acceptorConfig).addConnectorConfiguration(liveConfig.getName(), liveConfig).addConnectorConfiguration(backupConfig.getName(), backupConfig).setHAPolicyConfiguration(sharedStorage ? new SharedStoreSlavePolicyConfiguration() : new ReplicaPolicyConfiguration());
final boolean sharedStorage = HAType.SharedStore.equals(haType);

Configuration configuration = createBasicConfig(sharedStorage ? liveNode : node).clearAcceptorConfigurations().addAcceptorConfiguration(acceptorConfig).addConnectorConfiguration(liveConfig.getName(), liveConfig).addConnectorConfiguration(backupConfig.getName(), backupConfig).setHAPolicyConfiguration(haPolicyBackupConfiguration(haType));

ActiveMQServer server;

Expand All @@ -1580,6 +1628,21 @@ protected void setupBackupServer(final int node,
servers[node] = addServer(server);
}

private HAPolicyConfiguration haPolicyBackupConfiguration(HAType haType) {
switch (haType) {

case SharedStore:
return new SharedStoreSlavePolicyConfiguration();
case SharedNothingReplication:
return new ReplicaPolicyConfiguration();
case PluggableQuorumReplication:
return ReplicationBackupPolicyConfiguration.withDefault()
.setDistributedManagerConfiguration(getOrCreatePluggableQuorumConfiguration());
default:
throw new AssertionError("Unsupported ha type = " + haType);
}
}

protected void setupLiveServerWithDiscovery(final int node,
final String groupAddress,
final int port,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,14 @@ protected void setupCluster(final MessageLoadBalancingType messageLoadBalancingT

protected void setupServers() throws Exception {
// The backups
setupBackupServer(0, 3, isFileStorage(), true, isNetty());
setupBackupServer(1, 4, isFileStorage(), true, isNetty());
setupBackupServer(2, 5, isFileStorage(), true, isNetty());
setupBackupServer(0, 3, isFileStorage(), HAType.SharedStore, isNetty());
setupBackupServer(1, 4, isFileStorage(), HAType.SharedStore, isNetty());
setupBackupServer(2, 5, isFileStorage(), HAType.SharedStore, isNetty());

// The lives
setupLiveServer(3, isFileStorage(), true, isNetty(), false);
setupLiveServer(4, isFileStorage(), true, isNetty(), false);
setupLiveServer(5, isFileStorage(), true, isNetty(), false);
setupLiveServer(3, isFileStorage(), HAType.SharedStore, isNetty(), false);
setupLiveServer(4, isFileStorage(), HAType.SharedStore, isNetty(), false);
setupLiveServer(5, isFileStorage(), HAType.SharedStore, isNetty(), false);

}
}

0 comments on commit ab818f3

Please sign in to comment.