Skip to content

Commit

Permalink
Retry map and cache partition destroy operations
Browse files Browse the repository at this point in the history
When the partition is migrating, the operations might fail with a
PartitionMigratingException. We then need to retry the operation as
otherwise we are leaking memory. As for other partition operations,
we wait for the migration to complete, using the default try count and
wait count.

Fixes: https://github.com/hazelcast/hazelcast-enterprise/issues/930
Fixes: https://github.com/hazelcast/hazelcast-enterprise/issues/1933
  • Loading branch information
Matko Medenjak committed Mar 27, 2018
1 parent 37330c5 commit 890c299
Show file tree
Hide file tree
Showing 5 changed files with 290 additions and 22 deletions.
Expand Up @@ -31,6 +31,7 @@
import com.hazelcast.spi.ExecutionService;
import com.hazelcast.spi.NodeEngine;
import com.hazelcast.spi.Operation;
import com.hazelcast.spi.OperationResponseHandler;
import com.hazelcast.spi.impl.AbstractCompletableFuture;
import com.hazelcast.spi.properties.GroupProperty;
import com.hazelcast.util.executor.CompletedFuture;
Expand Down Expand Up @@ -90,6 +91,34 @@ public static ICompletableFuture<Object> invokeOnStableClusterSerial(NodeEngine
return new ChainingFuture<Object>(invocationIterator, executor, memberIterator, logger);
}

/**
* Constructs a local execution with retry logic. The operation must not
* have an {@link OperationResponseHandler}, it must return a response
* and it must not validate the target.
*
* @return the local execution
* @throws IllegalArgumentException if the operation has a response handler
* set, if it does not return a response
* or if it validates the operation target
* @see Operation#returnsResponse()
* @see Operation#getOperationResponseHandler()
* @see Operation#validatesTarget()
*/
public static LocalRetryableExecution executeLocallyWithRetry(NodeEngine nodeEngine, Operation operation) {
if (operation.getOperationResponseHandler() != null) {
throw new IllegalArgumentException("Operation must not have a response handler set");
}
if (!operation.returnsResponse()) {
throw new IllegalArgumentException("Operation must return a response");
}
if (operation.validatesTarget()) {
throw new IllegalArgumentException("Operation must not validate the target");
}
final LocalRetryableExecution execution = new LocalRetryableExecution(nodeEngine, operation);
execution.run();
return execution;
}

private static void warmUpPartitions(NodeEngine nodeEngine) {
ClusterService clusterService = nodeEngine.getClusterService();
if (!clusterService.getClusterState().isMigrationAllowed()) {
Expand Down
@@ -0,0 +1,118 @@
/*
* Copyright (c) 2008-2018, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hazelcast.internal.util;

import com.hazelcast.logging.ILogger;
import com.hazelcast.spi.NodeEngine;
import com.hazelcast.spi.Operation;
import com.hazelcast.spi.OperationResponseHandler;
import com.hazelcast.spi.exception.RetryableHazelcastException;
import com.hazelcast.spi.properties.GroupProperty;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;

import static java.util.logging.Level.FINEST;
import static java.util.logging.Level.WARNING;

/**
* Class encapsulating local execution with retry logic. The operation must
* not have an {@link OperationResponseHandler} set and it must return
* response.
* The retry will use the configured
* {@link GroupProperty#INVOCATION_MAX_RETRY_COUNT} and
* {@link GroupProperty#INVOCATION_RETRY_PAUSE}.
*
* @see Operation#returnsResponse()
* @see Operation#getOperationResponseHandler()
* @see GroupProperty#INVOCATION_MAX_RETRY_COUNT
* @see GroupProperty#INVOCATION_RETRY_PAUSE
* @see InvocationUtil#executeLocallyWithRetry(NodeEngine, Operation)
*/
public class LocalRetryableExecution implements Runnable, OperationResponseHandler {
/** Number of times an operation is retried before being logged at WARNING level */
private static final int LOG_MAX_INVOCATION_COUNT = 99;
private final ILogger logger;
private final CountDownLatch done = new CountDownLatch(1);
private final Operation op;
private final NodeEngine nodeEngine;
private final long invocationRetryPauseMillis;
private final int invocationMaxRetryCount;
private volatile Object response;
private int tryCount;

LocalRetryableExecution(NodeEngine nodeEngine, Operation op) {
this.nodeEngine = nodeEngine;
this.logger = nodeEngine.getLogger(LocalRetryableExecution.class);
this.invocationMaxRetryCount = nodeEngine.getProperties().getInteger(GroupProperty.INVOCATION_MAX_RETRY_COUNT);
this.invocationRetryPauseMillis = nodeEngine.getProperties().getMillis(GroupProperty.INVOCATION_RETRY_PAUSE);
this.op = op;
op.setOperationResponseHandler(this);
}

/**
* Causes the current thread to wait until the operation has finished the
* thread is {@linkplain Thread#interrupt interrupted}, or the specified
* waiting time elapses. The operation might have finished because it has
* completed (successfully or with an error) or the retry count has been
* exceeded.
*
* @param timeout the maximum time to wait
* @param unit the time unit of the {@code timeout} argument
* @return {@code true} if the operation completed or the operation retry
* count has been exceeded, else {@code false}
* @throws InterruptedException if the current thread is interrupted
* while waiting
*/
public boolean awaitCompletion(long timeout, TimeUnit unit) throws InterruptedException {
return done.await(timeout, unit);
}

/**
* The response of the operation execution. It may be an exception if the
* exception was not an instance of {@link RetryableHazelcastException} or
* the maximum number of retry counts was exceeded.
* The response may be also {@code null} if the operation has no response
* or the operation has not completed yet.
*
* @return the operation response
*/
public Object getResponse() {
return response;
}

@Override
public void run() {
nodeEngine.getOperationService().execute(op);
}

@Override
public void sendResponse(Operation op, Object response) {
tryCount++;
if (response instanceof RetryableHazelcastException && tryCount < invocationMaxRetryCount) {
Level level = tryCount > LOG_MAX_INVOCATION_COUNT ? WARNING : FINEST;
if (logger.isLoggable(level)) {
logger.log(level, "Retrying local execution: " + toString() + ", Reason: " + response);
}
nodeEngine.getExecutionService().schedule(this, invocationRetryPauseMillis, TimeUnit.MILLISECONDS);
} else {
this.response = response;
done.countDown();
}
}
}
Expand Up @@ -22,6 +22,8 @@
import com.hazelcast.config.PartitioningStrategyConfig;
import com.hazelcast.core.PartitioningStrategy;
import com.hazelcast.internal.serialization.InternalSerializationService;
import com.hazelcast.internal.util.InvocationUtil;
import com.hazelcast.internal.util.LocalRetryableExecution;
import com.hazelcast.logging.ILogger;
import com.hazelcast.map.MapInterceptor;
import com.hazelcast.map.impl.event.MapEventPublisher;
Expand All @@ -36,7 +38,7 @@
import com.hazelcast.map.impl.operation.GetOperation;
import com.hazelcast.map.impl.operation.MapOperationProvider;
import com.hazelcast.map.impl.operation.MapOperationProviders;
import com.hazelcast.map.impl.operation.MapPartitionDestroyTask;
import com.hazelcast.map.impl.operation.MapPartitionDestroyOperation;
import com.hazelcast.map.impl.query.AccumulationExecutor;
import com.hazelcast.map.impl.query.AggregationResult;
import com.hazelcast.map.impl.query.AggregationResultProcessor;
Expand Down Expand Up @@ -75,15 +77,14 @@
import com.hazelcast.spi.NodeEngine;
import com.hazelcast.spi.Operation;
import com.hazelcast.spi.impl.eventservice.impl.TrueEventFilter;
import com.hazelcast.spi.impl.operationservice.InternalOperationService;
import com.hazelcast.spi.partition.IPartitionService;
import com.hazelcast.spi.serialization.SerializationService;
import com.hazelcast.util.ConcurrencyUtil;
import com.hazelcast.util.ConstructorFunction;
import com.hazelcast.util.ContextMutexFactory;
import com.hazelcast.util.ExceptionUtil;
import com.hazelcast.util.executor.ManagedExecutorService;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand All @@ -94,7 +95,6 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
Expand All @@ -109,6 +109,7 @@
import static com.hazelcast.spi.properties.GroupProperty.INDEX_COPY_BEHAVIOR;
import static com.hazelcast.spi.properties.GroupProperty.OPERATION_CALL_TIMEOUT_MILLIS;
import static com.hazelcast.spi.properties.GroupProperty.QUERY_PREDICATE_PARALLEL_EVALUATION;
import static java.lang.Thread.currentThread;

/**
* Default implementation of {@link MapServiceContext}.
Expand Down Expand Up @@ -152,9 +153,11 @@ class MapServiceContextImpl implements MapServiceContext {
protected final EventService eventService;
protected final MapOperationProviders operationProviders;
protected final ResultProcessorRegistry resultProcessorRegistry;
protected ILogger logger;

protected MapService mapService;

@SuppressWarnings("checkstyle:executablestatementcount")
MapServiceContextImpl(NodeEngine nodeEngine) {
this.nodeEngine = nodeEngine;
this.serializationService = ((InternalSerializationService) nodeEngine.getSerializationService());
Expand All @@ -175,6 +178,7 @@ class MapServiceContextImpl implements MapServiceContext {
this.eventService = nodeEngine.getEventService();
this.operationProviders = createOperationProviders();
this.partitioningStrategyFactory = new PartitioningStrategyFactory(nodeEngine.getConfigClassLoader());
this.logger = nodeEngine.getLogger(getClass());

initRecordComparators();
}
Expand Down Expand Up @@ -392,18 +396,30 @@ public void destroyMap(String mapName) {
destroyPartitionsAndMapContainer(mapContainer);
}

/**
* Destroys the map data on local partition threads and waits for
* {@value #DESTROY_TIMEOUT_SECONDS} seconds
* for each partition segment destruction to complete.
*
* @param mapContainer the map container to destroy
*/
private void destroyPartitionsAndMapContainer(MapContainer mapContainer) {
Semaphore semaphore = new Semaphore(0);
InternalOperationService operationService = (InternalOperationService) nodeEngine.getOperationService();
final List<LocalRetryableExecution> executions = new ArrayList<LocalRetryableExecution>();

for (PartitionContainer container : partitionContainers) {
MapPartitionDestroyTask partitionDestroyTask = new MapPartitionDestroyTask(container, mapContainer, semaphore);
operationService.execute(partitionDestroyTask);
final MapPartitionDestroyOperation op = new MapPartitionDestroyOperation(container, mapContainer);
executions.add(InvocationUtil.executeLocallyWithRetry(nodeEngine, op));
}

try {
semaphore.tryAcquire(partitionContainers.length, DESTROY_TIMEOUT_SECONDS, TimeUnit.SECONDS);
} catch (Throwable t) {
throw ExceptionUtil.rethrow(t);
for (LocalRetryableExecution execution : executions) {
try {
if (!execution.awaitCompletion(DESTROY_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
logger.warning("Map partition was not destroyed in expected time, possible leak");
}
} catch (InterruptedException e) {
currentThread().interrupt();
nodeEngine.getLogger(getClass()).warning(e);
}
}
}

Expand Down
Expand Up @@ -19,29 +19,31 @@

import com.hazelcast.map.impl.MapContainer;
import com.hazelcast.map.impl.PartitionContainer;
import com.hazelcast.spi.impl.PartitionSpecificRunnable;
import com.hazelcast.spi.AbstractLocalOperation;
import com.hazelcast.spi.PartitionAwareOperation;
import com.hazelcast.spi.impl.AllowedDuringPassiveState;

import java.util.concurrent.Semaphore;

public class MapPartitionDestroyTask implements PartitionSpecificRunnable {
/**
* Operation to destroy the map data on the partition thread
*/
public class MapPartitionDestroyOperation extends AbstractLocalOperation
implements PartitionAwareOperation, AllowedDuringPassiveState {
private final PartitionContainer partitionContainer;
private final MapContainer mapContainer;
private Semaphore semaphore;

public MapPartitionDestroyTask(PartitionContainer container, MapContainer mapContainer, Semaphore semaphore) {
public MapPartitionDestroyOperation(PartitionContainer container, MapContainer mapContainer) {
this.partitionContainer = container;
this.mapContainer = mapContainer;
this.semaphore = semaphore;
setPartitionId(partitionContainer.getPartitionId());
}

@Override
public void run() {
partitionContainer.destroyMap(mapContainer);
semaphore.release();
}

@Override
public int getPartitionId() {
return partitionContainer.getPartitionId();
public boolean validatesTarget() {
return false;
}
}

0 comments on commit 890c299

Please sign in to comment.