From d85d1a99596ff5b93b88b6ccb4bc339f71e948db Mon Sep 17 00:00:00 2001 From: tkountis Date: Mon, 27 Jul 2020 14:54:47 +0100 Subject: [PATCH] Fix HD leak for EP that run on Map keys --- .../protocol/task/AbstractMultiPartitionMessageTask.java | 9 +++++++++ .../protocol/task/map/MapExecuteOnKeysMessageTask.java | 9 +++++---- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/hazelcast/src/main/java/com/hazelcast/client/impl/protocol/task/AbstractMultiPartitionMessageTask.java b/hazelcast/src/main/java/com/hazelcast/client/impl/protocol/task/AbstractMultiPartitionMessageTask.java index 268ef8a964b01..83ca9bce1fd25 100644 --- a/hazelcast/src/main/java/com/hazelcast/client/impl/protocol/task/AbstractMultiPartitionMessageTask.java +++ b/hazelcast/src/main/java/com/hazelcast/client/impl/protocol/task/AbstractMultiPartitionMessageTask.java @@ -20,6 +20,9 @@ import com.hazelcast.client.impl.protocol.ClientMessage; import com.hazelcast.core.ExecutionCallback; import com.hazelcast.instance.Node; +import com.hazelcast.map.impl.MapService; +import com.hazelcast.map.impl.MapServiceContext; +import com.hazelcast.map.impl.operation.MapOperationProvider; import com.hazelcast.nio.Connection; import com.hazelcast.spi.OperationFactory; import com.hazelcast.spi.impl.operationservice.InternalOperationService; @@ -41,6 +44,12 @@ protected void processMessage() { operationService.invokeOnPartitionsAsync(getServiceName(), operationFactory, getPartitions()).andThen(this); } + protected final MapOperationProvider getMapOperationProvider(String mapName) { + MapService mapService = getService(MapService.SERVICE_NAME); + MapServiceContext mapServiceContext = mapService.getMapServiceContext(); + return mapServiceContext.getMapOperationProvider(mapName); + } + public abstract Collection getPartitions(); protected abstract OperationFactory createOperationFactory(); diff --git a/hazelcast/src/main/java/com/hazelcast/client/impl/protocol/task/map/MapExecuteOnKeysMessageTask.java b/hazelcast/src/main/java/com/hazelcast/client/impl/protocol/task/map/MapExecuteOnKeysMessageTask.java index 20a7e2dc816d3..935a7328cfdbc 100644 --- a/hazelcast/src/main/java/com/hazelcast/client/impl/protocol/task/map/MapExecuteOnKeysMessageTask.java +++ b/hazelcast/src/main/java/com/hazelcast/client/impl/protocol/task/map/MapExecuteOnKeysMessageTask.java @@ -23,7 +23,7 @@ import com.hazelcast.map.EntryProcessor; import com.hazelcast.map.impl.MapEntries; import com.hazelcast.map.impl.MapService; -import com.hazelcast.map.impl.operation.MultipleEntryOperationFactory; +import com.hazelcast.map.impl.operation.MapOperationProvider; import com.hazelcast.nio.Connection; import com.hazelcast.nio.serialization.Data; import com.hazelcast.security.permission.ActionConstants; @@ -51,9 +51,10 @@ public MapExecuteOnKeysMessageTask(ClientMessage clientMessage, Node node, Conne @Override protected OperationFactory createOperationFactory() { - EntryProcessor entryProcessor = serializationService.toObject(parameters.entryProcessor); - Set keys = new HashSet(parameters.keys); - return new MultipleEntryOperationFactory(parameters.name, keys, entryProcessor); + EntryProcessor processor = serializationService.toObject(parameters.entryProcessor); + MapOperationProvider operationProvider = getMapOperationProvider(parameters.name); + return operationProvider.createMultipleEntryOperationFactory(parameters.name, + new HashSet<>(parameters.keys), processor); } @Override