Skip to content

Commit

Permalink
Server should not close clients when client only listens
Browse files Browse the repository at this point in the history
Client was not sending Ping to server when last read time is
up-to-date. When server constantly pushes events to client,
client does not send any ping to server. And consequently,
server closes client because it does not send any ping.

Client logic has changed so that it send pings if there is no
outgoing packages recently, rarther than looking at incoming
packages.

fixes hazelcast#13576
  • Loading branch information
sancar committed Aug 14, 2018
1 parent 732af03 commit 9bb70fa
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 1 deletion.
Expand Up @@ -91,7 +91,7 @@ private void checkConnection(long now, final ClientConnection connection) {
}
}

if (now - connection.lastReadTimeMillis() > heartbeatInterval) {
if (now - connection.lastWriteTimeMillis() > heartbeatInterval) {
ClientMessage request = ClientPingCodec.encodeRequest();
ClientInvocation clientInvocation = new ClientInvocation(client, request, null, connection);
clientInvocation.invokeUrgent();
Expand Down
Expand Up @@ -33,8 +33,11 @@
import com.hazelcast.core.IExecutorService;
import com.hazelcast.core.ILock;
import com.hazelcast.core.IMap;
import com.hazelcast.core.ITopic;
import com.hazelcast.core.LifecycleEvent;
import com.hazelcast.core.LifecycleListener;
import com.hazelcast.core.Message;
import com.hazelcast.core.MessageListener;
import com.hazelcast.logging.Logger;
import com.hazelcast.map.MapInterceptor;
import com.hazelcast.map.listener.MapListener;
Expand All @@ -46,12 +49,14 @@
import com.hazelcast.nio.serialization.PortableReader;
import com.hazelcast.nio.serialization.PortableWriter;
import com.hazelcast.security.UsernamePasswordCredentials;
import com.hazelcast.spi.properties.GroupProperty;
import com.hazelcast.test.AssertTask;
import com.hazelcast.test.HazelcastParallelClassRunner;
import com.hazelcast.test.HazelcastTestSupport;
import com.hazelcast.test.annotation.NightlyTest;
import com.hazelcast.test.annotation.ParallelTest;
import com.hazelcast.test.annotation.QuickTest;
import com.hazelcast.test.annotation.SlowTest;
import org.junit.After;
import org.junit.Test;
import org.junit.experimental.categories.Category;
Expand All @@ -65,6 +70,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import static com.hazelcast.config.EvictionConfig.MaxSizePolicy.ENTRY_COUNT;
import static com.hazelcast.core.LifecycleEvent.LifecycleState;
Expand Down Expand Up @@ -835,4 +841,47 @@ public void testMemberAddedWithListeners_thenCheckOperationsNotHanging() {
String key = generateKeyOwnedBy(h2);
map.get(key);
}


@Test
@Category(SlowTest.class)
public void testServerShouldNotCloseClientWhenClientOnlyListening() {
Config config = new Config();
int clientHeartbeatSeconds = 8;
config.setProperty(GroupProperty.CLIENT_HEARTBEAT_TIMEOUT_SECONDS.getName(), String.valueOf(clientHeartbeatSeconds));
HazelcastInstance instance = hazelcastFactory.newHazelcastInstance(config);
ClientConfig clientConfig = new ClientConfig();
clientConfig.setProperty(ClientProperty.HEARTBEAT_INTERVAL.getName(), "1000");
HazelcastInstance client = hazelcastFactory.newHazelcastClient(clientConfig);
HazelcastInstance instance2 = hazelcastFactory.newHazelcastInstance(config);
warmUpPartitions(instance, instance2);
String key = generateKeyOwnedBy(instance2);


ITopic topic = client.getTopic(key);
final AtomicLong receiveCount = new AtomicLong();
MessageListener listener = new MessageListener() {
public void onMessage(Message message) {
receiveCount.incrementAndGet();
}
};
String id = topic.addMessageListener(listener);

ITopic<Object> instanceTopic = instance.getTopic(key);
long begin = System.currentTimeMillis();

final AtomicLong publishCount = new AtomicLong();
while (System.currentTimeMillis() - begin < TimeUnit.SECONDS.toMillis(clientHeartbeatSeconds * 2)) {
publishCount.incrementAndGet();
instanceTopic.publish("message");
}

assertTrueEventually(new AssertTask() {
@Override
public void run() {
assertEquals(publishCount.get(), receiveCount.get());
}
});
assertTrue(topic.removeMessageListener(id));
}
}

0 comments on commit 9bb70fa

Please sign in to comment.