Skip to content

Commit

Permalink
Merge pull request #13578 from sancar/fix/serverClosesListeningClient…
Browse files Browse the repository at this point in the history
…/maint3.x

Server should not close clients when client only listens
  • Loading branch information
mustafa sancar koyunlu committed Aug 28, 2018
2 parents 65d4f9b + ec5ab10 commit faf6dbf
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 1 deletion.
Expand Up @@ -99,7 +99,7 @@ private void checkConnection(long now, final ClientConnection connection) {
}
}

if (now - connection.lastReadTimeMillis() > heartbeatInterval) {
if (now - connection.lastWriteTimeMillis() > heartbeatInterval) {
ClientMessage request = ClientPingCodec.encodeRequest();
final ClientInvocation clientInvocation = new ClientInvocation(client, request, null, connection);
clientInvocation.setBypassHeartbeatCheck(true);
Expand Down
Expand Up @@ -26,15 +26,20 @@
import com.hazelcast.config.ListenerConfig;
import com.hazelcast.config.NearCacheConfig;
import com.hazelcast.config.SerializationConfig;
import com.hazelcast.core.Client;
import com.hazelcast.core.ClientListener;
import com.hazelcast.core.DistributedObject;
import com.hazelcast.core.EntryAdapter;
import com.hazelcast.core.EntryEvent;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IExecutorService;
import com.hazelcast.core.ILock;
import com.hazelcast.core.IMap;
import com.hazelcast.core.ITopic;
import com.hazelcast.core.LifecycleEvent;
import com.hazelcast.core.LifecycleListener;
import com.hazelcast.core.Message;
import com.hazelcast.core.MessageListener;
import com.hazelcast.logging.Logger;
import com.hazelcast.map.MapInterceptor;
import com.hazelcast.map.listener.MapListener;
Expand All @@ -46,12 +51,14 @@
import com.hazelcast.nio.serialization.PortableReader;
import com.hazelcast.nio.serialization.PortableWriter;
import com.hazelcast.security.UsernamePasswordCredentials;
import com.hazelcast.spi.properties.GroupProperty;
import com.hazelcast.test.AssertTask;
import com.hazelcast.test.HazelcastParallelClassRunner;
import com.hazelcast.test.HazelcastTestSupport;
import com.hazelcast.test.annotation.NightlyTest;
import com.hazelcast.test.annotation.ParallelTest;
import com.hazelcast.test.annotation.QuickTest;
import com.hazelcast.test.annotation.SlowTest;
import org.junit.After;
import org.junit.Test;
import org.junit.experimental.categories.Category;
Expand All @@ -65,6 +72,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.hazelcast.config.EvictionConfig.MaxSizePolicy.ENTRY_COUNT;
import static com.hazelcast.core.LifecycleEvent.LifecycleState;
Expand Down Expand Up @@ -835,4 +843,50 @@ public void testMemberAddedWithListeners_thenCheckOperationsNotHanging() {
String key = generateKeyOwnedBy(h2);
map.get(key);
}


@Test
@Category(SlowTest.class)
public void testServerShouldNotCloseClientWhenClientOnlyListening() {
Config config = new Config();
int clientHeartbeatSeconds = 8;
config.setProperty(GroupProperty.CLIENT_HEARTBEAT_TIMEOUT_SECONDS.getName(), String.valueOf(clientHeartbeatSeconds));
HazelcastInstance instance = hazelcastFactory.newHazelcastInstance(config);
ClientConfig clientConfig = new ClientConfig();
clientConfig.setProperty(ClientProperty.HEARTBEAT_INTERVAL.getName(), "1000");
HazelcastInstance client = hazelcastFactory.newHazelcastClient(clientConfig);
HazelcastInstance instance2 = hazelcastFactory.newHazelcastInstance(config);
warmUpPartitions(instance, instance2);
String key = generateKeyOwnedBy(instance2);

final AtomicBoolean isClientDisconnected = new AtomicBoolean();
instance.getClientService().addClientListener(new ClientListener() {
@Override
public void clientConnected(Client client) {

}

@Override
public void clientDisconnected(Client client) {
isClientDisconnected.set(true);
}
});

ITopic topic = client.getTopic(key);
MessageListener listener = new MessageListener() {
public void onMessage(Message message) {
}
};
String id = topic.addMessageListener(listener);

ITopic<Object> instanceTopic = instance.getTopic(key);
long begin = System.currentTimeMillis();

while (System.currentTimeMillis() - begin < TimeUnit.SECONDS.toMillis(clientHeartbeatSeconds * 2)) {
instanceTopic.publish("message");
}

topic.removeMessageListener(id);
assertFalse(isClientDisconnected.get());
}
}

0 comments on commit faf6dbf

Please sign in to comment.