Skip to content

Commit

Permalink
Dispose store immediately after merge
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmetmircik committed Mar 29, 2018
1 parent 33421a9 commit 3fa85e6
Show file tree
Hide file tree
Showing 13 changed files with 417 additions and 333 deletions.
Expand Up @@ -142,15 +142,14 @@ public final void init(NodeEngine nodeEngine, Properties properties) {
segments[i] = newPartitionSegment(i);
}
this.cacheEventHandler = new CacheEventHandler(nodeEngine);
this.splitBrainHandlerService = newSplitBrainHandlerService(nodeEngine);
this.splitBrainHandlerService = new CacheSplitBrainHandlerService(nodeEngine, segments);
this.logger = nodeEngine.getLogger(getClass());
this.eventJournal = new RingbufferCacheEventJournalImpl(nodeEngine);
postInit(nodeEngine, properties);
}

// this method is overridden on ee
protected CacheSplitBrainHandlerService newSplitBrainHandlerService(NodeEngine nodeEngine) {
return new CacheSplitBrainHandlerService(nodeEngine, configs, segments);
public ConcurrentMap<String, CacheConfig> getConfigs() {
return configs;
}

protected void postInit(NodeEngine nodeEngine, Properties properties) {
Expand Down Expand Up @@ -228,7 +227,7 @@ public DistributedObject createDistributedObject(String cacheNameWithPrefix) {
cacheConfig.setManagerPrefix(HazelcastCacheManager.CACHE_MANAGER_PREFIX);
}

CacheMergePolicyProvider mergePolicyProvider = splitBrainHandlerService.getMergePolicyProvider();
CacheMergePolicyProvider mergePolicyProvider = new CacheMergePolicyProvider(nodeEngine);
checkCacheConfig(cacheConfig.getInMemoryFormat(), cacheConfig.getEvictionConfig(),
cacheConfig.isStatisticsEnabled(), cacheConfig.getMergePolicy());

Expand Down
Expand Up @@ -19,6 +19,7 @@
import com.hazelcast.cache.CacheEntryView;
import com.hazelcast.cache.CacheMergePolicy;
import com.hazelcast.cache.impl.merge.entry.DefaultCacheEntryView;
import com.hazelcast.cache.impl.merge.policy.CacheMergePolicyProvider;
import com.hazelcast.cache.impl.operation.CacheLegacyMergeOperation;
import com.hazelcast.cache.impl.record.CacheRecord;
import com.hazelcast.config.CacheConfig;
Expand All @@ -35,35 +36,44 @@
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;

import static com.hazelcast.cache.impl.AbstractCacheRecordStore.SOURCE_NOT_AVAILABLE;
import static com.hazelcast.cache.impl.ICacheService.SERVICE_NAME;
import static com.hazelcast.config.MergePolicyConfig.DEFAULT_BATCH_SIZE;
import static com.hazelcast.spi.impl.merge.MergingValueFactory.createMergingEntry;
import static com.hazelcast.util.ThreadUtil.assertRunningOnPartitionThread;

class CacheMergeRunnable extends AbstractMergeRunnable<ICacheRecordStore, MergingEntry<Data, Data>> {

private final CacheService cacheService;
private final CacheSplitBrainHandlerService cacheSplitBrainHandlerService;
private final CacheMergePolicyProvider mergePolicyProvider;

CacheMergeRunnable(Map<String, Collection<ICacheRecordStore>> collectedStores,
Map<String, Collection<ICacheRecordStore>> collectedStoresWithLegacyPolicies,
Collection<ICacheRecordStore> backupStores,
CacheSplitBrainHandlerService cacheSplitBrainHandlerService,
CacheMergeRunnable(Collection<ICacheRecordStore> mergingStores,
Collection<ICacheRecordStore> notMergingStores,
NodeEngine nodeEngine) {
super(CacheService.SERVICE_NAME, collectedStores, collectedStoresWithLegacyPolicies, backupStores, nodeEngine);
super(CacheService.SERVICE_NAME, mergingStores, notMergingStores, nodeEngine);

this.cacheService = nodeEngine.getService(SERVICE_NAME);
this.cacheSplitBrainHandlerService = cacheSplitBrainHandlerService;
this.mergePolicyProvider = new CacheMergePolicyProvider(nodeEngine);
}

@Override
protected void onPrepare(Set<String> mergingCacheNames) {
for (String cacheName : mergingCacheNames) {
cacheService.sendInvalidationEvent(cacheName, null, SOURCE_NOT_AVAILABLE);
}
}

@Override
protected void consumeStore(ICacheRecordStore store, BiConsumer<Integer, MergingEntry<Data, Data>> consumer) {
int partitionId = store.getPartitionId();

for (Map.Entry<Data, CacheRecord> entry : store.getReadOnlyRecords().entrySet()) {
Data key = entry.getKey();
Data key = toHeapData(entry.getKey());
CacheRecord record = entry.getValue();
Data dataValue = toData(record.getValue());
Data dataValue = toHeapData(record.getValue());

consumer.accept(partitionId, createMergingEntry(getSerializationService(), key, dataValue, record));
}
Expand Down Expand Up @@ -92,7 +102,7 @@ protected void consumeStoreLegacy(ICacheRecordStore recordStore, BiConsumer<Inte

@Override
protected InMemoryFormat getInMemoryFormat(String dataStructureName) {
return cacheSplitBrainHandlerService.getConfigs().get(dataStructureName).getInMemoryFormat();
return cacheService.getConfigs().get(dataStructureName).getInMemoryFormat();
}

@Override
Expand All @@ -105,12 +115,27 @@ protected int getBatchSize(String dataStructureName) {

@Override
protected Object getMergePolicy(String dataStructureName) {
return cacheSplitBrainHandlerService.getMergePolicy(dataStructureName);
ConcurrentMap<String, CacheConfig> configs = cacheService.getConfigs();
CacheConfig cacheConfig = configs.get(dataStructureName);
String mergePolicyName = cacheConfig.getMergePolicy();
return mergePolicyProvider.getMergePolicy(mergePolicyName);
}

@Override
protected String getDataStructureName(ICacheRecordStore iCacheRecordStore) {
return iCacheRecordStore.getName();
}

@Override
protected int getPartitionId(ICacheRecordStore store) {
return store.getPartitionId();
}

@Override
protected void destroyStores(Collection<ICacheRecordStore> stores) {
cacheSplitBrainHandlerService.destroyStores(stores);
protected void destroyStore(ICacheRecordStore store) {
assertRunningOnPartitionThread();

store.destroy();
}

@Override
Expand Down
Expand Up @@ -100,6 +100,7 @@ public Operation prepareReplicationOperation(PartitionReplicationEvent event,

CachePartitionSegment segment = segments[event.getPartitionId()];
CacheReplicationOperation op = newCacheReplicationOperation();
op.setPartitionId(event.getPartitionId());
op.prepare(segment, namespaces, event.getReplicaIndex());
return op.isEmpty() ? null : op;
}
Expand All @@ -111,7 +112,6 @@ private boolean assertAllKnownNamespaces(Collection<ServiceNamespace> namespaces
return true;
}


protected CacheReplicationOperation newCacheReplicationOperation() {
return new CacheReplicationOperation();
}
Expand Down
Expand Up @@ -16,19 +16,14 @@

package com.hazelcast.cache.impl;

import com.hazelcast.cache.impl.merge.policy.CacheMergePolicyProvider;
import com.hazelcast.config.CacheConfig;
import com.hazelcast.spi.NodeEngine;
import com.hazelcast.spi.impl.merge.AbstractSplitBrainHandlerService;

import java.util.Collection;
import java.util.Iterator;
import java.util.Map;

import static com.hazelcast.cache.impl.AbstractCacheRecordStore.SOURCE_NOT_AVAILABLE;
import static com.hazelcast.cache.impl.ICacheService.SERVICE_NAME;
import static com.hazelcast.config.InMemoryFormat.NATIVE;
import static java.util.Collections.singletonList;
import static com.hazelcast.util.ThreadUtil.assertRunningOnPartitionThread;

/**
* Handles split-brain functionality for cache.
Expand All @@ -37,64 +32,29 @@ class CacheSplitBrainHandlerService extends AbstractSplitBrainHandlerService<ICa

private final CacheService cacheService;
private final CachePartitionSegment[] segments;
private final Map<String, CacheConfig> configs;
private final CacheMergePolicyProvider mergePolicyProvider;

CacheSplitBrainHandlerService(NodeEngine nodeEngine,
Map<String, CacheConfig> configs,
CachePartitionSegment[] segments) {
super(nodeEngine);
this.configs = configs;
this.segments = segments;
this.mergePolicyProvider = new CacheMergePolicyProvider(nodeEngine);
this.cacheService = nodeEngine.getService(SERVICE_NAME);
}

public CacheMergePolicyProvider getMergePolicyProvider() {
return mergePolicyProvider;
}

@Override
protected Runnable newMergeRunnable(Map<String, Collection<ICacheRecordStore>> collectedStores,
Map<String, Collection<ICacheRecordStore>> collectedStoresWithLegacyPolicies,
Collection<ICacheRecordStore> backupStores,
NodeEngine nodeEngine) {
return new CacheMergeRunnable(collectedStores, collectedStoresWithLegacyPolicies,
backupStores, this, nodeEngine);
protected Runnable newMergeRunnable(Collection<ICacheRecordStore> mergingStores,
Collection<ICacheRecordStore> notMergingStores) {
return new CacheMergeRunnable(mergingStores, notMergingStores, cacheService.nodeEngine);
}

@Override
public String getDataStructureName(ICacheRecordStore recordStore) {
return recordStore.getName();
protected Iterator<ICacheRecordStore> storeIterator(int partitionId) {
return segments[partitionId].recordStoreIterator();
}

@Override
protected Object getMergePolicy(String dataStructureName) {
CacheConfig cacheConfig = configs.get(dataStructureName);
String mergePolicyName = cacheConfig.getMergePolicy();
return mergePolicyProvider.getMergePolicy(mergePolicyName);
}

@Override
protected void onPrepareMergeRunnableEnd(Collection<String> dataStructureNames) {
for (String cacheName : dataStructureNames) {
cacheService.sendInvalidationEvent(cacheName, null, SOURCE_NOT_AVAILABLE);
}
}

@Override
protected Collection<Iterator<ICacheRecordStore>> iteratorsOf(int partitionId) {
return singletonList(segments[partitionId].recordStoreIterator());
}

@Override
protected void destroyStore(ICacheRecordStore store) {
assert store.getConfig().getInMemoryFormat() != NATIVE;

store.destroy();
}
protected boolean hasEntry(ICacheRecordStore store) {
assertRunningOnPartitionThread();

public Map<String, CacheConfig> getConfigs() {
return configs;
return store.size() > 0;
}
}
Expand Up @@ -23,7 +23,9 @@
import com.hazelcast.map.impl.operation.MapOperationProvider;
import com.hazelcast.map.impl.record.Record;
import com.hazelcast.map.impl.recordstore.RecordStore;
import com.hazelcast.map.merge.IgnoreMergingEntryMapMergePolicy;
import com.hazelcast.map.merge.MapMergePolicy;
import com.hazelcast.map.merge.MergePolicyProvider;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.spi.Operation;
import com.hazelcast.spi.OperationFactory;
Expand All @@ -36,25 +38,29 @@
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import static com.hazelcast.map.impl.EntryViews.createSimpleEntryView;
import static com.hazelcast.spi.impl.merge.MergingValueFactory.createMergingEntry;
import static com.hazelcast.util.ThreadUtil.assertRunningOnPartitionThread;

class MapMergeRunnable extends AbstractMergeRunnable<RecordStore, MergingEntry<Data, Data>> {

private final MapServiceContext mapServiceContext;
private final MapSplitBrainHandlerService mapSplitBrainHandlerService;
private final MergePolicyProvider mergePolicyProvider;

MapMergeRunnable(Map<String, Collection<RecordStore>> collectedStores,
Map<String, Collection<RecordStore>> collectedStoresWithLegacyPolicies,
Collection<RecordStore> backupStores, MapServiceContext mapServiceContext,
MapSplitBrainHandlerService mapSplitBrainHandlerService) {
super(MapService.SERVICE_NAME, collectedStores, collectedStoresWithLegacyPolicies,
backupStores, mapServiceContext.getNodeEngine());
MapMergeRunnable(Collection<RecordStore> mergingStores,
Collection<RecordStore> notMergingStores,
MapServiceContext mapServiceContext) {
super(MapService.SERVICE_NAME, mergingStores, notMergingStores, mapServiceContext.getNodeEngine());

this.mapServiceContext = mapServiceContext;
this.mapSplitBrainHandlerService = mapSplitBrainHandlerService;
this.mergePolicyProvider = mapServiceContext.getMergePolicyProvider();
}

@Override
protected boolean hasDiscardPolicy(Object mergePolicy) {
return mergePolicy instanceof IgnoreMergingEntryMapMergePolicy
|| super.hasDiscardPolicy(mergePolicy);
}

@Override
Expand All @@ -66,9 +72,11 @@ protected void consumeStore(RecordStore store, BiConsumer<Integer, MergingEntry<
while (iterator.hasNext()) {
Record record = iterator.next();

Data dataValue = toData(record.getValue());
MergingEntry<Data, Data> mergingEntry = createMergingEntry(getSerializationService(), record, dataValue);
consumer.accept(partitionId, mergingEntry);
Data dataKey = toHeapData(record.getKey());
Data dataValue = toHeapData(record.getValue());

consumer.accept(partitionId,
createMergingEntry(getSerializationService(), dataKey, dataValue, record));
}
}

Expand All @@ -95,25 +103,45 @@ protected void consumeStoreLegacy(RecordStore store, BiConsumer<Integer, Operati

@Override
protected int getBatchSize(String dataStructureName) {
MapConfig mapConfig = mapSplitBrainHandlerService.getMapConfig(dataStructureName);
MapConfig mapConfig = getMapConfig(dataStructureName);
MergePolicyConfig mergePolicyConfig = mapConfig.getMergePolicyConfig();
return mergePolicyConfig.getBatchSize();
}

@Override
protected InMemoryFormat getInMemoryFormat(String dataStructureName) {
MapConfig mapConfig = mapSplitBrainHandlerService.getMapConfig(dataStructureName);
MapConfig mapConfig = getMapConfig(dataStructureName);
return mapConfig.getInMemoryFormat();
}

@Override
protected Object getMergePolicy(String dataStructureName) {
return mapSplitBrainHandlerService.getMergePolicy(dataStructureName);
MapConfig mapConfig = getMapConfig(dataStructureName);
MergePolicyConfig mergePolicyConfig = mapConfig.getMergePolicyConfig();
return mergePolicyProvider.getMergePolicy(mergePolicyConfig.getPolicy());
}

private MapConfig getMapConfig(String dataStructureName) {
MapContainer mapContainer = mapServiceContext.getMapContainer(dataStructureName);
return mapContainer.getMapConfig();
}

@Override
protected void destroyStores(Collection<RecordStore> stores) {
mapSplitBrainHandlerService.destroyStores(stores);
protected String getDataStructureName(RecordStore recordStore) {
return recordStore.getName();
}

@Override
protected int getPartitionId(RecordStore store) {
return store.getPartitionId();
}

@Override
protected void destroyStore(RecordStore store) {
assertRunningOnPartitionThread();

store.destroy();
store.getMapContainer().getIndexes(store.getPartitionId()).clearIndexes();
}

@Override
Expand Down

0 comments on commit 3fa85e6

Please sign in to comment.