Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Offset is going ahead and we are missing message with lag #1276

Open
NitinHsharma opened this issue Mar 20, 2024 · 3 comments
Open

Offset is going ahead and we are missing message with lag #1276

NitinHsharma opened this issue Mar 20, 2024 · 3 comments
Labels

Comments

@NitinHsharma
Copy link

We are using ReadMessage function with Consumer group, which works pretty good. but sometime one of the partition offset is getting set ahead of it commited message/s so those in between messages are getting stuck in kafka. No reader is able to get those message until we are restarting the pods basically forcefully rebalancing the consumer group.

Below are the basic code which we are using to consume the messages

l := log.New(log.Writer(), "kafka reader: ", log.LstdFlags)
Reader = kafka.NewReader(kafka.ReaderConfig{
		Brokers: kafkaDetails.BrokerAddress,
		Topic:   kafkaDetails.Topic,
		Dialer:  readerDialer,
		Logger:  l,
		GroupID: kafkaDetails.GroupID,
	})
msg, err := Reader.ReadMessage(ctx)
	if err != nil {
		logger.Log.Error("could not read message " + err.Error())
		return nil, err
	}
	return msg.Value, nil

Below are the logs for the same

kafka reader: 2024/03/20 08:25:56 stopped heartbeat for group MY_GROUP_NAME
kafka reader: 2024/03/20 08:25:56 stopped commit for group MY_GROUP_NAME
kafka reader: 2024/03/20 08:26:02 no messages received from kafka within the allocated time for partition 0 of MY_TOPIC_NAME at offset 113: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
kafka reader: 2024/03/20 08:26:23 joined group MY_GROUP_NAME as member MY_SERVICE_NAME@jcx-go-sms-bulk-processor-59d7566c58-l8dcm (github.com/segmentio/kafka-go)-593a5f49-89a8-4a0d-bbed-a1aa1ed6b101 in generation 35
kafka reader: 2024/03/20 08:26:23 selected as leader for group, MY_GROUP_NAME
kafka reader: 2024/03/20 08:26:23 using 'range' balancer to assign group, MY_GROUP_NAME
kafka reader: 2024/03/20 08:26:23 found member: MY_SERVICE_NAME@jcx-go-sms-bulk-processor-59d7566c58-l8dcm (github.com/segmentio/kafka-go)-593a5f49-89a8-4a0d-bbed-a1aa1ed6b101/[]byte(nil)
kafka reader: 2024/03/20 08:26:23 found member: MY_SERVICE_NAME@jcx-go-sms-bulk-processor-59d7566c58-g2q2r (github.com/segmentio/kafka-go)-058267e0-bc25-4458-96a2-2deb51ede6d4/[]byte(nil)
kafka reader: 2024/03/20 08:26:23 found member: MY_SERVICE_NAME@jcx-go-sms-bulk-processor-59d7566c58-xxqtp (github.com/segmentio/kafka-go)-e2de8250-d309-4e75-b53f-4fe301d9f4be/[]byte(nil)
kafka reader: 2024/03/20 08:26:23 found member: MY_SERVICE_NAME@jcx-go-sms-bulk-processor-59d7566c58-ttfs4 (github.com/segmentio/kafka-go)-993b3b28-30dc-4178-8193-2dc0ae1504d0/[]byte(nil)
kafka reader: 2024/03/20 08:26:23 found member: MY_SERVICE_NAME@jcx-go-sms-bulk-processor-59d7566c58-zdl5b (github.com/segmentio/kafka-go)-2a68df5f-0923-4ef0-8799-04d59f5262db/[]byte(nil)
kafka reader: 2024/03/20 08:26:23 found member: MY_SERVICE_NAME@jcx-go-sms-bulk-processor-59d7566c58-xqw56 (github.com/segmentio/kafka-go)-7000ed30-619c-4083-bc3b-5292fbcdc21f/[]byte(nil)
kafka reader: 2024/03/20 08:26:23 found topic/partition: MY_TOPIC_NAME/0
kafka reader: 2024/03/20 08:26:23 found topic/partition: MY_TOPIC_NAME/5
kafka reader: 2024/03/20 08:26:23 found topic/partition: MY_TOPIC_NAME/1
kafka reader: 2024/03/20 08:26:23 found topic/partition: MY_TOPIC_NAME/4
kafka reader: 2024/03/20 08:26:23 found topic/partition: MY_TOPIC_NAME/2
kafka reader: 2024/03/20 08:26:23 found topic/partition: MY_TOPIC_NAME/3
kafka reader: 2024/03/20 08:26:23 assigned member/topic/partitions MY_SERVICE_NAME@jcx-go-sms-bulk-processor-59d7566c58-g2q2r (github.com/segmentio/kafka-go)-058267e0-bc25-4458-96a2-2deb51ede6d4/MY_TOPIC_NAME/[0]
kafka reader: 2024/03/20 08:26:23 assigned member/topic/partitions MY_SERVICE_NAME@jcx-go-sms-bulk-processor-59d7566c58-l8dcm (github.com/segmentio/kafka-go)-593a5f49-89a8-4a0d-bbed-a1aa1ed6b101/MY_TOPIC_NAME/[5]
kafka reader: 2024/03/20 08:26:23 assigned member/topic/partitions MY_SERVICE_NAME@jcx-go-sms-bulk-processor-59d7566c58-ttfs4 (github.com/segmentio/kafka-go)-993b3b28-30dc-4178-8193-2dc0ae1504d0/MY_TOPIC_NAME/[1]
kafka reader: 2024/03/20 08:26:23 assigned member/topic/partitions MY_SERVICE_NAME@jcx-go-sms-bulk-processor-59d7566c58-xqw56 (github.com/segmentio/kafka-go)-7000ed30-619c-4083-bc3b-5292fbcdc21f/MY_TOPIC_NAME/[4]
kafka reader: 2024/03/20 08:26:23 assigned member/topic/partitions MY_SERVICE_NAME@jcx-go-sms-bulk-processor-59d7566c58-xxqtp (github.com/segmentio/kafka-go)-e2de8250-d309-4e75-b53f-4fe301d9f4be/MY_TOPIC_NAME/[2]
kafka reader: 2024/03/20 08:26:23 assigned member/topic/partitions MY_SERVICE_NAME@jcx-go-sms-bulk-processor-59d7566c58-zdl5b (github.com/segmentio/kafka-go)-2a68df5f-0923-4ef0-8799-04d59f5262db/MY_TOPIC_NAME/[3]
kafka reader: 2024/03/20 08:26:23 joinGroup succeeded for response, MY_GROUP_NAME.  generationID=35, memberID=MY_SERVICE_NAME@jcx-go-sms-bulk-processor-59d7566c58-l8dcm (github.com/segmentio/kafka-go)-593a5f49-89a8-4a0d-bbed-a1aa1ed6b101
kafka reader: 2024/03/20 08:26:23 Joined group MY_GROUP_NAME as member MY_SERVICE_NAME@jcx-go-sms-bulk-processor-59d7566c58-l8dcm (github.com/segmentio/kafka-go)-593a5f49-89a8-4a0d-bbed-a1aa1ed6b101 in generation 35
kafka reader: 2024/03/20 08:26:23 Syncing 6 assignments for generation 35 as member MY_SERVICE_NAME@jcx-go-sms-bulk-processor-59d7566c58-l8dcm (github.com/segmentio/kafka-go)-593a5f49-89a8-4a0d-bbed-a1aa1ed6b101
kafka reader: 2024/03/20 08:26:23 sync group finished for group, MY_GROUP_NAME
kafka reader: 2024/03/20 08:26:23 started heartbeat for group, MY_GROUP_NAME [3s]
kafka reader: 2024/03/20 08:26:23 subscribed to topics and partitions: map[{topic:MY_TOPIC_NAME partition:5}:78]
kafka reader: 2024/03/20 08:26:23 initializing kafka reader for partition 5 of MY_TOPIC_NAME starting at offset 78
kafka reader: 2024/03/20 08:26:23 started commit for group MY_GROUP_NAME
kafka reader: 2024/03/20 08:26:23 the kafka reader for partition 5 of MY_TOPIC_NAME is seeking to offset 78
kafka reader: 2024/03/20 08:26:32 no messages received from kafka within the allocated time for partition 5 of MY_TOPIC_NAME at offset 78: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
kafka reader: 2024/03/20 08:26:41 no messages received from kafka within the allocated time for partition 5 of MY_TOPIC_NAME at offset 78: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
kafka reader: 2024/03/20 08:26:50 no messages received from kafka within the allocated time for partition 5 of MY_TOPIC_NAME at offset 78: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
kafka reader: 2024/03/20 08:26:59 no messages received from kafka within the allocated time for partition 5 of MY_TOPIC_NAME at offset 78: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
kafka reader: 2024/03/20 08:27:08 no messages received from kafka within the allocated time for partition 5 of MY_TOPIC_NAME at offset 78: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
kafka reader: 2024/03/20 08:27:17 no messages received from kafka within the allocated time for partition 5 of MY_TOPIC_NAME at offset 78: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
kafka reader: 2024/03/20 08:27:23 committed offsets for group MY_GROUP_NAME: 
	topic: MY_TOPIC_NAME
		partition 5: 79
kafka reader: 2024/03/20 08:27:32 no messages received from kafka within the allocated time for partition 5 of MY_TOPIC_NAME at offset 79: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
kafka reader: 2024/03/20 08:27:41 no messages received from kafka within the allocated time for partition 5 of MY_TOPIC_NAME at offset 79: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
kafka reader: 2024/03/20 08:27:50 no messages received from kafka within the allocated time for partition 5 of MY_TOPIC_NAME at offset 79: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
kafka reader: 2024/03/20 08:27:59 no messages received from kafka within the allocated time for partition 5 of MY_TOPIC_NAME at offset 79: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
kafka reader: 2024/03/20 08:28:08 no messages received from kafka within the allocated time for partition 5 of MY_TOPIC_NAME at offset 79: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
kafka reader: 2024/03/20 08:28:17 no messages received from kafka within the allocated time for partition 5 of MY_TOPIC_NAME at offset 79: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
kafka reader: 2024/03/20 08:28:26 no messages received from kafka within the allocated time for partition 5 of MY_TOPIC_NAME at offset 79: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
kafka reader: 2024/03/20 08:28:35 no messages received from kafka within the allocated time for partition 5 of MY_TOPIC_NAME at offset 79: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
kafka reader: 2024/03/20 08:28:46 no messages received from kafka within the allocated time for partition 5 of MY_TOPIC_NAME at offset 80: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
kafka reader: 2024/03/20 08:28:55 no messages received from kafka within the allocated time for partition 5 of MY_TOPIC_NAME at offset 80: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
kafka reader: 2024/03/20 08:29:04 no messages received from kafka within the allocated time for partition 5 of MY_TOPIC_NAME at offset 80: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
kafka reader: 2024/03/20 08:29:13 no messages received from kafka within the allocated time for partition 5 of MY_TOPIC_NAME at offset 80: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
kafka reader: 2024/03/20 08:29:22 no messages received from kafka within the allocated time for partition 5 of MY_TOPIC_NAME at offset 80: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
kafka reader: 2024/03/20 08:29:31 no messages received from kafka within the allocated time for partition 5 of MY_TOPIC_NAME at offset 80: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
kafka reader: 2024/03/20 08:29:40 no messages received from kafka within the allocated time for partition 5 of MY_TOPIC_NAME at offset 80: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
kafka reader: 2024/03/20 08:29:49 no messages received from kafka within the allocated time for partition 5 of MY_TOPIC_NAME at offset 80: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
kafka reader: 2024/03/20 08:29:58 no messages received from kafka within the allocated time for partition 5 of MY_TOPIC_NAME at offset 80: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
kafka reader: 2024/03/20 08:30:07 no messages received from kafka within the allocated time for partition 5 of MY_TOPIC_NAME at offset 80: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
kafka reader: 2024/03/20 08:30:16 no messages received from kafka within the allocated time for partition 5 of MY_TOPIC_NAME at offset 80: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
kafka reader: 2024/03/20 08:30:25 no messages received from kafka within the allocated time for partition 5 of MY_TOPIC_NAME at offset 80: [7] Request Timed Out: the request exceeded the user-specified time limit in the request
kafka reader: 2024/03/20 08:30:

Now if you see the logs at the end, it library has commited 79 offset on partition no 5 but somehow it moved to 80. which is causing this lag at kafka with 1 message.

@ahmedyusef9
Copy link

Hi any suggested solution for now?

@ahmedyusef9
Copy link

ahmedyusef9 commented Mar 26, 2024

actually for now, fetch and commit manually

go func() {
		for {

			msg, err := ki.Reader.FetchMessage(context.Background())
			if err != nil {
				vlog.Errorf("Error reading message: %v", err)
				continue
			}
			// TO-DO

			// Commit the message after processing
            if err := ki.Reader.CommitMessages(context.Background(), msg); err != nil {
                vlog.Errorf("Error committing message: %v", err)
            }
		}
	}()

@shcw
Copy link

shcw commented May 10, 2024

I am also experiencing the same issue and I am unaware of its cause

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants