diff --git a/hazelcast-client/src/main/java/com/hazelcast/client/connection/nio/HeartbeatManager.java b/hazelcast-client/src/main/java/com/hazelcast/client/connection/nio/HeartbeatManager.java index 405267d61390..e666de4f36f9 100644 --- a/hazelcast-client/src/main/java/com/hazelcast/client/connection/nio/HeartbeatManager.java +++ b/hazelcast-client/src/main/java/com/hazelcast/client/connection/nio/HeartbeatManager.java @@ -91,7 +91,7 @@ private void checkConnection(long now, final ClientConnection connection) { } } - if (now - connection.lastReadTimeMillis() > heartbeatInterval) { + if (now - connection.lastWriteTimeMillis() > heartbeatInterval) { ClientMessage request = ClientPingCodec.encodeRequest(); ClientInvocation clientInvocation = new ClientInvocation(client, request, null, connection); clientInvocation.invokeUrgent(); diff --git a/hazelcast-client/src/test/java/com/hazelcast/client/ClientRegressionWithMockNetworkTest.java b/hazelcast-client/src/test/java/com/hazelcast/client/ClientRegressionWithMockNetworkTest.java index 7400a164c0c0..75082ece2606 100644 --- a/hazelcast-client/src/test/java/com/hazelcast/client/ClientRegressionWithMockNetworkTest.java +++ b/hazelcast-client/src/test/java/com/hazelcast/client/ClientRegressionWithMockNetworkTest.java @@ -26,6 +26,8 @@ import com.hazelcast.config.ListenerConfig; import com.hazelcast.config.NearCacheConfig; import com.hazelcast.config.SerializationConfig; +import com.hazelcast.core.Client; +import com.hazelcast.core.ClientListener; import com.hazelcast.core.DistributedObject; import com.hazelcast.core.EntryAdapter; import com.hazelcast.core.EntryEvent; @@ -33,8 +35,11 @@ import com.hazelcast.core.IExecutorService; import com.hazelcast.core.ILock; import com.hazelcast.core.IMap; +import com.hazelcast.core.ITopic; import com.hazelcast.core.LifecycleEvent; import com.hazelcast.core.LifecycleListener; +import com.hazelcast.core.Message; +import com.hazelcast.core.MessageListener; import com.hazelcast.logging.Logger; import com.hazelcast.map.MapInterceptor; import com.hazelcast.map.listener.MapListener; @@ -46,12 +51,14 @@ import com.hazelcast.nio.serialization.PortableReader; import com.hazelcast.nio.serialization.PortableWriter; import com.hazelcast.security.UsernamePasswordCredentials; +import com.hazelcast.spi.properties.GroupProperty; import com.hazelcast.test.AssertTask; import com.hazelcast.test.HazelcastParallelClassRunner; import com.hazelcast.test.HazelcastTestSupport; import com.hazelcast.test.annotation.NightlyTest; import com.hazelcast.test.annotation.ParallelTest; import com.hazelcast.test.annotation.QuickTest; +import com.hazelcast.test.annotation.SlowTest; import org.junit.After; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -65,6 +72,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import static com.hazelcast.config.EvictionConfig.MaxSizePolicy.ENTRY_COUNT; import static com.hazelcast.core.LifecycleEvent.LifecycleState; @@ -835,4 +843,50 @@ public void testMemberAddedWithListeners_thenCheckOperationsNotHanging() { String key = generateKeyOwnedBy(h2); map.get(key); } + + + @Test + @Category(SlowTest.class) + public void testServerShouldNotCloseClientWhenClientOnlyListening() { + Config config = new Config(); + int clientHeartbeatSeconds = 8; + config.setProperty(GroupProperty.CLIENT_HEARTBEAT_TIMEOUT_SECONDS.getName(), String.valueOf(clientHeartbeatSeconds)); + HazelcastInstance instance = hazelcastFactory.newHazelcastInstance(config); + ClientConfig clientConfig = new ClientConfig(); + clientConfig.setProperty(ClientProperty.HEARTBEAT_INTERVAL.getName(), "1000"); + HazelcastInstance client = hazelcastFactory.newHazelcastClient(clientConfig); + HazelcastInstance instance2 = hazelcastFactory.newHazelcastInstance(config); + warmUpPartitions(instance, instance2); + String key = generateKeyOwnedBy(instance2); + + final AtomicBoolean isClientDisconnected = new AtomicBoolean(); + instance.getClientService().addClientListener(new ClientListener() { + @Override + public void clientConnected(Client client) { + + } + + @Override + public void clientDisconnected(Client client) { + isClientDisconnected.set(true); + } + }); + + ITopic topic = client.getTopic(key); + MessageListener listener = new MessageListener() { + public void onMessage(Message message) { + } + }; + String id = topic.addMessageListener(listener); + + ITopic instanceTopic = instance.getTopic(key); + long begin = System.currentTimeMillis(); + + while (System.currentTimeMillis() - begin < TimeUnit.SECONDS.toMillis(clientHeartbeatSeconds * 2)) { + instanceTopic.publish("message"); + } + + topic.removeMessageListener(id); + assertFalse(isClientDisconnected.get()); + } }