Skip to content

Commit

Permalink
Fixes client not sending pings when server continuosly pushes messages
Browse files Browse the repository at this point in the history
  • Loading branch information
mustafaiman committed Jan 3, 2019
1 parent 77ac934 commit 11945e1
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 18 deletions.
6 changes: 3 additions & 3 deletions src/HeartbeatService.ts
Expand Up @@ -71,14 +71,14 @@ export class Heartbeat {
for (const address in estConnections) {
if (estConnections[address]) {
const conn = estConnections[address];
const timeSinceLastRead = new Date().getTime() - conn.getLastRead();
if (timeSinceLastRead > this.heartbeatTimeout) {
const now = Date.now();
if (now - conn.getLastReadTimeMillis() > this.heartbeatTimeout) {
if (conn.isHeartbeating()) {
conn.setHeartbeating(false);
this.onHeartbeatStopped(conn);
}
}
if (timeSinceLastRead > this.heartbeatInterval) {
if (now - conn.getLastWriteTimeMillis() > this.heartbeatInterval) {
const req = ClientPingCodec.encodeRequest();
this.client.getInvocationService().invokeOnConnection(conn, req)
.catch((error) => {
Expand Down
16 changes: 11 additions & 5 deletions src/invocation/ClientConnection.ts
Expand Up @@ -26,7 +26,8 @@ import {DeferredPromise} from '../Util';
export class ClientConnection {
private address: Address;
private readonly localAddress: Address;
private lastRead: number;
private lastReadTimeMillis: number;
private lastWriteTimeMillis: number;
private heartbeating = true;
private client: HazelcastClient;
private readBuffer: Buffer;
Expand All @@ -43,7 +44,7 @@ export class ClientConnection {
this.address = address;
this.localAddress = new Address(socket.localAddress, socket.localPort);
this.readBuffer = new Buffer(0);
this.lastRead = 0;
this.lastReadTimeMillis = 0;
this.closedTime = 0;
this.connectedServerVersionString = null;
this.connectedServerVersion = BuildInfo.UNKNOWN_VERSION_ID;
Expand Down Expand Up @@ -76,6 +77,7 @@ export class ClientConnection {
if (err) {
deferred.reject(new IOError(err));
} else {
this.lastWriteTimeMillis = Date.now();
deferred.resolve();
}
});
Expand Down Expand Up @@ -126,8 +128,12 @@ export class ClientConnection {
return this.startTime;
}

getLastRead(): number {
return this.lastRead;
getLastReadTimeMillis(): number {
return this.lastReadTimeMillis;
}

getLastWriteTimeMillis(): number {
return this.lastWriteTimeMillis;
}

toString(): string {
Expand All @@ -140,7 +146,7 @@ export class ClientConnection {
*/
registerResponseCallback(callback: Function): void {
this.socket.on('data', (buffer: Buffer) => {
this.lastRead = new Date().getTime();
this.lastReadTimeMillis = new Date().getTime();
this.readBuffer = Buffer.concat([this.readBuffer, buffer], this.readBuffer.length + buffer.length);
while (this.readBuffer.length >= BitsUtil.INT_SIZE_IN_BYTES) {
const frameSize = this.readBuffer.readInt32LE(0);
Expand Down
92 changes: 92 additions & 0 deletions test/heartbeat/HeartbeatFromClientTest.js
@@ -0,0 +1,92 @@
/*
* Copyright (c) 2008-2018, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

var RC = require('../RC');
var HazelcastClient = require('../../').Client;
var expect = require('chai').expect;
var Config = require('../../').Config;
var Util = require('../Util');
var fs = require('fs');

describe('Heartbeat', function () {
this.timeout(30000);

var cluster;

beforeEach(function () {
var serverConfig = fs.readFileSync(__dirname + '/short-heartbeat.xml', 'utf8');
return RC.createCluster(null, serverConfig).then(function (resp) {
cluster = resp;
});
});

afterEach(function () {
return RC.shutdownCluster(cluster.id);
});

it('client sends heartbeat periodically even when server continuously pushes messages', function () {
var MAP_NAME = 'testmap';
var member;
var client1;
var client2;
var connectionClosedEventCount = 0;

var mapFromClient1;
var mapFromClient2;
var pushTask;

var clientConfig = new Config.ClientConfig();
clientConfig.properties['hazelcast.client.heartbeat.interval'] = 1000;
return RC.startMember(cluster.id).then(function (m) {
member = m;
return HazelcastClient.newHazelcastClient(clientConfig);
}).then(function (c) {
client1 = c;
client1.getConnectionManager().on('connectionClosed', function () {
connectionClosedEventCount++;
});
return HazelcastClient.newHazelcastClient(clientConfig);
}).then(function (c) {
client2 = c;
return client1.getMap(MAP_NAME);
}).then(function (m) {
mapFromClient1 = m;
return mapFromClient1.addEntryListener({
added: function () {
//no-op
},
updated: function () {
//no-op
}
})
}).then(function () {
return client2.getMap(MAP_NAME);
}).then(function (m) {
var counter = 0;
mapFromClient2 = m;
pushTask = setInterval(function () {
mapFromClient2.put('testkey', counter++);
}, 1000);
return Util.promiseLater(15000, function () {});
}).then(function () {
clearInterval(pushTask);
expect(connectionClosedEventCount).to.equal(0);
});
});
});



24 changes: 14 additions & 10 deletions test/HeartbeatTest.js → test/heartbeat/HeartbeatFromServerTest.js
Expand Up @@ -14,12 +14,12 @@
* limitations under the License.
*/

var RC = require('./RC');
var HazelcastClient = require('../.').Client;
var expect = require('chai').expect;
var Config = require('../.').Config;
var Util = require('./Util');
var Address = require('../.').Address;
var RC = require('../RC');
var HazelcastClient = require('../../').Client;
var Config = require('../../').Config;
var Util = require('../Util');
var DeferredPromise = require('../../lib/Util').DeferredPromise;
var Address = require('../../').Address;

describe('Heartbeat', function () {
this.timeout(50000);
Expand All @@ -38,6 +38,7 @@ describe('Heartbeat', function () {

it('Heartbeat stopped fired when second member stops heartbeating', function (done) {
var client;
var memberAddedPromise = new DeferredPromise();
RC.startMember(cluster.id).then(function () {
var cfg = new Config.ClientConfig();
cfg.properties['hazelcast.client.heartbeat.interval'] = 500;
Expand All @@ -50,8 +51,9 @@ describe('Heartbeat', function () {
memberAdded: function (membershipEvent) {
var address = new Address(membershipEvent.member.address.host, membershipEvent.member.address.port);
warmUpConnectionToAddressWithRetry(client, address);
memberAddedPromise.resolve();
}
}
};

client.clusterService.addMembershipListener(membershipListener);
client.heartbeat.addListener({
Expand All @@ -63,7 +65,9 @@ describe('Heartbeat', function () {
}).then(function () {
return RC.startMember(cluster.id);
}).then(function (member2) {
simulateHeartbeatLost(client, member2.host + ':' + member2.port, 2000);
return memberAddedPromise.promise.then(function () {
simulateHeartbeatLost(client, member2.host + ':' + member2.port, 2000);
})
}).catch(done);
});

Expand All @@ -85,7 +89,7 @@ describe('Heartbeat', function () {
simulateHeartbeatLost(client, membershipEvent.member.address, 2000);
}).catch(done);
}
}
};

client.clusterService.addMembershipListener(membershipListener);
client.heartbeat.addListener({
Expand Down Expand Up @@ -124,7 +128,7 @@ describe('Heartbeat', function () {
});

function simulateHeartbeatLost(client, address, timeout) {
client.connectionManager.establishedConnections[address].lastRead = client.connectionManager.establishedConnections[address].lastRead - timeout;
client.connectionManager.establishedConnections[address].lastReadTimeMillis = client.connectionManager.establishedConnections[address].getLastReadTimeMillis() - timeout;
}

function warmUpConnectionToAddressWithRetry(client, address, retryCount) {
Expand Down
23 changes: 23 additions & 0 deletions test/heartbeat/short-heartbeat.xml
@@ -0,0 +1,23 @@
<!--
~ Copyright (c) 2008-2018, Hazelcast, Inc. All Rights Reserved.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->

<hazelcast xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.hazelcast.com/schema/config hazelcast-config-3.11.xsd"
xmlns="http://www.hazelcast.com/schema/config">
<properties>
<property name="hazelcast.client.max.no.heartbeat.seconds">8</property>
</properties>
</hazelcast>

0 comments on commit 11945e1

Please sign in to comment.