Skip to content

Commit

Permalink
Merge pull request #13689 from sancar/followUpTo/lockTimeoutsWhenKeyo…
Browse files Browse the repository at this point in the history
…nerGone/master

Fix one more lock timeout problem
  • Loading branch information
mustafa sancar koyunlu committed Sep 19, 2018
2 parents 1767c1c + abcbc88 commit fd73a92
Show file tree
Hide file tree
Showing 3 changed files with 282 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ public void lock(long leaseTime, TimeUnit timeUnit) {
checkPositive(leaseTime, "leaseTime should be positive");
ClientMessage request = LockLockCodec.encodeRequest(name,
getTimeInMillis(leaseTime, timeUnit), ThreadUtil.getThreadId(), referenceIdGenerator.getNextReferenceId());
invokeOnPartition(request);
invokeOnPartition(request, Long.MAX_VALUE);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,281 @@
/*
* Copyright (c) 2008-2018, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hazelcast.client.lock;

import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.client.spi.properties.ClientProperty;
import com.hazelcast.client.test.ClientTestSupport;
import com.hazelcast.client.test.TestHazelcastFactory;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.ILock;
import com.hazelcast.core.IMap;
import com.hazelcast.test.HazelcastParallelParametersRunnerFactory;
import com.hazelcast.test.annotation.ParallelTest;
import com.hazelcast.test.annotation.QuickTest;
import com.hazelcast.util.function.BiConsumer;
import com.hazelcast.util.function.Consumer;
import org.junit.After;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static java.util.Arrays.asList;
import static org.junit.Assert.assertTrue;


@RunWith(Parameterized.class)
@Parameterized.UseParametersRunnerFactory(HazelcastParallelParametersRunnerFactory.class)
@Category({QuickTest.class, ParallelTest.class})
public class ClientLockRetryWhenOwnerDiesTest extends ClientTestSupport {

private static long leaseTime = 120;
private static long waitTime = 120;

static class Shutdown implements Consumer<HazelcastInstance> {
@Override
public void accept(HazelcastInstance hazelcastInstance) {
hazelcastInstance.shutdown();
}

@Override
public String toString() {
return "Shutdown{}";
}
}

static class Terminate implements Consumer<HazelcastInstance> {
@Override
public void accept(HazelcastInstance hazelcastInstance) {
hazelcastInstance.getLifecycleService().terminate();
}

@Override
public String toString() {
return "Terminate{}";
}
}

static class LockLock implements BiConsumer<HazelcastInstance, String> {

@Override
public void accept(HazelcastInstance client, String key) {
ILock lock = client.getLock(key);
lock.lock();
lock.unlock();
}

@Override
public String toString() {
return "LockLock{}";
}
}

static class LockLockLease implements BiConsumer<HazelcastInstance, String> {

@Override
public void accept(HazelcastInstance client, String key) {
ILock lock = client.getLock(key);
lock.lock(leaseTime, TimeUnit.SECONDS);
lock.unlock();
}

@Override
public String toString() {
return "LockLockLease{}";
}
}

static class LockTryLockTimeout implements BiConsumer<HazelcastInstance, String> {

@Override
public void accept(HazelcastInstance client, String key) {
ILock lock = client.getLock(key);
try {
lock.tryLock(waitTime, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.unlock();
}

@Override
public String toString() {
return "LockTryLockTimeout{}";
}
}

static class LockTryLockTimeoutLease implements BiConsumer<HazelcastInstance, String> {

@Override
public void accept(HazelcastInstance client, String key) {
ILock lock = client.getLock(key);
try {
lock.tryLock(waitTime, TimeUnit.SECONDS, leaseTime, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
lock.unlock();
}

@Override
public String toString() {
return "LockTryLockTimeoutLease{}";
}
}

static class MapLock implements BiConsumer<HazelcastInstance, String> {

@Override
public void accept(HazelcastInstance client, String key) {
IMap map = client.getMap(key);
map.lock(key);
map.unlock(key);
}

@Override
public String toString() {
return "MapLock{}";
}
}

static class MapLockLease implements BiConsumer<HazelcastInstance, String> {

@Override
public void accept(HazelcastInstance client, String key) {
IMap map = client.getMap(key);
map.lock(key, leaseTime, TimeUnit.SECONDS);
map.unlock(key);
}
}

static class MapTryLockTimeout implements BiConsumer<HazelcastInstance, String> {

@Override
public void accept(HazelcastInstance client, String key) {
IMap map = client.getMap(key);
try {
map.tryLock(key, waitTime, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
map.unlock(key);
}

@Override
public String toString() {
return "MapTryLockTimeout{}";
}
}

static class MapTryLockTimeoutLease implements BiConsumer<HazelcastInstance, String> {

@Override
public void accept(HazelcastInstance client, String key) {
IMap map = client.getMap(key);
try {
map.tryLock(key, waitTime, TimeUnit.SECONDS, leaseTime, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
map.unlock(key);
}

@Override
public String toString() {
return "MapTryLockTimeoutLease{}";
}
}


@Parameterized.Parameters(name = "closePolicy:{0}, lockPolicy:{1}")
public static Collection<Object[]> parameters() {
List<Consumer<HazelcastInstance>> closePolicies = asList(new Shutdown(), new Terminate());
List<BiConsumer<HazelcastInstance, String>> lockPolicies =
asList(new LockLock(), new LockLockLease(), new LockTryLockTimeout(), new LockTryLockTimeoutLease(),
new MapLock(), new MapLockLease(), new MapTryLockTimeout(), new MapTryLockTimeoutLease());

Collection<Object[]> objects = new LinkedList<Object[]>();
for (Consumer<HazelcastInstance> closePolicy : closePolicies) {
for (BiConsumer<HazelcastInstance, String> lockPolicy : lockPolicies) {
objects.add(new Object[]{closePolicy, lockPolicy});
}
}

return objects;
}

@Parameterized.Parameter
public Consumer<HazelcastInstance> closePolicy;

@Parameterized.Parameter(1)
public BiConsumer<HazelcastInstance, String> lockPolicy;


private TestHazelcastFactory factory = new TestHazelcastFactory();

@After
public void teardown() {
factory.terminateAll();
}

@Test
public void testKeyOwnerCloses_afterInvocationTimeout() throws Exception {
HazelcastInstance keyOwner = factory.newHazelcastInstance();
HazelcastInstance instance = factory.newHazelcastInstance();
warmUpPartitions(keyOwner, instance);

ClientConfig clientConfig = new ClientConfig();
long invocationTimeoutMillis = 1000;
clientConfig.setProperty(ClientProperty.INVOCATION_TIMEOUT_SECONDS.getName(),
String.valueOf(TimeUnit.MILLISECONDS.toSeconds(invocationTimeoutMillis)));
final HazelcastInstance client = factory.newHazelcastClient(clientConfig);

final String key = generateKeyOwnedBy(keyOwner);
ILock serverLock = client.getLock(key);
serverLock.lock();

makeSureConnectedToServers(client, 2);

final CountDownLatch latch = new CountDownLatch(1);
new Thread(new Runnable() {
public void run() {
lockPolicy.accept(client, key);
latch.countDown();
}
}).start();

Thread.sleep(invocationTimeoutMillis * 2);
closePolicy.accept(keyOwner);

assertTrue(serverLock.isLocked());
assertTrue(serverLock.isLockedByCurrentThread());
assertTrue(serverLock.tryLock());
serverLock.unlock();
serverLock.unlock();
assertOpenEventually(latch);
}

}


Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package com.hazelcast.client.lock;

import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.client.spi.properties.ClientProperty;
import com.hazelcast.client.test.TestHazelcastFactory;
import com.hazelcast.config.Config;
import com.hazelcast.core.HazelcastInstance;
Expand Down Expand Up @@ -365,77 +363,4 @@ public void run() {
assertTrue("Lock should have been released after lease expires", lock.tryLock(2, TimeUnit.MINUTES));
}

@Test
public void testKeyOwnerTerminates_afterInvocationTimeout() throws Exception {
HazelcastInstance keyOwner = factory.newHazelcastInstance();
HazelcastInstance instance = factory.newHazelcastInstance();
warmUpPartitions(keyOwner, instance);

ClientConfig clientConfig = new ClientConfig();
long invocationTimeoutMillis = 1000;
clientConfig.setProperty(ClientProperty.INVOCATION_TIMEOUT_SECONDS.getName(),
String.valueOf(TimeUnit.MILLISECONDS.toSeconds(invocationTimeoutMillis)));
final HazelcastInstance client = factory.newHazelcastClient(clientConfig);

final String key = generateKeyOwnedBy(keyOwner);
ILock serverLock = instance.getLock(key);
serverLock.lock();

final CountDownLatch latch = new CountDownLatch(1);
new Thread(new Runnable() {
public void run() {
final ILock lock = client.getLock(key);
lock.lock();
lock.unlock();
latch.countDown();
}
}).start();

Thread.sleep(invocationTimeoutMillis * 2);
keyOwner.getLifecycleService().terminate();

assertTrue(serverLock.isLocked());
assertTrue(serverLock.isLockedByCurrentThread());
assertTrue(serverLock.tryLock());
serverLock.unlock();
serverLock.unlock();
assertOpenEventually(latch);
}

@Test
public void testKeyOwnerShutsDown_afterInvocationTimeout() throws Exception {
HazelcastInstance keyOwner = factory.newHazelcastInstance();
HazelcastInstance instance = factory.newHazelcastInstance();
warmUpPartitions(keyOwner, instance);

ClientConfig clientConfig = new ClientConfig();
long invocationTimeoutMillis = 1000;
clientConfig.setProperty(ClientProperty.INVOCATION_TIMEOUT_SECONDS.getName(),
String.valueOf(TimeUnit.MILLISECONDS.toSeconds(invocationTimeoutMillis)));
final HazelcastInstance client = factory.newHazelcastClient(clientConfig);

final String key = generateKeyOwnedBy(keyOwner);
ILock serverLock = instance.getLock(key);
serverLock.lock();

final CountDownLatch latch = new CountDownLatch(1);
new Thread(new Runnable() {
public void run() {
final ILock lock = client.getLock(key);
lock.lock();
lock.unlock();
latch.countDown();
}
}).start();

Thread.sleep(invocationTimeoutMillis * 2);
keyOwner.shutdown();

assertTrue(serverLock.isLocked());
assertTrue(serverLock.isLockedByCurrentThread());
assertTrue(serverLock.tryLock());
serverLock.unlock();
serverLock.unlock();
assertOpenEventually(latch);
}
}

0 comments on commit fd73a92

Please sign in to comment.