Skip to content

Commit

Permalink
Merge pull request #13684 from sancar/followUpTo/serverClosesListenin…
Browse files Browse the repository at this point in the history
…gClient/maint3.x

Fixing heartbeat resume condition
  • Loading branch information
mustafa sancar koyunlu committed Sep 4, 2018
2 parents b9cb772 + 23af8bf commit af34a2c
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 9 deletions.
Expand Up @@ -97,6 +97,12 @@ private void checkConnection(long now, final ClientConnection connection) {
connection.onHeartbeatFailed();
fireHeartbeatStopped(connection);
}
} else {
if (!connection.isHeartBeating()) {
logger.warning("Heartbeat is back to healthy for the connection: " + connection);
connection.onHeartbeatResumed();
fireHeartbeatResumed(connection);
}
}

if (now - connection.lastWriteTimeMillis() > heartbeatInterval) {
Expand All @@ -119,12 +125,6 @@ public void onFailure(Throwable t) {
}
}
});
} else {
if (!connection.isHeartBeating()) {
logger.warning("Heartbeat is back to healthy for the connection: " + connection);
connection.onHeartbeatResumed();
fireHeartbeatResumed(connection);
}
}
}

Expand Down
Expand Up @@ -103,7 +103,7 @@ public void heartbeatStopped(Connection connection) {
}

@Test
public void testHeartbeatResumedEvent() throws InterruptedException {
public void testHeartbeatResumedEvent() {
hazelcastFactory.newHazelcastInstance();
HazelcastInstance client = hazelcastFactory.newHazelcastClient(getClientConfig());
final HazelcastInstance instance2 = hazelcastFactory.newHazelcastInstance();
Expand All @@ -116,22 +116,25 @@ public void testHeartbeatResumedEvent() throws InterruptedException {
HazelcastClientInstanceImpl clientImpl = getHazelcastClientInstanceImpl(client);
final ClientConnectionManager connectionManager = clientImpl.getConnectionManager();
final CountDownLatch countDownLatch = new CountDownLatch(1);
final AtomicLong stoppedCount = new AtomicLong();
final AtomicLong resumedCount = new AtomicLong();
connectionManager.addConnectionHeartbeatListener(new ConnectionHeartbeatListener() {
@Override
public void heartbeatResumed(Connection connection) {
assertEquals(instance2.getCluster().getLocalMember().getAddress(), connection.getEndPoint());
countDownLatch.countDown();
resumedCount.incrementAndGet();
}

@Override
public void heartbeatStopped(Connection connection) {
stoppedCount.incrementAndGet();
}
});

assertTrueEventually(new AssertTask() {
@Override
public void run()
throws Exception {
public void run() {
assertNotNull(connectionManager.getActiveConnection(instance2.getCluster().getLocalMember().getAddress()));
}
});
Expand All @@ -141,6 +144,8 @@ public void run()
unblockMessagesFromInstance(instance2, client);

assertOpenEventually(countDownLatch);
assertEquals(1, resumedCount.get());
assertEquals(1, stoppedCount.get());
}

@Test
Expand Down

0 comments on commit af34a2c

Please sign in to comment.