You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I create a kafka consumer group and use Assign method for consuming specific topic with specific partition. If this topic has 7 partition, I will create 7 consumer group and assign specific partition and offset for it and each consumer group is running in isolated goroutine.
This consumer group per goroutine was working fine, after I scale partition to eight. Each consumer will consume first offset, then stop consume next offset:
We can see above log, fetch topic SOL_TWD_OUT [1] request always fetches 5215904 offset, It should fetch 5215905 offset, Although log say MaxOffset is 5215904, but our kafka dashboard present max offset is greater than 5215904.
I also found that current leader epoch is -1, which smaller thane leader epoch. Is it mean that consumer might get wrong metadata so that get wrong max offset
%7|1705555004.030|TOPBRK|rdkafka#producer-4| [thrd::0/internal]: :0/internal: Topic SOL_TWD_OUT [7]: not joining broker (next broker kafka-production-1.kafka-svc.production.svc.cluster.local:9093/1)
%7|1705555004.030|TOPBRK|rdkafka#producer-4| [thrd::0/internal]: :0/internal: Topic SOL_TWD_OUT [8]: not joining broker (next broker kafka-production-2.kafka-svc.production.svc.cluster.local:9093/2)
%7|1705555004.030|TOPBRK|rdkafka#producer-4| [thrd::0/internal]: :0/internal: Topic SOL_TWD_OUT [0]: leaving broker (0 messages in xmitq, next broker kafka-production-0.kafka-svc.production.svc.cluster.local:9093/0, rktp 0x7fb288c4c050)
%7|1705555004.030|FETCHADD|rdkafka#producer-4| [thrd::0/internal]: :0/internal: Removed SOL_TWD_OUT [0] from active list (6 entries, opv 0): leaving
%7|1705555004.030|BROADCAST|rdkafka#producer-4| [thrd::0/internal]: Broadcasting state change
%7|1705555004.030|TOPBRK|rdkafka#producer-4| [thrd:kafka-production-0.kafka-svc.production.svc.cluster.local:9093/]: kafka-production-0.kafka-svc.production.svc.cluster.local:9093/0: Topic SOL_TWD_OUT [0]: joining broker (rktp 0x7fb288c4c050, 0 message(s) queued)
%7|1705555004.030|TOPBRK|rdkafka#producer-4| [thrd:kafka-production-2.kafka-svc.production.svc.cluster.local:9093/]: kafka-production-2.kafka-svc.production.svc.cluster.local:9093/2: Topic SOL_TWD_OUT [8]: joining broker (rktp 0x7fb288ba6560, 0 message(s) queued)
%7|1705555004.030|FETCHADD|rdkafka#producer-4| [thrd:kafka-production-0.kafka-svc.production.svc.cluster.local:9093/]: kafka-production-0.kafka-svc.production.svc.cluster.local:9093/0: Added SOL_TWD_OUT [0] to active list (1 entries, opv 0, 0 messages queued): joining
%7|1705555004.031|BROADCAST|rdkafka#producer-4| [thrd:kafka-production-0.kafka-svc.production.svc.cluster.local:9093/]: Broadcasting state change
%7|1705555004.030|TOPBRK|rdkafka#producer-4| [thrd::0/internal]: :0/internal: Topic SOL_TWD_OUT [1]: leaving broker (0 messages in xmitq, next broker kafka-production-1.kafka-svc.production.svc.cluster.local:9093/1, rktp 0x7fb288ba8050)
%7|1705555004.031|FETCHADD|rdkafka#producer-4| [thrd::0/internal]: :0/internal: Removed SOL_TWD_OUT [1] from active list (5 entries, opv 0): leaving
%7|1705555004.031|BROADCAST|rdkafka#producer-4| [thrd::0/internal]: Broadcasting state change
%7|1705555004.031|TOPBRK|rdkafka#producer-4| [thrd::0/internal]: :0/internal: Topic SOL_TWD_OUT [2]: leaving broker (0 messages in xmitq, next broker kafka-production-2.kafka-svc.production.svc.cluster.local:9093/2, rktp 0x7fb288ba8590)
%7|1705555004.031|FETCHADD|rdkafka#producer-4| [thrd::0/internal]: :0/internal: Removed SOL_TWD_OUT [2] from active list (4 entries, opv 0): leaving
%7|1705555004.031|BROADCAST|rdkafka#producer-4| [thrd::0/internal]: Broadcasting state change
%7|1705555004.031|TOPBRK|rdkafka#producer-4| [thrd::0/internal]: :0/internal: Topic SOL_TWD_OUT [3]: leaving broker (0 messages in xmitq, next broker kafka-production-0.kafka-svc.production.svc.cluster.local:9093/0, rktp 0x7fb288ba8ad0)
%7|1705555004.031|FETCHADD|rdkafka#producer-4| [thrd::0/internal]: :0/internal: Removed SOL_TWD_OUT [3] from active list (3 entries, opv 0): leaving
%7|1705555004.031|BROADCAST|rdkafka#producer-4| [thrd::0/internal]: Broadcasting state change
%7|1705555004.031|TOPBRK|rdkafka#producer-4| [thrd::0/internal]: :0/internal: Topic SOL_TWD_OUT [4]: leaving broker (0 messages in xmitq, next broker kafka-production-1.kafka-svc.production.svc.cluster.local:9093/1, rktp 0x7fb288ba7010)
%7|1705555004.031|FETCHADD|rdkafka#producer-4| [thrd::0/internal]: :0/internal: Removed SOL_TWD_OUT [4] from active list (2 entries, opv 0): leaving
%7|1705555004.031|BROADCAST|rdkafka#producer-4| [thrd::0/internal]: Broadcasting state change
%7|1705555004.031|TOPBRK|rdkafka#producer-4| [thrd::0/internal]: :0/internal: Topic SOL_TWD_OUT [5]: leaving broker (0 messages in xmitq, next broker kafka-production-2.kafka-svc.production.svc.cluster.local:9093/2, rktp 0x7fb288ba7550)
%7|1705555004.031|FETCHADD|rdkafka#producer-4| [thrd::0/internal]: :0/internal: Removed SOL_TWD_OUT [5] from active list (1 entries, opv 0): leaving
%7|1705555004.031|BROADCAST|rdkafka#producer-4| [thrd::0/internal]: Broadcasting state change
%7|1705555004.031|TOPBRK|rdkafka#producer-4| [thrd::0/internal]: :0/internal: Topic SOL_TWD_OUT [6]: leaving broker (0 messages in xmitq, next broker kafka-production-0.kafka-svc.production.svc.cluster.local:9093/0, rktp 0x7fb288ba7a90)
%7|1705555004.031|FETCHADD|rdkafka#producer-4| [thrd::0/internal]: :0/internal: Removed SOL_TWD_OUT [6] from active list (0 entries, opv 0): leaving
%7|1705555004.031|BROADCAST|rdkafka#producer-4| [thrd::0/internal]: Broadcasting state change
According to above log, consumer is leaving from active list after consumer connected.
confluent-kafka-go version : github.com/confluentinc/confluent-kafka-go/v2 v2.1.0
confluent-kafka-go and librdkafka version (LibraryVersion()):
Apache Kafka broker version:
Client configuration: ConfigMap{...}
Operating system:
Provide client logs (with "debug": ".." as necessary)
Provide broker log excerpts
Critical issue
The text was updated successfully, but these errors were encountered:
vx416
changed the title
[Question] Kafka consumer group Assign TopicPartition
[Question] Kafka consumer group stop consuming new message and present wrong max offset
Jan 26, 2024
vx416
changed the title
[Question] Kafka consumer group stop consuming new message and present wrong max offset
[BUG] Kafka consumer group stop consuming new message and present wrong max offset
Jan 26, 2024
vx416
changed the title
[BUG] Kafka consumer group stop consuming new message and present wrong max offset
[BUG] Kafka consumer group stop consuming new message and present wrong max offset and leader epoch -1, confluent-kafka-go v2.1.0
Jan 27, 2024
vx416
changed the title
[BUG] Kafka consumer group stop consuming new message and present wrong max offset and leader epoch -1, confluent-kafka-go v2.1.0
[BUG] Kafka consumer stop consuming new message, and present wrong max offset and leader epoch -1, confluent-kafka-go v2.1.0
Jan 27, 2024
Description
I create a kafka consumer group and use Assign method for consuming specific topic with specific partition. If this topic has 7 partition, I will create 7 consumer group and assign specific partition and offset for it and each consumer group is running in isolated goroutine.
This consumer group per goroutine was working fine, after I scale partition to eight. Each consumer will consume first offset, then stop consume next offset:
We can see above log, fetch topic SOL_TWD_OUT [1] request always fetches 5215904 offset, It should fetch 5215905 offset, Although log say MaxOffset is 5215904, but our kafka dashboard present max offset is greater than 5215904.
I also found that current leader epoch is -1, which smaller thane leader epoch. Is it mean that consumer might get wrong metadata so that get wrong max offset
According to above log, consumer is leaving from active list after consumer connected.
confluent-kafka-go version : github.com/confluentinc/confluent-kafka-go/v2 v2.1.0
Apache Kafka broker version : 3.2.1 with java8
clientCofnig :
socket.timeout.ms: "180000"
session.timeout.ms: "2000"
max.poll.interval.ms: "300000"
enable.auto.commit: true
enable.auto.offset.store: false
os: alpine3.16
How to reproduce
Checklist
Please provide the following information:
LibraryVersion()
):ConfigMap{...}
"debug": ".."
as necessary)The text was updated successfully, but these errors were encountered: