Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Processing of missed submission event #6

Open
BadalZ opened this issue Oct 26, 2018 · 4 comments
Open

Processing of missed submission event #6

BadalZ opened this issue Oct 26, 2018 · 4 comments

Comments

@BadalZ
Copy link
Contributor

BadalZ commented Oct 26, 2018

When scanner app is down or not available and submission arrives, that submission is never get processed for avscan.

We can fix this by storing last msg process timestamp. When scanner is up it can start processing msgs from the timestamp we left off.

Other approach suggestions are welcome.

@sharathkumaranbu
Copy link
Contributor

@BadalZ I have already informed about this issue to @callmekatootie We are using Kafka simple consumer in our code. The scenario which is quoted is the problem with kafka simple consumer. We need to use Kafka group consumer which will fix the issue we are facing.

Cc @cwdcwd

@cwdcwd
Copy link

cwdcwd commented Oct 26, 2018

how does the group consumer work?

@callmekatootie
Copy link
Member

callmekatootie commented Oct 26, 2018

@cwdcwd Pasting Sharath's response from an earlier discussion:

Simple Consumer will use client offset and Group Consumer will register the offset in Zookeeper
Let say the application is started and it's processing some messages. The application is brought down for 5 minutes let say. After 5 minutes when the application is restarted, the messages received during 5 minutes will be lost in Simple Consumer
Whereas Group consumer will read the last offset from zookeeper and start processing from that point

@sharathkumaranbu
Copy link
Contributor

I actually researched more about this.

Actually, In Simple consumer also we can commit the offset and fetch it back from Zookeeper but there are some tricks. At the time of start up we need to know the topic name and partition number to fetch the last committed offset. Since we have only partition for now, we can configure the partition number via config variable.

return co(function * () {
    yield ProcessorService.processScan(messageJSON)
  })
    // commit offset
    .then(() => consumer.commitOffset({ topic, partition, offset: m.offset }))
    .catch((err) => logger.error(err))
})

We are actually committing the offset in Zookeeper but the trick here is every consumer uses the default groupId for committing the offset, this will actually will conflict with other processors consuming on the same topic.

We need to use different groupId for each processor so that each application is processing the message independently and few tweaks required in the code to process the missing messages during maintenance with Simple Consumer assuming we will know the partition number at the time of start up.

Also, please note that simple consumer will work only with 1 partition. If there are multiple partitions, Simple consumer won't work. i.e. It will consume only 1 partition and all other partitions will remain unconsumed.

Please let me know if this make sense and the consent to make the fixes (I would recommend same set of fixes all processor in hand) and the Direction (Either with Simple Consumer only or Group Consumer)

cc @cwdcwd @callmekatootie @BadalZ

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants