Skip to content

Commit

Permalink
ClientReauthOperation should be retryable
Browse files Browse the repository at this point in the history
ClientReauthOperation is used with `invokeOnStableClusterSerial`
This requires for the operation should be safely retyable.

Before ClientReauthOperation was always throwing exception because
it was expecting to the new correlation id is always greater.

We are allowing it to be equal or greater than to last set
correlation id to make the operation safely retryable.

fixes https://github.com/hazelcast/hazelcast/issues/13758

(cherry picked from commit 6e2e1dc)
  • Loading branch information
sancar committed Sep 18, 2018
1 parent 9b4e978 commit 6fafe52
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 3 deletions.
Expand Up @@ -18,9 +18,12 @@

import com.hazelcast.client.config.ClientConfig;
import com.hazelcast.client.impl.ClientEngineImpl;
import com.hazelcast.client.impl.operations.ClientReAuthOperation;
import com.hazelcast.client.test.TestHazelcastFactory;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IExecutorService;
import com.hazelcast.nio.Address;
import com.hazelcast.spi.impl.operationservice.InternalOperationService;
import com.hazelcast.test.AssertTask;
import com.hazelcast.test.HazelcastParallelClassRunner;
import com.hazelcast.test.HazelcastTestSupport;
Expand All @@ -34,6 +37,8 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicReference;

import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -109,6 +114,23 @@ public void run() throws Exception {
});
}

@Test
public void test_ClientReAuthOperation_retry() throws ExecutionException, InterruptedException {
HazelcastInstance instance = hazelcastFactory.newHazelcastInstance();
InternalOperationService operationService = getHazelcastInstanceImpl(instance).node.nodeEngine.getOperationService();

Address address = instance.getCluster().getLocalMember().getAddress();
ClientReAuthOperation reAuthOperation = new ClientReAuthOperation("clientUUId", 1);
Future<Object> future = operationService.invokeOnTarget(ClientEngineImpl.SERVICE_NAME, reAuthOperation, address);
future.get();

//retrying ClientReAuthOperation with same parameters, should not throw exception
ClientReAuthOperation reAuthOperation2 = new ClientReAuthOperation("clientUUId", 1);
Future<Object> future2 = operationService.invokeOnTarget(ClientEngineImpl.SERVICE_NAME, reAuthOperation2, address);
future2.get();

}

@Test
public void test_clientOwnedByAlreadyConnectedSecondMember_afterFirstOwnerDies() {
final HazelcastInstance instance1 = hazelcastFactory.newHazelcastInstance();
Expand Down
Expand Up @@ -408,7 +408,7 @@ public boolean trySetLastAuthenticationCorrelationId(String clientUuid, long new
AtomicLong lastCorrelationId = ConcurrencyUtil.getOrPutIfAbsent(lastAuthenticationCorrelationIds,
clientUuid,
LAST_AUTH_CORRELATION_ID_CONSTRUCTOR_FUNC);
return ConcurrencyUtil.setIfGreaterThan(lastCorrelationId, newCorrelationId);
return ConcurrencyUtil.setIfEqualOrGreaterThan(lastCorrelationId, newCorrelationId);
}

public String addOwnershipMapping(String clientUuid, String ownerUuid) {
Expand Down
Expand Up @@ -48,10 +48,10 @@ public static <E> void setMax(E obj, AtomicLongFieldUpdater<E> updater, long val
}
}

public static boolean setIfGreaterThan(AtomicLong oldValue, long newValue) {
public static boolean setIfEqualOrGreaterThan(AtomicLong oldValue, long newValue) {
while (true) {
long local = oldValue.get();
if (newValue <= local) {
if (newValue < local) {
return false;
}
if (oldValue.compareAndSet(local, newValue)) {
Expand Down
Expand Up @@ -27,9 +27,12 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

@RunWith(HazelcastParallelClassRunner.class)
@Category({QuickTest.class, ParallelTest.class})
Expand Down Expand Up @@ -93,6 +96,13 @@ public void testGetOrPutSynchronized_whenMutexFactoryIsNull_thenThrowException()
ConcurrencyUtil.getOrPutSynchronized(map, 5, factory, constructorFunction);
}

@Test
public void testSetIfEqualOrGreaterThan() {
assertTrue(ConcurrencyUtil.setIfEqualOrGreaterThan(new AtomicLong(1), 1));
assertTrue(ConcurrencyUtil.setIfEqualOrGreaterThan(new AtomicLong(1), 2));
assertFalse(ConcurrencyUtil.setIfEqualOrGreaterThan(new AtomicLong(2), 1));
}

@Test
public void testGetOrPutIfAbsent() {
int result = ConcurrencyUtil.getOrPutIfAbsent(map, 5, constructorFunction);
Expand Down

0 comments on commit 6fafe52

Please sign in to comment.