Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for AutoAckOff #578

Merged
merged 5 commits into from Oct 18, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 9 additions & 0 deletions options.go
Expand Up @@ -104,6 +104,7 @@ type ClientOptions struct {
MaxResumePubInFlight int // // 0 = no limit; otherwise this is the maximum simultaneous messages sent while resuming
Dialer *net.Dialer
CustomOpenConnectionFn OpenConnectionFunc
AutoAckDisabled bool
}

// NewClientOptions will create a new ClientClientOptions type with some
Expand Down Expand Up @@ -147,6 +148,7 @@ func NewClientOptions() *ClientOptions {
WebsocketOptions: &WebsocketOptions{},
Dialer: &net.Dialer{Timeout: 30 * time.Second},
CustomOpenConnectionFn: nil,
AutoAckDisabled: false,
}
return o
}
Expand Down Expand Up @@ -446,3 +448,10 @@ func (o *ClientOptions) SetCustomOpenConnectionFn(customOpenConnectionFn OpenCon
}
return o
}

// SetAutoAckDisabled enables or disables the Automated Acking of Messages received by the handler.
// By default it is set to false. Setting it to true will disable the auto-ack globally.
func (o *ClientOptions) SetAutoAckDisabled(autoAckDisabled bool) *ClientOptions {
o.AutoAckDisabled = autoAckDisabled
return o
}
12 changes: 9 additions & 3 deletions router.go
Expand Up @@ -186,7 +186,9 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order
wg.Add(1)
go func() {
hd(client, m)
m.Ack()
if client.options.AutoAckDisabled {
shivamkm07 marked this conversation as resolved.
Show resolved Hide resolved
m.Ack()
}
wg.Done()
}()
}
Expand All @@ -201,7 +203,9 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order
wg.Add(1)
go func() {
r.defaultHandler(client, m)
m.Ack()
if client.options.AutoAckDisabled {
m.Ack()
}
wg.Done()
}()
}
Expand All @@ -212,7 +216,9 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order
r.RUnlock()
for _, handler := range handlers {
handler(client, m)
m.Ack()
if client.options.AutoAckDisabled {
m.Ack()
}
}
// DEBUG.Println(ROU, "matchAndDispatch handled message")
}
Expand Down