Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert conversion to Lite member during Split-Brain healing #12691

Merged
merged 2 commits into from Apr 6, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -1650,6 +1650,13 @@ public void destroy() {
onDestroy();
}

@Override
public void destroyInternals() {
reset();
closeListeners();
onDestroy();
}

protected void onDestroy() {
}

Expand Down
Expand Up @@ -135,12 +135,13 @@ public Set<Closeable> createNew(String name) {
}
};

protected ILogger logger;
protected NodeEngine nodeEngine;
protected CachePartitionSegment[] segments;
protected CacheEventHandler cacheEventHandler;
protected CacheSplitBrainHandlerService splitBrainHandlerService;
protected RingbufferCacheEventJournalImpl eventJournal;
protected ILogger logger;
protected CacheMergePolicyProvider mergePolicyProvider;
protected CacheSplitBrainHandlerService splitBrainHandlerService;

@Override
public final void init(NodeEngine nodeEngine, Properties properties) {
Expand All @@ -151,15 +152,20 @@ public final void init(NodeEngine nodeEngine, Properties properties) {
segments[i] = newPartitionSegment(i);
}
this.cacheEventHandler = new CacheEventHandler(nodeEngine);
this.splitBrainHandlerService = newSplitBrainHandlerService(nodeEngine);
this.splitBrainHandlerService = new CacheSplitBrainHandlerService(nodeEngine, segments);
this.logger = nodeEngine.getLogger(getClass());
this.eventJournal = new RingbufferCacheEventJournalImpl(nodeEngine);
this.mergePolicyProvider = new CacheMergePolicyProvider(nodeEngine);

postInit(nodeEngine, properties);
}

// this method is overridden on ee
protected CacheSplitBrainHandlerService newSplitBrainHandlerService(NodeEngine nodeEngine) {
return new CacheSplitBrainHandlerService(nodeEngine, configs, segments);
public CacheMergePolicyProvider getMergePolicyProvider() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: This can be package private if the test is moved to the same package. Also, if the field is package-private, we can remove the getter altogether.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

kept this as is, it is being used by different packages on ee side

return mergePolicyProvider;
}

public ConcurrentMap<String, CacheConfig> getConfigs() {
return configs;
}

protected void postInit(NodeEngine nodeEngine, Properties properties) {
Expand Down Expand Up @@ -237,7 +243,6 @@ public DistributedObject createDistributedObject(String cacheNameWithPrefix) {
cacheConfig.setManagerPrefix(HazelcastCacheManager.CACHE_MANAGER_PREFIX);
}

CacheMergePolicyProvider mergePolicyProvider = splitBrainHandlerService.getMergePolicyProvider();
checkCacheConfig(cacheConfig, mergePolicyProvider);

Object mergePolicy = mergePolicyProvider.getMergePolicy(cacheConfig.getMergePolicy());
Expand Down Expand Up @@ -744,10 +749,6 @@ public Runnable prepareMergeRunnable() {
return splitBrainHandlerService.prepareMergeRunnable();
}

public CacheMergePolicyProvider getCacheMergePolicyProvider() {
return splitBrainHandlerService.getMergePolicyProvider();
}

public CacheEventHandler getCacheEventHandler() {
return cacheEventHandler;
}
Expand Down
Expand Up @@ -19,6 +19,7 @@
import com.hazelcast.cache.CacheEntryView;
import com.hazelcast.cache.CacheMergePolicy;
import com.hazelcast.cache.impl.merge.entry.DefaultCacheEntryView;
import com.hazelcast.cache.impl.merge.policy.CacheMergePolicyProvider;
import com.hazelcast.cache.impl.operation.CacheLegacyMergeOperation;
import com.hazelcast.cache.impl.record.CacheRecord;
import com.hazelcast.config.CacheConfig;
Expand All @@ -28,49 +29,55 @@
import com.hazelcast.spi.Operation;
import com.hazelcast.spi.OperationFactory;
import com.hazelcast.spi.impl.merge.AbstractMergeRunnable;
import com.hazelcast.spi.impl.merge.BaseSplitBrainHandlerService;
import com.hazelcast.spi.merge.SplitBrainMergePolicy;
import com.hazelcast.spi.merge.SplitBrainMergeTypes.CacheMergeTypes;
import com.hazelcast.util.function.BiConsumer;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;

import static com.hazelcast.cache.impl.AbstractCacheRecordStore.SOURCE_NOT_AVAILABLE;
import static com.hazelcast.cache.impl.ICacheService.SERVICE_NAME;
import static com.hazelcast.config.MergePolicyConfig.DEFAULT_BATCH_SIZE;
import static com.hazelcast.spi.impl.merge.MergingValueFactory.createMergingEntry;

class CacheMergeRunnable extends AbstractMergeRunnable<Data, Data, ICacheRecordStore, CacheMergeTypes> {

private final CacheService cacheService;
private final CacheSplitBrainHandlerService cacheSplitBrainHandlerService;
private final CacheMergePolicyProvider mergePolicyProvider;

CacheMergeRunnable(Map<String, Collection<ICacheRecordStore>> collectedStores,
Map<String, Collection<ICacheRecordStore>> collectedStoresWithLegacyPolicies,
Collection<ICacheRecordStore> backupStores,
CacheSplitBrainHandlerService cacheSplitBrainHandlerService,
CacheMergeRunnable(Collection<ICacheRecordStore> mergingStores,
BaseSplitBrainHandlerService<ICacheRecordStore> splitBrainHandlerService,
NodeEngine nodeEngine) {
super(CacheService.SERVICE_NAME, collectedStores, collectedStoresWithLegacyPolicies, backupStores, nodeEngine);
super(CacheService.SERVICE_NAME, mergingStores, splitBrainHandlerService, nodeEngine);

this.cacheService = nodeEngine.getService(SERVICE_NAME);
this.cacheSplitBrainHandlerService = cacheSplitBrainHandlerService;
this.mergePolicyProvider = cacheService.mergePolicyProvider;
}

@Override
protected void consumeStore(ICacheRecordStore store, BiConsumer<Integer, CacheMergeTypes> consumer) {
protected void onMerge(String cacheName) {
cacheService.sendInvalidationEvent(cacheName, null, SOURCE_NOT_AVAILABLE);
}

@Override
protected void mergeStore(ICacheRecordStore store, BiConsumer<Integer, CacheMergeTypes> consumer) {
int partitionId = store.getPartitionId();

for (Map.Entry<Data, CacheRecord> entry : store.getReadOnlyRecords().entrySet()) {
Data key = entry.getKey();
Data key = toHeapData(entry.getKey());
CacheRecord record = entry.getValue();
Data dataValue = toData(record.getValue());
Data dataValue = toHeapData(record.getValue());

consumer.accept(partitionId, createMergingEntry(getSerializationService(), key, dataValue, record));
}
}

@Override
protected void consumeStoreLegacy(ICacheRecordStore recordStore, BiConsumer<Integer, Operation> consumer) {
protected void mergeStoreLegacy(ICacheRecordStore recordStore, BiConsumer<Integer, Operation> consumer) {
int partitionId = recordStore.getPartitionId();
String name = recordStore.getName();
CacheMergePolicy mergePolicy = ((CacheMergePolicy) getMergePolicy(name));
Expand All @@ -92,7 +99,7 @@ protected void consumeStoreLegacy(ICacheRecordStore recordStore, BiConsumer<Inte

@Override
protected InMemoryFormat getInMemoryFormat(String dataStructureName) {
return cacheSplitBrainHandlerService.getConfigs().get(dataStructureName).getInMemoryFormat();
return cacheService.getConfigs().get(dataStructureName).getInMemoryFormat();
}

@Override
Expand All @@ -105,12 +112,20 @@ protected int getBatchSize(String dataStructureName) {

@Override
protected Object getMergePolicy(String dataStructureName) {
return cacheSplitBrainHandlerService.getMergePolicy(dataStructureName);
ConcurrentMap<String, CacheConfig> configs = cacheService.getConfigs();
CacheConfig cacheConfig = configs.get(dataStructureName);
String mergePolicyName = cacheConfig.getMergePolicy();
return mergePolicyProvider.getMergePolicy(mergePolicyName);
}

@Override
protected String getDataStructureName(ICacheRecordStore iCacheRecordStore) {
return iCacheRecordStore.getName();
}

@Override
protected void destroyStores(Collection<ICacheRecordStore> stores) {
cacheSplitBrainHandlerService.destroyStores(stores);
protected int getPartitionId(ICacheRecordStore store) {
return store.getPartitionId();
}

@Override
Expand Down
Expand Up @@ -100,6 +100,7 @@ public Operation prepareReplicationOperation(PartitionReplicationEvent event,

CachePartitionSegment segment = segments[event.getPartitionId()];
CacheReplicationOperation op = newCacheReplicationOperation();
op.setPartitionId(event.getPartitionId());
op.prepare(segment, namespaces, event.getReplicaIndex());
return op.isEmpty() ? null : op;
}
Expand All @@ -111,7 +112,6 @@ private boolean assertAllKnownNamespaces(Collection<ServiceNamespace> namespaces
return true;
}


protected CacheReplicationOperation newCacheReplicationOperation() {
return new CacheReplicationOperation();
}
Expand Down
Expand Up @@ -16,85 +16,52 @@

package com.hazelcast.cache.impl;

import com.hazelcast.cache.impl.merge.policy.CacheMergePolicyProvider;
import com.hazelcast.config.CacheConfig;
import com.hazelcast.spi.NodeEngine;
import com.hazelcast.spi.impl.merge.AbstractSplitBrainHandlerService;
import com.hazelcast.spi.impl.merge.BaseSplitBrainHandlerService;

import java.util.Collection;
import java.util.Iterator;
import java.util.Map;

import static com.hazelcast.cache.impl.AbstractCacheRecordStore.SOURCE_NOT_AVAILABLE;
import static com.hazelcast.cache.impl.ICacheService.SERVICE_NAME;
import static com.hazelcast.config.InMemoryFormat.NATIVE;
import static java.util.Collections.singletonList;
import static com.hazelcast.util.ThreadUtil.assertRunningOnPartitionThread;

/**
* Handles split-brain functionality for cache.
*/
class CacheSplitBrainHandlerService extends AbstractSplitBrainHandlerService<ICacheRecordStore> {
class CacheSplitBrainHandlerService extends BaseSplitBrainHandlerService<ICacheRecordStore> {

private final CacheService cacheService;
private final CachePartitionSegment[] segments;
private final Map<String, CacheConfig> configs;
private final CacheMergePolicyProvider mergePolicyProvider;

CacheSplitBrainHandlerService(NodeEngine nodeEngine,
Map<String, CacheConfig> configs,
CachePartitionSegment[] segments) {
super(nodeEngine);
this.configs = configs;
this.segments = segments;
this.mergePolicyProvider = new CacheMergePolicyProvider(nodeEngine);
this.cacheService = nodeEngine.getService(SERVICE_NAME);
}

public CacheMergePolicyProvider getMergePolicyProvider() {
return mergePolicyProvider;
}

@Override
protected Runnable newMergeRunnable(Map<String, Collection<ICacheRecordStore>> collectedStores,
Map<String, Collection<ICacheRecordStore>> collectedStoresWithLegacyPolicies,
Collection<ICacheRecordStore> backupStores,
NodeEngine nodeEngine) {
return new CacheMergeRunnable(collectedStores, collectedStoresWithLegacyPolicies,
backupStores, this, nodeEngine);
}

@Override
public String getDataStructureName(ICacheRecordStore recordStore) {
return recordStore.getName();
protected Runnable newMergeRunnable(Collection<ICacheRecordStore> mergingStores,
BaseSplitBrainHandlerService<ICacheRecordStore> splitBrainHandlerService) {
return new CacheMergeRunnable(mergingStores, splitBrainHandlerService, cacheService.nodeEngine);
}

@Override
protected Object getMergePolicy(String dataStructureName) {
CacheConfig cacheConfig = configs.get(dataStructureName);
String mergePolicyName = cacheConfig.getMergePolicy();
return mergePolicyProvider.getMergePolicy(mergePolicyName);
protected Iterator<ICacheRecordStore> storeIterator(int partitionId) {
return segments[partitionId].recordStoreIterator();
}

@Override
protected void onPrepareMergeRunnableEnd(Collection<String> dataStructureNames) {
for (String cacheName : dataStructureNames) {
cacheService.sendInvalidationEvent(cacheName, null, SOURCE_NOT_AVAILABLE);
}
}
protected void destroyStore(ICacheRecordStore store) {
assertRunningOnPartitionThread();

@Override
protected Collection<Iterator<ICacheRecordStore>> iteratorsOf(int partitionId) {
return singletonList(segments[partitionId].recordStoreIterator());
store.destroyInternals();
}

@Override
protected void destroyStore(ICacheRecordStore store) {
assert store.getConfig().getInMemoryFormat() != NATIVE;

store.destroy();
}
protected boolean hasEntry(ICacheRecordStore store) {
assertRunningOnPartitionThread();

public Map<String, CacheConfig> getConfigs() {
return configs;
return store.size() > 0;
}
}
Expand Up @@ -178,7 +178,7 @@ protected void removeCacheConfigFromLocal(String cacheNameWithPrefix) {

@Override
protected <K, V> void validateCacheConfig(CacheConfig<K, V> cacheConfig) {
CacheMergePolicyProvider mergePolicyProvider = cacheService.getCacheMergePolicyProvider();
CacheMergePolicyProvider mergePolicyProvider = cacheService.getMergePolicyProvider();
checkCacheConfig(cacheConfig, mergePolicyProvider);

Object mergePolicy = mergePolicyProvider.getMergePolicy(cacheConfig.getMergePolicy());
Expand Down
Expand Up @@ -357,6 +357,12 @@ public interface ICacheRecordStore {
*/
void destroy();

/**
* Like {@link #destroy()} but does not touch state on other services
* like event journal service.
*/
void destroyInternals();

/**
* Gets the configuration of the cache that this store belongs to.
*
Expand Down
Expand Up @@ -54,7 +54,7 @@ protected Object call() throws Exception {
CacheService cacheService = getService(CacheService.SERVICE_NAME);

if (cacheConfig != null) {
CacheMergePolicyProvider mergePolicyProvider = cacheService.getCacheMergePolicyProvider();
CacheMergePolicyProvider mergePolicyProvider = cacheService.getMergePolicyProvider();
checkCacheConfig(cacheConfig, mergePolicyProvider);

Object mergePolicy = mergePolicyProvider.getMergePolicy(cacheConfig.getMergePolicy());
Expand Down
Expand Up @@ -20,7 +20,6 @@
import com.hazelcast.instance.LifecycleServiceImpl;
import com.hazelcast.instance.Node;
import com.hazelcast.internal.cluster.ClusterService;
import com.hazelcast.logging.ILogger;
import com.hazelcast.nio.Disposable;
import com.hazelcast.spi.CoreService;
import com.hazelcast.spi.ManagedService;
Expand All @@ -46,16 +45,12 @@ class ClusterMergeTask implements Runnable {

private static final String MERGE_TASKS_EXECUTOR = "hz:cluster-merge";

private final boolean wasLiteMember;
private final Node node;
private final ILogger logger;
private final LifecycleServiceImpl lifecycleService;

ClusterMergeTask(Node node) {
this.node = node;
this.logger = node.getLogger(getClass());
this.lifecycleService = node.hazelcastInstance.getLifecycleService();
this.wasLiteMember = node.clusterService.getLocalMember().isLiteMember();
}

public void run() {
Expand Down Expand Up @@ -83,13 +78,7 @@ public void run() {
}
}
} finally {
try {
if (joined) {
tryToPromoteLocalLiteMember();
}
} finally {
lifecycleService.fireLifecycleEvent(joined ? MERGED : MERGE_FAILED);
}
lifecycleService.fireLifecycleEvent(joined ? MERGED : MERGE_FAILED);
}
}

Expand All @@ -106,19 +95,6 @@ private void disposeTasks(Collection<Runnable>... tasks) {
}
}

private void tryToPromoteLocalLiteMember() {
if (wasLiteMember) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wasLiteMember field can be deleted too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// this node was a lite-member so no promotion needed after merging
return;
}

logger.info("Local lite-member was previously a data-member, now trying to promote it back...");

node.clusterService.promoteLocalLiteMember();

logger.info("Promoted local lite-member upon finish of split brain healing");
}

private boolean isJoined() {
return node.isRunning() && node.getClusterService().isJoined();
}
Expand Down