Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Server should not close clients when client only listens #13577

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -91,7 +91,7 @@ private void checkConnection(long now, final ClientConnection connection) {
}
}

if (now - connection.lastReadTimeMillis() > heartbeatInterval) {
if (now - connection.lastWriteTimeMillis() > heartbeatInterval) {
ClientMessage request = ClientPingCodec.encodeRequest();
ClientInvocation clientInvocation = new ClientInvocation(client, request, null, connection);
clientInvocation.invokeUrgent();
Expand Down
Expand Up @@ -26,15 +26,20 @@
import com.hazelcast.config.ListenerConfig;
import com.hazelcast.config.NearCacheConfig;
import com.hazelcast.config.SerializationConfig;
import com.hazelcast.core.Client;
import com.hazelcast.core.ClientListener;
import com.hazelcast.core.DistributedObject;
import com.hazelcast.core.EntryAdapter;
import com.hazelcast.core.EntryEvent;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.IExecutorService;
import com.hazelcast.core.ILock;
import com.hazelcast.core.IMap;
import com.hazelcast.core.ITopic;
import com.hazelcast.core.LifecycleEvent;
import com.hazelcast.core.LifecycleListener;
import com.hazelcast.core.Message;
import com.hazelcast.core.MessageListener;
import com.hazelcast.logging.Logger;
import com.hazelcast.map.MapInterceptor;
import com.hazelcast.map.listener.MapListener;
Expand All @@ -46,12 +51,14 @@
import com.hazelcast.nio.serialization.PortableReader;
import com.hazelcast.nio.serialization.PortableWriter;
import com.hazelcast.security.UsernamePasswordCredentials;
import com.hazelcast.spi.properties.GroupProperty;
import com.hazelcast.test.AssertTask;
import com.hazelcast.test.HazelcastParallelClassRunner;
import com.hazelcast.test.HazelcastTestSupport;
import com.hazelcast.test.annotation.NightlyTest;
import com.hazelcast.test.annotation.ParallelTest;
import com.hazelcast.test.annotation.QuickTest;
import com.hazelcast.test.annotation.SlowTest;
import org.junit.After;
import org.junit.Test;
import org.junit.experimental.categories.Category;
Expand All @@ -65,6 +72,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import static com.hazelcast.config.EvictionConfig.MaxSizePolicy.ENTRY_COUNT;
import static com.hazelcast.core.LifecycleEvent.LifecycleState;
Expand Down Expand Up @@ -835,4 +843,50 @@ public void testMemberAddedWithListeners_thenCheckOperationsNotHanging() {
String key = generateKeyOwnedBy(h2);
map.get(key);
}


@Test
@Category(SlowTest.class)
public void testServerShouldNotCloseClientWhenClientOnlyListening() {
Config config = new Config();
int clientHeartbeatSeconds = 8;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why 8 secs and not a shorter duration? This seems to cause it to be a slow test.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When it is shorter it could fail when environment is slow. That is why I wanted to keep it longer.

config.setProperty(GroupProperty.CLIENT_HEARTBEAT_TIMEOUT_SECONDS.getName(), String.valueOf(clientHeartbeatSeconds));
HazelcastInstance instance = hazelcastFactory.newHazelcastInstance(config);
ClientConfig clientConfig = new ClientConfig();
clientConfig.setProperty(ClientProperty.HEARTBEAT_INTERVAL.getName(), "1000");
HazelcastInstance client = hazelcastFactory.newHazelcastClient(clientConfig);
HazelcastInstance instance2 = hazelcastFactory.newHazelcastInstance(config);
warmUpPartitions(instance, instance2);
String key = generateKeyOwnedBy(instance2);

final AtomicBoolean isClientDisconnected = new AtomicBoolean();
instance.getClientService().addClientListener(new ClientListener() {
@Override
public void clientConnected(Client client) {

}

@Override
public void clientDisconnected(Client client) {
isClientDisconnected.set(true);
}
});

ITopic topic = client.getTopic(key);
MessageListener listener = new MessageListener() {
public void onMessage(Message message) {
}
};
String id = topic.addMessageListener(listener);

ITopic<Object> instanceTopic = instance.getTopic(key);
long begin = System.currentTimeMillis();

while (System.currentTimeMillis() - begin < TimeUnit.SECONDS.toMillis(clientHeartbeatSeconds * 2)) {
instanceTopic.publish("message");
}

topic.removeMessageListener(id);
assertFalse(isClientDisconnected.get());
}
}