Skip to content

Commit

Permalink
Fix lock timeout behaviour when key owner is gone
Browse files Browse the repository at this point in the history
When a client waiting on lock to get it, if the member that it
is waiting on shuts down/terminated , client retries it on new
owner of the lock.

If a client waits for longer than client.invocation.timeout.seconds,
when exception came from server, it is wrapped inside
OperationTimeoutException and thrown to user.
This pr changes this behaviour so that waiting lock operations
(map.lock/trylock lock.lock/trylock condition.await) operations will
not throw OperationTimeoutException but retry the operation.

fixes hazelcast#13551
  • Loading branch information
sancar committed Aug 9, 2018
1 parent 134ae02 commit 79fac3c
Show file tree
Hide file tree
Showing 7 changed files with 130 additions and 9 deletions.
Expand Up @@ -86,7 +86,7 @@ private boolean doAwait(long time, TimeUnit unit, long threadId) throws Interrup
final long timeoutInMillis = unit.toMillis(time);
ClientMessage request = ConditionAwaitCodec
.encodeRequest(conditionId, threadId, timeoutInMillis, name, referenceIdGenerator.getNextReferenceId());
ClientMessage response = invokeOnPartition(request);
ClientMessage response = invokeOnPartition(request, Long.MAX_VALUE);
return ConditionAwaitCodec.decodeResponse(response).response;
}

Expand Down
Expand Up @@ -113,14 +113,14 @@ public ICondition newCondition(String name) {
public void lock() {
ClientMessage request = LockLockCodec
.encodeRequest(name, -1, ThreadUtil.getThreadId(), referenceIdGenerator.getNextReferenceId());
invokeOnPartition(request);
invokeOnPartition(request, Long.MAX_VALUE);
}

@Override
public void lockInterruptibly() throws InterruptedException {
ClientMessage request = LockLockCodec
.encodeRequest(name, -1, ThreadUtil.getThreadId(), referenceIdGenerator.getNextReferenceId());
invokeOnPartitionInterruptibly(request);
invokeOnPartitionInterruptibly(request, Long.MAX_VALUE);
}

@Override
Expand All @@ -145,7 +145,8 @@ public boolean tryLock(long timeout, TimeUnit unit, long leaseTime, TimeUnit lea
long threadId = ThreadUtil.getThreadId();
ClientMessage request = LockTryLockCodec
.encodeRequest(name, threadId, leaseTimeInMillis, timeoutInMillis, referenceIdGenerator.getNextReferenceId());
LockTryLockCodec.ResponseParameters resultParameters = LockTryLockCodec.decodeResponse(invokeOnPartition(request));
ClientMessage clientMessage = invokeOnPartition(request, Long.MAX_VALUE);
LockTryLockCodec.ResponseParameters resultParameters = LockTryLockCodec.decodeResponse(clientMessage);
return resultParameters.response;
}

Expand Down
Expand Up @@ -624,7 +624,19 @@ public void lock(K key, long leaseTime, TimeUnit timeUnit) {
Data keyData = toData(key);
ClientMessage request = MapLockCodec.encodeRequest(name, keyData, getThreadId(), getTimeInMillis(leaseTime, timeUnit),
lockReferenceIdGenerator.getNextReferenceId());
invoke(request, keyData);
invoke(request, keyData, Long.MAX_VALUE);
}

private <T> T invoke(ClientMessage clientMessage, Object key, long invocationTimeoutSeconds) {
final int partitionId = getContext().getPartitionService().getPartitionId(key);
try {
ClientInvocation clientInvocation = new ClientInvocation(getClient(), clientMessage, getName(), partitionId);
clientInvocation.setInvocationTimeoutMillis(invocationTimeoutSeconds);
final Future future = clientInvocation.invoke();
return (T) future.get();
} catch (Exception e) {
throw rethrow(e);
}
}

@Override
Expand Down Expand Up @@ -661,7 +673,7 @@ public boolean tryLock(K key, long time, TimeUnit timeunit, long leaseTime, Time
ClientMessage request = MapTryLockCodec.encodeRequest(name, keyData, getThreadId(), leaseTimeMillis, timeoutMillis,
lockReferenceIdGenerator.getNextReferenceId());

ClientMessage response = invoke(request, keyData);
ClientMessage response = invoke(request, keyData, Long.MAX_VALUE);
MapTryLockCodec.ResponseParameters resultParameters = MapTryLockCodec.decodeResponse(response);
return resultParameters.response;
}
Expand Down
Expand Up @@ -24,6 +24,9 @@
import com.hazelcast.client.spi.impl.ClientInvocationFuture;
import com.hazelcast.client.util.ClientDelegatingFuture;
import com.hazelcast.partition.strategy.StringPartitioningStrategy;
import com.hazelcast.util.ExceptionUtil;

import java.util.concurrent.Future;

import static com.hazelcast.util.ExceptionUtil.rethrow;

Expand Down Expand Up @@ -61,4 +64,27 @@ protected <T> ClientDelegatingFuture<T> invokeOnPartitionAsync(ClientMessage cli
throw rethrow(e);
}
}

protected <T> T invokeOnPartition(ClientMessage clientMessage, long invocationTimeoutSeconds) {
try {
ClientInvocation clientInvocation = new ClientInvocation(getClient(), clientMessage, getName(), partitionId);
clientInvocation.setInvocationTimeoutMillis(invocationTimeoutSeconds);
final Future future = clientInvocation.invoke();
return (T) future.get();
} catch (Exception e) {
throw rethrow(e);
}
}

protected <T> T invokeOnPartitionInterruptibly(ClientMessage clientMessage,
long invocationTimeoutSeconds) throws InterruptedException {
try {
ClientInvocation clientInvocation = new ClientInvocation(getClient(), clientMessage, getName(), partitionId);
clientInvocation.setInvocationTimeoutMillis(invocationTimeoutSeconds);
final Future future = clientInvocation.invoke();
return (T) future.get();
} catch (Exception e) {
throw ExceptionUtil.rethrowAllowInterrupted(e);
}
}
}
Expand Up @@ -75,6 +75,7 @@ public class ClientInvocation implements Runnable {
private volatile ClientConnection sendConnection;
private EventHandler handler;
private volatile long invokeCount;
private volatile long invocationTimeoutMillis;

protected ClientInvocation(HazelcastClientInstanceImpl client,
ClientMessage clientMessage,
Expand All @@ -97,6 +98,7 @@ protected ClientInvocation(HazelcastClientInstanceImpl client,
this.callIdSequence = invocationService.getCallIdSequence();
this.clientInvocationFuture = new ClientInvocationFuture(this, executionService,
clientMessage, logger, callIdSequence);
this.invocationTimeoutMillis = invocationService.getInvocationTimeoutMillis();
}

/**
Expand Down Expand Up @@ -195,6 +197,10 @@ private void retry() {
}
}

public void setInvocationTimeoutMillis(long invocationTimeoutMillis) {
this.invocationTimeoutMillis = invocationTimeoutMillis;
}

public void notify(ClientMessage clientMessage) {
if (clientMessage == null) {
throw new IllegalArgumentException("response can't be null");
Expand Down Expand Up @@ -225,7 +231,7 @@ public void notifyException(Throwable exception) {
}

long timePassed = System.currentTimeMillis() - startTimeMillis;
if (timePassed > invocationService.getInvocationTimeoutMillis()) {
if (timePassed > invocationTimeoutMillis) {
if (logger.isFinestEnabled()) {
logger.finest("Exception will not be retried because invocation timed out", exception);
}
Expand Down
Expand Up @@ -16,6 +16,8 @@

package com.hazelcast.client.lock;

import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.client.spi.properties.ClientProperty;
import com.hazelcast.client.test.TestHazelcastFactory;
import com.hazelcast.config.Config;
import com.hazelcast.core.HazelcastInstance;
Expand Down Expand Up @@ -65,7 +67,7 @@ public void run() {
}
}
}.start();
assertTrue(latch.await(5, TimeUnit.SECONDS));
assertOpenEventually(latch);
lock.forceUnlock();
}

Expand Down Expand Up @@ -362,4 +364,78 @@ public void run() {

assertTrue("Lock should have been released after lease expires", lock.tryLock(2, TimeUnit.MINUTES));
}

@Test
public void testKeyOwnerTerminates_afterInvocationTimeout() throws Exception {
HazelcastInstance keyOwner = factory.newHazelcastInstance();
HazelcastInstance instance = factory.newHazelcastInstance();
warmUpPartitions(keyOwner, instance);

ClientConfig clientConfig = new ClientConfig();
long invocationTimeoutMillis = 1000;
clientConfig.setProperty(ClientProperty.INVOCATION_TIMEOUT_SECONDS.getName(),
String.valueOf(TimeUnit.MILLISECONDS.toSeconds(invocationTimeoutMillis)));
final HazelcastInstance client = factory.newHazelcastClient(clientConfig);

final String key = generateKeyOwnedBy(keyOwner);
ILock serverLock = instance.getLock(key);
serverLock.lock();

final CountDownLatch latch = new CountDownLatch(1);
new Thread(new Runnable() {
public void run() {
final ILock lock = client.getLock(key);
lock.lock();
lock.unlock();
latch.countDown();
}
}).start();

Thread.sleep(invocationTimeoutMillis * 2);
keyOwner.getLifecycleService().terminate();

assertTrue(serverLock.isLocked());
assertTrue(serverLock.isLockedByCurrentThread());
assertTrue(serverLock.tryLock());
serverLock.unlock();
serverLock.unlock();
assertOpenEventually(latch);
}

@Test
public void testKeyOwnerShutsDown_afterInvocationTimeout() throws Exception {
HazelcastInstance keyOwner = factory.newHazelcastInstance();
HazelcastInstance instance = factory.newHazelcastInstance();
warmUpPartitions(keyOwner, instance);

ClientConfig clientConfig = new ClientConfig();
long invocationTimeoutMillis = 1000;
clientConfig.setProperty(ClientProperty.INVOCATION_TIMEOUT_SECONDS.getName(),
String.valueOf(TimeUnit.MILLISECONDS.toSeconds(invocationTimeoutMillis)));
final HazelcastInstance client = factory.newHazelcastClient(clientConfig);

final String key = generateKeyOwnedBy(keyOwner);
ILock serverLock = instance.getLock(key);
serverLock.lock();

final CountDownLatch latch = new CountDownLatch(1);
new Thread(new Runnable() {
public void run() {
final ILock lock = client.getLock(key);
lock.lock();
lock.unlock();
latch.countDown();
}
}).start();

Thread.sleep(invocationTimeoutMillis * 2);
keyOwner.shutdown();

assertTrue(serverLock.isLocked());
assertTrue(serverLock.isLockedByCurrentThread());
assertTrue(serverLock.tryLock());
serverLock.unlock();
serverLock.unlock();
assertOpenEventually(latch);
}
}
Expand Up @@ -345,7 +345,7 @@ public void run() {
assertTrue(lock1.tryLock());
lock1.unlock();
lock1.unlock();
assertOpenEventually(latch, 10);
assertOpenEventually(latch);
}

@Test(timeout = 100000)
Expand Down

0 comments on commit 79fac3c

Please sign in to comment.