You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
在消费数据时,我会异步对数据进行处理,在处理之后我会将其StoreMessage()标记。
现在遇到一个问题是:举例现在有 10 条消息[1,2,3,4,5,6,7,8,9,10],在处理第 5 条消息的时由于业务堵塞需要处理很久。在此过程中,由于是异步处理的情况,第 10 条消息已经处理了并且成功StoreMessage()。此时我将程序中断并且重新运行,由于第 10 条消息已经成功提交,offset 设置为 10,我的第 5 条数据就丢失了。这种情况应该怎么解决呢?
以下内容是由ChatGPT翻译:
The following is translated by Google:
When consuming data, I will process the data asynchronously, and after processing, I will mark it with StoreMessage().
Now I encounter a problem: for example, there are 10 messages [1,2,3,4,5,6,7,8,9,10]. When processing the fifth message, it takes a long time to process due to business congestion. During this process, due to asynchronous processing, the 10th message has been processed and successfully StoreMessage(). At this point I interrupted the program and re-ran it. Since the 10th message had been submitted successfully and the offset was set to 10, my 5th piece of data was lost.
How should this situation be resolved?
How to reproduce
Checklist
Please provide the following information:
confluent-kafka-go and librdkafka version (LibraryVersion()):
Apache Kafka broker version:
Client configuration: ConfigMap{...}
Operating system:
Provide client logs (with "debug": ".." as necessary)
Provide broker log excerpts
Critical issue
The text was updated successfully, but these errors were encountered:
Unfortunately, there is no in-built way to manage this situation. It has to be resolved on the application end, for example, by maintaining a map of offsets to booleans, and moving the stored offsets forward only if all the prior offsets are processed already. Consumption could block in case the map exceeds a specific size to prevent the map from growing too much.
In the future, there is a proposed KIP which will handle it on the broker-end, but that might take a lot of time if it's implemented. So I'd suggest not waiting for that.
Description
在消费数据时,我会异步对数据进行处理,在处理之后我会将其
StoreMessage()
标记。现在遇到一个问题是:举例现在有 10 条消息[1,2,3,4,5,6,7,8,9,10],在处理第 5 条消息的时由于业务堵塞需要处理很久。在此过程中,由于是异步处理的情况,第 10 条消息已经处理了并且成功
StoreMessage()
。此时我将程序中断并且重新运行,由于第 10 条消息已经成功提交,offset 设置为 10,我的第 5 条数据就丢失了。这种情况应该怎么解决呢?以下内容是由ChatGPT翻译:
The following is translated by Google:
When consuming data, I will process the data asynchronously, and after processing, I will mark it with
StoreMessage()
.Now I encounter a problem: for example, there are 10 messages [1,2,3,4,5,6,7,8,9,10]. When processing the fifth message, it takes a long time to process due to business congestion. During this process, due to asynchronous processing, the 10th message has been processed and successfully
StoreMessage()
. At this point I interrupted the program and re-ran it. Since the 10th message had been submitted successfully and the offset was set to 10, my 5th piece of data was lost.How should this situation be resolved?
How to reproduce
Checklist
Please provide the following information:
LibraryVersion()
):ConfigMap{...}
"debug": ".."
as necessary)The text was updated successfully, but these errors were encountered: