Skip to content

Commit

Permalink
Fix: Bulk processor retries indefinitely on failure
Browse files Browse the repository at this point in the history
When all retries are exhausted the worker internal requests buffer needs to be
cleared in failure scenarios. That is required because the commitFunc (and
consequently the underlying BulkService.Do call) doesn't reset it when some error
happens. Without clearing the internal buffer the worker will continue sending
the same requests on the following rounds of execution.

Kudos for this solution goes to @rwynn and @raiRaiyan .

Resolves olivere#1278
  • Loading branch information
rafaeljusto committed Feb 2, 2023
1 parent 29ee989 commit 91b8a1a
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 0 deletions.
8 changes: 8 additions & 0 deletions bulk_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -580,6 +580,14 @@ func (w *bulkWorker) commit(ctx context.Context) error {
err := RetryNotify(commitFunc, w.p.backoff, notifyFunc)
w.updateStats(res)
if err != nil {
// After all retry attempts clear the requests for the next round. This is
// important when backoff is disabled or limited to a number of rounds, and
// the aftermath is a failure., because the commitFunc (and consequently the
// underlying BulkService.Do call) will not clear the requests from the
// worker. Without this the same requests will be used on the next round of
// execution of the worker.
w.service.Reset()

w.p.c.errorf("elastic: bulk processor %q failed: %v", w.p.name, err)
}

Expand Down
65 changes: 65 additions & 0 deletions bulk_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"context"
"fmt"
"math/rand"
"net/http"
"net/http/httptest"
"reflect"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -335,6 +337,60 @@ func TestBulkProcessorFlush(t *testing.T) {
}
}

func TestBulkWorker_commit_clearFailedRequests(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}))
defer server.Close()

client, err := NewClient(
SetURL(server.URL),
SetSniff(false),
SetHealthcheck(false),
SetHttpClient(tooManyHTTPClient{}),
)
if err != nil {
t.Fatal(err)
}

var calls int

bulkProcessor, err := NewBulkProcessorService(client).
BulkActions(-1).
BulkSize(-1).
FlushInterval(0).
RetryItemStatusCodes().
Backoff(StopBackoff{}).
After(BulkAfterFunc(func(executionId int64, requests []BulkableRequest, response *BulkResponse, err error) {
calls++
if calls == 1 {
if len(requests) != 10 {
t.Errorf("expected 10 requests; got: %d", len(requests))
}
} else if len(requests) > 0 {
t.Errorf("expected 0 requests; got: %d", len(requests))
}
})).Do(context.Background())

if err != nil {
t.Fatal(err)
}

for i := 0; i < 10; i++ {
bulkProcessor.Add(NewBulkIndexRequest())
}

// first flush should process 10 items
if err := bulkProcessor.Flush(); err != nil {
t.Fatal(err)
}
// second flush should process none (even if the first flush failed)
if err := bulkProcessor.Flush(); err != nil {
t.Fatal(err)
}
if err := bulkProcessor.Close(); err != nil {
t.Fatal(err)
}
}

// -- Helper --

func testBulkProcessor(t *testing.T, numDocs int, svc *BulkProcessorService) {
Expand Down Expand Up @@ -427,3 +483,12 @@ func testBulkProcessor(t *testing.T, numDocs int, svc *BulkProcessorService) {
t.Fatalf("expected %d documents; got: %d", numDocs, count)
}
}

type tooManyHTTPClient struct {
}

func (t tooManyHTTPClient) Do(r *http.Request) (*http.Response, error) {
recorder := httptest.NewRecorder()
recorder.WriteHeader(http.StatusTooManyRequests)
return recorder.Result(), nil
}

0 comments on commit 91b8a1a

Please sign in to comment.