jetstream: slow consumer #888
-
Hi, I have an application which has to do some heavy processing on every message received. I created a Jetstream subscription, to be sure not to miss any message and send a However I still get What should I do, to prevent those errors? Here is the logic of my code: func main() {
nc, err := nats.Connect(url,
nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
logger.Log("nats", "disconnected", "reason", err)
}),
nats.ReconnectHandler(func(nc *nats.Conn) {
logger.Log("nats", "reconnected")
}),
)
js, err := nc.JetStream(nats.PublishAsyncMaxPending(256)) //nolint:gomnd
_, err = js.AddStream(&nats.StreamConfig{
Name: "BARN",
Subjects: []string{"BARN.>"},
})
msgs := make(chan *nats.Msg)
js.ChanQueueSubscribe(subj, queue, msgs,
nats.Durable(queue),
nats.MaxAckPending(2048),
nats.AckWait(3*keepaliveDelay/2), // add some margin
)
for m := range msgs {
// keepalive to inform nats of "InProgress"
sendNak := make(chan bool)
go keepalive(m, sendNak)
// long processing
time.Sleep(5 * keepaliveDelay)
// everything was fine
close(sendNak)
err = m.Ack()
}
}
func keepalive(m *nats.Msg, sendNak <-chan bool) {
for {
select {
case <-time.After(keepaliveDelay):
_ = m.InProgress()
case nak := <-sendNak:
// when something is wrong, but might work later
if nak {
time.Sleep(nakDelay)
_ = m.Nak()
}
return
}
}
} |
Beta Was this translation helpful? Give feedback.
Replies: 2 comments 3 replies
-
The channel is unbuffered so it is prone to dropping messages. I suggest using a larger buffer size like 65536 or more for jetstream, depending on the type of traffic. |
Beta Was this translation helpful? Give feedback.
-
To not be treated as a slow consumer, I adjusted the |
Beta Was this translation helpful? Give feedback.
To not be treated as a slow consumer, I adjusted the
nats.MaxAckPending(1),
setting, as hinted by @derekcollison 👍