Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KafkaJSProtocolError: The broker received an out of order sequence number #598

Closed
Delapouite opened this issue Dec 13, 2019 · 38 comments
Closed

Comments

@Delapouite
Copy link

Hi

I have a single producer:

const producer = kafka.producer({
  allowAutoTopicCreation: false,
  idempotent: true,
  maxInFlightRequests: 1
});

As you can see, I'm using the idempotent feature introduced in #203 I'm also forcing the maxInFlightRequests to 1

It works fine when my producer sends messages to the kafka server at a slow pace. By that I mean no requests are stored in the RequestQueue.

But during spike in activity, sometimes messages are automatically sent in batch.
Kafka then returns the following error:

ERROR [ReplicaManager broker=0] Error processing append operation on partition TOPIC (kafka.server.ReplicaManager)
kafka_1                 | org.apache.kafka.common.errors.OutOfOrderSequenceException: Out of order sequence number for producerId 74: 158 (incoming seq. number), 156 (current end sequence number)

In this bug case, we have 156158 jump, because 2 messages where sent in the batch.

According the kafka java code, it expects only a 1 unit jump so 156157:

https://github.com/apache/kafka/blob/fecb977b257888e2022c1b1e04dd7bf03e18720c/core/src/main/scala/kafka/log/ProducerStateManager.scala#L241-L251

My current understanding/intuition, is that the firstSequence number should always be incremented by 1 and not by the number of messages present in the batch request.

Which roughly means that the updateSequence method of the EOSManager should not take an increment param but always increment by 1 only.

/**
* Update the sequence for a given Topic-Partition.
*
* Do nothing if not yet initialized (not idempotent)
* @param {string} topic
* @param {string} partition
* @param {number} increment
*/
updateSequence(topic, partition, increment) {
if (!eosManager.isInitialized()) {
return
}
const previous = eosManager.getSequence(topic, partition)
let sequence = previous + increment
// Sequence is defined as Int32 in the Record Batch,
// so theoretically should need to rotate here
if (sequence >= INT_32_MAX_VALUE) {
logger.debug(
`Sequence for ${topic} ${partition} exceeds max value (${sequence}). Rotating to 0.`
)
sequence = 0
}
producerSequence[topic][partition] = sequence
},

I'm still quite noob on this codebase so my reasoning is probably flawed.

@ianwsperber , as you're the main implementer of the eos code, what's your take on this? Thanks a lot.

@Delapouite
Copy link
Author

Here's a minimal reproducible case:

const { Kafka, logLevel } = require('kafkajs')

const delay = Number(process.argv[2]) || 100
console.log({ delay })

const kafka = new Kafka({
	clientId: 'test-app',
	brokers: ['localhost:9092'],
})

const createProducer = async () => {
	const producer = kafka.producer({
		allowAutoTopicCreation: false,
		idempotent: true,
		maxInFlightRequests: 1,
	})
	await producer.connect()
	producer.logger().setLogLevel(logLevel.INFO)
	return producer
}

const sendMessages = async (producer) => {
	setInterval(async () => {
		const messageId = Math.random()
		console.log(`sending message ${messageId}`)

		try {
			await producer.send({
				topic: 'test-topic',
				messages: [
					{ value: `Hello KafkaJS user! ${messageId}` },
				],
			})
			console.log(`confirmed message ${messageId}`)
		} catch (err) {
			console.error(`crash for message ${messageId}`)
			console.error(err)
			process.exit(1)
		}
	}, delay)
}

createProducer()
	.then(sendMessages).catch((err) => {
		console.error(err)
		process.exit(1)
	})

If this code runs with a "large delay" between request (like the default 100ms), everything is fine:

node index.js

But if you run this code with a small delay, the out of OUT_OF_ORDER_SEQUENCE_NUMBER is raised.
Here I voluntarily set the delay to an extreme 1ms on my laptop, but on the real server the value is often higher:

node index.js 1

Here's the log output:

{ delay: 1 }
sending message 0.9237417995495543
sending message 0.32402946325370396
confirmed message 0.9237417995495543
sending message 0.9860404572033681
confirmed message 0.32402946325370396
sending message 0.06510684418463164
confirmed message 0.9860404572033681
sending message 0.599302604904195
confirmed message 0.06510684418463164
sending message 0.5404080025200682
confirmed message 0.599302604904195
sending message 0.22869052896543063
confirmed message 0.5404080025200682
sending message 0.29642055944336443
confirmed message 0.22869052896543063
confirmed message 0.29642055944336443
sending message 0.6128450568370936
{"level":"ERROR","timestamp":"2019-12-15T11:09:15.513Z","logger":"kafkajs","message":"[Connection] Response Produce(key: 0, version: 5)","broker":"localhost:9092","clientId":"test-app","error":"The broker received an out of order sequence number","correlationId":12,"size":58}
{"level":"ERROR","timestamp":"2019-12-15T11:09:15.514Z","logger":"kafkajs","message":"[Producer] The broker received an out of order sequence number","retryCount":0,"retryTime":355}
crash for message 0.6128450568370936
KafkaJSProtocolError: The broker received an out of order sequence number
    at createErrorFromCode (/home/delapouite/code/lab/kafka-crash/node_modules/kafkajs/src/protocol/error.js:536:10)
    at Object.parse (/home/delapouite/code/lab/kafka-crash/node_modules/kafkajs/src/protocol/requests/produce/v3/response.js:47:11)
    at Connection.send (/home/delapouite/code/lab/kafka-crash/node_modules/kafkajs/src/network/connection.js:306:35)
    at processTicksAndRejections (internal/process/task_queues.js:97:5)
    at async Broker.produce (/home/delapouite/code/lab/kafka-crash/node_modules/kafkajs/src/broker/index.js:252:12)
    at async /home/delapouite/code/lab/kafka-crash/node_modules/kafkajs/src/producer/sendMessages.js:93:28
    at async Promise.all (index 0)
    at async makeRequests (/home/delapouite/code/lab/kafka-crash/node_modules/kafkajs/src/producer/sendMessages.js:125:9)
    at async /home/delapouite/code/lab/kafka-crash/node_modules/kafkajs/src/producer/messageProducer.js:61:16
    at async Timeout._onTimeout (/home/delapouite/code/lab/kafka-crash/index.js:28:4) {
  name: 'KafkaJSProtocolError',
  retriable: false,
  helpUrl: undefined,
  type: 'OUT_OF_ORDER_SEQUENCE_NUMBER',
  code: 45
}

error in the kafka logs:

kafka_1      | [2019-12-15 11:09:15,513] ERROR [ReplicaManager broker=1001] Error processing append operation on partition test-topic-0 (kafka.server.ReplicaManager)
kafka_1      | org.apache.kafka.common.errors.OutOfOrderSequenceException: Out of order sequence number for producerId 68 at offset 8176 in partition test-topic-0: 8 (incoming seq. number), 6 (current end sequence number)

As soon as 2 consecutive "confirmed message id" log lines appear, the next send crash.

In the end, I don't think the problem is about "incrementing by one" in updateSequence(), as I said in the first comment of this thread. It's more than when a request has been parked in the RequestQueue, it should not call updateSequence() at all.

@Delapouite
Copy link
Author

I've registered to the Slack channel and found some other users who faced the same issue in the past. I'm adding links here so they can be contacted at a later time if progress is made:

by Vykimo https://app.slack.com/client/TF528A1BJ/CF6RFPF6K/thread/CF6RFPF6K-1556120666.002100
by Arpit https://app.slack.com/client/TF528A1BJ/CF6RFPF6K/thread/CF6RFPF6K-1556536043.003600
by Dave Bohl https://app.slack.com/client/TF528A1BJ/CF6RFPF6K/thread/CF6RFPF6K-1569971941.056400

@tulios
Copy link
Owner

tulios commented Dec 18, 2019

Hi @Delapouite, thanks for the detailed investigation. Most of the team is already on vacation, so bear with me 😄

We implemented transactions right after it was out, so we might have gotten some things wrong. I will try to get the docs around it to confirm some things, can you make the changes you proposed locally and try that? I could also help you land a PR

@iammathew
Copy link

iammathew commented Feb 7, 2020

Hey @Delapouite @tulios,
facing the same issues on a heavy traffic edge installation for Kafka. Is there a solution for this already in place? Or updates?

@Delapouite
Copy link
Author

No solutions found on my side yet. I had to disable idempotency for the moment.

@bpetty-formfast
Copy link

No solutions found on my side yet. I had to disable idempotency for the moment.

Thank you, just started using KafkaJS and started seeing this issue. Disabling idempotency got us through. I noticed that the producer is pretty much dead after receiving this error.

@cabelitos
Copy link

cabelitos commented Jun 18, 2020

Yeah. This is also affecting us. Which is kinda bad, since the only solution right know is to disable idempotency. :/.
Tried to understand what's happening, but no progress so far.

@ArnaudovSt
Copy link

Any progress on that issue?

@Nevon
Copy link
Collaborator

Nevon commented Jun 22, 2020

Any progress on that issue?

Not as far as I'm aware, but we are open to contributions if anyone is willing to invest some time into it.

@dearsaturn
Copy link
Contributor

dearsaturn commented Aug 26, 2020

I was able to circumvent this issue with some naive throttling:

import {Producer, Kafka, CompressionTypes, ProducerConfig} from 'kafkajs';
import {IAction, IQueuedRecord} from './types';
import {createActionMessage} from './create_action_message';
import {TopicAdministrator} from './topic_administrator';
import {isKafkaJSProtocolError} from './type_guard';
import Bluebird from 'bluebird';
import pino from 'pino';
import uuid from 'uuid';

export class ThrottledProducer {
    public recordsSent = 0;

    private producer: Producer;
    private topicAdministrator: TopicAdministrator;
    private isConnected: boolean = false;
    private intervalTimeout: NodeJS.Timeout;
    private createdTopics = new Set<string>();
    private recordQueue: IQueuedRecord[] = [];
    private isFlushing = false;
    private logger: pino.Logger;

    constructor(
        protected kafka: Kafka,
        protected producerConfig: Omit<
            ProducerConfig,
            'allowAutoTopicCreation' | 'maxInFlightRequests' | 'idempotent'
        > & {maxOutgoingBatchSize?: number; flushIntervalMs?: number} = {
            maxOutgoingBatchSize: 10000,
            flushIntervalMs: 1000
        },
        topicAdministrator?: TopicAdministrator,
        logger?: pino.Logger
    ) {
        this.topicAdministrator = topicAdministrator || new TopicAdministrator(kafka);
        this.logger = logger
            ? logger.child({class: 'KafkaSagasThrottledProducer'})
            : pino().child({class: 'KafkaSagasThrottledProducer'});
        this.createProducer();
    }

    // tslint:disable-next-line: cyclomatic-complexity
    public putAction = async <Action extends IAction>(action: Action) => {
        if (!this.isConnected) {
            throw new Error('You must .connect before producing actions');
        }

        if (!this.createdTopics.has(action.topic)) {
            this.logger.debug({topic: action.topic}, 'Creating topic');
            await this.topicAdministrator.createTopic(action.topic);
            this.createdTopics.add(action.topic);
        }

        return new Promise<void>((resolve, reject) => {
            this.recordQueue = [
                ...this.recordQueue,
                {
                    resolve,
                    reject,
                    record: {
                        topic: action.topic,
                        messages: [createActionMessage({action})]
                    }
                }
            ];

            return;
        });
    };

    public connect = async () => {
        if (this.isConnected) {
            return;
        }

        const flushIntervalMs = this.producerConfig.flushIntervalMs || 1000;

        this.logger.debug('Connecting producer');
        await this.producer.connect();
        this.logger.debug('Connected producer');

        this.logger.debug({flushIntervalMs}, 'Creating flush interval');
        this.intervalTimeout = setInterval(this.flush, flushIntervalMs);
        this.logger.debug('Created flush interval');

        this.isConnected = true;
    };

    public disconnect = async () => {
        if (!this.isConnected) {
            return;
        }

        this.logger.debug('Disconnecting');
        clearInterval(this.intervalTimeout);

        await this.producer.disconnect();
        this.logger.debug('Disconnected');
        this.isConnected = false;
    };

    private createProducer = () => {
        this.logger.debug('Creating a new producer');
        this.producer = this.kafka.producer({
            maxInFlightRequests: 1,
            idempotent: true,
            ...this.producerConfig
        });
        this.logger.debug('Created a new producer');
    };

    // tslint:disable-next-line: cyclomatic-complexity
    private flush = async (
        retryRecords?: IQueuedRecord[],
        retryCounter = 0,
        retryBatchId?: string
    ) => {
        if (!retryRecords && this.isFlushing) {
            return;
        }

        if (retryCounter) {
            /** Wait for a max of 30 seconds before retrying */
            const retryDelay = Math.min(retryCounter * 1000, 30000);
            this.logger.debug({retryDelay}, 'Waiting before attempting retry');
            await Bluebird.delay(retryDelay);
        }

        /**
         * Ensures that if the interval call ends up being concurrent due latency in sendBatch,
         * unintentinally overlapping cycles are deferred to the next interval.
         */
        this.isFlushing = true;

        const batchSize = this.producerConfig.maxOutgoingBatchSize || 1000;
        const outgoingRecords = retryRecords || this.recordQueue.slice(0, batchSize);
        this.recordQueue = this.recordQueue.slice(batchSize);
        const batchId = retryBatchId || uuid.v4();

        if (!outgoingRecords.length) {
            this.logger.debug({batchId}, 'No records to flush');
            this.isFlushing = false;
            return;
        }

        this.logger.debug(
            {
                remaining: this.recordQueue.length,
                records: outgoingRecords.length,
                batchId
            },
            'Flushing queue'
        );

        try {
            await this.producer.sendBatch({
                topicMessages: outgoingRecords.map(({record}) => record),
                acks: -1,
                compression: CompressionTypes.GZIP
            });

            this.recordsSent += outgoingRecords.length;
            this.logger.debug({batchId}, 'Flushed queue');
            outgoingRecords.map(({resolve}) => resolve());
            this.isFlushing = false;
            return;
        } catch (error) {
            /**
             * If for some reason this producer is no longer recognized by the broker,
             * create a new producer.
             */
            if (isKafkaJSProtocolError(error) && error.type === 'UNKNOWN_PRODUCER_ID') {
                await this.producer.disconnect();
                this.createProducer();
                await this.producer.connect();
                this.logger.debug(
                    {batchId},
                    'Retrying failed flush attempt due to UNKNOWN_PRODUCER_ID'
                );
                await this.flush(outgoingRecords, retryCounter + 1, batchId);
                return;
            }

            outgoingRecords.map(({reject}) => reject(error));
            this.isFlushing = false;
            return;
        }
    };
}

@jducoeur
Copy link

Also hitting this issue. (Using Kafka Streams, when trying to pre-seed a KTable.) I'm going to try the throttle approach to get past this immediate hurdle, but this is a serious problem -- if idempotent mode falls over under load, that calls its usefulness into question...

@matthewadams
Copy link

matthewadams commented Nov 20, 2020

👀 seeing with latest kafkajs

@Va1
Copy link

Va1 commented Jan 8, 2021

experiencing the same issue in idempotent mode

@joaosouzaminu
Copy link

I've tested the implementation made in #1050, and it seems to solve this issue. But the PR is still not revised/merged since March, even though it's a pretty simple change. Could we give some attention to it @tulios @Nevon?

@t-d-d
Copy link
Contributor

t-d-d commented Apr 30, 2021

This is with me.

I think the issue is that the proposed fix is a step forward but still has issues with failures.

I will progress.

@denis-mludek
Copy link

Hi! Got the same issue, disabling idempotent for now, if #1050 solves this it would be really great! 👏 @t-d-d

@amalcaraz
Copy link

Any estimated date for this to be released ?

@aikoven
Copy link

aikoven commented Jul 30, 2021

I can confirm that #1050 fixes this issue in normal situation, but if the broker restarts, these errors start happening again until the producer is restarted.

@t-d-d
Copy link
Contributor

t-d-d commented Jul 31, 2021

@aikoven was this with the latest changes pushed to #1050 ?

@aikoven
Copy link

aikoven commented Jul 31, 2021

Oh, my fork is 11 days old, and I see newer commits in your branch. Will check once more, thanks.

@aikoven
Copy link

aikoven commented Aug 12, 2021

@t-d-d we have updated to commit e36f248 and we still start getting these errors after a while.

@aikoven
Copy link

aikoven commented Aug 12, 2021

In the logs of kafkajs, I see reconnects to broker prior to this error. The broker did not restart.

There are hundreds of log lines

Cluster has disconnected, reconnecting: Closed connection

followed by reconnect, authentication, and hundreds of

Request Metadata(key: 3, version: 6)

then hundreds of

Response Metadata(key: 3, version: 6)

Then there are hundreds of produce requests (seem to have been buffered during disconnect)

Request Produce(key: 0, version: 7)

Some of these produce requests were successful, but others resulted in

Response Produce(key: 0, version: 7)
  error: "The broker received an out of order sequence number"

@t-d-d
Copy link
Contributor

t-d-d commented Aug 12, 2021 via email

@aikoven
Copy link

aikoven commented Aug 13, 2021

About 1-2 produce calls per second per instance, with total 50 instances. There are 3 brokers. The topic has 20 partitions and 3 replicas.

@t-d-d
Copy link
Contributor

t-d-d commented Aug 14, 2021

@aikoven Could you try this code? It fixes the issues that I'm aware of. #1172

@aikoven
Copy link

aikoven commented Aug 14, 2021

Sure, thanks. Will post back when I have any results.

@anlauren
Copy link

Hello @aikoven Did you manage to get any results?

We are experimenting the same problem as everybody above, and the way the code runs we really need to have the delivery exactly once :/

@aikoven
Copy link

aikoven commented Aug 25, 2021

@anlauren I'm on a vacation right now, will return to this issue somewhere next week.

@aikoven
Copy link

aikoven commented Sep 6, 2021

@t-d-d after upgrading to the latest code from #1172 and running for few days, I don't see the "out of order" errors any more.

However, there is another issue: after some time we start getting a very high rate of created connections. I collected the logs spanning one second, and it says Connecting 157 times. It seems like some sort of race condition while reconnecting that leads to many parallel connections.

@aikoven
Copy link

aikoven commented Sep 8, 2021

However, there is another issue: after some time we start getting a very high rate of created connections. I collected the logs spanning one second, and it says Connecting 157 times. It seems like some sort of race condition while reconnecting that leads to many parallel connections.

It seems that this bug was present in older versions of kafkajs.

@justineyster
Copy link

justineyster commented Nov 3, 2021

@t-d-d @tulios @aikoven What's the status on this? I see that #1050 and #1172 both address this issue. The former has been approved and is awaiting merge. Would really love to see this fixed! I'm sure I'm not the only one. 🙂

@aikoven
Copy link

aikoven commented Nov 4, 2021

I'm using my own fork that includes the fixes from both of these PRs. Haven't seen any issues for quite a while.

@justineyster
Copy link

Gotcha. Note that I solved my own issue by using transactions as described here: https://kafka.js.org/docs/transactions

@GabrielBarrosAS
Copy link

Olá a todos, estava enfrentando esse mesmo problema com a seguinte configuração de produtor:

const producer = kafka.producer({
    allowAutoTopicCreation: true,
    idempotent: true,
})

Eu basicamente tinha que verificar entre meus objetos, quais ficaram 1 minuto sem entrar em contato e enviar as informações de cada um deles pelo kafka da seguinte forma:

try {

   await producer.connect()

   await producer.send({
       topic: 'inatividade',
       acks: -1,
       messages: [
           { value: JSON.stringify(element) }
       ],
   })
} catch (error) {
  console.log(error)
}

No entanto, somente o primeiro objeto era enviado, pois eu realizava essa tarefa em um forEach. Só parei de ter o erro em questão e os objetos foram enviados corretamente após remover o idempotent: true, da configuração do meu produtor

@eliw00d
Copy link
Contributor

eliw00d commented Dec 14, 2021

It sounds like forks using the open PRs do not have issues. Is there any chance we can get those merged and a new version released? Or is there something holding them back?

aikoven referenced this issue in aikoven/kafkajs Jan 16, 2022
@Neustradamus
Copy link

Dear all,

I wish you a Happy New Year 2022!

@tulios: It is time to solve this issue in upstream!
Since 18 Dec 2019, holidays are finsihed no?

@aikoven has done a fork with 2 current PRs from @t-d-d: #1050 + #1172

Thanks in advance.

@Nevon
Copy link
Collaborator

Nevon commented Feb 9, 2022

Just as planned, now that the holidays are over, #1050 and #1172 have been merged (@tulios cleverly did not specify a year). This functionality is now available in the 1.17.0-beta.1 version. It would be extremely helpful if you could try out this version and let me know if it works as expected.

@Nevon Nevon closed this as completed Feb 9, 2022
@justineyster
Copy link

@Nevon It worked! Thank you very much! This is a huge help.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests