Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bulk processor retries indefinitely on failure #1278

Open
1 of 5 tasks
raiRaiyan opened this issue Feb 4, 2020 · 7 comments · May be fixed by #1661
Open
1 of 5 tasks

Bulk processor retries indefinitely on failure #1278

raiRaiyan opened this issue Feb 4, 2020 · 7 comments · May be fixed by #1661

Comments

@raiRaiyan
Copy link

raiRaiyan commented Feb 4, 2020

Please use the following questions as a guideline to help me answer
your issue/question without further inquiry. Thank you.

Which version of Elastic are you using?

  • elastic.v7 (for Elasticsearch 7.x)
  • elastic.v6 (for Elasticsearch 6.x)
  • elastic.v5 (for Elasticsearch 5.x)
  • elastic.v3 (for Elasticsearch 2.x)
  • elastic.v2 (for Elasticsearch 1.x)

Please describe the expected behavior

This issue is similar to #1247 but with a bigger scope.

When an individual request fails in a bulk processor, it should not retry when the backoff returns false.

Please describe the actual behavior

When the backoff returns false and there is an error in the individual request , the failing request is added back to the bulkprocessor before the commit function returns. This causes the indefinite retries.

The following code in bulk_processor.go is responsible for this behaviour.

// commitFunc will commit bulk requests and, on failure, be retried
// via exponential backoff
commitFunc := func() error {
    var err error
    // Save requests because they will be reset in service.Do
    reqs := w.service.requests
    res, err = w.service.Do(ctx)
    if err == nil {
	// Overall bulk request was OK.  But each bulk response item also has a status
	if w.p.retryItemStatusCodes != nil && len(w.p.retryItemStatusCodes) > 0 {
		// Check res.Items since some might be soft failures
	    if res.Items != nil && res.Errors {
		// res.Items will be 1 to 1 with reqs in same order
		for i, item := range res.Items {
		    for _, result := range item {
	                if _, found := w.p.retryItemStatusCodes[result.Status]; found {
			    w.service.Add(reqs[i]) //here the failing request is added back to the processor regardless of the backoff flag
			    if err == nil {
				err = ErrBulkItemRetry
	                     }
			}
	            }
		}
	    }
        }
    }
    return err
}

Any steps to reproduce the behavior?

One easy way to recreate this is to retry on client side errors, and send wrong data type to a field in the index.

@rwynn
Copy link
Contributor

rwynn commented Feb 14, 2020

hi @raiRaiyan I believe I contributed the code that you referenced.

The library will check the status on each response line item as shown above before adding the request corresponding to the line item back. The default list of status codes this applies to is

defaultRetryItemStatusCodes = []int{408, 429, 503, 507}

if _, found := w.p.retryItemStatusCodes[result.Status]; found {
// add back
}

These correspond to a set of status codes I looked up which could be temporary errors. E.g. 429 too many requests. I would be surprised if a wrong data type would cause one of those response status to be returned, but I would be interested to find out if that's wrong.

You can override this slice of status codes to add back with the RetryItemStatusCodes method on the bulk processor.

@raiRaiyan
Copy link
Author

@rwynn Thank you for the reply. The code works as expected when the requests succeed after the initial failure, but the issue is of infinite retries when the requests do not succeed. The wrong data type was just an example for how to recreate the issue.

My understanding is that the backoff function defines how the requests are retried and it can control when to stop retrying by returning false from the Next function. But this is not the case, as the commitFunc keeps adding the failed requests (provided it belongs to the list of status codes defined) to the worker, regardless of the output of the backoff function.

I want the processor to stop retrying when the max time is reached for exponential backoff and discard the failing request, but currently that doesn't seem to happen.

@rwynn
Copy link
Contributor

rwynn commented Feb 16, 2020

@raiRaiyan I think to get the behavior you are expecting, one way to do it would be to add a 4th argument to RetryNotify defined here which is a reset function.

The reset function would be called when all the retries have been exhausted without success and it would clear the queue of requests.

So, I'm thinking something like this...

func RetryNotify(operation Operation, b Backoff, notify Notify, reset ResetFunc) error {
	var err error
	var wait time.Duration
	var retry bool
	var n int

	for {
		if err = operation(); err == nil {
			return nil
		}

		n++
		wait, retry = b.Next(n)
		if !retry {
                    if reset != nil {
                       reset()
                    }
		    return err
		}

		if notify != nil {
			notify(err)
		}

		time.Sleep(wait)
	}
}

The reset function passed in would be a function that closes over BulkService and calls Reset on it to clear the line items.

@rwynn
Copy link
Contributor

rwynn commented Feb 16, 2020

To clarify further, without inspecting and checking the response item lines in the commit function you get a scenario where the overall bulk request succeeds, but the response line items contain soft (temporary) failures. If you don't inspect those and return an error on them, then the RetryNotify thinks the operation() call succeeded and does not initiate retry. Furthermore, without the "add back" logic you actually lose those line items since the Do function calls Reset if the overall response was OK.

@rwynn
Copy link
Contributor

rwynn commented Feb 16, 2020

A much more direct solution would be to simply add the following line after the call to RetryNotify.

err := RetryNotify(commitFunc, w.p.backoff, notifyFunc)
w.service.Reset()

Since all the retries happen inside RetryNotify.

@raiRaiyan
Copy link
Author

Yes. Resetting the service queue after the RetryNotify makes sense. I was thinking of passing a flag to the commitFunc to control whether or not the request is added back, but just resetting the queue is enough and the overhead is very minimal.

@olivere
Copy link
Owner

olivere commented Feb 17, 2020

@rwynn @raiRaiyan I'm very busy with my day job currently, so I'd be more than happy to get a PR to be merged for the next release. Thanks for your committment.

rafaeljusto added a commit to rafaeljusto/elastic that referenced this issue Feb 2, 2023
When all retries are exhausted the worker internal requests buffer needs to be
cleared in failure scenarios. That is required because the commitFunc (and
consequently the underlying BulkService.Do call) doesn't reset it when some error
happens. Without clearing the internal buffer the worker will continue sending
the same requests on the following rounds of execution.

Kudos for this solution goes to @rwynn and @raiRaiyan .

Resolves olivere#1278
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants