From c8c193fa9cb32af2de2ffd347b77ea67fc2fd420 Mon Sep 17 00:00:00 2001 From: sancar Date: Tue, 14 Aug 2018 16:44:54 +0300 Subject: [PATCH] Server should not close clients when client only listens Client was not sending Ping to server when last read time is up-to-date. When server constantly pushes events to client, client does not send any ping to server. And consequently, server closes client because it does not send any ping. Client logic has changed so that it send pings if there is no outgoing packages recently, rarther than looking at incoming packages. fixes https://github.com/hazelcast/hazelcast/issues/13576 --- .../connection/nio/HeartbeatManager.java | 2 +- .../ClientRegressionWithMockNetworkTest.java | 54 +++++++++++++++++++ 2 files changed, 55 insertions(+), 1 deletion(-) diff --git a/hazelcast-client/src/main/java/com/hazelcast/client/connection/nio/HeartbeatManager.java b/hazelcast-client/src/main/java/com/hazelcast/client/connection/nio/HeartbeatManager.java index 405267d61390..e666de4f36f9 100644 --- a/hazelcast-client/src/main/java/com/hazelcast/client/connection/nio/HeartbeatManager.java +++ b/hazelcast-client/src/main/java/com/hazelcast/client/connection/nio/HeartbeatManager.java @@ -91,7 +91,7 @@ private void checkConnection(long now, final ClientConnection connection) { } } - if (now - connection.lastReadTimeMillis() > heartbeatInterval) { + if (now - connection.lastWriteTimeMillis() > heartbeatInterval) { ClientMessage request = ClientPingCodec.encodeRequest(); ClientInvocation clientInvocation = new ClientInvocation(client, request, null, connection); clientInvocation.invokeUrgent(); diff --git a/hazelcast-client/src/test/java/com/hazelcast/client/ClientRegressionWithMockNetworkTest.java b/hazelcast-client/src/test/java/com/hazelcast/client/ClientRegressionWithMockNetworkTest.java index 7400a164c0c0..75082ece2606 100644 --- a/hazelcast-client/src/test/java/com/hazelcast/client/ClientRegressionWithMockNetworkTest.java +++ b/hazelcast-client/src/test/java/com/hazelcast/client/ClientRegressionWithMockNetworkTest.java @@ -26,6 +26,8 @@ import com.hazelcast.config.ListenerConfig; import com.hazelcast.config.NearCacheConfig; import com.hazelcast.config.SerializationConfig; +import com.hazelcast.core.Client; +import com.hazelcast.core.ClientListener; import com.hazelcast.core.DistributedObject; import com.hazelcast.core.EntryAdapter; import com.hazelcast.core.EntryEvent; @@ -33,8 +35,11 @@ import com.hazelcast.core.IExecutorService; import com.hazelcast.core.ILock; import com.hazelcast.core.IMap; +import com.hazelcast.core.ITopic; import com.hazelcast.core.LifecycleEvent; import com.hazelcast.core.LifecycleListener; +import com.hazelcast.core.Message; +import com.hazelcast.core.MessageListener; import com.hazelcast.logging.Logger; import com.hazelcast.map.MapInterceptor; import com.hazelcast.map.listener.MapListener; @@ -46,12 +51,14 @@ import com.hazelcast.nio.serialization.PortableReader; import com.hazelcast.nio.serialization.PortableWriter; import com.hazelcast.security.UsernamePasswordCredentials; +import com.hazelcast.spi.properties.GroupProperty; import com.hazelcast.test.AssertTask; import com.hazelcast.test.HazelcastParallelClassRunner; import com.hazelcast.test.HazelcastTestSupport; import com.hazelcast.test.annotation.NightlyTest; import com.hazelcast.test.annotation.ParallelTest; import com.hazelcast.test.annotation.QuickTest; +import com.hazelcast.test.annotation.SlowTest; import org.junit.After; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -65,6 +72,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import static com.hazelcast.config.EvictionConfig.MaxSizePolicy.ENTRY_COUNT; import static com.hazelcast.core.LifecycleEvent.LifecycleState; @@ -835,4 +843,50 @@ public void testMemberAddedWithListeners_thenCheckOperationsNotHanging() { String key = generateKeyOwnedBy(h2); map.get(key); } + + + @Test + @Category(SlowTest.class) + public void testServerShouldNotCloseClientWhenClientOnlyListening() { + Config config = new Config(); + int clientHeartbeatSeconds = 8; + config.setProperty(GroupProperty.CLIENT_HEARTBEAT_TIMEOUT_SECONDS.getName(), String.valueOf(clientHeartbeatSeconds)); + HazelcastInstance instance = hazelcastFactory.newHazelcastInstance(config); + ClientConfig clientConfig = new ClientConfig(); + clientConfig.setProperty(ClientProperty.HEARTBEAT_INTERVAL.getName(), "1000"); + HazelcastInstance client = hazelcastFactory.newHazelcastClient(clientConfig); + HazelcastInstance instance2 = hazelcastFactory.newHazelcastInstance(config); + warmUpPartitions(instance, instance2); + String key = generateKeyOwnedBy(instance2); + + final AtomicBoolean isClientDisconnected = new AtomicBoolean(); + instance.getClientService().addClientListener(new ClientListener() { + @Override + public void clientConnected(Client client) { + + } + + @Override + public void clientDisconnected(Client client) { + isClientDisconnected.set(true); + } + }); + + ITopic topic = client.getTopic(key); + MessageListener listener = new MessageListener() { + public void onMessage(Message message) { + } + }; + String id = topic.addMessageListener(listener); + + ITopic instanceTopic = instance.getTopic(key); + long begin = System.currentTimeMillis(); + + while (System.currentTimeMillis() - begin < TimeUnit.SECONDS.toMillis(clientHeartbeatSeconds * 2)) { + instanceTopic.publish("message"); + } + + topic.removeMessageListener(id); + assertFalse(isClientDisconnected.get()); + } }