Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

High memory usage with the kafka.consumer #1171

Open
4 tasks done
yanmxa opened this issue Apr 10, 2024 · 0 comments
Open
4 tasks done

High memory usage with the kafka.consumer #1171

yanmxa opened this issue Apr 10, 2024 · 0 comments
Assignees

Comments

@yanmxa
Copy link

yanmxa commented Apr 10, 2024

Description

I'm running a service with kafka.Consumer poll message from brokers. But as time goes on, the kafka._Cfunc_GoBytes (inline) takes up more and more memory resources

image

How to reproduce

(pprof) top
Showing nodes accounting for 402.72MB, 97.44% of 413.28MB total
Dropped 83 nodes (cum <= 2.07MB)
Showing top 10 nodes out of 13
      flat  flat%   sum%        cum   cum%
  402.72MB 97.44% 97.44%   402.72MB 97.44%  github.com/confluentinc/confluent-kafka-go/v2/kafka._Cfunc_GoBytes (inline)
         0     0% 97.44%   402.72MB 97.44%  github.com/cloudevents/sdk-go/v2/client.(*ceClient).StartReceiver
         0     0% 97.44%   402.72MB 97.44%  github.com/confluentinc/confluent-kafka-go/v2/kafka.(*Consumer).Poll (inline)
         0     0% 97.44%   402.72MB 97.44%  github.com/confluentinc/confluent-kafka-go/v2/kafka.(*handle).eventPoll
         0     0% 97.44%   402.72MB 97.44%  github.com/confluentinc/confluent-kafka-go/v2/kafka.(*handle).newMessageFromGlueMsg
         0     0% 97.44%   402.72MB 97.44%  github.com/confluentinc/confluent-kafka-go/v2/kafka.(*handle).setupMessageFromC
         0     0% 97.44%   402.72MB 97.44%  github.com/confluentinc/confluent-kafka-go/v2/kafka.(*handle).setupMessageFromC.func1
         0     0% 97.44%   402.72MB 97.44%  github.com/stolostron/multicluster-global-hub/pkg/transport/consumer.(*GenericConsumer).Start
         0     0% 97.44%   402.72MB 97.44%  github.com/stolostron/multicluster-global-hub/pkg/transport/kafka_confluent.(*Protocol).OpenInbound
         0     0% 97.44%     3.91MB  0.95%  runtime.doInit

Code:

         .          .    219:
         .          .    220:   for {
         .          .    221:           select {
         .          .    222:           case <-p.consumerCtx.Done():
         .          .    223:                   return p.consumerCtx.Err()
         .          .    224:           default:
         .   402.72MB    225:                   ev := p.consumer.Poll(100)
         .          .    226:                   if ev == nil {
         .          .    227:                           continue
         .          .    228:                   }
         .          .    229:                   switch e := ev.(type) {
         .          .    230:                   case *kafka.Message:
(pprof) list eventPoll
Total: 412.40MB
ROUTINE ======================== github.com/confluentinc/confluent-kafka-go/v2/kafka.(*handle).eventPoll in mod/github.com/confluentinc/confluent-kafka-go/v2@v2.3.0/kafka/event.go
         0   402.72MB (flat, cum) 97.65% of Total
         .          .    153:func (h *handle) eventPoll(channel chan Event, timeoutMs int, maxEvents int, termChan chan bool) (Event, bool) {
         .          .    154:
         .          .    155:   var prevRkev *C.rd_kafka_event_t
         .          .    156:   term := false
         .          .    157:
         .          .    158:   var retval Event
         .          .    159:
         .          .    160:   if channel == nil {
         .          .    161:           maxEvents = 1
         .          .    162:   }
         .          .    163:out:
         .          .    164:   for evcnt := 0; evcnt < maxEvents; evcnt++ {
         .          .    165:           var evtype C.rd_kafka_event_type_t
         .          .    166:           var gMsg C.glue_msg_t
         .          .    167:           gMsg.want_hdrs = C.int8_t(bool2cint(h.msgFields.Headers))
         .          .    168:           rkev := C._rk_queue_poll(h.rkq, C.int(timeoutMs), &evtype, &gMsg, prevRkev)
         .          .    169:           prevRkev = rkev
         .          .    170:           timeoutMs = 0
         .          .    171:
         .          .    172:           retval = nil
         .          .    173:
         .          .    174:           switch evtype {
         .          .    175:           case C.RD_KAFKA_EVENT_FETCH:
         .          .    176:                   // Consumer fetch event, new message.
         .          .    177:                   // Extracted into temporary gMsg for optimization
         .   402.72MB    178:                   retval = h.newMessageFromGlueMsg(&gMsg)
         .          .    179:
         .          .    180:           case C.RD_KAFKA_EVENT_REBALANCE:
         .          .    181:                   // Consumer rebalance event
         .          .    182:                   retval = h.c.handleRebalanceEvent(channel, rkev)
         .          .    183:
(pprof) list setupMessageFromC
Total: 412.40MB
ROUTINE ======================== github.com/confluentinc/confluent-kafka-go/v2/kafka.(*handle).setupMessageFromC in mod/github.com/confluentinc/confluent-kafka-go/v2@v2.3.0/kafka/message.go
         0   402.72MB (flat, cum) 97.65% of Total
         .          .    139:func (h *handle) setupMessageFromC(msg *Message, cmsg *C.rd_kafka_message_t) {
         .          .    140:   if cmsg.rkt != nil {
         .          .    141:           topic := h.getTopicNameFromRkt(cmsg.rkt)
         .          .    142:           msg.TopicPartition.Topic = &topic
         .          .    143:   }
         .          .    144:   msg.TopicPartition.Partition = int32(cmsg.partition)
         .          .    145:   if cmsg.payload != nil && h.msgFields.Value {
         .   402.72MB    146:           msg.Value = C.GoBytes(unsafe.Pointer(cmsg.payload), C.int(cmsg.len))
         .          .    147:   }
         .          .    148:   if cmsg.key != nil && h.msgFields.Key {
         .          .    149:           msg.Key = C.GoBytes(unsafe.Pointer(cmsg.key), C.int(cmsg.key_len))
         .          .    150:   }
         .          .    151:   if h.msgFields.Headers {
image

Checklist

Please provide the following information:

  • confluent-kafka-go and librdkafka version (confluent-kafka-go(v2.3.0) librdkafka(v2.3.0)):
  • Apache Kafka broker version(3.6.0):
  • Client configuration: ConfigMap{...}
"bootstrap.servers":       kafkaConfig.BootstrapServer,
"socket.keepalive.enable": "true",
// silence spontaneous disconnection logs, kafka recovers by itself.
"log.connection.close": "false",
// https://github.com/confluentinc/librdkafka/issues/4349
"ssl.endpoint.identification.algorithm": "none",
"go.events.channel.size":                1000,
"enable.auto.commit", "true",
"auto.offset.reset", "earliest",
"group.id": consumerID,
"client.id": consumerID,
  • Critical issue
@yanmxa yanmxa changed the title High memory usage with the kafka.consumer High memory usage with the kafka.consumer Apr 10, 2024
@milindl milindl self-assigned this May 9, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants