Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing heartbeat resume condition #13684

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -97,6 +97,12 @@ private void checkConnection(long now, final ClientConnection connection) {
connection.onHeartbeatFailed();
fireHeartbeatStopped(connection);
}
} else {
if (!connection.isHeartBeating()) {
logger.warning("Heartbeat is back to healthy for the connection: " + connection);
connection.onHeartbeatResumed();
fireHeartbeatResumed(connection);
}
}

if (now - connection.lastWriteTimeMillis() > heartbeatInterval) {
Expand All @@ -119,12 +125,6 @@ public void onFailure(Throwable t) {
}
}
});
} else {
if (!connection.isHeartBeating()) {
logger.warning("Heartbeat is back to healthy for the connection: " + connection);
connection.onHeartbeatResumed();
fireHeartbeatResumed(connection);
}
}
}

Expand Down
Expand Up @@ -103,7 +103,7 @@ public void heartbeatStopped(Connection connection) {
}

@Test
public void testHeartbeatResumedEvent() throws InterruptedException {
public void testHeartbeatResumedEvent() {
hazelcastFactory.newHazelcastInstance();
HazelcastInstance client = hazelcastFactory.newHazelcastClient(getClientConfig());
final HazelcastInstance instance2 = hazelcastFactory.newHazelcastInstance();
Expand All @@ -116,22 +116,25 @@ public void testHeartbeatResumedEvent() throws InterruptedException {
HazelcastClientInstanceImpl clientImpl = getHazelcastClientInstanceImpl(client);
final ClientConnectionManager connectionManager = clientImpl.getConnectionManager();
final CountDownLatch countDownLatch = new CountDownLatch(1);
final AtomicLong stoppedCount = new AtomicLong();
final AtomicLong resumedCount = new AtomicLong();
connectionManager.addConnectionHeartbeatListener(new ConnectionHeartbeatListener() {
@Override
public void heartbeatResumed(Connection connection) {
assertEquals(instance2.getCluster().getLocalMember().getAddress(), connection.getEndPoint());
countDownLatch.countDown();
resumedCount.incrementAndGet();
}

@Override
public void heartbeatStopped(Connection connection) {
stoppedCount.incrementAndGet();
}
});

assertTrueEventually(new AssertTask() {
@Override
public void run()
throws Exception {
public void run() {
assertNotNull(connectionManager.getActiveConnection(instance2.getCluster().getLocalMember().getAddress()));
}
});
Expand All @@ -141,6 +144,8 @@ public void run()
unblockMessagesFromInstance(instance2, client);

assertOpenEventually(countDownLatch);
assertEquals(1, resumedCount.get());
assertEquals(1, stoppedCount.get());
}

@Test
Expand Down