Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmetmircik committed Mar 28, 2018
1 parent 8704aaf commit 39f80fc
Show file tree
Hide file tree
Showing 16 changed files with 409 additions and 370 deletions.
Expand Up @@ -150,7 +150,11 @@ public final void init(NodeEngine nodeEngine, Properties properties) {

// this method is overridden on ee
protected CacheSplitBrainHandlerService newSplitBrainHandlerService(NodeEngine nodeEngine) {
return new CacheSplitBrainHandlerService(nodeEngine, configs, segments);
return new CacheSplitBrainHandlerService(nodeEngine, segments);
}

public ConcurrentMap<String, CacheConfig> getConfigs() {
return configs;
}

protected void postInit(NodeEngine nodeEngine, Properties properties) {
Expand Down Expand Up @@ -228,7 +232,7 @@ public DistributedObject createDistributedObject(String cacheNameWithPrefix) {
cacheConfig.setManagerPrefix(HazelcastCacheManager.CACHE_MANAGER_PREFIX);
}

CacheMergePolicyProvider mergePolicyProvider = splitBrainHandlerService.getMergePolicyProvider();
CacheMergePolicyProvider mergePolicyProvider = new CacheMergePolicyProvider(nodeEngine);
checkCacheConfig(cacheConfig.getInMemoryFormat(), cacheConfig.getEvictionConfig(),
cacheConfig.isStatisticsEnabled(), cacheConfig.getMergePolicy());

Expand Down
Expand Up @@ -19,6 +19,7 @@
import com.hazelcast.cache.CacheEntryView;
import com.hazelcast.cache.CacheMergePolicy;
import com.hazelcast.cache.impl.merge.entry.DefaultCacheEntryView;
import com.hazelcast.cache.impl.merge.policy.CacheMergePolicyProvider;
import com.hazelcast.cache.impl.operation.CacheLegacyMergeOperation;
import com.hazelcast.cache.impl.record.CacheRecord;
import com.hazelcast.config.CacheConfig;
Expand All @@ -35,35 +36,45 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;

import static com.hazelcast.cache.impl.AbstractCacheRecordStore.SOURCE_NOT_AVAILABLE;
import static com.hazelcast.cache.impl.ICacheService.SERVICE_NAME;
import static com.hazelcast.config.MergePolicyConfig.DEFAULT_BATCH_SIZE;
import static com.hazelcast.spi.impl.merge.MergingValueFactory.createMergingEntry;
import static com.hazelcast.util.ThreadUtil.assertRunningOnPartitionThread;

class CacheMergeRunnable extends AbstractMergeRunnable<ICacheRecordStore, MergingEntry<Data, Data>> {

private final CacheService cacheService;
private final CacheSplitBrainHandlerService cacheSplitBrainHandlerService;
private final CacheMergePolicyProvider mergePolicyProvider;

CacheMergeRunnable(Map<String, Collection<ICacheRecordStore>> collectedStores,
Map<String, Collection<ICacheRecordStore>> collectedStoresWithLegacyPolicies,
Collection<ICacheRecordStore> backupStores,
CacheSplitBrainHandlerService cacheSplitBrainHandlerService,
CacheMergeRunnable(Collection<ICacheRecordStore> mergingStores,
Collection<ICacheRecordStore> noMergingStores,
NodeEngine nodeEngine) {
super(CacheService.SERVICE_NAME, collectedStores, collectedStoresWithLegacyPolicies, backupStores, nodeEngine);
super(CacheService.SERVICE_NAME, mergingStores, noMergingStores, nodeEngine);

this.cacheService = nodeEngine.getService(SERVICE_NAME);
this.cacheSplitBrainHandlerService = cacheSplitBrainHandlerService;
this.mergePolicyProvider = new CacheMergePolicyProvider(nodeEngine);

}

@Override
protected void onPrepare(Set<String> mergingDataStructureNames) {
for (String cacheName : mergingDataStructureNames) {
cacheService.sendInvalidationEvent(cacheName, null, SOURCE_NOT_AVAILABLE);
}
}

@Override
protected void consumeStore(ICacheRecordStore store, BiConsumer<Integer, MergingEntry<Data, Data>> consumer) {
int partitionId = store.getPartitionId();

for (Map.Entry<Data, CacheRecord> entry : store.getReadOnlyRecords().entrySet()) {
Data key = entry.getKey();
Data key = toHeapData(entry.getKey());
CacheRecord record = entry.getValue();
Data dataValue = toData(record.getValue());
Data dataValue = toHeapData(record.getValue());

consumer.accept(partitionId, createMergingEntry(getSerializationService(), key, dataValue, record));
}
Expand Down Expand Up @@ -92,7 +103,7 @@ protected void consumeStoreLegacy(ICacheRecordStore recordStore, BiConsumer<Inte

@Override
protected InMemoryFormat getInMemoryFormat(String dataStructureName) {
return cacheSplitBrainHandlerService.getConfigs().get(dataStructureName).getInMemoryFormat();
return cacheService.getConfigs().get(dataStructureName).getInMemoryFormat();
}

@Override
Expand All @@ -105,12 +116,27 @@ protected int getBatchSize(String dataStructureName) {

@Override
protected Object getMergePolicy(String dataStructureName) {
return cacheSplitBrainHandlerService.getMergePolicy(dataStructureName);
ConcurrentMap<String, CacheConfig> configs = cacheService.getConfigs();
CacheConfig cacheConfig = configs.get(dataStructureName);
String mergePolicyName = cacheConfig.getMergePolicy();
return mergePolicyProvider.getMergePolicy(mergePolicyName);
}

@Override
protected void destroyStores(Collection<ICacheRecordStore> stores) {
cacheSplitBrainHandlerService.destroyStores(stores);
protected String getDataStructureName(ICacheRecordStore iCacheRecordStore) {
return iCacheRecordStore.getName();
}

@Override
protected int getPartitionId(ICacheRecordStore store) {
return store.getPartitionId();
}

@Override
protected void destroyStore(ICacheRecordStore store) {
assertRunningOnPartitionThread();

store.destroy();
}

@Override
Expand Down
Expand Up @@ -100,6 +100,7 @@ public Operation prepareReplicationOperation(PartitionReplicationEvent event,

CachePartitionSegment segment = segments[event.getPartitionId()];
CacheReplicationOperation op = newCacheReplicationOperation();
op.setPartitionId(event.getPartitionId());
op.prepare(segment, namespaces, event.getReplicaIndex());
return op.isEmpty() ? null : op;
}
Expand All @@ -111,7 +112,6 @@ private boolean assertAllKnownNamespaces(Collection<ServiceNamespace> namespaces
return true;
}


protected CacheReplicationOperation newCacheReplicationOperation() {
return new CacheReplicationOperation();
}
Expand Down
Expand Up @@ -16,19 +16,13 @@

package com.hazelcast.cache.impl;

import com.hazelcast.cache.impl.merge.policy.CacheMergePolicyProvider;
import com.hazelcast.config.CacheConfig;
import com.hazelcast.spi.NodeEngine;
import com.hazelcast.spi.impl.merge.AbstractSplitBrainHandlerService;

import java.util.Collection;
import java.util.Iterator;
import java.util.Map;

import static com.hazelcast.cache.impl.AbstractCacheRecordStore.SOURCE_NOT_AVAILABLE;
import static com.hazelcast.cache.impl.ICacheService.SERVICE_NAME;
import static com.hazelcast.config.InMemoryFormat.NATIVE;
import static java.util.Collections.singletonList;

/**
* Handles split-brain functionality for cache.
Expand All @@ -37,64 +31,27 @@ class CacheSplitBrainHandlerService extends AbstractSplitBrainHandlerService<ICa

private final CacheService cacheService;
private final CachePartitionSegment[] segments;
private final Map<String, CacheConfig> configs;
private final CacheMergePolicyProvider mergePolicyProvider;

CacheSplitBrainHandlerService(NodeEngine nodeEngine,
Map<String, CacheConfig> configs,
CachePartitionSegment[] segments) {
super(nodeEngine);
this.configs = configs;
this.segments = segments;
this.mergePolicyProvider = new CacheMergePolicyProvider(nodeEngine);
this.cacheService = nodeEngine.getService(SERVICE_NAME);
}

public CacheMergePolicyProvider getMergePolicyProvider() {
return mergePolicyProvider;
}

@Override
protected Runnable newMergeRunnable(Map<String, Collection<ICacheRecordStore>> collectedStores,
Map<String, Collection<ICacheRecordStore>> collectedStoresWithLegacyPolicies,
Collection<ICacheRecordStore> backupStores,
NodeEngine nodeEngine) {
return new CacheMergeRunnable(collectedStores, collectedStoresWithLegacyPolicies,
backupStores, this, nodeEngine);
}

@Override
public String getDataStructureName(ICacheRecordStore recordStore) {
return recordStore.getName();
}

@Override
protected Object getMergePolicy(String dataStructureName) {
CacheConfig cacheConfig = configs.get(dataStructureName);
String mergePolicyName = cacheConfig.getMergePolicy();
return mergePolicyProvider.getMergePolicy(mergePolicyName);
}

@Override
protected void onPrepareMergeRunnableEnd(Collection<String> dataStructureNames) {
for (String cacheName : dataStructureNames) {
cacheService.sendInvalidationEvent(cacheName, null, SOURCE_NOT_AVAILABLE);
}
protected Runnable newMergeRunnable(Collection<ICacheRecordStore> mergingStores,
Collection<ICacheRecordStore> noMergingStores) {
return new CacheMergeRunnable(mergingStores, noMergingStores, cacheService.nodeEngine);
}

@Override
protected Collection<Iterator<ICacheRecordStore>> iteratorsOf(int partitionId) {
return singletonList(segments[partitionId].recordStoreIterator());
protected Iterator<ICacheRecordStore> storeIterator(int partitionId) {
return segments[partitionId].recordStoreIterator();
}

@Override
protected void destroyStore(ICacheRecordStore store) {
assert store.getConfig().getInMemoryFormat() != NATIVE;

store.destroy();
}

public Map<String, CacheConfig> getConfigs() {
return configs;
protected boolean hasEntry(ICacheRecordStore store) {
return store.size() > 0;
}
}
Expand Up @@ -83,13 +83,7 @@ public void run() {
}
}
} finally {
try {
if (joined) {
tryToPromoteLocalLiteMember();
}
} finally {
lifecycleService.fireLifecycleEvent(joined ? MERGED : MERGE_FAILED);
}
lifecycleService.fireLifecycleEvent(joined ? MERGED : MERGE_FAILED);
}
}

Expand All @@ -106,19 +100,6 @@ private void disposeTasks(Collection<Runnable>... tasks) {
}
}

private void tryToPromoteLocalLiteMember() {
if (wasLiteMember) {
// this node was a lite-member so no promotion needed after merging
return;
}

logger.info("Local lite-member was previously a data-member, now trying to promote it back...");

node.clusterService.promoteLocalLiteMember();

logger.info("Promoted local lite-member upon finish of split brain healing");
}

private boolean isJoined() {
return node.isRunning() && node.getClusterService().isJoined();
}
Expand Down
Expand Up @@ -312,14 +312,10 @@ public void merge(Address newTargetAddress) {

@Override
public void reset() {
reset(false);
}

public void reset(boolean isForceStart) {
lock.lock();
try {
resetJoinState();
resetLocalMember(isForceStart);
resetLocalMemberUuid();
resetClusterId();
clearInternalState();
} finally {
Expand All @@ -328,14 +324,10 @@ public void reset(boolean isForceStart) {
}

/**
* Reset means:
* - Give a new uuid to local member
* - If {@code isForceStart} is {@code false}, set member type of local member to lite
*
* @param isForceStart set {@code true} if this method is called to start local node forcibly for hot-restart,
* otherwise set it to {@code false} when resetting this service
* Give a new uuid to local member otherwise set it to {@code false} when
* resetting this service
*/
private void resetLocalMember(boolean isForceStart) {
private void resetLocalMemberUuid() {
assert lock.isHeldByCurrentThread() : "Called without holding cluster service lock!";
assert !isJoined() : "Cannot reset local member UUID when joined.";

Expand All @@ -344,19 +336,13 @@ private void resetLocalMember(boolean isForceStart) {

logger.warning("Resetting local member UUID. Previous: " + localMember.getUuid() + ", new: " + newUuid);

boolean liteMember = !isForceStart || localMember.isLiteMember();

MemberImpl resetLocalMember = new MemberImpl(address, localMember.getVersion(),
true, newUuid, localMember.getAttributes(), liteMember,
true, newUuid, localMember.getAttributes(), localMember.isLiteMember(),
localMember.getMemberListJoinVersion(), node.hazelcastInstance);

localMember = resetLocalMember;

if (!isForceStart) {
logger.info("Converted local member to lite-member before start of split brain healing");
}

node.loggingService.setThisMember(this.localMember);
node.loggingService.setThisMember(localMember);
}

public void resetJoinState() {
Expand Down

0 comments on commit 39f80fc

Please sign in to comment.