Skip to content

Commit

Permalink
ARTEMIS-4765 DuplicateIDCache on Mirror Target is using 20K elements …
Browse files Browse the repository at this point in the history
…instead of amqpCredits

in this commit I'm storing a binding record with the address-settings for the correct size
this is also validating eventual merges of the AddressSettings in the same namespace.
  • Loading branch information
clebertsuconic committed May 9, 2024
1 parent 2a43c53 commit cd563b4
Show file tree
Hide file tree
Showing 13 changed files with 226 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,8 @@ JournalLoadInformation loadBindingJournal(List<QueueBindingInfo> queueBindingInf

List<AbstractPersistedAddressSetting> recoverAddressSettings() throws Exception;

AbstractPersistedAddressSetting recoverAddressSettings(SimpleString address);

void storeSecuritySetting(PersistedSecuritySetting persistedRoles) throws Exception;

void deleteSecuritySetting(SimpleString addressMatch) throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,11 @@ public List<AbstractPersistedAddressSetting> recoverAddressSettings() throws Exc
return new ArrayList<>(mapPersistedAddressSettings.values());
}

@Override
public AbstractPersistedAddressSetting recoverAddressSettings(SimpleString address) {
return mapPersistedAddressSettings.get(address);
}

@Override
public List<PersistedSecuritySetting> recoverSecuritySettings() throws Exception {
return new ArrayList<>(mapPersistedSecuritySettings.values());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ public class NullStorageManager implements StorageManager {

private final IOCriticalErrorListener ioCriticalErrorListener;

@Override
public AbstractPersistedAddressSetting recoverAddressSettings(SimpleString address) {
return null;
}

public NullStorageManager(IOCriticalErrorListener ioCriticalErrorListener) {
this.ioCriticalErrorListener = ioCriticalErrorListener;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public interface DuplicateIDCache {

void addToCache(byte[] duplicateID, Transaction tx) throws Exception;

int getSize();

/**
* it will add the data to the cache.
* If TX == null it won't use a transaction.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -277,4 +277,9 @@ public List<MessageReference> getRelatedMessageReferences() {
return null;
}
}

@Override
public int getSize() {
return cacheSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -74,4 +74,9 @@ public void clear() throws Exception {
public List<Pair<byte[], Long>> getMap() {
return Collections.emptyList();
}

@Override
public int getSize() {
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -396,4 +396,9 @@ public List<MessageReference> getRelatedMessageReferences() {
}
}

@Override
public int getSize() {
return cacheSize;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.config.AbstractPersistedAddressSetting;
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSettingJSON;
import org.apache.activemq.artemis.core.postoffice.AddressManager;
import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.BindingType;
Expand Down Expand Up @@ -1443,15 +1445,22 @@ private int resolveIdCacheSize(SimpleString address) {
@Override
public DuplicateIDCache getDuplicateIDCache(final SimpleString address) {
int resolvedIdCacheSize = resolveIdCacheSize(address);
return getDuplicateIDCache(address, resolvedIdCacheSize);
return getDuplicateIDCache(address, resolvedIdCacheSize, false);
}

@Override
public DuplicateIDCache getDuplicateIDCache(final SimpleString address, int cacheSizeToUse) {
return getDuplicateIDCache(address, cacheSizeToUse, true);
}

private DuplicateIDCache getDuplicateIDCache(final SimpleString address, int cacheSizeToUse, boolean allowRegistration) {
DuplicateIDCache cache = duplicateIDCaches.get(address);

if (cache == null) {
if (persistIDCache) {
if (allowRegistration) {
registerCacheSize(address, cacheSizeToUse);
}
cache = DuplicateIDCaches.persistent(address, cacheSizeToUse, storageManager);
} else {
cache = DuplicateIDCaches.inMemory(address, cacheSizeToUse);
Expand All @@ -1467,6 +1476,22 @@ public DuplicateIDCache getDuplicateIDCache(final SimpleString address, int cach
return cache;
}

private void registerCacheSize(SimpleString address, int cacheSizeToUse) {
AbstractPersistedAddressSetting recordedSetting = storageManager.recoverAddressSettings(address);
if (recordedSetting == null || recordedSetting.getSetting().getIDCacheSize() == null || recordedSetting.getSetting().getIDCacheSize().intValue() != cacheSizeToUse) {
AddressSettings settings = recordedSetting != null ? recordedSetting.getSetting() : new AddressSettings();
settings.setIDCacheSize(cacheSizeToUse);
server.getAddressSettingsRepository().addMatch(address.toString(), settings);
try {
storageManager.storeAddressSetting(new PersistedAddressSettingJSON(address, settings, settings.toJSON()));
} catch (Exception e) {
// nothing could be done here, we just log
// if an exception is happening, if IO is compromised the server will eventually be shutdown
ActiveMQServerLogger.LOGGER.errorRegisteringDuplicateCacheSize(String.valueOf(address), e);
}
}
}

public ConcurrentMap<SimpleString, DuplicateIDCache> getDuplicateIDCaches() {
return duplicateIDCaches;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1614,4 +1614,7 @@ void noQueueIdDefined(org.apache.activemq.artemis.api.core.Message message,

@LogMessage(id = 224137, value = "Skipping SSL auto reload for sources of store {} because of {}", level = LogMessage.Level.WARN)
void skipSSLAutoReloadForSourcesOfStore(String storePath, String reason);

@LogMessage(id = 224138, value = "Error Registering DuplicateCacheSize on namespace {}", level = LogMessage.Level.WARN)
void errorRegisteringDuplicateCacheSize(String address, Exception reason);
}
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,11 @@ public void lineUpContext() {

}

@Override
public AbstractPersistedAddressSetting recoverAddressSettings(SimpleString address) {
return null;
}

@Override
public void asyncCommit(long txID) throws Exception {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,11 @@ public LargeServerMessage onLargeMessageCreate(long id, LargeServerMessage large
return manager.onLargeMessageCreate(id, largeMessage);
}

@Override
public AbstractPersistedAddressSetting recoverAddressSettings(SimpleString address) {
return null;
}

@Override
public void stop() throws Exception {
manager.stop();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.persistence;

import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.ActiveMQServerControl;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
import org.apache.activemq.artemis.core.postoffice.DuplicateIDCache;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.activemq.artemis.utils.RandomUtil;
import org.junit.Assert;
import org.junit.Test;

public class ResizeDuplicateCacheTest extends ActiveMQTestBase {

@Test
public void testReloadCache() throws Exception {
internalReloadCache(false, null);
}

@Test
public void testReloadCacheAdditionalSettings() throws Exception {
// this variance will add a settings on the same namespace for something else
// this will validate that the setting should be merged
internalReloadCache(true, null);
}

@Test
public void testReloadCacheAdditionalSettingsValueSet() throws Exception {
// this variance will add a settings on the same namespace for something else
// this will validate that the setting should be merged
internalReloadCache(true, 50_000);
}

public void internalReloadCache(boolean additionalSettings, Integer preExistingCacheValue) throws Exception {
int duplicateSize = 30;
SimpleString randomString = RandomUtil.randomSimpleString();

ActiveMQServer server = createServer(true, false);
server.start();

ActiveMQServerControl serverControl = server.getActiveMQServerControl();

if (additionalSettings) {
AddressSettings settings = new AddressSettings().setDefaultRingSize(3333);
if (preExistingCacheValue != null) {
settings.setIDCacheSize(preExistingCacheValue);
}
serverControl.addAddressSettings(randomString.toString(), settings.toJSON());
String json = serverControl.getAddressSettingsAsJSON(randomString.toString());
AddressSettings settingsFromJson = AddressSettings.fromJSON(json);
Assert.assertEquals(preExistingCacheValue, settingsFromJson.getIDCacheSize());
Assert.assertEquals(3333, settingsFromJson.getDefaultRingSize());
}

DuplicateIDCache duplicateIDCache = server.getPostOffice().getDuplicateIDCache(randomString, duplicateSize);

for (int i = 0; i < duplicateSize * 2; i++) {
duplicateIDCache.addToCache(("a" + i).getBytes(StandardCharsets.UTF_8));
}

server.stop();
server = createServer(true, false);
server.start();
serverControl = server.getActiveMQServerControl();

duplicateIDCache = server.getPostOffice().getDuplicateIDCache(randomString, duplicateSize);

Assert.assertEquals(duplicateSize, duplicateIDCache.getSize());

server.getStorageManager().getMessageJournal().scheduleCompactAndBlock(10_000);
HashMap<Integer, AtomicInteger> records = countJournal(server.getConfiguration());

AtomicInteger duplicateRecordsCount = records.get((int) JournalRecordIds.DUPLICATE_ID);
Assert.assertNotNull(duplicateRecordsCount);
Assert.assertEquals(duplicateSize, duplicateRecordsCount.get());

if (additionalSettings) {
String json = serverControl.getAddressSettingsAsJSON(randomString.toString());

// checking if the previous value was merged as expected
AddressSettings settingsFromJson = AddressSettings.fromJSON(json);
Assert.assertEquals(Integer.valueOf(duplicateSize), settingsFromJson.getIDCacheSize());
Assert.assertEquals(3333, settingsFromJson.getDefaultRingSize());
}

server.stop();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,18 @@
import java.io.File;
import java.io.StringWriter;
import java.lang.invoke.MethodHandles;
import java.util.HashMap;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.activemq.artemis.api.core.management.SimpleManagement;
import org.apache.activemq.artemis.core.config.amqpBrokerConnectivity.AMQPBrokerConnectionAddressType;
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.tests.soak.SoakTestBase;
import org.apache.activemq.artemis.tests.util.CFUtil;
import org.apache.activemq.artemis.util.ServerUtil;
Expand Down Expand Up @@ -62,6 +66,13 @@ public class SingleMirrorSoakTest extends SoakTestBase {
private static final int SEND_COMMIT = TestParameters.testProperty(TEST_NAME, "SEND_COMMIT", 100);
private static final int KILL_INTERNAL = TestParameters.testProperty(TEST_NAME, "KILL_INTERVAL", 500);
private static final int SNF_TIMEOUT = TestParameters.testProperty(TEST_NAME, "SNF_TIMEOUT", 60_000);
private static final int GENERAL_WAIT_TIMEOUT = TestParameters.testProperty(TEST_NAME, "GENERAL_TIMEOUT", 10_000);

/*
* Time each consumer takes to process a message received to allow some messages accumulating.
* This sleep happens right before the commit.
*/
private static final int CONSUMER_PROCESSING_TIME = TestParameters.testProperty(TEST_NAME, "CONSUMER_PROCESSING_TIME", 0);

private static final String TOPIC_NAME = "topicTest";

Expand All @@ -82,12 +93,16 @@ public class SingleMirrorSoakTest extends SoakTestBase {
volatile Process processDC2;

@After
public void destroyServers() {
public void destroyServers() throws Exception {
if (processDC1 != null) {
processDC1.destroyForcibly();
processDC1.waitFor(1, TimeUnit.MINUTES);
processDC1 = null;
}
if (processDC2 != null) {
processDC2.destroyForcibly();
processDC2.waitFor(1, TimeUnit.MINUTES);
processDC2 = null;
}

}
Expand Down Expand Up @@ -121,6 +136,15 @@ private static void createServer(String serverName,
brokerProperties.put("largeMessageSync", "false");
brokerProperties.put("mirrorAckManagerPageAttempts", "10");
brokerProperties.put("mirrorAckManagerRetryDelay", "1000");

if (paging) {
brokerProperties.put("addressSettings.#.maxSizeMessages", "1");
brokerProperties.put("addressSettings.#.maxReadPageMessages", "1000");
brokerProperties.put("addressSettings.#.maxReadPageBytes", "-1");
brokerProperties.put("addressSettings.#.prefetchPageMessages", "100");
// un-comment this line if you want to rather use the work around without the fix on the PostOfficeImpl
// brokerProperties.put("addressSettings.#.iDCacheSize", "1000");
}
// if we don't use pageTransactions we may eventually get a few duplicates
brokerProperties.put("mirrorPageTransaction", "true");
File brokerPropertiesFile = new File(serverLocation, "broker.properties");
Expand All @@ -130,11 +154,6 @@ private static void createServer(String serverName,
Assert.assertTrue(brokerXml.exists());
// Adding redistribution delay to broker configuration
Assert.assertTrue(FileUtil.findReplace(brokerXml, "<address-setting match=\"#\">", "<address-setting match=\"#\">\n\n" + " <redistribution-delay>0</redistribution-delay> <!-- added by SimpleMirrorSoakTest.java --> \n"));
if (paging) {
Assert.assertTrue(FileUtil.findReplace(brokerXml, "<max-size-messages>-1</max-size-messages>", "<max-size-messages>1</max-size-messages>"));
Assert.assertTrue(FileUtil.findReplace(brokerXml, "<max-read-page-bytes>20M</max-read-page-bytes>", "<max-read-page-bytes>-1</max-read-page-bytes>"));
Assert.assertTrue(FileUtil.findReplace(brokerXml, "<max-read-page-messages>-1</max-read-page-messages>", "<max-read-page-messages>100000</max-read-page-messages>\n" + " <prefetch-page-messages>10000</prefetch-page-messages>"));
}

if (TRACE_LOGS) {
File log4j = new File(serverLocation, "/etc/log4j2.properties");
Expand Down Expand Up @@ -248,10 +267,27 @@ public void testInterruptedMirrorTransfer() throws Exception {

Wait.assertEquals(0, () -> getCount(managementDC1, snfQueue), SNF_TIMEOUT);
Wait.assertEquals(0, () -> getCount(managementDC2, snfQueue), SNF_TIMEOUT);
Wait.assertEquals(0, () -> getCount(managementDC1, clientIDA + "." + subscriptionID), 10_000);
Wait.assertEquals(0, () -> getCount(managementDC1, clientIDB + "." + subscriptionID), 10_000);
Wait.assertEquals(0, () -> getCount(managementDC2, clientIDA + "." + subscriptionID), 10_000);
Wait.assertEquals(0, () -> getCount(managementDC2, clientIDB + "." + subscriptionID), 10_000);
Wait.assertEquals(0, () -> getCount(managementDC1, clientIDA + "." + subscriptionID), GENERAL_WAIT_TIMEOUT);
Wait.assertEquals(0, () -> getCount(managementDC1, clientIDB + "." + subscriptionID), GENERAL_WAIT_TIMEOUT);
Wait.assertEquals(0, () -> getCount(managementDC2, clientIDA + "." + subscriptionID), GENERAL_WAIT_TIMEOUT);
Wait.assertEquals(0, () -> getCount(managementDC2, clientIDB + "." + subscriptionID), GENERAL_WAIT_TIMEOUT);

destroyServers();

// counting the number of records on duplicate cache
// to validate if ARTEMIS-4765 is fixed
ActiveMQServer server = createServer(true, false);
server.getConfiguration().setJournalDirectory(getServerLocation(DC2_NODE) + "/data/journal");
server.getConfiguration().setBindingsDirectory(getServerLocation(DC2_NODE) + "/data/bindings");
server.getConfiguration().setPagingDirectory(getServerLocation(DC2_NODE) + "/data/paging");
server.start();
server.getStorageManager().getMessageJournal().scheduleCompactAndBlock(10_000);
HashMap<Integer, AtomicInteger> records = countJournal(server.getConfiguration());
AtomicInteger duplicateRecordsCount = records.get((int) JournalRecordIds.DUPLICATE_ID);
Assert.assertNotNull(duplicateRecordsCount);
// 1000 credits by default
Assert.assertTrue(duplicateRecordsCount.get() <= 1000);

}

private static void consume(ConnectionFactory factory,
Expand Down Expand Up @@ -283,6 +319,9 @@ private static void consume(ConnectionFactory factory,
logger.debug("Consumed {}, large={}", i, message.getBooleanProperty("large"));
pendingCommit++;
if (pendingCommit >= batchCommit) {
if (CONSUMER_PROCESSING_TIME > 0) {
Thread.sleep(CONSUMER_PROCESSING_TIME);
}
logger.info("received {}", i);
session.commit();
pendingCommit = 0;
Expand All @@ -301,7 +340,6 @@ private static void consume(ConnectionFactory factory,
public long getCount(SimpleManagement simpleManagement, String queue) throws Exception {
try {
long value = simpleManagement.getMessageCountOnQueue(queue);
logger.debug("count on queue {} is {}", queue, value);
return value;
} catch (Exception e) {
logger.warn(e.getMessage(), e);
Expand Down

0 comments on commit cd563b4

Please sign in to comment.