Skip to content

Commit

Permalink
Fail fast at startup for invalid WAN merge policy (hazelcast#18875)
Browse files Browse the repository at this point in the history
Modifications:
- Validate wan and split-brain merge policies at node-engine creation time to fail-fast at node start time.
- Print a warning log at execution phase of MergeOperation to catch unreasonable merge-policy 
configs that cannot be catch at start-up time. Example to this is, you have WAN replication between 
2 clusters, even though source cluster has per-entry-stats enabled, target cluster doesn't have it 
enabled. In this case, MergeOperation catches this anomaly and prints it as a warning.
  • Loading branch information
ahmetmircik committed Jun 17, 2021
1 parent dfa600c commit 35a27c5
Show file tree
Hide file tree
Showing 10 changed files with 92 additions and 28 deletions.
Expand Up @@ -328,7 +328,7 @@ public void testCacheConfig() {

WanReplicationRef wanRef = cacheConfig.getWanReplicationRef();
assertEquals("testWan", wanRef.getName());
assertEquals("PUT_IF_ABSENT", wanRef.getMergePolicyClassName());
assertEquals("PutIfAbsentMergePolicy", wanRef.getMergePolicyClassName());
assertEquals(1, wanRef.getFilters().size());
assertEquals("com.example.SampleFilter", wanRef.getFilters().get(0));
}
Expand Down Expand Up @@ -415,7 +415,7 @@ public void testMapConfig() {
// test testMapConfig2's WanReplicationConfig
WanReplicationRef wanReplicationRef = testMapConfig2.getWanReplicationRef();
assertEquals("testWan", wanReplicationRef.getName());
assertEquals("PUT_IF_ABSENT", wanReplicationRef.getMergePolicyClassName());
assertEquals("PutIfAbsentMergePolicy", wanReplicationRef.getMergePolicyClassName());
assertTrue(wanReplicationRef.isRepublishingEnabled());

assertEquals(1000, testMapConfig2.getEvictionConfig().getSize());
Expand Down Expand Up @@ -482,7 +482,7 @@ public void testMapNoWanMergePolicy() {
// test testMapConfig2's WanReplicationConfig
WanReplicationRef wanReplicationRef = testMapConfig2.getWanReplicationRef();
assertEquals("testWan", wanReplicationRef.getName());
assertEquals("PUT_IF_ABSENT", wanReplicationRef.getMergePolicyClassName());
assertEquals("PutIfAbsentMergePolicy", wanReplicationRef.getMergePolicyClassName());
}

@Test
Expand Down
Expand Up @@ -374,7 +374,7 @@
implementation="dummyMapStore"
write-delay-seconds="0"
initial-mode="LAZY"/>
<hz:wan-replication-ref name="testWan" merge-policy-class-name="PUT_IF_ABSENT"/>
<hz:wan-replication-ref name="testWan" merge-policy-class-name="PutIfAbsentMergePolicy"/>

<hz:entry-listeners>
<hz:entry-listener class-name="com.hazelcast.spring.DummyEntryListener" include-value="true"/>
Expand Down Expand Up @@ -508,7 +508,7 @@
</hz:map>

<hz:cache name="testCache" disable-per-entry-invalidation-events="true">
<hz:wan-replication-ref name="testWan" merge-policy-class-name="PUT_IF_ABSENT"
<hz:wan-replication-ref name="testWan" merge-policy-class-name="PutIfAbsentMergePolicy"
republishing-enabled="false">
<hz:filters>
<hz:filter-impl>com.example.SampleFilter</hz:filter-impl>
Expand Down
Expand Up @@ -64,6 +64,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
Expand Down Expand Up @@ -106,6 +107,7 @@ public class MapContainer {
* Holds number of registered {@link InvalidationListener} from clients.
*/
protected final AtomicInteger invalidationListenerCount = new AtomicInteger();
protected final AtomicLong lastInvalidMergePolicyCheckTime = new AtomicLong();

protected SplitBrainMergePolicy wanMergePolicy;
protected DelegatingWanScheme wanReplicationDelegate;
Expand Down Expand Up @@ -169,6 +171,10 @@ public Indexes createIndexes(boolean global) {
.build();
}

public AtomicLong getLastInvalidMergePolicyCheckTime() {
return lastInvalidMergePolicyCheckTime;
}

private class IndexResultFilterFactory implements Supplier<Predicate<QueryableEntry>> {

@Override
Expand Down
Expand Up @@ -16,7 +16,9 @@

package com.hazelcast.map.impl.operation;

import com.hazelcast.config.InvalidConfigurationException;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.internal.util.Clock;
import com.hazelcast.map.impl.MapDataSerializerHook;
import com.hazelcast.map.impl.record.Record;
import com.hazelcast.nio.ObjectDataInput;
Expand All @@ -31,8 +33,12 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;

import static com.hazelcast.core.EntryEventType.MERGED;
import static com.hazelcast.internal.config.MergePolicyValidator.checkMapMergePolicy;

/**
* Contains multiple merge entries for split-brain
Expand All @@ -43,6 +49,8 @@
public class MergeOperation extends MapOperation
implements PartitionAwareOperation, BackupAwareOperation {

private static final long MERGE_POLICY_CHECK_PERIOD = TimeUnit.MINUTES.toMillis(1);

private boolean disableWanReplicationEvent;
private List<MapMergeTypes<Object, Object>> mergingEntries;
private SplitBrainMergePolicy<Object, MapMergeTypes<Object, Object>, Object> mergePolicy;
Expand Down Expand Up @@ -77,6 +85,16 @@ protected boolean disableWanReplicationEvent() {

@Override
protected void runInternal() {
// Check once in a minute as earliest to avoid log bursts.
if (shouldCheckNow(mapContainer.getLastInvalidMergePolicyCheckTime())) {
try {
checkMapMergePolicy(mapContainer.getMapConfig(), mergePolicy.getClass().getName(),
getNodeEngine().getSplitBrainMergePolicyProvider());
} catch (InvalidConfigurationException e) {
logger().log(Level.WARNING, e.getMessage(), e);
}
}

hasMapListener = mapEventPublisher.hasEventListener(name);
hasWanReplication = mapContainer.isWanReplicationEnabled()
&& !disableWanReplicationEvent;
Expand All @@ -100,6 +118,16 @@ protected void runInternal() {
}
}

private static boolean shouldCheckNow(AtomicLong lastLogTime) {
long now = Clock.currentTimeMillis();
long lastLogged = lastLogTime.get();
if (now - lastLogged >= MERGE_POLICY_CHECK_PERIOD) {
return lastLogTime.compareAndSet(lastLogged, now);
}

return false;
}

private void merge(MapMergeTypes<Object, Object> mergingEntry) {
Data dataKey = getNodeEngine().toData(mergingEntry.getRawKey());
Data oldValue = hasMapListener ? getValue(dataKey) : null;
Expand Down
39 changes: 31 additions & 8 deletions hazelcast/src/main/java/com/hazelcast/spi/impl/NodeEngineImpl.java
Expand Up @@ -19,6 +19,8 @@
import com.hazelcast.cluster.Address;
import com.hazelcast.cluster.impl.MemberImpl;
import com.hazelcast.config.Config;
import com.hazelcast.config.MapConfig;
import com.hazelcast.config.WanReplicationRef;
import com.hazelcast.core.HazelcastException;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.instance.impl.Node;
Expand Down Expand Up @@ -80,9 +82,11 @@
import javax.annotation.Nonnull;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Map;
import java.util.UUID;
import java.util.function.Consumer;

import static com.hazelcast.internal.config.MergePolicyValidator.checkMapMergePolicy;
import static com.hazelcast.internal.metrics.MetricDescriptorConstants.MEMORY_PREFIX;
import static com.hazelcast.internal.metrics.impl.MetricsConfigHelper.memberMetricsLevel;
import static com.hazelcast.internal.util.EmptyStatement.ignore;
Expand Down Expand Up @@ -151,17 +155,20 @@ public NodeEngineImpl(Node node) {
this.wanReplicationService = node.getNodeExtension().createService(WanReplicationService.class);
this.sqlService = new SqlServiceImpl(this);
this.packetDispatcher = new PacketDispatcher(
logger,
operationService.getOperationExecutor(),
operationService.getInboundResponseHandlerSupplier().get(),
operationService.getInvocationMonitor(),
eventService,
getJetPacketConsumer(node.getNodeExtension()),
sqlService
logger,
operationService.getOperationExecutor(),
operationService.getInboundResponseHandlerSupplier().get(),
operationService.getInvocationMonitor(),
eventService,
getJetPacketConsumer(node.getNodeExtension()),
sqlService
);
this.splitBrainProtectionService = new SplitBrainProtectionServiceImpl(this);
this.diagnostics = newDiagnostics();
this.splitBrainMergePolicyProvider = new SplitBrainMergePolicyProvider(this);
this.splitBrainMergePolicyProvider = new SplitBrainMergePolicyProvider(configClassLoader);

checkMapMergePolicies(node);

this.tenantControlService = new TenantControlServiceImpl(this);
serviceManager.registerService(OperationServiceImpl.SERVICE_NAME, operationService);
serviceManager.registerService(OperationParker.SERVICE_NAME, operationParker);
Expand All @@ -178,6 +185,22 @@ public NodeEngineImpl(Node node) {
}
}

private void checkMapMergePolicies(Node node) {
Map<String, MapConfig> mapConfigs = node.config.getMapConfigs();
for (MapConfig mapConfig : mapConfigs.values()) {
WanReplicationRef wanReplicationRef = mapConfig.getWanReplicationRef();
if (wanReplicationRef != null) {
String wanMergePolicyClassName = mapConfig.getWanReplicationRef().getMergePolicyClassName();
checkMapMergePolicy(mapConfig,
wanMergePolicyClassName, splitBrainMergePolicyProvider);
}

String splitBrainMergePolicyClassName = mapConfig.getMergePolicyConfig().getPolicy();
checkMapMergePolicy(mapConfig,
splitBrainMergePolicyClassName, splitBrainMergePolicyProvider);
}
}

private ConcurrencyDetection newConcurrencyDetection() {
HazelcastProperties properties = node.getProperties();
boolean writeThrough = properties.getBoolean(ClusterProperty.IO_WRITE_THROUGH_ENABLED);
Expand Down
Expand Up @@ -17,8 +17,8 @@
package com.hazelcast.spi.merge;

import com.hazelcast.config.InvalidConfigurationException;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.internal.util.ConstructorFunction;
import com.hazelcast.spi.impl.NodeEngine;

import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -51,7 +51,7 @@ public final class SplitBrainMergePolicyProvider {
addPolicy(PutIfAbsentMergePolicy.class, new PutIfAbsentMergePolicy());
}

private final NodeEngine nodeEngine;
private final ClassLoader configClassLoader;

private final ConcurrentMap<String, SplitBrainMergePolicy> mergePolicyMap
= new ConcurrentHashMap<String, SplitBrainMergePolicy>();
Expand All @@ -61,7 +61,7 @@ public final class SplitBrainMergePolicyProvider {
@Override
public SplitBrainMergePolicy createNew(String className) {
try {
return newInstance(nodeEngine.getConfigClassLoader(), className);
return newInstance(configClassLoader, className);
} catch (Exception e) {
throw new InvalidConfigurationException("Invalid SplitBrainMergePolicy: " + className, e);
}
Expand All @@ -73,8 +73,8 @@ public SplitBrainMergePolicy createNew(String className) {
*
* @param nodeEngine the {@link NodeEngine} to retrieve the classloader from
*/
public SplitBrainMergePolicyProvider(NodeEngine nodeEngine) {
this.nodeEngine = nodeEngine;
public SplitBrainMergePolicyProvider(ClassLoader configClassLoader) {
this.configClassLoader = configClassLoader;
this.mergePolicyMap.putAll(OUT_OF_THE_BOX_MERGE_POLICIES);
}

Expand Down
Expand Up @@ -65,7 +65,7 @@ public void setUp() {
NodeEngine nodeEngine = Mockito.mock(NodeEngine.class);
when(nodeEngine.getConfigClassLoader()).thenReturn(config.getClassLoader());

splitBrainMergePolicyProvider = new SplitBrainMergePolicyProvider(nodeEngine);
splitBrainMergePolicyProvider = new SplitBrainMergePolicyProvider(config.getClassLoader());
when(nodeEngine.getSplitBrainMergePolicyProvider()).thenReturn(splitBrainMergePolicyProvider);

properties = nodeEngine.getProperties();
Expand Down
Expand Up @@ -40,7 +40,7 @@
@Category({QuickTest.class, ParallelJVMTest.class})
public class MergePolicyValidatorMapIntegrationTest extends AbstractMergePolicyValidatorIntegrationTest {

private boolean perEntryStatsEnabled = false;
private boolean perEntryStatsEnabled = true;

@Override
void addConfig(Config config, String name, MergePolicyConfig mergePolicyConfig) {
Expand All @@ -60,9 +60,9 @@ public void testMap_withPutIfAbsentMergePolicy() {

@Test
public void testMap_withHyperLogLogMergePolicy() {
expectCardinalityEstimatorException();
HazelcastInstance hz = getHazelcastInstance("cardinalityEstimator", hyperLogLogMergePolicy);

expectCardinalityEstimatorException();
hz.getMap("cardinalityEstimator");
}

Expand All @@ -76,9 +76,9 @@ public void testMap_withHigherHitsMergePolicy() {

@Test
public void testMap_withInvalidMergePolicy() {
expectedInvalidMergePolicyException();
HazelcastInstance hz = getHazelcastInstance("invalid", invalidMergePolicyConfig);

expectedInvalidMergePolicyException();
hz.getMap("invalid");
}

Expand All @@ -105,9 +105,11 @@ public void testMap_withExpirationTimeMergePolicy_withStatsEnabled() {
*/
@Test
public void testMap_withLastStoredTimeMergePolicy() {
perEntryStatsEnabled = false;
expectedMapStatisticsDisabledException(lastStoredTimeMergePolicy);

HazelcastInstance hz = getHazelcastInstance("lastStoredTime", lastStoredTimeMergePolicy);

expectedMapStatisticsDisabledException(lastStoredTimeMergePolicy);
hz.getMap("lastStoredTime");
}

Expand All @@ -124,9 +126,11 @@ public void testMap_withLastStoredMergePolicy_withStatsEnabled() {
*/
@Test
public void testMap_withLastStoredTimeMergePolicyNoTypeVariable() {
perEntryStatsEnabled = false;
expectedMapStatisticsDisabledException(lastStoredTimeMergePolicyNoTypeVariable);

HazelcastInstance hz = getHazelcastInstance("lastStoredTimeNoTypeVariable", lastStoredTimeMergePolicyNoTypeVariable);

expectedMapStatisticsDisabledException(lastStoredTimeMergePolicyNoTypeVariable);
hz.getMap("lastStoredTimeNoTypeVariable");
}

Expand Down Expand Up @@ -163,9 +167,10 @@ public void testMap_withComplexCustomMergePolicy_withStatsEnabled() {
*/
@Test
public void testMap_withCustomMapMergePolicy() {
perEntryStatsEnabled = false;
expectedMapStatisticsDisabledException(customMapMergePolicy);
HazelcastInstance hz = getHazelcastInstance("customMap", customMapMergePolicy);

expectedMapStatisticsDisabledException(customMapMergePolicy);
hz.getMap("customMap");
}

Expand All @@ -183,9 +188,11 @@ public void testMap_withCustomMapMergePolicy_withStatsEnabled() {
*/
@Test
public void testMap_withCustomMapMergePolicyNoTypeVariable() {
perEntryStatsEnabled = false;
expectedMapStatisticsDisabledException(customMapMergePolicyNoTypeVariable);

HazelcastInstance hz = getHazelcastInstance("customMapNoTypeVariable", customMapMergePolicyNoTypeVariable);

expectedMapStatisticsDisabledException(customMapMergePolicyNoTypeVariable);
hz.getMap("customMapNoTypeVariable");
}

Expand Down
Expand Up @@ -43,7 +43,7 @@ public void setUp() {
NodeEngine nodeEngine = Mockito.mock(NodeEngine.class);
when(nodeEngine.getConfigClassLoader()).thenReturn(config.getClassLoader());

mapMergePolicyProvider = new SplitBrainMergePolicyProvider(nodeEngine);
mapMergePolicyProvider = new SplitBrainMergePolicyProvider(config.getClassLoader());
when(nodeEngine.getSplitBrainMergePolicyProvider()).thenReturn(mapMergePolicyProvider);
}

Expand Down
Expand Up @@ -48,7 +48,7 @@ public class SplitBrainMergePolicyProviderTest extends HazelcastTestSupport {

@Before
public void setup() {
mergePolicyProvider = new SplitBrainMergePolicyProvider(getNode(createHazelcastInstance()).getNodeEngine());
mergePolicyProvider = new SplitBrainMergePolicyProvider(getNode(createHazelcastInstance()).getConfigClassLoader());
}

@Test
Expand Down

0 comments on commit 35a27c5

Please sign in to comment.