diff --git a/src/HeartbeatService.ts b/src/HeartbeatService.ts index f13287277..cfdd255b2 100644 --- a/src/HeartbeatService.ts +++ b/src/HeartbeatService.ts @@ -71,14 +71,14 @@ export class Heartbeat { for (const address in estConnections) { if (estConnections[address]) { const conn = estConnections[address]; - const timeSinceLastRead = new Date().getTime() - conn.getLastRead(); - if (timeSinceLastRead > this.heartbeatTimeout) { + const now = Date.now(); + if (now - conn.getLastReadTimeMillis() > this.heartbeatTimeout) { if (conn.isHeartbeating()) { conn.setHeartbeating(false); this.onHeartbeatStopped(conn); } } - if (timeSinceLastRead > this.heartbeatInterval) { + if (now - conn.getLastWriteTimeMillis() > this.heartbeatInterval) { const req = ClientPingCodec.encodeRequest(); this.client.getInvocationService().invokeOnConnection(conn, req) .catch((error) => { diff --git a/src/invocation/ClientConnection.ts b/src/invocation/ClientConnection.ts index c42afeeb9..3df38c35a 100644 --- a/src/invocation/ClientConnection.ts +++ b/src/invocation/ClientConnection.ts @@ -26,7 +26,8 @@ import {DeferredPromise} from '../Util'; export class ClientConnection { private address: Address; private readonly localAddress: Address; - private lastRead: number; + private lastReadTimeMillis: number; + private lastWriteTimeMillis: number; private heartbeating = true; private client: HazelcastClient; private readBuffer: Buffer; @@ -43,7 +44,7 @@ export class ClientConnection { this.address = address; this.localAddress = new Address(socket.localAddress, socket.localPort); this.readBuffer = new Buffer(0); - this.lastRead = 0; + this.lastReadTimeMillis = 0; this.closedTime = 0; this.connectedServerVersionString = null; this.connectedServerVersion = BuildInfo.UNKNOWN_VERSION_ID; @@ -76,6 +77,7 @@ export class ClientConnection { if (err) { deferred.reject(new IOError(err)); } else { + this.lastWriteTimeMillis = Date.now(); deferred.resolve(); } }); @@ -126,8 +128,12 @@ export class ClientConnection { return this.startTime; } - getLastRead(): number { - return this.lastRead; + getLastReadTimeMillis(): number { + return this.lastReadTimeMillis; + } + + getLastWriteTimeMillis(): number { + return this.lastWriteTimeMillis; } toString(): string { @@ -140,7 +146,7 @@ export class ClientConnection { */ registerResponseCallback(callback: Function): void { this.socket.on('data', (buffer: Buffer) => { - this.lastRead = new Date().getTime(); + this.lastReadTimeMillis = new Date().getTime(); this.readBuffer = Buffer.concat([this.readBuffer, buffer], this.readBuffer.length + buffer.length); while (this.readBuffer.length >= BitsUtil.INT_SIZE_IN_BYTES) { const frameSize = this.readBuffer.readInt32LE(0); diff --git a/test/heartbeat/HeartbeatFromClientTest.js b/test/heartbeat/HeartbeatFromClientTest.js new file mode 100644 index 000000000..1b6ee0af9 --- /dev/null +++ b/test/heartbeat/HeartbeatFromClientTest.js @@ -0,0 +1,92 @@ +/* + * Copyright (c) 2008-2018, Hazelcast, Inc. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +var RC = require('../RC'); +var HazelcastClient = require('../../').Client; +var expect = require('chai').expect; +var Config = require('../../').Config; +var Util = require('../Util'); +var fs = require('fs'); + +describe('Heartbeat', function () { + this.timeout(30000); + + var cluster; + + beforeEach(function () { + var serverConfig = fs.readFileSync(__dirname + '/short-heartbeat.xml', 'utf8'); + return RC.createCluster(null, serverConfig).then(function (resp) { + cluster = resp; + }); + }); + + afterEach(function () { + return RC.shutdownCluster(cluster.id); + }); + + it('client sends heartbeat periodically even when server continuously pushes messages', function () { + var MAP_NAME = 'testmap'; + var member; + var client1; + var client2; + var connectionClosedEventCount = 0; + + var mapFromClient1; + var mapFromClient2; + var pushTask; + + var clientConfig = new Config.ClientConfig(); + clientConfig.properties['hazelcast.client.heartbeat.interval'] = 1000; + return RC.startMember(cluster.id).then(function (m) { + member = m; + return HazelcastClient.newHazelcastClient(clientConfig); + }).then(function (c) { + client1 = c; + client1.getConnectionManager().on('connectionClosed', function () { + connectionClosedEventCount++; + }); + return HazelcastClient.newHazelcastClient(clientConfig); + }).then(function (c) { + client2 = c; + return client1.getMap(MAP_NAME); + }).then(function (m) { + mapFromClient1 = m; + return mapFromClient1.addEntryListener({ + added: function () { + //no-op + }, + updated: function () { + //no-op + } + }) + }).then(function () { + return client2.getMap(MAP_NAME); + }).then(function (m) { + var counter = 0; + mapFromClient2 = m; + pushTask = setInterval(function () { + mapFromClient2.put('testkey', counter++); + }, 1000); + return Util.promiseLater(15000, function () {}); + }).then(function () { + clearInterval(pushTask); + expect(connectionClosedEventCount).to.equal(0); + }); + }); +}); + + + diff --git a/test/HeartbeatTest.js b/test/heartbeat/HeartbeatFromServerTest.js similarity index 88% rename from test/HeartbeatTest.js rename to test/heartbeat/HeartbeatFromServerTest.js index 0778dd9cf..2bbecc62b 100644 --- a/test/HeartbeatTest.js +++ b/test/heartbeat/HeartbeatFromServerTest.js @@ -14,12 +14,12 @@ * limitations under the License. */ -var RC = require('./RC'); -var HazelcastClient = require('../.').Client; -var expect = require('chai').expect; -var Config = require('../.').Config; -var Util = require('./Util'); -var Address = require('../.').Address; +var RC = require('../RC'); +var HazelcastClient = require('../../').Client; +var Config = require('../../').Config; +var Util = require('../Util'); +var DeferredPromise = require('../../lib/Util').DeferredPromise; +var Address = require('../../').Address; describe('Heartbeat', function () { this.timeout(50000); @@ -38,6 +38,7 @@ describe('Heartbeat', function () { it('Heartbeat stopped fired when second member stops heartbeating', function (done) { var client; + var memberAddedPromise = new DeferredPromise(); RC.startMember(cluster.id).then(function () { var cfg = new Config.ClientConfig(); cfg.properties['hazelcast.client.heartbeat.interval'] = 500; @@ -50,8 +51,9 @@ describe('Heartbeat', function () { memberAdded: function (membershipEvent) { var address = new Address(membershipEvent.member.address.host, membershipEvent.member.address.port); warmUpConnectionToAddressWithRetry(client, address); + memberAddedPromise.resolve(); } - } + }; client.clusterService.addMembershipListener(membershipListener); client.heartbeat.addListener({ @@ -63,7 +65,9 @@ describe('Heartbeat', function () { }).then(function () { return RC.startMember(cluster.id); }).then(function (member2) { - simulateHeartbeatLost(client, member2.host + ':' + member2.port, 2000); + return memberAddedPromise.promise.then(function () { + simulateHeartbeatLost(client, member2.host + ':' + member2.port, 2000); + }) }).catch(done); }); @@ -85,7 +89,7 @@ describe('Heartbeat', function () { simulateHeartbeatLost(client, membershipEvent.member.address, 2000); }).catch(done); } - } + }; client.clusterService.addMembershipListener(membershipListener); client.heartbeat.addListener({ @@ -124,7 +128,7 @@ describe('Heartbeat', function () { }); function simulateHeartbeatLost(client, address, timeout) { - client.connectionManager.establishedConnections[address].lastRead = client.connectionManager.establishedConnections[address].lastRead - timeout; + client.connectionManager.establishedConnections[address].lastReadTimeMillis = client.connectionManager.establishedConnections[address].getLastReadTimeMillis() - timeout; } function warmUpConnectionToAddressWithRetry(client, address, retryCount) { diff --git a/test/heartbeat/short-heartbeat.xml b/test/heartbeat/short-heartbeat.xml new file mode 100644 index 000000000..c667fa2d3 --- /dev/null +++ b/test/heartbeat/short-heartbeat.xml @@ -0,0 +1,23 @@ + + + + + 8 + +