Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FORWARD PORT] Fail fast at startup for invalid WAN merge policy (#18875) #18928

Merged
merged 1 commit into from
Jun 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ public void testCacheConfig() {

WanReplicationRef wanRef = cacheConfig.getWanReplicationRef();
assertEquals("testWan", wanRef.getName());
assertEquals("PUT_IF_ABSENT", wanRef.getMergePolicyClassName());
assertEquals("PutIfAbsentMergePolicy", wanRef.getMergePolicyClassName());
assertEquals(1, wanRef.getFilters().size());
assertEquals("com.example.SampleFilter", wanRef.getFilters().get(0));
}
Expand Down Expand Up @@ -415,7 +415,7 @@ public void testMapConfig() {
// test testMapConfig2's WanReplicationConfig
WanReplicationRef wanReplicationRef = testMapConfig2.getWanReplicationRef();
assertEquals("testWan", wanReplicationRef.getName());
assertEquals("PUT_IF_ABSENT", wanReplicationRef.getMergePolicyClassName());
assertEquals("PutIfAbsentMergePolicy", wanReplicationRef.getMergePolicyClassName());
assertTrue(wanReplicationRef.isRepublishingEnabled());

assertEquals(1000, testMapConfig2.getEvictionConfig().getSize());
Expand Down Expand Up @@ -482,7 +482,7 @@ public void testMapNoWanMergePolicy() {
// test testMapConfig2's WanReplicationConfig
WanReplicationRef wanReplicationRef = testMapConfig2.getWanReplicationRef();
assertEquals("testWan", wanReplicationRef.getName());
assertEquals("PUT_IF_ABSENT", wanReplicationRef.getMergePolicyClassName());
assertEquals("PutIfAbsentMergePolicy", wanReplicationRef.getMergePolicyClassName());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@
implementation="dummyMapStore"
write-delay-seconds="0"
initial-mode="LAZY"/>
<hz:wan-replication-ref name="testWan" merge-policy-class-name="PUT_IF_ABSENT"/>
<hz:wan-replication-ref name="testWan" merge-policy-class-name="PutIfAbsentMergePolicy"/>

<hz:entry-listeners>
<hz:entry-listener class-name="com.hazelcast.spring.DummyEntryListener" include-value="true"/>
Expand Down Expand Up @@ -508,7 +508,7 @@
</hz:map>

<hz:cache name="testCache" disable-per-entry-invalidation-events="true">
<hz:wan-replication-ref name="testWan" merge-policy-class-name="PUT_IF_ABSENT"
<hz:wan-replication-ref name="testWan" merge-policy-class-name="PutIfAbsentMergePolicy"
republishing-enabled="false">
<hz:filters>
<hz:filter-impl>com.example.SampleFilter</hz:filter-impl>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
Expand Down Expand Up @@ -106,6 +107,7 @@ public class MapContainer {
* Holds number of registered {@link InvalidationListener} from clients.
*/
protected final AtomicInteger invalidationListenerCount = new AtomicInteger();
protected final AtomicLong lastInvalidMergePolicyCheckTime = new AtomicLong();

protected SplitBrainMergePolicy wanMergePolicy;
protected DelegatingWanScheme wanReplicationDelegate;
Expand Down Expand Up @@ -169,6 +171,10 @@ public Indexes createIndexes(boolean global) {
.build();
}

public AtomicLong getLastInvalidMergePolicyCheckTime() {
return lastInvalidMergePolicyCheckTime;
}

private class IndexResultFilterFactory implements Supplier<Predicate<QueryableEntry>> {

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

package com.hazelcast.map.impl.operation;

import com.hazelcast.config.InvalidConfigurationException;
import com.hazelcast.internal.serialization.Data;
import com.hazelcast.internal.util.Clock;
import com.hazelcast.map.impl.MapDataSerializerHook;
import com.hazelcast.map.impl.record.Record;
import com.hazelcast.nio.ObjectDataInput;
Expand All @@ -31,8 +33,12 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;

import static com.hazelcast.core.EntryEventType.MERGED;
import static com.hazelcast.internal.config.MergePolicyValidator.checkMapMergePolicy;

/**
* Contains multiple merge entries for split-brain
Expand All @@ -43,6 +49,8 @@
public class MergeOperation extends MapOperation
implements PartitionAwareOperation, BackupAwareOperation {

private static final long MERGE_POLICY_CHECK_PERIOD = TimeUnit.MINUTES.toMillis(1);

private boolean disableWanReplicationEvent;
private List<MapMergeTypes<Object, Object>> mergingEntries;
private SplitBrainMergePolicy<Object, MapMergeTypes<Object, Object>, Object> mergePolicy;
Expand Down Expand Up @@ -77,6 +85,16 @@ protected boolean disableWanReplicationEvent() {

@Override
protected void runInternal() {
// Check once in a minute as earliest to avoid log bursts.
if (shouldCheckNow(mapContainer.getLastInvalidMergePolicyCheckTime())) {
try {
checkMapMergePolicy(mapContainer.getMapConfig(), mergePolicy.getClass().getName(),
getNodeEngine().getSplitBrainMergePolicyProvider());
} catch (InvalidConfigurationException e) {
logger().log(Level.WARNING, e.getMessage(), e);
}
}

hasMapListener = mapEventPublisher.hasEventListener(name);
hasWanReplication = mapContainer.isWanReplicationEnabled()
&& !disableWanReplicationEvent;
Expand All @@ -100,6 +118,16 @@ protected void runInternal() {
}
}

private static boolean shouldCheckNow(AtomicLong lastLogTime) {
long now = Clock.currentTimeMillis();
long lastLogged = lastLogTime.get();
if (now - lastLogged >= MERGE_POLICY_CHECK_PERIOD) {
return lastLogTime.compareAndSet(lastLogged, now);
}

return false;
}

private void merge(MapMergeTypes<Object, Object> mergingEntry) {
Data dataKey = getNodeEngine().toData(mergingEntry.getRawKey());
Data oldValue = hasMapListener ? getValue(dataKey) : null;
Expand Down
39 changes: 31 additions & 8 deletions hazelcast/src/main/java/com/hazelcast/spi/impl/NodeEngineImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import com.hazelcast.cluster.Address;
import com.hazelcast.cluster.impl.MemberImpl;
import com.hazelcast.config.Config;
import com.hazelcast.config.MapConfig;
import com.hazelcast.config.WanReplicationRef;
import com.hazelcast.core.HazelcastException;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.instance.impl.Node;
Expand Down Expand Up @@ -80,9 +82,11 @@
import javax.annotation.Nonnull;
import java.util.Collection;
import java.util.LinkedList;
import java.util.Map;
import java.util.UUID;
import java.util.function.Consumer;

import static com.hazelcast.internal.config.MergePolicyValidator.checkMapMergePolicy;
import static com.hazelcast.internal.metrics.MetricDescriptorConstants.MEMORY_PREFIX;
import static com.hazelcast.internal.metrics.impl.MetricsConfigHelper.memberMetricsLevel;
import static com.hazelcast.internal.util.EmptyStatement.ignore;
Expand Down Expand Up @@ -151,17 +155,20 @@ public NodeEngineImpl(Node node) {
this.wanReplicationService = node.getNodeExtension().createService(WanReplicationService.class);
this.sqlService = new SqlServiceImpl(this);
this.packetDispatcher = new PacketDispatcher(
logger,
operationService.getOperationExecutor(),
operationService.getInboundResponseHandlerSupplier().get(),
operationService.getInvocationMonitor(),
eventService,
getJetPacketConsumer(node.getNodeExtension()),
sqlService
logger,
operationService.getOperationExecutor(),
operationService.getInboundResponseHandlerSupplier().get(),
operationService.getInvocationMonitor(),
eventService,
getJetPacketConsumer(node.getNodeExtension()),
sqlService
);
this.splitBrainProtectionService = new SplitBrainProtectionServiceImpl(this);
this.diagnostics = newDiagnostics();
this.splitBrainMergePolicyProvider = new SplitBrainMergePolicyProvider(this);
this.splitBrainMergePolicyProvider = new SplitBrainMergePolicyProvider(configClassLoader);

checkMapMergePolicies(node);

this.tenantControlService = new TenantControlServiceImpl(this);
serviceManager.registerService(OperationServiceImpl.SERVICE_NAME, operationService);
serviceManager.registerService(OperationParker.SERVICE_NAME, operationParker);
Expand All @@ -178,6 +185,22 @@ public NodeEngineImpl(Node node) {
}
}

private void checkMapMergePolicies(Node node) {
Map<String, MapConfig> mapConfigs = node.config.getMapConfigs();
for (MapConfig mapConfig : mapConfigs.values()) {
WanReplicationRef wanReplicationRef = mapConfig.getWanReplicationRef();
if (wanReplicationRef != null) {
String wanMergePolicyClassName = mapConfig.getWanReplicationRef().getMergePolicyClassName();
checkMapMergePolicy(mapConfig,
wanMergePolicyClassName, splitBrainMergePolicyProvider);
}

String splitBrainMergePolicyClassName = mapConfig.getMergePolicyConfig().getPolicy();
checkMapMergePolicy(mapConfig,
splitBrainMergePolicyClassName, splitBrainMergePolicyProvider);
}
}

private ConcurrencyDetection newConcurrencyDetection() {
HazelcastProperties properties = node.getProperties();
boolean writeThrough = properties.getBoolean(ClusterProperty.IO_WRITE_THROUGH_ENABLED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
package com.hazelcast.spi.merge;

import com.hazelcast.config.InvalidConfigurationException;
import com.hazelcast.spi.impl.NodeEngine;
import com.hazelcast.internal.util.ConstructorFunction;
import com.hazelcast.spi.impl.NodeEngine;

import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -51,7 +51,7 @@ public final class SplitBrainMergePolicyProvider {
addPolicy(PutIfAbsentMergePolicy.class, new PutIfAbsentMergePolicy());
}

private final NodeEngine nodeEngine;
private final ClassLoader configClassLoader;

private final ConcurrentMap<String, SplitBrainMergePolicy> mergePolicyMap
= new ConcurrentHashMap<String, SplitBrainMergePolicy>();
Expand All @@ -61,7 +61,7 @@ public final class SplitBrainMergePolicyProvider {
@Override
public SplitBrainMergePolicy createNew(String className) {
try {
return newInstance(nodeEngine.getConfigClassLoader(), className);
return newInstance(configClassLoader, className);
} catch (Exception e) {
throw new InvalidConfigurationException("Invalid SplitBrainMergePolicy: " + className, e);
}
Expand All @@ -73,8 +73,8 @@ public SplitBrainMergePolicy createNew(String className) {
*
* @param nodeEngine the {@link NodeEngine} to retrieve the classloader from
*/
public SplitBrainMergePolicyProvider(NodeEngine nodeEngine) {
this.nodeEngine = nodeEngine;
public SplitBrainMergePolicyProvider(ClassLoader configClassLoader) {
this.configClassLoader = configClassLoader;
this.mergePolicyMap.putAll(OUT_OF_THE_BOX_MERGE_POLICIES);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public void setUp() {
NodeEngine nodeEngine = Mockito.mock(NodeEngine.class);
when(nodeEngine.getConfigClassLoader()).thenReturn(config.getClassLoader());

splitBrainMergePolicyProvider = new SplitBrainMergePolicyProvider(nodeEngine);
splitBrainMergePolicyProvider = new SplitBrainMergePolicyProvider(config.getClassLoader());
when(nodeEngine.getSplitBrainMergePolicyProvider()).thenReturn(splitBrainMergePolicyProvider);

properties = nodeEngine.getProperties();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
@Category({QuickTest.class, ParallelJVMTest.class})
public class MergePolicyValidatorMapIntegrationTest extends AbstractMergePolicyValidatorIntegrationTest {

private boolean perEntryStatsEnabled = false;
private boolean perEntryStatsEnabled = true;

@Override
void addConfig(Config config, String name, MergePolicyConfig mergePolicyConfig) {
Expand All @@ -60,9 +60,9 @@ public void testMap_withPutIfAbsentMergePolicy() {

@Test
public void testMap_withHyperLogLogMergePolicy() {
expectCardinalityEstimatorException();
HazelcastInstance hz = getHazelcastInstance("cardinalityEstimator", hyperLogLogMergePolicy);

expectCardinalityEstimatorException();
hz.getMap("cardinalityEstimator");
}

Expand All @@ -76,9 +76,9 @@ public void testMap_withHigherHitsMergePolicy() {

@Test
public void testMap_withInvalidMergePolicy() {
expectedInvalidMergePolicyException();
HazelcastInstance hz = getHazelcastInstance("invalid", invalidMergePolicyConfig);

expectedInvalidMergePolicyException();
hz.getMap("invalid");
}

Expand All @@ -105,9 +105,11 @@ public void testMap_withExpirationTimeMergePolicy_withStatsEnabled() {
*/
@Test
public void testMap_withLastStoredTimeMergePolicy() {
perEntryStatsEnabled = false;
expectedMapStatisticsDisabledException(lastStoredTimeMergePolicy);

HazelcastInstance hz = getHazelcastInstance("lastStoredTime", lastStoredTimeMergePolicy);

expectedMapStatisticsDisabledException(lastStoredTimeMergePolicy);
hz.getMap("lastStoredTime");
}

Expand All @@ -124,9 +126,11 @@ public void testMap_withLastStoredMergePolicy_withStatsEnabled() {
*/
@Test
public void testMap_withLastStoredTimeMergePolicyNoTypeVariable() {
perEntryStatsEnabled = false;
expectedMapStatisticsDisabledException(lastStoredTimeMergePolicyNoTypeVariable);

HazelcastInstance hz = getHazelcastInstance("lastStoredTimeNoTypeVariable", lastStoredTimeMergePolicyNoTypeVariable);

expectedMapStatisticsDisabledException(lastStoredTimeMergePolicyNoTypeVariable);
hz.getMap("lastStoredTimeNoTypeVariable");
}

Expand Down Expand Up @@ -163,9 +167,10 @@ public void testMap_withComplexCustomMergePolicy_withStatsEnabled() {
*/
@Test
public void testMap_withCustomMapMergePolicy() {
perEntryStatsEnabled = false;
expectedMapStatisticsDisabledException(customMapMergePolicy);
HazelcastInstance hz = getHazelcastInstance("customMap", customMapMergePolicy);

expectedMapStatisticsDisabledException(customMapMergePolicy);
hz.getMap("customMap");
}

Expand All @@ -183,9 +188,11 @@ public void testMap_withCustomMapMergePolicy_withStatsEnabled() {
*/
@Test
public void testMap_withCustomMapMergePolicyNoTypeVariable() {
perEntryStatsEnabled = false;
expectedMapStatisticsDisabledException(customMapMergePolicyNoTypeVariable);

HazelcastInstance hz = getHazelcastInstance("customMapNoTypeVariable", customMapMergePolicyNoTypeVariable);

expectedMapStatisticsDisabledException(customMapMergePolicyNoTypeVariable);
hz.getMap("customMapNoTypeVariable");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public void setUp() {
NodeEngine nodeEngine = Mockito.mock(NodeEngine.class);
when(nodeEngine.getConfigClassLoader()).thenReturn(config.getClassLoader());

mapMergePolicyProvider = new SplitBrainMergePolicyProvider(nodeEngine);
mapMergePolicyProvider = new SplitBrainMergePolicyProvider(config.getClassLoader());
when(nodeEngine.getSplitBrainMergePolicyProvider()).thenReturn(mapMergePolicyProvider);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public class SplitBrainMergePolicyProviderTest extends HazelcastTestSupport {

@Before
public void setup() {
mergePolicyProvider = new SplitBrainMergePolicyProvider(getNode(createHazelcastInstance()).getNodeEngine());
mergePolicyProvider = new SplitBrainMergePolicyProvider(getNode(createHazelcastInstance()).getConfigClassLoader());
}

@Test
Expand Down