From 53933cf4eb892102f2aba043e99b402e5cb847f3 Mon Sep 17 00:00:00 2001 From: sancar Date: Thu, 9 Aug 2018 14:08:39 +0300 Subject: [PATCH] Fix lock timeout behaviour when key owner is gone When a client waiting on lock to get it, if the member that it is waiting on shuts down/terminated , client retries it on new owner of the lock. If a client waits for longer than client.invocation.timeout.seconds, when exception came from server, it is wrapped inside OperationTimeoutException and thrown to user. This pr changes this behaviour so that waiting lock operations (map.lock/trylock lock.lock/trylock condition.await) operations will not throw OperationTimeoutException but retry the operation. fixes https://github.com/hazelcast/hazelcast/issues/13551 (cherry picked from commit 79fac3ce15bd0572cd62f8830f37fb2e097b4d16) --- .../client/proxy/ClientConditionProxy.java | 2 +- .../client/proxy/ClientLockProxy.java | 7 +- .../client/proxy/ClientMapProxy.java | 16 +++- .../proxy/PartitionSpecificClientProxy.java | 26 +++++++ .../client/spi/impl/ClientInvocation.java | 8 +- .../hazelcast/client/lock/ClientLockTest.java | 78 ++++++++++++++++++- .../concurrent/lock/LockAdvancedTest.java | 2 +- 7 files changed, 130 insertions(+), 9 deletions(-) diff --git a/hazelcast-client/src/main/java/com/hazelcast/client/proxy/ClientConditionProxy.java b/hazelcast-client/src/main/java/com/hazelcast/client/proxy/ClientConditionProxy.java index 9d9ceccd8b51..57e456200828 100644 --- a/hazelcast-client/src/main/java/com/hazelcast/client/proxy/ClientConditionProxy.java +++ b/hazelcast-client/src/main/java/com/hazelcast/client/proxy/ClientConditionProxy.java @@ -86,7 +86,7 @@ private boolean doAwait(long time, TimeUnit unit, long threadId) throws Interrup final long timeoutInMillis = unit.toMillis(time); ClientMessage request = ConditionAwaitCodec .encodeRequest(conditionId, threadId, timeoutInMillis, name, referenceIdGenerator.getNextReferenceId()); - ClientMessage response = invokeOnPartition(request); + ClientMessage response = invokeOnPartition(request, Long.MAX_VALUE); return ConditionAwaitCodec.decodeResponse(response).response; } diff --git a/hazelcast-client/src/main/java/com/hazelcast/client/proxy/ClientLockProxy.java b/hazelcast-client/src/main/java/com/hazelcast/client/proxy/ClientLockProxy.java index e799173f7cb3..8372474fd0ac 100644 --- a/hazelcast-client/src/main/java/com/hazelcast/client/proxy/ClientLockProxy.java +++ b/hazelcast-client/src/main/java/com/hazelcast/client/proxy/ClientLockProxy.java @@ -113,14 +113,14 @@ public ICondition newCondition(String name) { public void lock() { ClientMessage request = LockLockCodec .encodeRequest(name, -1, ThreadUtil.getThreadId(), referenceIdGenerator.getNextReferenceId()); - invokeOnPartition(request); + invokeOnPartition(request, Long.MAX_VALUE); } @Override public void lockInterruptibly() throws InterruptedException { ClientMessage request = LockLockCodec .encodeRequest(name, -1, ThreadUtil.getThreadId(), referenceIdGenerator.getNextReferenceId()); - invokeOnPartitionInterruptibly(request); + invokeOnPartitionInterruptibly(request, Long.MAX_VALUE); } @Override @@ -145,7 +145,8 @@ public boolean tryLock(long timeout, TimeUnit unit, long leaseTime, TimeUnit lea long threadId = ThreadUtil.getThreadId(); ClientMessage request = LockTryLockCodec .encodeRequest(name, threadId, leaseTimeInMillis, timeoutInMillis, referenceIdGenerator.getNextReferenceId()); - LockTryLockCodec.ResponseParameters resultParameters = LockTryLockCodec.decodeResponse(invokeOnPartition(request)); + ClientMessage clientMessage = invokeOnPartition(request, Long.MAX_VALUE); + LockTryLockCodec.ResponseParameters resultParameters = LockTryLockCodec.decodeResponse(clientMessage); return resultParameters.response; } diff --git a/hazelcast-client/src/main/java/com/hazelcast/client/proxy/ClientMapProxy.java b/hazelcast-client/src/main/java/com/hazelcast/client/proxy/ClientMapProxy.java index 47579714e289..693be0dc5ccf 100644 --- a/hazelcast-client/src/main/java/com/hazelcast/client/proxy/ClientMapProxy.java +++ b/hazelcast-client/src/main/java/com/hazelcast/client/proxy/ClientMapProxy.java @@ -623,7 +623,19 @@ public void lock(K key, long leaseTime, TimeUnit timeUnit) { Data keyData = toData(key); ClientMessage request = MapLockCodec.encodeRequest(name, keyData, getThreadId(), getTimeInMillis(leaseTime, timeUnit), lockReferenceIdGenerator.getNextReferenceId()); - invoke(request, keyData); + invoke(request, keyData, Long.MAX_VALUE); + } + + private T invoke(ClientMessage clientMessage, Object key, long invocationTimeoutSeconds) { + final int partitionId = getContext().getPartitionService().getPartitionId(key); + try { + ClientInvocation clientInvocation = new ClientInvocation(getClient(), clientMessage, getName(), partitionId); + clientInvocation.setInvocationTimeoutMillis(invocationTimeoutSeconds); + final Future future = clientInvocation.invoke(); + return (T) future.get(); + } catch (Exception e) { + throw rethrow(e); + } } @Override @@ -660,7 +672,7 @@ public boolean tryLock(K key, long time, TimeUnit timeunit, long leaseTime, Time ClientMessage request = MapTryLockCodec.encodeRequest(name, keyData, getThreadId(), leaseTimeMillis, timeoutMillis, lockReferenceIdGenerator.getNextReferenceId()); - ClientMessage response = invoke(request, keyData); + ClientMessage response = invoke(request, keyData, Long.MAX_VALUE); MapTryLockCodec.ResponseParameters resultParameters = MapTryLockCodec.decodeResponse(response); return resultParameters.response; } diff --git a/hazelcast-client/src/main/java/com/hazelcast/client/proxy/PartitionSpecificClientProxy.java b/hazelcast-client/src/main/java/com/hazelcast/client/proxy/PartitionSpecificClientProxy.java index a4d636394552..8c3e7c026ef2 100644 --- a/hazelcast-client/src/main/java/com/hazelcast/client/proxy/PartitionSpecificClientProxy.java +++ b/hazelcast-client/src/main/java/com/hazelcast/client/proxy/PartitionSpecificClientProxy.java @@ -24,6 +24,9 @@ import com.hazelcast.client.spi.impl.ClientInvocationFuture; import com.hazelcast.client.util.ClientDelegatingFuture; import com.hazelcast.partition.strategy.StringPartitioningStrategy; +import com.hazelcast.util.ExceptionUtil; + +import java.util.concurrent.Future; import static com.hazelcast.util.ExceptionUtil.rethrow; @@ -62,4 +65,27 @@ protected ClientDelegatingFuture invokeOnPartitionAsync(ClientMessage cli throw rethrow(e); } } + + protected T invokeOnPartition(ClientMessage clientMessage, long invocationTimeoutSeconds) { + try { + ClientInvocation clientInvocation = new ClientInvocation(getClient(), clientMessage, getName(), partitionId); + clientInvocation.setInvocationTimeoutMillis(invocationTimeoutSeconds); + final Future future = clientInvocation.invoke(); + return (T) future.get(); + } catch (Exception e) { + throw rethrow(e); + } + } + + protected T invokeOnPartitionInterruptibly(ClientMessage clientMessage, + long invocationTimeoutSeconds) throws InterruptedException { + try { + ClientInvocation clientInvocation = new ClientInvocation(getClient(), clientMessage, getName(), partitionId); + clientInvocation.setInvocationTimeoutMillis(invocationTimeoutSeconds); + final Future future = clientInvocation.invoke(); + return (T) future.get(); + } catch (Exception e) { + throw ExceptionUtil.rethrowAllowInterrupted(e); + } + } } diff --git a/hazelcast-client/src/main/java/com/hazelcast/client/spi/impl/ClientInvocation.java b/hazelcast-client/src/main/java/com/hazelcast/client/spi/impl/ClientInvocation.java index 5a8427122495..d6eb054ef509 100644 --- a/hazelcast-client/src/main/java/com/hazelcast/client/spi/impl/ClientInvocation.java +++ b/hazelcast-client/src/main/java/com/hazelcast/client/spi/impl/ClientInvocation.java @@ -76,6 +76,7 @@ public class ClientInvocation implements Runnable { private boolean bypassHeartbeatCheck; private EventHandler handler; private volatile long invokeCount; + private volatile long invocationTimeoutMillis; protected ClientInvocation(HazelcastClientInstanceImpl client, ClientMessage clientMessage, @@ -98,6 +99,7 @@ protected ClientInvocation(HazelcastClientInstanceImpl client, this.callIdSequence = client.getCallIdSequence(); this.clientInvocationFuture = new ClientInvocationFuture(this, executionService, clientMessage, logger, callIdSequence); + this.invocationTimeoutMillis = invocationService.getInvocationTimeoutMillis(); } /** @@ -196,6 +198,10 @@ private void retry() { } } + public void setInvocationTimeoutMillis(long invocationTimeoutMillis) { + this.invocationTimeoutMillis = invocationTimeoutMillis; + } + public void notify(ClientMessage clientMessage) { if (clientMessage == null) { throw new IllegalArgumentException("response can't be null"); @@ -225,7 +231,7 @@ public void notifyException(Throwable exception) { } long timePassed = System.currentTimeMillis() - startTimeMillis; - if (timePassed > invocationService.getInvocationTimeoutMillis()) { + if (timePassed > invocationTimeoutMillis) { if (logger.isFinestEnabled()) { logger.finest("Exception will not be retried because invocation timed out", exception); } diff --git a/hazelcast-client/src/test/java/com/hazelcast/client/lock/ClientLockTest.java b/hazelcast-client/src/test/java/com/hazelcast/client/lock/ClientLockTest.java index 35adcc39bbd5..d54ab0562320 100644 --- a/hazelcast-client/src/test/java/com/hazelcast/client/lock/ClientLockTest.java +++ b/hazelcast-client/src/test/java/com/hazelcast/client/lock/ClientLockTest.java @@ -16,6 +16,8 @@ package com.hazelcast.client.lock; +import com.hazelcast.client.config.ClientConfig; +import com.hazelcast.client.spi.properties.ClientProperty; import com.hazelcast.client.test.TestHazelcastFactory; import com.hazelcast.config.Config; import com.hazelcast.core.HazelcastInstance; @@ -65,7 +67,7 @@ public void run() { } } }.start(); - assertTrue(latch.await(5, TimeUnit.SECONDS)); + assertOpenEventually(latch); lock.forceUnlock(); } @@ -362,4 +364,78 @@ public void run() { assertTrue("Lock should have been released after lease expires", lock.tryLock(2, TimeUnit.MINUTES)); } + + @Test + public void testKeyOwnerTerminates_afterInvocationTimeout() throws Exception { + HazelcastInstance keyOwner = factory.newHazelcastInstance(); + HazelcastInstance instance = factory.newHazelcastInstance(); + warmUpPartitions(keyOwner, instance); + + ClientConfig clientConfig = new ClientConfig(); + long invocationTimeoutMillis = 1000; + clientConfig.setProperty(ClientProperty.INVOCATION_TIMEOUT_SECONDS.getName(), + String.valueOf(TimeUnit.MILLISECONDS.toSeconds(invocationTimeoutMillis))); + final HazelcastInstance client = factory.newHazelcastClient(clientConfig); + + final String key = generateKeyOwnedBy(keyOwner); + ILock serverLock = instance.getLock(key); + serverLock.lock(); + + final CountDownLatch latch = new CountDownLatch(1); + new Thread(new Runnable() { + public void run() { + final ILock lock = client.getLock(key); + lock.lock(); + lock.unlock(); + latch.countDown(); + } + }).start(); + + Thread.sleep(invocationTimeoutMillis * 2); + keyOwner.getLifecycleService().terminate(); + + assertTrue(serverLock.isLocked()); + assertTrue(serverLock.isLockedByCurrentThread()); + assertTrue(serverLock.tryLock()); + serverLock.unlock(); + serverLock.unlock(); + assertOpenEventually(latch); + } + + @Test + public void testKeyOwnerShutsDown_afterInvocationTimeout() throws Exception { + HazelcastInstance keyOwner = factory.newHazelcastInstance(); + HazelcastInstance instance = factory.newHazelcastInstance(); + warmUpPartitions(keyOwner, instance); + + ClientConfig clientConfig = new ClientConfig(); + long invocationTimeoutMillis = 1000; + clientConfig.setProperty(ClientProperty.INVOCATION_TIMEOUT_SECONDS.getName(), + String.valueOf(TimeUnit.MILLISECONDS.toSeconds(invocationTimeoutMillis))); + final HazelcastInstance client = factory.newHazelcastClient(clientConfig); + + final String key = generateKeyOwnedBy(keyOwner); + ILock serverLock = instance.getLock(key); + serverLock.lock(); + + final CountDownLatch latch = new CountDownLatch(1); + new Thread(new Runnable() { + public void run() { + final ILock lock = client.getLock(key); + lock.lock(); + lock.unlock(); + latch.countDown(); + } + }).start(); + + Thread.sleep(invocationTimeoutMillis * 2); + keyOwner.shutdown(); + + assertTrue(serverLock.isLocked()); + assertTrue(serverLock.isLockedByCurrentThread()); + assertTrue(serverLock.tryLock()); + serverLock.unlock(); + serverLock.unlock(); + assertOpenEventually(latch); + } } diff --git a/hazelcast/src/test/java/com/hazelcast/concurrent/lock/LockAdvancedTest.java b/hazelcast/src/test/java/com/hazelcast/concurrent/lock/LockAdvancedTest.java index 57f1890a0012..e549ea6473fc 100644 --- a/hazelcast/src/test/java/com/hazelcast/concurrent/lock/LockAdvancedTest.java +++ b/hazelcast/src/test/java/com/hazelcast/concurrent/lock/LockAdvancedTest.java @@ -345,7 +345,7 @@ public void run() { assertTrue(lock1.tryLock()); lock1.unlock(); lock1.unlock(); - assertOpenEventually(latch, 10); + assertOpenEventually(latch); } @Test(timeout = 100000)