Skip to content

Commit

Permalink
Server should not close clients when client only listens
Browse files Browse the repository at this point in the history
Client was not sending Ping to server when last read time is
up-to-date. When server constantly pushes events to client,
client does not send any ping to server. And consequently,
server closes client because it does not send any ping.

Client logic has changed so that it send pings if there is no
outgoing packages recently, rarther than looking at incoming
packages.

fixes hazelcast#13576

(cherry picked from commit 9bb70fa)
  • Loading branch information
sancar committed Aug 15, 2018
1 parent bee5a96 commit ec5ab10
Show file tree
Hide file tree
Showing 2 changed files with 55 additions and 1 deletion.
Expand Up @@ -99,7 +99,7 @@ private void checkConnection(long now, final ClientConnection connection) {
}
}

if (now - connection.lastReadTimeMillis() > heartbeatInterval) {
if (now - connection.lastWriteTimeMillis() > heartbeatInterval) {
ClientMessage request = ClientPingCodec.encodeRequest();
final ClientInvocation clientInvocation = new ClientInvocation(client, request, null, connection);
clientInvocation.setBypassHeartbeatCheck(true);
Expand Down
Expand Up @@ -26,15 +26,20 @@
import com.hazelcast.config.ListenerConfig;
import com.hazelcast.config.NearCacheConfig;
import com.hazelcast.config.SerializationConfig;
import com.hazelcast.core.Client;
import com.hazelcast.core.ClientListener;
import com.hazelcast.core.DistributedObject;
import com.hazelcast.core.EntryAdapter;
import com.hazelcast.core.EntryEvent;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IExecutorService;
import com.hazelcast.core.ILock;
import com.hazelcast.core.IMap;
import com.hazelcast.core.ITopic;
import com.hazelcast.core.LifecycleEvent;
import com.hazelcast.core.LifecycleListener;
import com.hazelcast.core.Message;
import com.hazelcast.core.MessageListener;
import com.hazelcast.logging.Logger;
import com.hazelcast.map.MapInterceptor;
import com.hazelcast.map.listener.MapListener;
Expand All @@ -46,12 +51,14 @@
import com.hazelcast.nio.serialization.PortableReader;
import com.hazelcast.nio.serialization.PortableWriter;
import com.hazelcast.security.UsernamePasswordCredentials;
import com.hazelcast.spi.properties.GroupProperty;
import com.hazelcast.test.AssertTask;
import com.hazelcast.test.HazelcastParallelClassRunner;
import com.hazelcast.test.HazelcastTestSupport;
import com.hazelcast.test.annotation.NightlyTest;
import com.hazelcast.test.annotation.ParallelTest;
import com.hazelcast.test.annotation.QuickTest;
import com.hazelcast.test.annotation.SlowTest;
import org.junit.After;
import org.junit.Test;
import org.junit.experimental.categories.Category;
Expand All @@ -65,6 +72,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.hazelcast.config.EvictionConfig.MaxSizePolicy.ENTRY_COUNT;
import static com.hazelcast.core.LifecycleEvent.LifecycleState;
Expand Down Expand Up @@ -835,4 +843,50 @@ public void testMemberAddedWithListeners_thenCheckOperationsNotHanging() {
String key = generateKeyOwnedBy(h2);
map.get(key);
}


@Test
@Category(SlowTest.class)
public void testServerShouldNotCloseClientWhenClientOnlyListening() {
Config config = new Config();
int clientHeartbeatSeconds = 8;
config.setProperty(GroupProperty.CLIENT_HEARTBEAT_TIMEOUT_SECONDS.getName(), String.valueOf(clientHeartbeatSeconds));
HazelcastInstance instance = hazelcastFactory.newHazelcastInstance(config);
ClientConfig clientConfig = new ClientConfig();
clientConfig.setProperty(ClientProperty.HEARTBEAT_INTERVAL.getName(), "1000");
HazelcastInstance client = hazelcastFactory.newHazelcastClient(clientConfig);
HazelcastInstance instance2 = hazelcastFactory.newHazelcastInstance(config);
warmUpPartitions(instance, instance2);
String key = generateKeyOwnedBy(instance2);

final AtomicBoolean isClientDisconnected = new AtomicBoolean();
instance.getClientService().addClientListener(new ClientListener() {
@Override
public void clientConnected(Client client) {

}

@Override
public void clientDisconnected(Client client) {
isClientDisconnected.set(true);
}
});

ITopic topic = client.getTopic(key);
MessageListener listener = new MessageListener() {
public void onMessage(Message message) {
}
};
String id = topic.addMessageListener(listener);

ITopic<Object> instanceTopic = instance.getTopic(key);
long begin = System.currentTimeMillis();

while (System.currentTimeMillis() - begin < TimeUnit.SECONDS.toMillis(clientHeartbeatSeconds * 2)) {
instanceTopic.publish("message");
}

topic.removeMessageListener(id);
assertFalse(isClientDisconnected.get());
}
}

0 comments on commit ec5ab10

Please sign in to comment.