Skip to content

Commit

Permalink
Cleared storage before destroy to release all HD resources.
Browse files Browse the repository at this point in the history
To release key+value pairs storage#clear should be called
besides storage#destroy because it only releases internal
resources of backing data structure not key+value pairs
  • Loading branch information
ahmetmircik committed Sep 11, 2018
1 parent 8550196 commit a1a7c16
Show file tree
Hide file tree
Showing 5 changed files with 189 additions and 161 deletions.
Expand Up @@ -316,7 +316,7 @@ public void clearMapsHavingLesserBackupCountThan(int partitionId, int backupCoun
RecordStore recordStore = iter.next();
final MapContainer mapContainer = recordStore.getMapContainer();
if (backupCount > mapContainer.getTotalBackupCount()) {
recordStore.clearPartition(false, false);
recordStore.clearPartition(false, true);
iter.remove();
}
}
Expand All @@ -328,7 +328,7 @@ public void clearPartitionData(int partitionId) {
final PartitionContainer container = partitionContainers[partitionId];
if (container != null) {
for (RecordStore mapPartition : container.getMaps().values()) {
mapPartition.clearPartition(false, false);
mapPartition.clearPartition(false, true);
}
container.getMaps().clear();
}
Expand Down
Expand Up @@ -58,14 +58,14 @@ protected Iterator<RecordStore> storeIterator(int partitionId) {
protected void onStoreCollection(RecordStore recordStore) {
assertRunningOnPartitionThread();

((DefaultRecordStore) recordStore).clearIndexedData();
((DefaultRecordStore) recordStore).clearOtherDataThanStorage(true);
}

@Override
protected void destroyStore(RecordStore store) {
assertRunningOnPartitionThread();

store.destroyInternals();
((DefaultRecordStore) store).destroyStorageAfterClear(false);
}

@Override
Expand Down
Expand Up @@ -122,20 +122,6 @@ public MapDataStore<Data, Object> getMapDataStore() {
return mapDataStore;
}

@Override
public void destroy() {
clearPartition(false, true);
storage.destroy(false);
eventJournal.destroy(mapContainer.getObjectNamespace(), partitionId);
}

@Override
public void destroyInternals() {
clearMapStore();
clearStorage(false);
storage.destroy(false);
}

@Override
public long softFlush() {
updateStoreStats();
Expand All @@ -148,10 +134,8 @@ public long softFlush() {
* @param recordsToBeFlushed records to be flushed to map-store.
* @param backup <code>true</code> if backup, false otherwise.
*/
protected void flush(Collection<Record> recordsToBeFlushed, boolean backup) {
Iterator<Record> iterator = recordsToBeFlushed.iterator();
while (iterator.hasNext()) {
Record record = iterator.next();
private void flush(Collection<Record> recordsToBeFlushed, boolean backup) {
for (Record record : recordsToBeFlushed) {
mapDataStore.flush(record.getKey(), record.getValue(), backup);
}
}
Expand Down Expand Up @@ -221,91 +205,6 @@ public Iterator<Record> loadAwareIterator(long now, boolean backup) {
return iterator(now, backup);
}

@Override
public void clearPartition(boolean onShutdown, boolean onRecordStoreDestroy) {
clearLockStore();
if (onRecordStoreDestroy) {
destroyIndexes();
} else {
clearIndexedData();
}
clearMapStore();
clearStorage(onShutdown);
}

protected void clearLockStore() {
NodeEngine nodeEngine = mapServiceContext.getNodeEngine();
LockService lockService = nodeEngine.getSharedService(LockService.SERVICE_NAME);
if (lockService != null) {
ObjectNamespace namespace = MapService.getObjectNamespace(name);
lockService.clearLockStore(partitionId, namespace);
}
}

protected void clearStorage(boolean onShutdown) {
if (onShutdown) {
NodeEngine nodeEngine = mapServiceContext.getNodeEngine();
NativeMemoryConfig nativeMemoryConfig = nodeEngine.getConfig().getNativeMemoryConfig();
boolean shouldClear = (nativeMemoryConfig != null && nativeMemoryConfig.getAllocatorType() != POOLED);
if (shouldClear) {
storage.clear(true);
}
storage.destroy(true);
} else {
storage.clear(false);
}
}

protected void clearMapStore() {
mapDataStore.reset();
}

/**
* Only indexed data will be removed, index info will stay.
*/
public void clearIndexedData() {
Indexes indexes = mapContainer.getIndexes(partitionId);
if (indexes.isGlobal()) {
if (indexes.hasIndex()) {
// clears indexed data of this partition
// from shared global index.
fullScanLocalDataToClear(indexes);
}
} else {
// if index is partitioned, we can clear indexed
// data with clearAll here.
indexes.clearAll();
}
}

public void destroyIndexes() {
Indexes indexes = mapContainer.getIndexes(partitionId);
indexes.destroyIndexes();
if (indexes.isGlobal()) {
if (indexes.hasIndex()) {
// clears indexed data of this partition
// from shared global index.
fullScanLocalDataToClear(indexes);
}
} else {
// if index is partitioned, we can destroy indexed
// data with destroyIndexes here.
indexes.destroyIndexes();
}
}

/**
* Clears local data of this partition from global index by doing
* partition full-scan.
*/
private void fullScanLocalDataToClear(Indexes indexes) {
for (Record record : storage.values()) {
Data key = record.getKey();
Object value = Records.getValueOrCachedValue(record, serializationService);
indexes.removeEntryIndex(key, value);
}
}

/**
* Size may not give precise size at a specific moment
* due to the expiration logic. But eventually, it should be correct.
Expand Down Expand Up @@ -431,20 +330,6 @@ record = createRecord(value, DEFAULT_TTL, getNow());
return record;
}

@Override
public int clear() {
checkIfLoaded();

// we don't remove locked keys. These are clearable records.
Collection<Record> clearableRecords = getNotLockedRecords();
// This conversion is required by mapDataStore#removeAll call.
List<Data> keys = getKeysFromRecords(clearableRecords);
mapDataStore.removeAll(keys);
clearMapStore();
removeIndex(clearableRecords);
return removeRecords(clearableRecords);
}

protected List<Data> getKeysFromRecords(Collection<Record> clearableRecords) {
List<Data> keys = new ArrayList<Data>(clearableRecords.size());
for (Record clearableRecord : clearableRecords) {
Expand Down Expand Up @@ -503,16 +388,6 @@ protected Collection<Record> getNotLockedRecords() {
return notLockedRecords;
}

/**
* Resets the record store to it's initial state.
*/
@Override
public void reset() {
clearMapStore();
storage.clear(false);
stats.reset();
}

@Override
public Object evict(Data key, boolean backup) {
Record record = storage.get(key);
Expand Down Expand Up @@ -1230,4 +1105,142 @@ private String getStateMessage() {
+ " loadedOnCreate=" + loadedOnCreate + " loadedOnPreMigration=" + loadedOnPreMigration
+ " isLoaded=" + isLoaded();
}

@Override
public int clear() {
checkIfLoaded();
// we don't remove locked keys. These are clearable records.
Collection<Record> clearableRecords = getNotLockedRecords();
// This conversion is required by mapDataStore#removeAll call.
List<Data> keys = getKeysFromRecords(clearableRecords);
mapDataStore.removeAll(keys);
clearMapStore();
removeIndex(clearableRecords);
return removeRecords(clearableRecords);
}

@Override
public void reset() {
clearMapStore();
storage.clear(false);
stats.reset();
}

@Override
public void destroy() {
clearPartition(false, true);
eventJournal.destroy(mapContainer.getObjectNamespace(), partitionId);
}

@Override
public void clearPartition(boolean onShutdown, boolean onStorageDestroy) {
clearLockStore();
clearOtherDataThanStorage(onStorageDestroy);

if (onShutdown) {
if (hasPooledMemoryAllocator()) {
destroyStorageImmediate(true);
} else {
destroyStorageAfterClear(true);
}
} else {
if (onStorageDestroy) {
destroyStorageAfterClear(false);
} else {
clearStorage(false);
}
}
}

private boolean hasPooledMemoryAllocator() {
NodeEngine nodeEngine = mapServiceContext.getNodeEngine();
NativeMemoryConfig nativeMemoryConfig = nodeEngine.getConfig().getNativeMemoryConfig();
return nativeMemoryConfig != null && nativeMemoryConfig.getAllocatorType() == POOLED;
}

/**
* Only cleans the data other than storage-data that is held on this record
* store. Other services data like lock-service-data is not cleared here.
*/
public void clearOtherDataThanStorage(boolean onStorageDestroy) {
clearMapStore();
clearIndexedData(onStorageDestroy);
}

private void destroyStorageImmediate(boolean isDuringShutdown) {
storage.destroy(isDuringShutdown);
}

/**
* Calls also {@link #clearStorage(boolean)} to release allocated HD memory
* of key+value pairs because {@link #destroyStorageImmediate(boolean)}
* only releases internal resources of backing data structure.
*
* @param isDuringShutdown {@link Storage#clear(boolean)}
*/
public void destroyStorageAfterClear(boolean isDuringShutdown) {
clearStorage(isDuringShutdown);
destroyStorageImmediate(isDuringShutdown);
}

private void clearStorage(boolean isDuringShutdown) {
storage.clear(isDuringShutdown);
}

private void clearLockStore() {
NodeEngine nodeEngine = mapServiceContext.getNodeEngine();
LockService lockService = nodeEngine.getSharedService(LockService.SERVICE_NAME);
if (lockService != null) {
ObjectNamespace namespace = MapService.getObjectNamespace(name);
lockService.clearLockStore(partitionId, namespace);
}
}

private void clearMapStore() {
mapDataStore.reset();
}

/**
* Only indexed data will be removed, index info will stay.
*/
private void clearIndexedData(boolean onStorageDestroy) {
clearGlobalIndexes();
clearPartitionedIndexes(onStorageDestroy);
}

private void clearGlobalIndexes() {
Indexes indexes = mapContainer.getIndexes(partitionId);
if (indexes.isGlobal()) {
if (indexes.hasIndex()) {
// clears indexed data of this partition
// from shared global index.
fullScanLocalDataToClear(indexes);
}
}
}

private void clearPartitionedIndexes(boolean onStorageDestroy) {
Indexes indexes = mapContainer.getIndexes(partitionId);
if (indexes.isGlobal()) {
return;
}

if (onStorageDestroy) {
indexes.destroyIndexes();
} else {
indexes.clearAll();
}
}

/**
* Clears local data of this partition from global index by doing
* partition full-scan.
*/
private void fullScanLocalDataToClear(Indexes indexes) {
for (Record record : storage.values()) {
Data key = record.getKey();
Object value = Records.getValueOrCachedValue(record, serializationService);
indexes.removeEntryIndex(key, value);
}
}
}

0 comments on commit a1a7c16

Please sign in to comment.