From e28a1db639f8e9d2bb406a7f935848f22f5235fa Mon Sep 17 00:00:00 2001 From: shivam Date: Thu, 6 Jan 2022 18:34:09 +0530 Subject: [PATCH 1/5] Support for AutoAckOff Signed-off-by: shivam --- message.go | 12 ++++++++++++ router.go | 12 +++++++++--- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/message.go b/message.go index 35b463f..9bf23e6 100644 --- a/message.go +++ b/message.go @@ -36,6 +36,8 @@ type Message interface { MessageID() uint16 Payload() []byte Ack() + NoAutoAck() bool + AutoAckOff() } type message struct { @@ -47,6 +49,7 @@ type message struct { payload []byte once sync.Once ack func() + noautoack bool } func (m *message) Duplicate() bool { @@ -77,6 +80,14 @@ func (m *message) Ack() { m.once.Do(m.ack) } +func (m *message) NoAutoAck() bool { + return m.noautoack +} + +func (m *message) AutoAckOff() { + m.noautoack = true +} + func messageFromPublish(p *packets.PublishPacket, ack func()) Message { return &message{ duplicate: p.Dup, @@ -86,6 +97,7 @@ func messageFromPublish(p *packets.PublishPacket, ack func()) Message { messageID: p.MessageID, payload: p.Payload, ack: ack, + noautoack: false, } } diff --git a/router.go b/router.go index bfaef61..4c6ca62 100644 --- a/router.go +++ b/router.go @@ -186,7 +186,9 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order wg.Add(1) go func() { hd(client, m) - m.Ack() + if !m.NoAutoAck() { + m.Ack() + } wg.Done() }() } @@ -201,7 +203,9 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order wg.Add(1) go func() { r.defaultHandler(client, m) - m.Ack() + if !m.NoAutoAck() { + m.Ack() + } wg.Done() }() } @@ -212,7 +216,9 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order r.RUnlock() for _, handler := range handlers { handler(client, m) - m.Ack() + if !m.NoAutoAck() { + m.Ack() + } } // DEBUG.Println(ROU, "matchAndDispatch handled message") } From 7b1c0eb039eb9101c6757e9c50a32f3a5d626a2e Mon Sep 17 00:00:00 2001 From: shivam Date: Thu, 13 Oct 2022 13:56:54 +0530 Subject: [PATCH 2/5] Revert "Support for AutoAckOff" This reverts commit e28a1db639f8e9d2bb406a7f935848f22f5235fa. --- message.go | 12 ------------ router.go | 12 +++--------- 2 files changed, 3 insertions(+), 21 deletions(-) diff --git a/message.go b/message.go index 9bf23e6..35b463f 100644 --- a/message.go +++ b/message.go @@ -36,8 +36,6 @@ type Message interface { MessageID() uint16 Payload() []byte Ack() - NoAutoAck() bool - AutoAckOff() } type message struct { @@ -49,7 +47,6 @@ type message struct { payload []byte once sync.Once ack func() - noautoack bool } func (m *message) Duplicate() bool { @@ -80,14 +77,6 @@ func (m *message) Ack() { m.once.Do(m.ack) } -func (m *message) NoAutoAck() bool { - return m.noautoack -} - -func (m *message) AutoAckOff() { - m.noautoack = true -} - func messageFromPublish(p *packets.PublishPacket, ack func()) Message { return &message{ duplicate: p.Dup, @@ -97,7 +86,6 @@ func messageFromPublish(p *packets.PublishPacket, ack func()) Message { messageID: p.MessageID, payload: p.Payload, ack: ack, - noautoack: false, } } diff --git a/router.go b/router.go index 4c6ca62..bfaef61 100644 --- a/router.go +++ b/router.go @@ -186,9 +186,7 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order wg.Add(1) go func() { hd(client, m) - if !m.NoAutoAck() { - m.Ack() - } + m.Ack() wg.Done() }() } @@ -203,9 +201,7 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order wg.Add(1) go func() { r.defaultHandler(client, m) - if !m.NoAutoAck() { - m.Ack() - } + m.Ack() wg.Done() }() } @@ -216,9 +212,7 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order r.RUnlock() for _, handler := range handlers { handler(client, m) - if !m.NoAutoAck() { - m.Ack() - } + m.Ack() } // DEBUG.Println(ROU, "matchAndDispatch handled message") } From 5c775c25b18c79d40a0e7c3b6d21fced0829b528 Mon Sep 17 00:00:00 2001 From: shivam Date: Thu, 13 Oct 2022 14:03:59 +0530 Subject: [PATCH 3/5] Adding AutoAck option in ClientOptions Signed-off-by: shivam --- options.go | 9 +++++++++ router.go | 12 +++++++++--- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/options.go b/options.go index b87b503..a01223d 100644 --- a/options.go +++ b/options.go @@ -104,6 +104,7 @@ type ClientOptions struct { MaxResumePubInFlight int // // 0 = no limit; otherwise this is the maximum simultaneous messages sent while resuming Dialer *net.Dialer CustomOpenConnectionFn OpenConnectionFunc + AutoAck bool } // NewClientOptions will create a new ClientClientOptions type with some @@ -147,6 +148,7 @@ func NewClientOptions() *ClientOptions { WebsocketOptions: &WebsocketOptions{}, Dialer: &net.Dialer{Timeout: 30 * time.Second}, CustomOpenConnectionFn: nil, + AutoAck: true, } return o } @@ -446,3 +448,10 @@ func (o *ClientOptions) SetCustomOpenConnectionFn(customOpenConnectionFn OpenCon } return o } + +// SetAutoAck enables or disables the Automated Acking of Messages received by the handler. +// By default it is set to true. Setting it to false will disable the auto-ack globally. +func (o *ClientOptions) SetAutoAck(autoAck bool) *ClientOptions { + o.AutoAck = autoAck + return o +} diff --git a/router.go b/router.go index bfaef61..15ba381 100644 --- a/router.go +++ b/router.go @@ -186,7 +186,9 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order wg.Add(1) go func() { hd(client, m) - m.Ack() + if client.options.AutoAck { + m.Ack() + } wg.Done() }() } @@ -201,7 +203,9 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order wg.Add(1) go func() { r.defaultHandler(client, m) - m.Ack() + if client.options.AutoAck { + m.Ack() + } wg.Done() }() } @@ -212,7 +216,9 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order r.RUnlock() for _, handler := range handlers { handler(client, m) - m.Ack() + if client.options.AutoAck { + m.Ack() + } } // DEBUG.Println(ROU, "matchAndDispatch handled message") } From 35f45ad7ef34dcca0a18a44ee7aeb481f99f7354 Mon Sep 17 00:00:00 2001 From: shivam Date: Fri, 14 Oct 2022 15:57:10 +0530 Subject: [PATCH 4/5] Changing AutoAck -> AutoAckDisabled Signed-off-by: shivam --- options.go | 12 ++++++------ router.go | 6 +++--- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/options.go b/options.go index a01223d..5aaa7d9 100644 --- a/options.go +++ b/options.go @@ -104,7 +104,7 @@ type ClientOptions struct { MaxResumePubInFlight int // // 0 = no limit; otherwise this is the maximum simultaneous messages sent while resuming Dialer *net.Dialer CustomOpenConnectionFn OpenConnectionFunc - AutoAck bool + AutoAckDisabled bool } // NewClientOptions will create a new ClientClientOptions type with some @@ -148,7 +148,7 @@ func NewClientOptions() *ClientOptions { WebsocketOptions: &WebsocketOptions{}, Dialer: &net.Dialer{Timeout: 30 * time.Second}, CustomOpenConnectionFn: nil, - AutoAck: true, + AutoAckDisabled: false, } return o } @@ -449,9 +449,9 @@ func (o *ClientOptions) SetCustomOpenConnectionFn(customOpenConnectionFn OpenCon return o } -// SetAutoAck enables or disables the Automated Acking of Messages received by the handler. -// By default it is set to true. Setting it to false will disable the auto-ack globally. -func (o *ClientOptions) SetAutoAck(autoAck bool) *ClientOptions { - o.AutoAck = autoAck +// SetAutoAckDisabled enables or disables the Automated Acking of Messages received by the handler. +// By default it is set to false. Setting it to true will disable the auto-ack globally. +func (o *ClientOptions) SetAutoAckDisabled(autoAckDisabled bool) *ClientOptions { + o.AutoAckDisabled = autoAckDisabled return o } diff --git a/router.go b/router.go index 15ba381..088302d 100644 --- a/router.go +++ b/router.go @@ -186,7 +186,7 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order wg.Add(1) go func() { hd(client, m) - if client.options.AutoAck { + if client.options.AutoAckDisabled { m.Ack() } wg.Done() @@ -203,7 +203,7 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order wg.Add(1) go func() { r.defaultHandler(client, m) - if client.options.AutoAck { + if client.options.AutoAckDisabled { m.Ack() } wg.Done() @@ -216,7 +216,7 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order r.RUnlock() for _, handler := range handlers { handler(client, m) - if client.options.AutoAck { + if client.options.AutoAckDisabled { m.Ack() } } From 25189541faa53812d1c0be06c379863e04d578ec Mon Sep 17 00:00:00 2001 From: shivam Date: Tue, 18 Oct 2022 10:43:02 +0530 Subject: [PATCH 5/5] Ack when AutoAckDisabled is off Signed-off-by: shivam --- router.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/router.go b/router.go index 088302d..bd05a0c 100644 --- a/router.go +++ b/router.go @@ -186,7 +186,7 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order wg.Add(1) go func() { hd(client, m) - if client.options.AutoAckDisabled { + if !client.options.AutoAckDisabled { m.Ack() } wg.Done() @@ -203,7 +203,7 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order wg.Add(1) go func() { r.defaultHandler(client, m) - if client.options.AutoAckDisabled { + if !client.options.AutoAckDisabled { m.Ack() } wg.Done() @@ -216,7 +216,7 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order r.RUnlock() for _, handler := range handlers { handler(client, m) - if client.options.AutoAckDisabled { + if !client.options.AutoAckDisabled { m.Ack() } }