You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
{{ message }}
This repository has been archived by the owner on Jan 8, 2020. It is now read-only.
I'm prototyping a worker that uses PartitionConsumers to ensure there is a Go routine running for each subscribed partition. Minor annoyance is that I need to pass a reference to the top level Consumer to the workers in order to mark offsets. If a version of MarkOffset was exposed on PartitionConsumer it would alleviate the need.
It looks like there is already MarkOffset(int64, string) on PartitionConsumer but it's not exposed as part of the interface and seems to expect msg.Offset + 1 which could cause confusion if exposed as is.
Rough sketch of what I'm doing
funcmain() {
consumer, _:= cluster.NewConsumer(...)
gofunc() {
forpart:=rangeconsumer.Partitions() {
goworker(consumer, part)
}
}
}
funcworker(consumer*cluster.Consumer, part cluster.PartitionConsumer) {
formsg:=part.Messages() {
// process// If only this could be part.MarkOffset(msg, "")consumer.MarkOffset(msg, "")
}
}
I'm also not sure if this approach is strictly necessary. So if I'm overthinking it I would appreciate some course correction.
The text was updated successfully, but these errors were encountered:
I'm fine with calling part.MarkOffset(msg.Offset, "") the only concern on my end is that consumer.MarkOffset(msg, "") calls part.MarkOffset(msg.Offset + 1, "")see here.
Is msg.Offset + 1 correct in this case? Is MarkOffset marking the specified offset as completed or is it marking the next offset as where to resume from?
I'm prototyping a worker that uses
PartitionConsumer
s to ensure there is a Go routine running for each subscribed partition. Minor annoyance is that I need to pass a reference to the top levelConsumer
to the workers in order to mark offsets. If a version ofMarkOffset
was exposed onPartitionConsumer
it would alleviate the need.It looks like there is already
MarkOffset(int64, string)
onPartitionConsumer
but it's not exposed as part of the interface and seems to expectmsg.Offset + 1
which could cause confusion if exposed as is.Rough sketch of what I'm doing
I'm also not sure if this approach is strictly necessary. So if I'm overthinking it I would appreciate some course correction.
The text was updated successfully, but these errors were encountered: