From 06eba0b173edaebad5adfa3141075959f02b1289 Mon Sep 17 00:00:00 2001 From: Ahmet Mircik Date: Wed, 29 Aug 2018 18:21:01 +0300 Subject: [PATCH] Added storage destroy to release HD resources. To release key+value pairs storage#clear should be called, storage#destroy only releases internal resources of backing data structure --- .../map/impl/MapServiceContextImpl.java | 4 +- .../map/impl/MapSplitBrainHandlerService.java | 4 +- .../impl/recordstore/DefaultRecordStore.java | 263 +++++++++--------- .../map/impl/recordstore/RecordStore.java | 72 +++-- .../map/impl/recordstore/Storage.java | 3 + 5 files changed, 185 insertions(+), 161 deletions(-) diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/MapServiceContextImpl.java b/hazelcast/src/main/java/com/hazelcast/map/impl/MapServiceContextImpl.java index e659075190a9d..0003f8edba849 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/MapServiceContextImpl.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/MapServiceContextImpl.java @@ -316,7 +316,7 @@ public void clearMapsHavingLesserBackupCountThan(int partitionId, int backupCoun RecordStore recordStore = iter.next(); final MapContainer mapContainer = recordStore.getMapContainer(); if (backupCount > mapContainer.getTotalBackupCount()) { - recordStore.clearPartition(false, false); + recordStore.clearPartition(false, true); iter.remove(); } } @@ -328,7 +328,7 @@ public void clearPartitionData(int partitionId) { final PartitionContainer container = partitionContainers[partitionId]; if (container != null) { for (RecordStore mapPartition : container.getMaps().values()) { - mapPartition.clearPartition(false, false); + mapPartition.clearPartition(false, true); } container.getMaps().clear(); } diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/MapSplitBrainHandlerService.java b/hazelcast/src/main/java/com/hazelcast/map/impl/MapSplitBrainHandlerService.java index aaaa90fa2b29c..0bc821670f9ca 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/MapSplitBrainHandlerService.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/MapSplitBrainHandlerService.java @@ -58,14 +58,14 @@ protected Iterator storeIterator(int partitionId) { protected void onStoreCollection(RecordStore recordStore) { assertRunningOnPartitionThread(); - ((DefaultRecordStore) recordStore).clearIndexedData(); + ((DefaultRecordStore) recordStore).clearOtherResourcesThanStorage(true); } @Override protected void destroyStore(RecordStore store) { assertRunningOnPartitionThread(); - store.destroyInternals(); + ((DefaultRecordStore) store).clearAndDestroyStorage(false); } @Override diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/DefaultRecordStore.java b/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/DefaultRecordStore.java index e430ac51b29ce..9c6ee291827a8 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/DefaultRecordStore.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/DefaultRecordStore.java @@ -122,20 +122,6 @@ public MapDataStore getMapDataStore() { return mapDataStore; } - @Override - public void destroy() { - clearPartition(false, true); - storage.destroy(false); - eventJournal.destroy(mapContainer.getObjectNamespace(), partitionId); - } - - @Override - public void destroyInternals() { - clearMapStore(); - clearStorage(false); - storage.destroy(false); - } - @Override public long softFlush() { updateStoreStats(); @@ -148,10 +134,8 @@ public long softFlush() { * @param recordsToBeFlushed records to be flushed to map-store. * @param backup true if backup, false otherwise. */ - protected void flush(Collection recordsToBeFlushed, boolean backup) { - Iterator iterator = recordsToBeFlushed.iterator(); - while (iterator.hasNext()) { - Record record = iterator.next(); + private void flush(Collection recordsToBeFlushed, boolean backup) { + for (Record record : recordsToBeFlushed) { mapDataStore.flush(record.getKey(), record.getValue(), backup); } } @@ -221,91 +205,6 @@ public Iterator loadAwareIterator(long now, boolean backup) { return iterator(now, backup); } - @Override - public void clearPartition(boolean onShutdown, boolean onRecordStoreDestroy) { - clearLockStore(); - if (onRecordStoreDestroy) { - destroyIndexes(); - } else { - clearIndexedData(); - } - clearMapStore(); - clearStorage(onShutdown); - } - - protected void clearLockStore() { - NodeEngine nodeEngine = mapServiceContext.getNodeEngine(); - LockService lockService = nodeEngine.getSharedService(LockService.SERVICE_NAME); - if (lockService != null) { - ObjectNamespace namespace = MapService.getObjectNamespace(name); - lockService.clearLockStore(partitionId, namespace); - } - } - - protected void clearStorage(boolean onShutdown) { - if (onShutdown) { - NodeEngine nodeEngine = mapServiceContext.getNodeEngine(); - NativeMemoryConfig nativeMemoryConfig = nodeEngine.getConfig().getNativeMemoryConfig(); - boolean shouldClear = (nativeMemoryConfig != null && nativeMemoryConfig.getAllocatorType() != POOLED); - if (shouldClear) { - storage.clear(true); - } - storage.destroy(true); - } else { - storage.clear(false); - } - } - - protected void clearMapStore() { - mapDataStore.reset(); - } - - /** - * Only indexed data will be removed, index info will stay. - */ - public void clearIndexedData() { - Indexes indexes = mapContainer.getIndexes(partitionId); - if (indexes.isGlobal()) { - if (indexes.hasIndex()) { - // clears indexed data of this partition - // from shared global index. - fullScanLocalDataToClear(indexes); - } - } else { - // if index is partitioned, we can clear indexed - // data with clearAll here. - indexes.clearAll(); - } - } - - public void destroyIndexes() { - Indexes indexes = mapContainer.getIndexes(partitionId); - indexes.destroyIndexes(); - if (indexes.isGlobal()) { - if (indexes.hasIndex()) { - // clears indexed data of this partition - // from shared global index. - fullScanLocalDataToClear(indexes); - } - } else { - // if index is partitioned, we can destroy indexed - // data with destroyIndexes here. - indexes.destroyIndexes(); - } - } - - /** - * Clears local data of this partition from global index by doing - * partition full-scan. - */ - private void fullScanLocalDataToClear(Indexes indexes) { - for (Record record : storage.values()) { - Data key = record.getKey(); - Object value = Records.getValueOrCachedValue(record, serializationService); - indexes.removeEntryIndex(key, value); - } - } - /** * Size may not give precise size at a specific moment * due to the expiration logic. But eventually, it should be correct. @@ -431,20 +330,6 @@ record = createRecord(value, DEFAULT_TTL, getNow()); return record; } - @Override - public int clear() { - checkIfLoaded(); - - // we don't remove locked keys. These are clearable records. - Collection clearableRecords = getNotLockedRecords(); - // This conversion is required by mapDataStore#removeAll call. - List keys = getKeysFromRecords(clearableRecords); - mapDataStore.removeAll(keys); - clearMapStore(); - removeIndex(clearableRecords); - return removeRecords(clearableRecords); - } - protected List getKeysFromRecords(Collection clearableRecords) { List keys = new ArrayList(clearableRecords.size()); for (Record clearableRecord : clearableRecords) { @@ -503,16 +388,6 @@ protected Collection getNotLockedRecords() { return notLockedRecords; } - /** - * Resets the record store to it's initial state. - */ - @Override - public void reset() { - clearMapStore(); - storage.clear(false); - stats.reset(); - } - @Override public Object evict(Data key, boolean backup) { Record record = storage.get(key); @@ -1230,4 +1105,138 @@ private String getStateMessage() { + " loadedOnCreate=" + loadedOnCreate + " loadedOnPreMigration=" + loadedOnPreMigration + " isLoaded=" + isLoaded(); } + + @Override + public int clear() { + checkIfLoaded(); + // we don't remove locked keys. These are clearable records. + Collection clearableRecords = getNotLockedRecords(); + // This conversion is required by mapDataStore#removeAll call. + List keys = getKeysFromRecords(clearableRecords); + mapDataStore.removeAll(keys); + clearMapStore(); + removeIndex(clearableRecords); + return removeRecords(clearableRecords); + } + + @Override + public void reset() { + clearMapStore(); + storage.clear(false); + stats.reset(); + } + + @Override + public void destroy() { + clearPartition(false, true); + eventJournal.destroy(mapContainer.getObjectNamespace(), partitionId); + } + + @Override + public void clearPartition(boolean onShutdown, boolean onStorageDestroy) { + clearOtherResourcesThanStorage(onStorageDestroy); + + if (onShutdown) { + if (hasPooledMemoryAllocator()) { + destroyStorage(true); + } else { + clearAndDestroyStorage(true); + } + } else { + if (onStorageDestroy) { + clearAndDestroyStorage(false); + } else { + clearStorage(false); + } + } + } + + private boolean hasPooledMemoryAllocator() { + NodeEngine nodeEngine = mapServiceContext.getNodeEngine(); + NativeMemoryConfig nativeMemoryConfig = nodeEngine.getConfig().getNativeMemoryConfig(); + return nativeMemoryConfig != null && nativeMemoryConfig.getAllocatorType() == POOLED; + } + + public void clearOtherResourcesThanStorage(boolean onStorageDestroy) { + clearMapStore(); + clearLockStore(); + clearIndexedData(onStorageDestroy); + } + + /** + * Calls also {@link #clearStorage(boolean)} to release allocated HD memory + * of key+value pairs because {@link #destroyStorage(boolean)} + * only releases internal resources of backing data structure. + * + * @param isDuringShutdown {@link Storage#clear(boolean)} + */ + public void clearAndDestroyStorage(boolean isDuringShutdown) { + clearStorage(isDuringShutdown); + destroyStorage(isDuringShutdown); + } + + private void destroyStorage(boolean isDuringShutdown) { + storage.destroy(isDuringShutdown); + } + + private void clearStorage(boolean isDuringShutdown) { + storage.clear(isDuringShutdown); + } + + private void clearLockStore() { + NodeEngine nodeEngine = mapServiceContext.getNodeEngine(); + LockService lockService = nodeEngine.getSharedService(LockService.SERVICE_NAME); + if (lockService != null) { + ObjectNamespace namespace = MapService.getObjectNamespace(name); + lockService.clearLockStore(partitionId, namespace); + } + } + + private void clearMapStore() { + mapDataStore.reset(); + } + + /** + * Only indexed data will be removed, index info will stay. + */ + private void clearIndexedData(boolean onStorageDestroy) { + clearGlobalIndexes(); + clearPartitionedIndexes(onStorageDestroy); + } + + private void clearGlobalIndexes() { + Indexes indexes = mapContainer.getIndexes(partitionId); + if (indexes.isGlobal()) { + if (indexes.hasIndex()) { + // clears indexed data of this partition + // from shared global index. + fullScanLocalDataToClear(indexes); + } + } + } + + private void clearPartitionedIndexes(boolean onStorageDestroy) { + Indexes indexes = mapContainer.getIndexes(partitionId); + if (indexes.isGlobal()) { + return; + } + + if (onStorageDestroy) { + indexes.destroyIndexes(); + } else { + indexes.clearAll(); + } + } + + /** + * Clears local data of this partition from global index by doing + * partition full-scan. + */ + private void fullScanLocalDataToClear(Indexes indexes) { + for (Record record : storage.values()) { + Data key = record.getKey(); + Object value = Records.getValueOrCachedValue(record, serializationService); + indexes.removeEntryIndex(key, value); + } + } } diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/RecordStore.java b/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/RecordStore.java index 3f2d3790fbbbe..3cb1caae8fb54 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/RecordStore.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/RecordStore.java @@ -18,6 +18,7 @@ import com.hazelcast.config.InMemoryFormat; import com.hazelcast.core.EntryView; +import com.hazelcast.core.IMap; import com.hazelcast.internal.nearcache.impl.invalidation.InvalidationQueue; import com.hazelcast.map.impl.MapContainer; import com.hazelcast.map.impl.MapEntries; @@ -278,28 +279,10 @@ public interface RecordStore { */ long softFlush(); - /** - * Clears internal partition data. - * - * @param onShutdown true if {@code close} is called during MapService shutdown, - * false otherwise. - */ - void clearPartition(boolean onShutdown, boolean onMapDestroy); - - /** - * Resets the record store to it's initial state. - * Used in replication operations. - * - * @see #putRecord(Data, Record) - */ - void reset(); - boolean forceUnlock(Data dataKey); long getOwnedEntryCost(); - int clear(); - boolean isEmpty(); /** @@ -379,14 +362,6 @@ public interface RecordStore { */ void disposeDeferredBlocks(); - void destroy(); - - /** - * Like {@link #destroy()} but does not touch state on other services - * like lock service or event journal service. - */ - void destroyInternals(); - /** * Initialize the recordStore after creation */ @@ -430,8 +405,7 @@ public interface RecordStore { */ boolean isLoaded(); - void checkIfLoaded() - throws RetryableHazelcastException; + void checkIfLoaded() throws RetryableHazelcastException; /** * Triggers key and value loading if there is no ongoing or completed @@ -464,9 +438,9 @@ void checkIfLoaded() /** * Advances the state of the map key loader for this partition and sets the key * loading future result if the {@code lastBatch} is {@code true}. - *

+ * * If there was an exception during key loading, you may pass it as the - * {@code exception} paramter and it will be set as the result of the future. + * {@code exception} parameter and it will be set as the result of the future. * * @param lastBatch if the last key batch was sent * @param exception an exception that occurred during key loading @@ -478,4 +452,42 @@ void checkIfLoaded() * for this map. */ boolean hasQueryCache(); + + /** + * Called by {@link IMap#destroy()} or {@link + * com.hazelcast.map.impl.MapMigrationAwareService} + * + * Clears internal partition data. + * + * @param onShutdown true if {@code close} is called during + * MapService shutdown, false otherwise. + * @param onRecordStoreDestroy true if record-store will be destroyed, + * otherwise false. + */ + void clearPartition(boolean onShutdown, boolean onRecordStoreDestroy); + + /** + * Called by {@link IMap#clear()}. + * + * Clears data in this record store. + * + * @return number of cleared entries. + */ + int clear(); + + /** + * Resets the record store to it's initial state. + * + * Used in replication operations. + * + * @see #putRecord(Data, Record) + */ + void reset(); + + /** + * Called by {@link IMap#destroy()}. + * + * Destroys data in this record store. + */ + void destroy(); } diff --git a/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/Storage.java b/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/Storage.java index 519f19d3293a1..cae7f10e1004b 100644 --- a/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/Storage.java +++ b/hazelcast/src/main/java/com/hazelcast/map/impl/recordstore/Storage.java @@ -69,6 +69,9 @@ public interface Storage { boolean isEmpty(); + /** + * @param isDuringShutdown only used by hot-restart. + */ void clear(boolean isDuringShutdown); void destroy(boolean isDuringShutdown);