Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BACKPORT]Added storage destroy to release HD resources. #13659

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -316,7 +316,7 @@ public void clearMapsHavingLesserBackupCountThan(int partitionId, int backupCoun
RecordStore recordStore = iter.next();
final MapContainer mapContainer = recordStore.getMapContainer();
if (backupCount > mapContainer.getTotalBackupCount()) {
recordStore.clearPartition(false, false);
recordStore.clearPartition(false, true);
iter.remove();
}
}
Expand All @@ -328,7 +328,7 @@ public void clearPartitionData(int partitionId) {
final PartitionContainer container = partitionContainers[partitionId];
if (container != null) {
for (RecordStore mapPartition : container.getMaps().values()) {
mapPartition.clearPartition(false, false);
mapPartition.clearPartition(false, true);
}
container.getMaps().clear();
}
Expand Down
Expand Up @@ -58,14 +58,14 @@ protected Iterator<RecordStore> storeIterator(int partitionId) {
protected void onStoreCollection(RecordStore recordStore) {
assertRunningOnPartitionThread();

((DefaultRecordStore) recordStore).clearIndexedData();
((DefaultRecordStore) recordStore).clearOtherDataThanStorage(true);
}

@Override
protected void destroyStore(RecordStore store) {
assertRunningOnPartitionThread();

store.destroyInternals();
((DefaultRecordStore) store).destroyStorageAfterClear(false);
}

@Override
Expand Down
Expand Up @@ -122,20 +122,6 @@ public MapDataStore<Data, Object> getMapDataStore() {
return mapDataStore;
}

@Override
public void destroy() {
clearPartition(false, true);
storage.destroy(false);
eventJournal.destroy(mapContainer.getObjectNamespace(), partitionId);
}

@Override
public void destroyInternals() {
clearMapStore();
clearStorage(false);
storage.destroy(false);
}

@Override
public long softFlush() {
updateStoreStats();
Expand All @@ -148,10 +134,8 @@ public long softFlush() {
* @param recordsToBeFlushed records to be flushed to map-store.
* @param backup <code>true</code> if backup, false otherwise.
*/
protected void flush(Collection<Record> recordsToBeFlushed, boolean backup) {
Iterator<Record> iterator = recordsToBeFlushed.iterator();
while (iterator.hasNext()) {
Record record = iterator.next();
private void flush(Collection<Record> recordsToBeFlushed, boolean backup) {
for (Record record : recordsToBeFlushed) {
mapDataStore.flush(record.getKey(), record.getValue(), backup);
}
}
Expand Down Expand Up @@ -221,91 +205,6 @@ public Iterator<Record> loadAwareIterator(long now, boolean backup) {
return iterator(now, backup);
}

@Override
public void clearPartition(boolean onShutdown, boolean onRecordStoreDestroy) {
clearLockStore();
if (onRecordStoreDestroy) {
destroyIndexes();
} else {
clearIndexedData();
}
clearMapStore();
clearStorage(onShutdown);
}

protected void clearLockStore() {
NodeEngine nodeEngine = mapServiceContext.getNodeEngine();
LockService lockService = nodeEngine.getSharedService(LockService.SERVICE_NAME);
if (lockService != null) {
ObjectNamespace namespace = MapService.getObjectNamespace(name);
lockService.clearLockStore(partitionId, namespace);
}
}

protected void clearStorage(boolean onShutdown) {
if (onShutdown) {
NodeEngine nodeEngine = mapServiceContext.getNodeEngine();
NativeMemoryConfig nativeMemoryConfig = nodeEngine.getConfig().getNativeMemoryConfig();
boolean shouldClear = (nativeMemoryConfig != null && nativeMemoryConfig.getAllocatorType() != POOLED);
if (shouldClear) {
storage.clear(true);
}
storage.destroy(true);
} else {
storage.clear(false);
}
}

protected void clearMapStore() {
mapDataStore.reset();
}

/**
* Only indexed data will be removed, index info will stay.
*/
public void clearIndexedData() {
Indexes indexes = mapContainer.getIndexes(partitionId);
if (indexes.isGlobal()) {
if (indexes.hasIndex()) {
// clears indexed data of this partition
// from shared global index.
fullScanLocalDataToClear(indexes);
}
} else {
// if index is partitioned, we can clear indexed
// data with clearAll here.
indexes.clearAll();
}
}

public void destroyIndexes() {
Indexes indexes = mapContainer.getIndexes(partitionId);
indexes.destroyIndexes();
if (indexes.isGlobal()) {
if (indexes.hasIndex()) {
// clears indexed data of this partition
// from shared global index.
fullScanLocalDataToClear(indexes);
}
} else {
// if index is partitioned, we can destroy indexed
// data with destroyIndexes here.
indexes.destroyIndexes();
}
}

/**
* Clears local data of this partition from global index by doing
* partition full-scan.
*/
private void fullScanLocalDataToClear(Indexes indexes) {
for (Record record : storage.values()) {
Data key = record.getKey();
Object value = Records.getValueOrCachedValue(record, serializationService);
indexes.removeEntryIndex(key, value);
}
}

/**
* Size may not give precise size at a specific moment
* due to the expiration logic. But eventually, it should be correct.
Expand Down Expand Up @@ -431,20 +330,6 @@ record = createRecord(value, DEFAULT_TTL, getNow());
return record;
}

@Override
public int clear() {
checkIfLoaded();

// we don't remove locked keys. These are clearable records.
Collection<Record> clearableRecords = getNotLockedRecords();
// This conversion is required by mapDataStore#removeAll call.
List<Data> keys = getKeysFromRecords(clearableRecords);
mapDataStore.removeAll(keys);
clearMapStore();
removeIndex(clearableRecords);
return removeRecords(clearableRecords);
}

protected List<Data> getKeysFromRecords(Collection<Record> clearableRecords) {
List<Data> keys = new ArrayList<Data>(clearableRecords.size());
for (Record clearableRecord : clearableRecords) {
Expand Down Expand Up @@ -503,16 +388,6 @@ protected Collection<Record> getNotLockedRecords() {
return notLockedRecords;
}

/**
* Resets the record store to it's initial state.
*/
@Override
public void reset() {
clearMapStore();
storage.clear(false);
stats.reset();
}

@Override
public Object evict(Data key, boolean backup) {
Record record = storage.get(key);
Expand Down Expand Up @@ -1230,4 +1105,142 @@ private String getStateMessage() {
+ " loadedOnCreate=" + loadedOnCreate + " loadedOnPreMigration=" + loadedOnPreMigration
+ " isLoaded=" + isLoaded();
}

@Override
public int clear() {
checkIfLoaded();
// we don't remove locked keys. These are clearable records.
Collection<Record> clearableRecords = getNotLockedRecords();
// This conversion is required by mapDataStore#removeAll call.
List<Data> keys = getKeysFromRecords(clearableRecords);
mapDataStore.removeAll(keys);
clearMapStore();
removeIndex(clearableRecords);
return removeRecords(clearableRecords);
}

@Override
public void reset() {
clearMapStore();
storage.clear(false);
stats.reset();
}

@Override
public void destroy() {
clearPartition(false, true);
eventJournal.destroy(mapContainer.getObjectNamespace(), partitionId);
}

@Override
public void clearPartition(boolean onShutdown, boolean onStorageDestroy) {
clearLockStore();
clearOtherDataThanStorage(onStorageDestroy);

if (onShutdown) {
if (hasPooledMemoryAllocator()) {
destroyStorageImmediate(true);
} else {
destroyStorageAfterClear(true);
}
} else {
if (onStorageDestroy) {
destroyStorageAfterClear(false);
} else {
clearStorage(false);
}
}
}

private boolean hasPooledMemoryAllocator() {
NodeEngine nodeEngine = mapServiceContext.getNodeEngine();
NativeMemoryConfig nativeMemoryConfig = nodeEngine.getConfig().getNativeMemoryConfig();
return nativeMemoryConfig != null && nativeMemoryConfig.getAllocatorType() == POOLED;
}

/**
* Only cleans the data other than storage-data that is held on this record
* store. Other services data like lock-service-data is not cleared here.
*/
public void clearOtherDataThanStorage(boolean onStorageDestroy) {
clearMapStore();
clearIndexedData(onStorageDestroy);
}

private void destroyStorageImmediate(boolean isDuringShutdown) {
storage.destroy(isDuringShutdown);
}

/**
* Calls also {@link #clearStorage(boolean)} to release allocated HD memory
* of key+value pairs because {@link #destroyStorageImmediate(boolean)}
* only releases internal resources of backing data structure.
*
* @param isDuringShutdown {@link Storage#clear(boolean)}
*/
public void destroyStorageAfterClear(boolean isDuringShutdown) {
clearStorage(isDuringShutdown);
destroyStorageImmediate(isDuringShutdown);
}

private void clearStorage(boolean isDuringShutdown) {
storage.clear(isDuringShutdown);
}

private void clearLockStore() {
NodeEngine nodeEngine = mapServiceContext.getNodeEngine();
LockService lockService = nodeEngine.getSharedService(LockService.SERVICE_NAME);
if (lockService != null) {
ObjectNamespace namespace = MapService.getObjectNamespace(name);
lockService.clearLockStore(partitionId, namespace);
}
}

private void clearMapStore() {
mapDataStore.reset();
}

/**
* Only indexed data will be removed, index info will stay.
*/
private void clearIndexedData(boolean onStorageDestroy) {
clearGlobalIndexes();
clearPartitionedIndexes(onStorageDestroy);
}

private void clearGlobalIndexes() {
Indexes indexes = mapContainer.getIndexes(partitionId);
if (indexes.isGlobal()) {
if (indexes.hasIndex()) {
// clears indexed data of this partition
// from shared global index.
fullScanLocalDataToClear(indexes);
}
}
}

private void clearPartitionedIndexes(boolean onStorageDestroy) {
Indexes indexes = mapContainer.getIndexes(partitionId);
if (indexes.isGlobal()) {
return;
}

if (onStorageDestroy) {
indexes.destroyIndexes();
} else {
indexes.clearAll();
}
}

/**
* Clears local data of this partition from global index by doing
* partition full-scan.
*/
private void fullScanLocalDataToClear(Indexes indexes) {
for (Record record : storage.values()) {
Data key = record.getKey();
Object value = Records.getValueOrCachedValue(record, serializationService);
indexes.removeEntryIndex(key, value);
}
}
}