Skip to content

Commit

Permalink
check lastwrite time instead of lastread time to ping
Browse files Browse the repository at this point in the history
  • Loading branch information
SaitTalhaNisanci committed Sep 24, 2018
1 parent d4a66ee commit 4a7a619
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 1 deletion.
3 changes: 2 additions & 1 deletion internal/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,13 @@ func (hbs *heartBeatService) start() {
func (hbs *heartBeatService) heartBeat() {
for _, connection := range hbs.client.ConnectionManager.getActiveConnections() {
timeSinceLastRead := time.Since(connection.lastRead.Load().(time.Time))
timeSinceLastWrite := time.Since(connection.lastWrite.Load().(time.Time))
if timeSinceLastRead > hbs.heartBeatTimeout {
if connection.heartBeating {
hbs.HeartbeatStopped(connection)
}
}
if timeSinceLastRead > hbs.heartBeatInterval {
if timeSinceLastWrite > hbs.heartBeatInterval {
connection.lastHeartbeatRequested.Store(time.Now())
request := proto.ClientPingEncodeRequest()
sentInvocation := hbs.client.InvocationService.invokeOnConnection(request, connection)
Expand Down
62 changes: 62 additions & 0 deletions test/heartbeat_config.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
<hazelcast xsi:schemaLocation="http://www.hazelcast.com/schema/config hazelcast-config-3.9.xsd"
xmlns="http://www.hazelcast.com/schema/config"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
<properties>
<property name="hazelcast.client.max.no.heartbeat.seconds">8</property>
</properties>
<group>
<name>dev</name>
<password>dev-pass</password>
</group>
<management-center enabled="false">http://localhost:8080/mancenter</management-center>
<network>
<port auto-increment="true" port-count="100">5701</port>
<outbound-ports>
<!--
Allowed port range when connecting to other nodes.
0 or * means use system provided port.
-->
<ports>0</ports>
</outbound-ports>
<join>
<multicast enabled="true">
<multicast-group>224.7.7.7</multicast-group>
<multicast-port>54327</multicast-port>
</multicast>
<tcp-ip enabled="false">
<interface>127.0.0.1</interface>
</tcp-ip>
</join>
<public-address>127.0.0.1</public-address>
<ssl enabled="false"/>
<socket-interceptor enabled="false"/>
</network>
<serialization>
<data-serializable-factories>
<data-serializable-factory factory-id="66">com.hazelcast.client.test.IdentifiedFactory
</data-serializable-factory>
</data-serializable-factories>
<portable-factories>
<portable-factory factory-id="666">com.hazelcast.client.test.PortableFactory
</portable-factory>
</portable-factories>
</serialization>

<queue name="ClientQueueTest*">
<!--
Maximum size of the queue. When a JVM's local queue size reaches the maximum,
all put/offer operations will get blocked until the queue size
of the JVM goes down below the maximum.
Any integer between 0 and Integer.MAX_VALUE. 0 means
Integer.MAX_VALUE. Default is 0.
-->
<max-size>6</max-size>
</queue>
<ringbuffer name="ClientRingbufferTest*">
<capacity>10</capacity>
</ringbuffer>
<ringbuffer name="ClientRingbufferTestWithTTL*">
<capacity>10</capacity>
<time-to-live-seconds>180</time-to-live-seconds>
</ringbuffer>
</hazelcast>
49 changes: 49 additions & 0 deletions test/heartbeat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,11 @@ import (
"sync"
"testing"

"time"

"github.com/hazelcast/hazelcast-go-client"
"github.com/hazelcast/hazelcast-go-client/config/property"
"github.com/hazelcast/hazelcast-go-client/core"
"github.com/hazelcast/hazelcast-go-client/internal"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -59,3 +62,49 @@ func TestHeartbeatStoppedForConnection(t *testing.T) {
client.Shutdown()
remoteController.ShutdownCluster(cluster.ID)
}

func TestServerShouldNotCloseClientWhenClientOnlyListening(t *testing.T) {
heartbeatConfig, _ := Read("heartbeat_config.xml")
cluster, _ = remoteController.CreateCluster("", heartbeatConfig)
defer remoteController.ShutdownCluster(cluster.ID)
remoteController.StartMember(cluster.ID)

lifecycleListener := lifecycleListener2{collector: make([]string, 0)}
config := hazelcast.NewConfig()
config.AddLifecycleListener(&lifecycleListener)
config.SetProperty(property.HeartbeatInterval.Name(), "1000")
client, _ := hazelcast.NewClientWithConfig(config)
defer client.Shutdown()

client2, _ := hazelcast.NewClient()
defer client2.Shutdown()
topicName := "topicName"
topic, _ := client.GetTopic(topicName)
listener := &topicMessageListener{}
topic.AddMessageListener(listener)
topic2, _ := client2.GetTopic(topicName)
begin := time.Now()
for time.Since(begin) < 16*time.Second {
topic2.Publish("message")
}
assert.Equal(t, len(lifecycleListener.collector), 0)

}

type topicMessageListener struct {
}

func (l *topicMessageListener) OnMessage(message core.Message) error {
return nil
}

type lifecycleListener2 struct {
collector []string
}

func (l *lifecycleListener2) LifecycleStateChanged(newState string) {
if newState == core.LifecycleStateDisconnected {
l.collector = append(l.collector, newState)

}
}

0 comments on commit 4a7a619

Please sign in to comment.