Skip to content

Commit

Permalink
Dispose store immediately when unneeded during merge
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmetmircik committed Apr 5, 2018
1 parent 0ca109b commit 8d23020
Show file tree
Hide file tree
Showing 21 changed files with 513 additions and 435 deletions.
Expand Up @@ -1650,6 +1650,13 @@ public void destroy() {
onDestroy();
}

@Override
public void destroyInternals() {
reset();
closeListeners();
onDestroy();
}

protected void onDestroy() {
}

Expand Down
Expand Up @@ -135,12 +135,13 @@ public Set<Closeable> createNew(String name) {
}
};

protected ILogger logger;
protected NodeEngine nodeEngine;
protected CachePartitionSegment[] segments;
protected CacheEventHandler cacheEventHandler;
protected CacheSplitBrainHandlerService splitBrainHandlerService;
protected RingbufferCacheEventJournalImpl eventJournal;
protected ILogger logger;
protected CacheMergePolicyProvider mergePolicyProvider;
protected CacheSplitBrainHandlerService splitBrainHandlerService;

@Override
public final void init(NodeEngine nodeEngine, Properties properties) {
Expand All @@ -151,15 +152,20 @@ public final void init(NodeEngine nodeEngine, Properties properties) {
segments[i] = newPartitionSegment(i);
}
this.cacheEventHandler = new CacheEventHandler(nodeEngine);
this.splitBrainHandlerService = newSplitBrainHandlerService(nodeEngine);
this.splitBrainHandlerService = new CacheSplitBrainHandlerService(nodeEngine, segments);
this.logger = nodeEngine.getLogger(getClass());
this.eventJournal = new RingbufferCacheEventJournalImpl(nodeEngine);
this.mergePolicyProvider = new CacheMergePolicyProvider(nodeEngine);

postInit(nodeEngine, properties);
}

// this method is overridden on ee
protected CacheSplitBrainHandlerService newSplitBrainHandlerService(NodeEngine nodeEngine) {
return new CacheSplitBrainHandlerService(nodeEngine, configs, segments);
public CacheMergePolicyProvider getMergePolicyProvider() {
return mergePolicyProvider;
}

public ConcurrentMap<String, CacheConfig> getConfigs() {
return configs;
}

protected void postInit(NodeEngine nodeEngine, Properties properties) {
Expand Down Expand Up @@ -237,7 +243,6 @@ public DistributedObject createDistributedObject(String cacheNameWithPrefix) {
cacheConfig.setManagerPrefix(HazelcastCacheManager.CACHE_MANAGER_PREFIX);
}

CacheMergePolicyProvider mergePolicyProvider = splitBrainHandlerService.getMergePolicyProvider();
checkCacheConfig(cacheConfig, mergePolicyProvider);

Object mergePolicy = mergePolicyProvider.getMergePolicy(cacheConfig.getMergePolicy());
Expand Down Expand Up @@ -744,10 +749,6 @@ public Runnable prepareMergeRunnable() {
return splitBrainHandlerService.prepareMergeRunnable();
}

public CacheMergePolicyProvider getCacheMergePolicyProvider() {
return splitBrainHandlerService.getMergePolicyProvider();
}

public CacheEventHandler getCacheEventHandler() {
return cacheEventHandler;
}
Expand Down
Expand Up @@ -19,6 +19,7 @@
import com.hazelcast.cache.CacheEntryView;
import com.hazelcast.cache.CacheMergePolicy;
import com.hazelcast.cache.impl.merge.entry.DefaultCacheEntryView;
import com.hazelcast.cache.impl.merge.policy.CacheMergePolicyProvider;
import com.hazelcast.cache.impl.operation.CacheLegacyMergeOperation;
import com.hazelcast.cache.impl.record.CacheRecord;
import com.hazelcast.config.CacheConfig;
Expand All @@ -28,49 +29,55 @@
import com.hazelcast.spi.Operation;
import com.hazelcast.spi.OperationFactory;
import com.hazelcast.spi.impl.merge.AbstractMergeRunnable;
import com.hazelcast.spi.impl.merge.BaseSplitBrainHandlerService;
import com.hazelcast.spi.merge.SplitBrainMergePolicy;
import com.hazelcast.spi.merge.SplitBrainMergeTypes.CacheMergeTypes;
import com.hazelcast.util.function.BiConsumer;

import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;

import static com.hazelcast.cache.impl.AbstractCacheRecordStore.SOURCE_NOT_AVAILABLE;
import static com.hazelcast.cache.impl.ICacheService.SERVICE_NAME;
import static com.hazelcast.config.MergePolicyConfig.DEFAULT_BATCH_SIZE;
import static com.hazelcast.spi.impl.merge.MergingValueFactory.createMergingEntry;

class CacheMergeRunnable extends AbstractMergeRunnable<Data, Data, ICacheRecordStore, CacheMergeTypes> {

private final CacheService cacheService;
private final CacheSplitBrainHandlerService cacheSplitBrainHandlerService;
private final CacheMergePolicyProvider mergePolicyProvider;

CacheMergeRunnable(Map<String, Collection<ICacheRecordStore>> collectedStores,
Map<String, Collection<ICacheRecordStore>> collectedStoresWithLegacyPolicies,
Collection<ICacheRecordStore> backupStores,
CacheSplitBrainHandlerService cacheSplitBrainHandlerService,
CacheMergeRunnable(Collection<ICacheRecordStore> mergingStores,
BaseSplitBrainHandlerService<ICacheRecordStore> splitBrainHandlerService,
NodeEngine nodeEngine) {
super(CacheService.SERVICE_NAME, collectedStores, collectedStoresWithLegacyPolicies, backupStores, nodeEngine);
super(CacheService.SERVICE_NAME, mergingStores, splitBrainHandlerService, nodeEngine);

this.cacheService = nodeEngine.getService(SERVICE_NAME);
this.cacheSplitBrainHandlerService = cacheSplitBrainHandlerService;
this.mergePolicyProvider = cacheService.mergePolicyProvider;
}

@Override
protected void consumeStore(ICacheRecordStore store, BiConsumer<Integer, CacheMergeTypes> consumer) {
protected void onMerge(String cacheName) {
cacheService.sendInvalidationEvent(cacheName, null, SOURCE_NOT_AVAILABLE);
}

@Override
protected void mergeStore(ICacheRecordStore store, BiConsumer<Integer, CacheMergeTypes> consumer) {
int partitionId = store.getPartitionId();

for (Map.Entry<Data, CacheRecord> entry : store.getReadOnlyRecords().entrySet()) {
Data key = entry.getKey();
Data key = toHeapData(entry.getKey());
CacheRecord record = entry.getValue();
Data dataValue = toData(record.getValue());
Data dataValue = toHeapData(record.getValue());

consumer.accept(partitionId, createMergingEntry(getSerializationService(), key, dataValue, record));
}
}

@Override
protected void consumeStoreLegacy(ICacheRecordStore recordStore, BiConsumer<Integer, Operation> consumer) {
protected void mergeStoreLegacy(ICacheRecordStore recordStore, BiConsumer<Integer, Operation> consumer) {
int partitionId = recordStore.getPartitionId();
String name = recordStore.getName();
CacheMergePolicy mergePolicy = ((CacheMergePolicy) getMergePolicy(name));
Expand All @@ -92,7 +99,7 @@ protected void consumeStoreLegacy(ICacheRecordStore recordStore, BiConsumer<Inte

@Override
protected InMemoryFormat getInMemoryFormat(String dataStructureName) {
return cacheSplitBrainHandlerService.getConfigs().get(dataStructureName).getInMemoryFormat();
return cacheService.getConfigs().get(dataStructureName).getInMemoryFormat();
}

@Override
Expand All @@ -105,12 +112,20 @@ protected int getBatchSize(String dataStructureName) {

@Override
protected Object getMergePolicy(String dataStructureName) {
return cacheSplitBrainHandlerService.getMergePolicy(dataStructureName);
ConcurrentMap<String, CacheConfig> configs = cacheService.getConfigs();
CacheConfig cacheConfig = configs.get(dataStructureName);
String mergePolicyName = cacheConfig.getMergePolicy();
return mergePolicyProvider.getMergePolicy(mergePolicyName);
}

@Override
protected String getDataStructureName(ICacheRecordStore iCacheRecordStore) {
return iCacheRecordStore.getName();
}

@Override
protected void destroyStores(Collection<ICacheRecordStore> stores) {
cacheSplitBrainHandlerService.destroyStores(stores);
protected int getPartitionId(ICacheRecordStore store) {
return store.getPartitionId();
}

@Override
Expand Down
Expand Up @@ -100,6 +100,7 @@ public Operation prepareReplicationOperation(PartitionReplicationEvent event,

CachePartitionSegment segment = segments[event.getPartitionId()];
CacheReplicationOperation op = newCacheReplicationOperation();
op.setPartitionId(event.getPartitionId());
op.prepare(segment, namespaces, event.getReplicaIndex());
return op.isEmpty() ? null : op;
}
Expand All @@ -111,7 +112,6 @@ private boolean assertAllKnownNamespaces(Collection<ServiceNamespace> namespaces
return true;
}


protected CacheReplicationOperation newCacheReplicationOperation() {
return new CacheReplicationOperation();
}
Expand Down
Expand Up @@ -16,85 +16,52 @@

package com.hazelcast.cache.impl;

import com.hazelcast.cache.impl.merge.policy.CacheMergePolicyProvider;
import com.hazelcast.config.CacheConfig;
import com.hazelcast.spi.NodeEngine;
import com.hazelcast.spi.impl.merge.AbstractSplitBrainHandlerService;
import com.hazelcast.spi.impl.merge.BaseSplitBrainHandlerService;

import java.util.Collection;
import java.util.Iterator;
import java.util.Map;

import static com.hazelcast.cache.impl.AbstractCacheRecordStore.SOURCE_NOT_AVAILABLE;
import static com.hazelcast.cache.impl.ICacheService.SERVICE_NAME;
import static com.hazelcast.config.InMemoryFormat.NATIVE;
import static java.util.Collections.singletonList;
import static com.hazelcast.util.ThreadUtil.assertRunningOnPartitionThread;

/**
* Handles split-brain functionality for cache.
*/
class CacheSplitBrainHandlerService extends AbstractSplitBrainHandlerService<ICacheRecordStore> {
class CacheSplitBrainHandlerService extends BaseSplitBrainHandlerService<ICacheRecordStore> {

private final CacheService cacheService;
private final CachePartitionSegment[] segments;
private final Map<String, CacheConfig> configs;
private final CacheMergePolicyProvider mergePolicyProvider;

CacheSplitBrainHandlerService(NodeEngine nodeEngine,
Map<String, CacheConfig> configs,
CachePartitionSegment[] segments) {
super(nodeEngine);
this.configs = configs;
this.segments = segments;
this.mergePolicyProvider = new CacheMergePolicyProvider(nodeEngine);
this.cacheService = nodeEngine.getService(SERVICE_NAME);
}

public CacheMergePolicyProvider getMergePolicyProvider() {
return mergePolicyProvider;
}

@Override
protected Runnable newMergeRunnable(Map<String, Collection<ICacheRecordStore>> collectedStores,
Map<String, Collection<ICacheRecordStore>> collectedStoresWithLegacyPolicies,
Collection<ICacheRecordStore> backupStores,
NodeEngine nodeEngine) {
return new CacheMergeRunnable(collectedStores, collectedStoresWithLegacyPolicies,
backupStores, this, nodeEngine);
}

@Override
public String getDataStructureName(ICacheRecordStore recordStore) {
return recordStore.getName();
protected Runnable newMergeRunnable(Collection<ICacheRecordStore> mergingStores,
BaseSplitBrainHandlerService<ICacheRecordStore> splitBrainHandlerService) {
return new CacheMergeRunnable(mergingStores, splitBrainHandlerService, cacheService.nodeEngine);
}

@Override
protected Object getMergePolicy(String dataStructureName) {
CacheConfig cacheConfig = configs.get(dataStructureName);
String mergePolicyName = cacheConfig.getMergePolicy();
return mergePolicyProvider.getMergePolicy(mergePolicyName);
protected Iterator<ICacheRecordStore> storeIterator(int partitionId) {
return segments[partitionId].recordStoreIterator();
}

@Override
protected void onPrepareMergeRunnableEnd(Collection<String> dataStructureNames) {
for (String cacheName : dataStructureNames) {
cacheService.sendInvalidationEvent(cacheName, null, SOURCE_NOT_AVAILABLE);
}
}
protected void destroyStore(ICacheRecordStore store) {
assertRunningOnPartitionThread();

@Override
protected Collection<Iterator<ICacheRecordStore>> iteratorsOf(int partitionId) {
return singletonList(segments[partitionId].recordStoreIterator());
store.destroyInternals();
}

@Override
protected void destroyStore(ICacheRecordStore store) {
assert store.getConfig().getInMemoryFormat() != NATIVE;

store.destroy();
}
protected boolean hasEntry(ICacheRecordStore store) {
assertRunningOnPartitionThread();

public Map<String, CacheConfig> getConfigs() {
return configs;
return store.size() > 0;
}
}
Expand Up @@ -178,7 +178,7 @@ protected void removeCacheConfigFromLocal(String cacheNameWithPrefix) {

@Override
protected <K, V> void validateCacheConfig(CacheConfig<K, V> cacheConfig) {
CacheMergePolicyProvider mergePolicyProvider = cacheService.getCacheMergePolicyProvider();
CacheMergePolicyProvider mergePolicyProvider = cacheService.getMergePolicyProvider();
checkCacheConfig(cacheConfig, mergePolicyProvider);

Object mergePolicy = mergePolicyProvider.getMergePolicy(cacheConfig.getMergePolicy());
Expand Down
Expand Up @@ -357,6 +357,12 @@ public interface ICacheRecordStore {
*/
void destroy();

/**
* Like {@link #destroy()} but does not touch state on other services
* like event journal service.
*/
void destroyInternals();

/**
* Gets the configuration of the cache that this store belongs to.
*
Expand Down
Expand Up @@ -54,7 +54,7 @@ protected Object call() throws Exception {
CacheService cacheService = getService(CacheService.SERVICE_NAME);

if (cacheConfig != null) {
CacheMergePolicyProvider mergePolicyProvider = cacheService.getCacheMergePolicyProvider();
CacheMergePolicyProvider mergePolicyProvider = cacheService.getMergePolicyProvider();
checkCacheConfig(cacheConfig, mergePolicyProvider);

Object mergePolicy = mergePolicyProvider.getMergePolicy(cacheConfig.getMergePolicy());
Expand Down

0 comments on commit 8d23020

Please sign in to comment.