Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fail fast at startup for invalid WAN merge policy #18875

Merged
merged 4 commits into from Jun 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -319,7 +319,7 @@ public void testCacheConfig() {

WanReplicationRef wanRef = cacheConfig.getWanReplicationRef();
assertEquals("testWan", wanRef.getName());
assertEquals("PUT_IF_ABSENT", wanRef.getMergePolicyClassName());
assertEquals("PutIfAbsentMergePolicy", wanRef.getMergePolicyClassName());
assertEquals(1, wanRef.getFilters().size());
assertEquals("com.example.SampleFilter", wanRef.getFilters().get(0));
}
Expand Down Expand Up @@ -404,7 +404,7 @@ public void testMapConfig() {
// test testMapConfig2's WanReplicationConfig
WanReplicationRef wanReplicationRef = testMapConfig2.getWanReplicationRef();
assertEquals("testWan", wanReplicationRef.getName());
assertEquals("PUT_IF_ABSENT", wanReplicationRef.getMergePolicyClassName());
assertEquals("PutIfAbsentMergePolicy", wanReplicationRef.getMergePolicyClassName());
assertTrue(wanReplicationRef.isRepublishingEnabled());

assertEquals(1000, testMapConfig2.getEvictionConfig().getSize());
Expand Down Expand Up @@ -471,7 +471,7 @@ public void testMapNoWanMergePolicy() {
// test testMapConfig2's WanReplicationConfig
WanReplicationRef wanReplicationRef = testMapConfig2.getWanReplicationRef();
assertEquals("testWan", wanReplicationRef.getName());
assertEquals("PUT_IF_ABSENT", wanReplicationRef.getMergePolicyClassName());
assertEquals("PutIfAbsentMergePolicy", wanReplicationRef.getMergePolicyClassName());
}

@Test
Expand Down
Expand Up @@ -374,7 +374,7 @@
implementation="dummyMapStore"
write-delay-seconds="0"
initial-mode="LAZY"/>
<hz:wan-replication-ref name="testWan" merge-policy-class-name="PUT_IF_ABSENT"/>
<hz:wan-replication-ref name="testWan" merge-policy-class-name="PutIfAbsentMergePolicy"/>

<hz:entry-listeners>
<hz:entry-listener class-name="com.hazelcast.spring.DummyEntryListener" include-value="true"/>
Expand Down Expand Up @@ -508,7 +508,7 @@
</hz:map>

<hz:cache name="testCache" disable-per-entry-invalidation-events="true">
<hz:wan-replication-ref name="testWan" merge-policy-class-name="PUT_IF_ABSENT"
<hz:wan-replication-ref name="testWan" merge-policy-class-name="PutIfAbsentMergePolicy"
republishing-enabled="false">
<hz:filters>
<hz:filter-impl>com.example.SampleFilter</hz:filter-impl>
Expand Down
Expand Up @@ -64,6 +64,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
Expand Down Expand Up @@ -106,6 +107,7 @@ public class MapContainer {
* Holds number of registered {@link InvalidationListener} from clients.
*/
protected final AtomicInteger invalidationListenerCount = new AtomicInteger();
protected final AtomicLong lastInvalidMergePolicyCheckTime = new AtomicLong();

protected SplitBrainMergePolicy wanMergePolicy;
protected DelegatingWanScheme wanReplicationDelegate;
Expand Down Expand Up @@ -169,6 +171,10 @@ public Indexes createIndexes(boolean global) {
.build();
}

public AtomicLong getLastInvalidMergePolicyCheckTime() {
return lastInvalidMergePolicyCheckTime;
}

private class IndexResultFilterFactory implements Supplier<Predicate<QueryableEntry>> {

@Override
Expand Down
Expand Up @@ -16,7 +16,9 @@

package com.hazelcast.map.impl.operation;

import com.hazelcast.config.InvalidConfigurationException;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.internal.util.Clock;
import com.hazelcast.map.impl.MapDataSerializerHook;
import com.hazelcast.map.impl.record.Record;
import com.hazelcast.nio.ObjectDataInput;
Expand All @@ -31,8 +33,12 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;

import static com.hazelcast.core.EntryEventType.MERGED;
import static com.hazelcast.internal.config.MergePolicyValidator.checkMapMergePolicy;

/**
* Contains multiple merge entries for split-brain
Expand All @@ -43,6 +49,8 @@
public class MergeOperation extends MapOperation
implements PartitionAwareOperation, BackupAwareOperation {

private static final long MERGE_POLICY_CHECK_PERIOD = TimeUnit.MINUTES.toMillis(1);

private boolean disableWanReplicationEvent;
private List<MapMergeTypes<Object, Object>> mergingEntries;
private SplitBrainMergePolicy<Object, MapMergeTypes<Object, Object>, Object> mergePolicy;
Expand Down Expand Up @@ -77,6 +85,16 @@ protected boolean disableWanReplicationEvent() {

@Override
protected void runInternal() {
// Check once in a minute as earliest to avoid log bursts.
if (shouldCheckNow(mapContainer.getLastInvalidMergePolicyCheckTime())) {
try {
checkMapMergePolicy(mapContainer.getMapConfig(), mergePolicy.getClass().getName(),
getNodeEngine().getSplitBrainMergePolicyProvider());
} catch (InvalidConfigurationException e) {
logger().log(Level.WARNING, e.getMessage(), e);
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we testing the case if this call fails? If I'm right, this fails in the WAN target cluster if it has stats disabled while the used merge policy relies on the stats. Can the split-brain healing get here with an invalid config? As I get it, with the recent changes we fail-fast on startup. Or can a RU config change drive us here and fail it during split-brain healing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added PR description to explain this.
You are right that this check can fail after a RU config change but not sure what can be the other expected behavior in such a case?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably we can get here. I don't think we do any checks for dynamically added configs, so you can add invalid config and get here after split-brain. Don't know about the other questions.

I'm also curious about a related topic. Is this operation failed or retried after this throws an exception? Wondering if we ever give up, both in split-brain healing and WAN, or do we keep filling the logs and retrying.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ahmetmircik which RU config is this? And what does this check cover? I forgot.
I'm wondering if it's something we can simply avoid checking for now, and just check WAN merge policies at startup, not runtime.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It covers the scenario i mentioned in PR description: You have WAN replication between 2 clusters, even though source cluster has per-entry-stats enabled, target cluster doesn't have it enabled. In this case having same merge-policy in both sides doesn't make sense(since it can require per-entry-stats-enabled on both sides) hence we catch this anomaly. I see this as an edge case but still possible.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So that's the RU case that Zoltan was mentioning? Someone having this kind of setup and doing RU from 4.2.0 to 4.2.1 would start getting failures? In this case we assume that the 4.2.0 policy would simply always choose one of the two entries, because one of the values is always -1?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about SB, but WAN expects the replicated event to succeed. We keep sending it until we get an ACK. So the clusters will start playing a neverending ping-pong if this check fails. Well, if whatever fails on the target. The only solution I can think of is extending the WAN protocol with a feature to respond with "hey, the merge policy you try to use for DS XXX doesn't work for me, stop replicating that DS".

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Complex :) I'm leaning towards simply removing this check. For now?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I didn't mean to do this change in a patch release :) We'll fail here (or soon after the check) and the situation is the same. So I'd opt for the most accurate error message. Maybe we can optionally delay the NAK to prevent the source cluster to overwhelm the target and leave time for the ops to intervene. Need to check how it can be done.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm for removing the check but adding a log message.

hasMapListener = mapEventPublisher.hasEventListener(name);
hasWanReplication = mapContainer.isWanReplicationEnabled()
&& !disableWanReplicationEvent;
Expand All @@ -100,6 +118,16 @@ protected void runInternal() {
}
}

private static boolean shouldCheckNow(AtomicLong lastLogTime) {
long now = Clock.currentTimeMillis();
long lastLogged = lastLogTime.get();
if (now - lastLogged >= MERGE_POLICY_CHECK_PERIOD) {
return lastLogTime.compareAndSet(lastLogged, now);
}

return false;
}

private void merge(MapMergeTypes<Object, Object> mergingEntry) {
Data dataKey = getNodeEngine().toData(mergingEntry.getRawKey());
Data oldValue = hasMapListener ? getValue(dataKey) : null;
Expand Down
39 changes: 31 additions & 8 deletions hazelcast/src/main/java/com/hazelcast/spi/impl/NodeEngineImpl.java
Expand Up @@ -19,6 +19,8 @@
import com.hazelcast.cluster.Address;
import com.hazelcast.cluster.impl.MemberImpl;
import com.hazelcast.config.Config;
import com.hazelcast.config.MapConfig;
import com.hazelcast.config.WanReplicationRef;
import com.hazelcast.core.HazelcastException;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.instance.impl.Node;
Expand Down Expand Up @@ -80,9 +82,11 @@
import javax.annotation.Nonnull;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Map;
import java.util.UUID;
import java.util.function.Consumer;

import static com.hazelcast.internal.config.MergePolicyValidator.checkMapMergePolicy;
import static com.hazelcast.internal.metrics.MetricDescriptorConstants.MEMORY_PREFIX;
import static com.hazelcast.internal.metrics.impl.MetricsConfigHelper.memberMetricsLevel;
import static com.hazelcast.internal.util.EmptyStatement.ignore;
Expand Down Expand Up @@ -151,17 +155,20 @@ public NodeEngineImpl(Node node) {
this.wanReplicationService = node.getNodeExtension().createService(WanReplicationService.class);
this.sqlService = new SqlServiceImpl(this);
this.packetDispatcher = new PacketDispatcher(
logger,
operationService.getOperationExecutor(),
operationService.getInboundResponseHandlerSupplier().get(),
operationService.getInvocationMonitor(),
eventService,
getJetPacketConsumer(node.getNodeExtension()),
sqlService
logger,
operationService.getOperationExecutor(),
operationService.getInboundResponseHandlerSupplier().get(),
operationService.getInvocationMonitor(),
eventService,
getJetPacketConsumer(node.getNodeExtension()),
sqlService
);
this.splitBrainProtectionService = new SplitBrainProtectionServiceImpl(this);
this.diagnostics = newDiagnostics();
this.splitBrainMergePolicyProvider = new SplitBrainMergePolicyProvider(this);
this.splitBrainMergePolicyProvider = new SplitBrainMergePolicyProvider(configClassLoader);

checkMapMergePolicies(node);

this.tenantControlService = new TenantControlServiceImpl(this);
serviceManager.registerService(OperationServiceImpl.SERVICE_NAME, operationService);
serviceManager.registerService(OperationParker.SERVICE_NAME, operationParker);
Expand All @@ -178,6 +185,22 @@ public NodeEngineImpl(Node node) {
}
}

private void checkMapMergePolicies(Node node) {
mmedenjak marked this conversation as resolved.
Show resolved Hide resolved
Map<String, MapConfig> mapConfigs = node.config.getMapConfigs();
for (MapConfig mapConfig : mapConfigs.values()) {
WanReplicationRef wanReplicationRef = mapConfig.getWanReplicationRef();
if (wanReplicationRef != null) {
String wanMergePolicyClassName = mapConfig.getWanReplicationRef().getMergePolicyClassName();
checkMapMergePolicy(mapConfig,
wanMergePolicyClassName, splitBrainMergePolicyProvider);
}

String splitBrainMergePolicyClassName = mapConfig.getMergePolicyConfig().getPolicy();
checkMapMergePolicy(mapConfig,
splitBrainMergePolicyClassName, splitBrainMergePolicyProvider);
}
}

private ConcurrencyDetection newConcurrencyDetection() {
HazelcastProperties properties = node.getProperties();
boolean writeThrough = properties.getBoolean(ClusterProperty.IO_WRITE_THROUGH_ENABLED);
Expand Down
Expand Up @@ -17,8 +17,8 @@
package com.hazelcast.spi.merge;

import com.hazelcast.config.InvalidConfigurationException;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.internal.util.ConstructorFunction;
import com.hazelcast.spi.impl.NodeEngine;

import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -51,7 +51,7 @@ public final class SplitBrainMergePolicyProvider {
addPolicy(PutIfAbsentMergePolicy.class, new PutIfAbsentMergePolicy());
}

private final NodeEngine nodeEngine;
private final ClassLoader configClassLoader;

private final ConcurrentMap<String, SplitBrainMergePolicy> mergePolicyMap
= new ConcurrentHashMap<String, SplitBrainMergePolicy>();
Expand All @@ -61,7 +61,7 @@ public final class SplitBrainMergePolicyProvider {
@Override
public SplitBrainMergePolicy createNew(String className) {
try {
return newInstance(nodeEngine.getConfigClassLoader(), className);
return newInstance(configClassLoader, className);
} catch (Exception e) {
throw new InvalidConfigurationException("Invalid SplitBrainMergePolicy: " + className, e);
}
Expand All @@ -73,8 +73,8 @@ public SplitBrainMergePolicy createNew(String className) {
*
* @param nodeEngine the {@link NodeEngine} to retrieve the classloader from
*/
public SplitBrainMergePolicyProvider(NodeEngine nodeEngine) {
this.nodeEngine = nodeEngine;
public SplitBrainMergePolicyProvider(ClassLoader configClassLoader) {
this.configClassLoader = configClassLoader;
this.mergePolicyMap.putAll(OUT_OF_THE_BOX_MERGE_POLICIES);
}

Expand Down
Expand Up @@ -65,7 +65,7 @@ public void setUp() {
NodeEngine nodeEngine = Mockito.mock(NodeEngine.class);
when(nodeEngine.getConfigClassLoader()).thenReturn(config.getClassLoader());

splitBrainMergePolicyProvider = new SplitBrainMergePolicyProvider(nodeEngine);
splitBrainMergePolicyProvider = new SplitBrainMergePolicyProvider(config.getClassLoader());
when(nodeEngine.getSplitBrainMergePolicyProvider()).thenReturn(splitBrainMergePolicyProvider);

properties = nodeEngine.getProperties();
Expand Down
Expand Up @@ -40,7 +40,7 @@
@Category({QuickTest.class, ParallelJVMTest.class})
public class MergePolicyValidatorMapIntegrationTest extends AbstractMergePolicyValidatorIntegrationTest {

private boolean perEntryStatsEnabled = false;
private boolean perEntryStatsEnabled = true;

@Override
void addConfig(Config config, String name, MergePolicyConfig mergePolicyConfig) {
Expand All @@ -60,9 +60,9 @@ public void testMap_withPutIfAbsentMergePolicy() {

@Test
public void testMap_withHyperLogLogMergePolicy() {
expectCardinalityEstimatorException();
HazelcastInstance hz = getHazelcastInstance("cardinalityEstimator", hyperLogLogMergePolicy);

expectCardinalityEstimatorException();
hz.getMap("cardinalityEstimator");
}

Expand All @@ -76,9 +76,9 @@ public void testMap_withHigherHitsMergePolicy() {

@Test
public void testMap_withInvalidMergePolicy() {
expectedInvalidMergePolicyException();
HazelcastInstance hz = getHazelcastInstance("invalid", invalidMergePolicyConfig);

expectedInvalidMergePolicyException();
hz.getMap("invalid");
}

Expand All @@ -105,9 +105,11 @@ public void testMap_withExpirationTimeMergePolicy_withStatsEnabled() {
*/
@Test
public void testMap_withLastStoredTimeMergePolicy() {
perEntryStatsEnabled = false;
expectedMapStatisticsDisabledException(lastStoredTimeMergePolicy);

HazelcastInstance hz = getHazelcastInstance("lastStoredTime", lastStoredTimeMergePolicy);

expectedMapStatisticsDisabledException(lastStoredTimeMergePolicy);
hz.getMap("lastStoredTime");
}

Expand All @@ -124,9 +126,11 @@ public void testMap_withLastStoredMergePolicy_withStatsEnabled() {
*/
@Test
public void testMap_withLastStoredTimeMergePolicyNoTypeVariable() {
perEntryStatsEnabled = false;
expectedMapStatisticsDisabledException(lastStoredTimeMergePolicyNoTypeVariable);

HazelcastInstance hz = getHazelcastInstance("lastStoredTimeNoTypeVariable", lastStoredTimeMergePolicyNoTypeVariable);

expectedMapStatisticsDisabledException(lastStoredTimeMergePolicyNoTypeVariable);
hz.getMap("lastStoredTimeNoTypeVariable");
}

Expand Down Expand Up @@ -163,9 +167,10 @@ public void testMap_withComplexCustomMergePolicy_withStatsEnabled() {
*/
@Test
public void testMap_withCustomMapMergePolicy() {
perEntryStatsEnabled = false;
expectedMapStatisticsDisabledException(customMapMergePolicy);
HazelcastInstance hz = getHazelcastInstance("customMap", customMapMergePolicy);

expectedMapStatisticsDisabledException(customMapMergePolicy);
hz.getMap("customMap");
}

Expand All @@ -183,9 +188,11 @@ public void testMap_withCustomMapMergePolicy_withStatsEnabled() {
*/
@Test
public void testMap_withCustomMapMergePolicyNoTypeVariable() {
perEntryStatsEnabled = false;
expectedMapStatisticsDisabledException(customMapMergePolicyNoTypeVariable);

HazelcastInstance hz = getHazelcastInstance("customMapNoTypeVariable", customMapMergePolicyNoTypeVariable);

expectedMapStatisticsDisabledException(customMapMergePolicyNoTypeVariable);
hz.getMap("customMapNoTypeVariable");
}

Expand Down
Expand Up @@ -43,7 +43,7 @@ public void setUp() {
NodeEngine nodeEngine = Mockito.mock(NodeEngine.class);
when(nodeEngine.getConfigClassLoader()).thenReturn(config.getClassLoader());

mapMergePolicyProvider = new SplitBrainMergePolicyProvider(nodeEngine);
mapMergePolicyProvider = new SplitBrainMergePolicyProvider(config.getClassLoader());
when(nodeEngine.getSplitBrainMergePolicyProvider()).thenReturn(mapMergePolicyProvider);
}

Expand Down
Expand Up @@ -48,7 +48,7 @@ public class SplitBrainMergePolicyProviderTest extends HazelcastTestSupport {

@Before
public void setup() {
mergePolicyProvider = new SplitBrainMergePolicyProvider(getNode(createHazelcastInstance()).getNodeEngine());
mergePolicyProvider = new SplitBrainMergePolicyProvider(getNode(createHazelcastInstance()).getConfigClassLoader());
}

@Test
Expand Down