diff --git a/message.go b/message.go index 35b463f8..9bf23e68 100644 --- a/message.go +++ b/message.go @@ -36,6 +36,8 @@ type Message interface { MessageID() uint16 Payload() []byte Ack() + NoAutoAck() bool + AutoAckOff() } type message struct { @@ -47,6 +49,7 @@ type message struct { payload []byte once sync.Once ack func() + noautoack bool } func (m *message) Duplicate() bool { @@ -77,6 +80,14 @@ func (m *message) Ack() { m.once.Do(m.ack) } +func (m *message) NoAutoAck() bool { + return m.noautoack +} + +func (m *message) AutoAckOff() { + m.noautoack = true +} + func messageFromPublish(p *packets.PublishPacket, ack func()) Message { return &message{ duplicate: p.Dup, @@ -86,6 +97,7 @@ func messageFromPublish(p *packets.PublishPacket, ack func()) Message { messageID: p.MessageID, payload: p.Payload, ack: ack, + noautoack: false, } } diff --git a/router.go b/router.go index bfaef617..4c6ca620 100644 --- a/router.go +++ b/router.go @@ -186,7 +186,9 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order wg.Add(1) go func() { hd(client, m) - m.Ack() + if !m.NoAutoAck() { + m.Ack() + } wg.Done() }() } @@ -201,7 +203,9 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order wg.Add(1) go func() { r.defaultHandler(client, m) - m.Ack() + if !m.NoAutoAck() { + m.Ack() + } wg.Done() }() } @@ -212,7 +216,9 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order r.RUnlock() for _, handler := range handlers { handler(client, m) - m.Ack() + if !m.NoAutoAck() { + m.Ack() + } } // DEBUG.Println(ROU, "matchAndDispatch handled message") }