Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Completely disabling backoff #1651

Open
itiulkanov opened this issue Sep 21, 2022 · 0 comments
Open

Completely disabling backoff #1651

itiulkanov opened this issue Sep 21, 2022 · 0 comments

Comments

@itiulkanov
Copy link

Please use the following questions as a guideline to help me answer
your issue/question without further inquiry. Thank you.

Which version of Elastic are you using?

[ x] elastic.v7 (for Elasticsearch 7.x)

Please describe the expected behavior

Once backoff is disabled, we expect all the data sent to the disabled (unavailable) cluster will be discarded, so we only process the response from the cluster in the after method, and store all the failed values into DLQ. Instead, the data sent at the time of the ES failure is kept and submitted once the connection to the ES cluster is restored.

Please describe the actual behavior

We have a constant flow of values, that are kept in different ES clusters. We have added the code that suppose to handle the case when of the ES clusters failed, which is sending that values to the dead letter queue (DLQ) and read them back to it when it is restored back. The issue we are having now - double records. Seems that even without the backoff policy defined (or something like stopBackoff{} one), BulkProcessor, once the ES cluster is restored sends all the values processed from the source into ES, at the same time as we are starting to process DLQ... This means that all the records ingested by the app while the ES cluster is off simply piled up on top of each other, instead of being discarded after the "after" method has been called.
We've tried to overcome that issue by setting up StopBackoff like that:

processor, err = client.BulkProcessor().
			Workers(o.Workers).
			BulkActions(o.BatchSize).
			BulkSize(o.BatchBytes).
			FlushInterval(o.FlushInterval).
			RetryItemStatusCodes(o.RetryItemStatusCodes...).
                        Backoff(elastic.StopBackoff{}).
			Stats(o.WantStats).
			After(after). // call "after" after every commit
			Do(o.Ctx)

But that didn't help, since it seems to only generate errors one after another without doing much.

This behavior would've been fine if we were processing a low amount of messages, but when there are a lot of messages stored in the memory, eventually the app would be crashed and all the stored messages are lost.

Any steps to reproduce the behavior?

  1. Set up the BulkProcessor with stopBackoff defined as backoff.
  2. Disable ES cluster, while BulkProcessor app keeps running and ingest messages from the source (kafka)
  3. Send some values to the app ingestion channel
  4. Enable ES cluster
  5. All the values sent to the BulkProcessor while the ES cluster was disabled are can be seen in the ES cluster.

Suggested solutions

I am seeing that we are only clearing records at one place, where we call s.Reset(), that part of the code is seems to be never reached if the cluster is off. Is there a way to clear out the records at the time after method has finished? Or add some setting that will allow doing so?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant