From 9bb70fa34594b21caaf03808d82f0999c1421c91 Mon Sep 17 00:00:00 2001 From: sancar Date: Tue, 14 Aug 2018 16:44:54 +0300 Subject: [PATCH] Server should not close clients when client only listens Client was not sending Ping to server when last read time is up-to-date. When server constantly pushes events to client, client does not send any ping to server. And consequently, server closes client because it does not send any ping. Client logic has changed so that it send pings if there is no outgoing packages recently, rarther than looking at incoming packages. fixes https://github.com/hazelcast/hazelcast/issues/13576 --- .../connection/nio/HeartbeatManager.java | 2 +- .../ClientRegressionWithMockNetworkTest.java | 49 +++++++++++++++++++ 2 files changed, 50 insertions(+), 1 deletion(-) diff --git a/hazelcast-client/src/main/java/com/hazelcast/client/connection/nio/HeartbeatManager.java b/hazelcast-client/src/main/java/com/hazelcast/client/connection/nio/HeartbeatManager.java index 405267d613902..e666de4f36f9e 100644 --- a/hazelcast-client/src/main/java/com/hazelcast/client/connection/nio/HeartbeatManager.java +++ b/hazelcast-client/src/main/java/com/hazelcast/client/connection/nio/HeartbeatManager.java @@ -91,7 +91,7 @@ private void checkConnection(long now, final ClientConnection connection) { } } - if (now - connection.lastReadTimeMillis() > heartbeatInterval) { + if (now - connection.lastWriteTimeMillis() > heartbeatInterval) { ClientMessage request = ClientPingCodec.encodeRequest(); ClientInvocation clientInvocation = new ClientInvocation(client, request, null, connection); clientInvocation.invokeUrgent(); diff --git a/hazelcast-client/src/test/java/com/hazelcast/client/ClientRegressionWithMockNetworkTest.java b/hazelcast-client/src/test/java/com/hazelcast/client/ClientRegressionWithMockNetworkTest.java index 7400a164c0c05..ac1667fc1030c 100644 --- a/hazelcast-client/src/test/java/com/hazelcast/client/ClientRegressionWithMockNetworkTest.java +++ b/hazelcast-client/src/test/java/com/hazelcast/client/ClientRegressionWithMockNetworkTest.java @@ -33,8 +33,11 @@ import com.hazelcast.core.IExecutorService; import com.hazelcast.core.ILock; import com.hazelcast.core.IMap; +import com.hazelcast.core.ITopic; import com.hazelcast.core.LifecycleEvent; import com.hazelcast.core.LifecycleListener; +import com.hazelcast.core.Message; +import com.hazelcast.core.MessageListener; import com.hazelcast.logging.Logger; import com.hazelcast.map.MapInterceptor; import com.hazelcast.map.listener.MapListener; @@ -46,12 +49,14 @@ import com.hazelcast.nio.serialization.PortableReader; import com.hazelcast.nio.serialization.PortableWriter; import com.hazelcast.security.UsernamePasswordCredentials; +import com.hazelcast.spi.properties.GroupProperty; import com.hazelcast.test.AssertTask; import com.hazelcast.test.HazelcastParallelClassRunner; import com.hazelcast.test.HazelcastTestSupport; import com.hazelcast.test.annotation.NightlyTest; import com.hazelcast.test.annotation.ParallelTest; import com.hazelcast.test.annotation.QuickTest; +import com.hazelcast.test.annotation.SlowTest; import org.junit.After; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -65,6 +70,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import static com.hazelcast.config.EvictionConfig.MaxSizePolicy.ENTRY_COUNT; import static com.hazelcast.core.LifecycleEvent.LifecycleState; @@ -835,4 +841,47 @@ public void testMemberAddedWithListeners_thenCheckOperationsNotHanging() { String key = generateKeyOwnedBy(h2); map.get(key); } + + + @Test + @Category(SlowTest.class) + public void testServerShouldNotCloseClientWhenClientOnlyListening() { + Config config = new Config(); + int clientHeartbeatSeconds = 8; + config.setProperty(GroupProperty.CLIENT_HEARTBEAT_TIMEOUT_SECONDS.getName(), String.valueOf(clientHeartbeatSeconds)); + HazelcastInstance instance = hazelcastFactory.newHazelcastInstance(config); + ClientConfig clientConfig = new ClientConfig(); + clientConfig.setProperty(ClientProperty.HEARTBEAT_INTERVAL.getName(), "1000"); + HazelcastInstance client = hazelcastFactory.newHazelcastClient(clientConfig); + HazelcastInstance instance2 = hazelcastFactory.newHazelcastInstance(config); + warmUpPartitions(instance, instance2); + String key = generateKeyOwnedBy(instance2); + + + ITopic topic = client.getTopic(key); + final AtomicLong receiveCount = new AtomicLong(); + MessageListener listener = new MessageListener() { + public void onMessage(Message message) { + receiveCount.incrementAndGet(); + } + }; + String id = topic.addMessageListener(listener); + + ITopic instanceTopic = instance.getTopic(key); + long begin = System.currentTimeMillis(); + + final AtomicLong publishCount = new AtomicLong(); + while (System.currentTimeMillis() - begin < TimeUnit.SECONDS.toMillis(clientHeartbeatSeconds * 2)) { + publishCount.incrementAndGet(); + instanceTopic.publish("message"); + } + + assertTrueEventually(new AssertTask() { + @Override + public void run() { + assertEquals(publishCount.get(), receiveCount.get()); + } + }); + assertTrue(topic.removeMessageListener(id)); + } }