Skip to content
Mitar edited this page Jan 31, 2022 · 18 revisions

Bulk Processor

A Bulk Processor is a service that can be started to receive bulk requests and commit them to Elasticsearch in the background. It is similar to the BulkProcessor in the Java Client API but has some conceptual differences.

Setup

The Bulk Processor is created by a service, just like any other service in Elastic.

client, err := elastic.NewClient()
if err != nil { ... }

// Setup a bulk processor
service := client.BulkProcessor().Name("MyBackgroundWorker-1")

The service is responsible for setting up and starting the bulk processor. In the example above, we set just one property: The name of the bulk processor to be created. There are some other parameters you can use, e.g. the number of workers or thresholds that describe when a list of bulk requests will be committed (see below for a complete list). The service is basically a builder pattern.

To finally have a started bulk processor you can send bulk requests to, use the Do method of the service. With other services, the Do methods executes a request. With the Bulk Processor service, it starts a Bulk Processor. Here's a typical example:

client, err := elastic.NewClient()
if err != nil { ... }

// Setup a bulk processor
p, err := client.BulkProcessor().Name("MyBackgroundWorker-1").Workers(2).Do(context.Background())
if err != nil { ... }

Notice that the Do method actually spins up some goroutines. If you want to safely clean up, you need to call Close on the bulk processor. You can pass a context.Context into the Do func. Notice that in contrast to other services, this context is used by the workers in a long-running manner. It will be passed into e.g. the Bulk API calls.

The bulk processor implements the io.Closer interface, so you can eventually wrap it with other resources that need cleanup when your application stops. Here's an example of starting and stopping:

client, err := elastic.NewClient()
if err != nil { ... }

// Setup a bulk processor
p, err := client.BulkProcessor().Name("MyBackgroundWorker-1").Workers(2).Do(context.Background())
if err != nil { ... }

// ... Do some work here

// Stop the bulk processor and do some cleanup
err := p.Close()
if err != nil { ... }

Indexing

You can add bulk requests to the bulk processor by using its Add function. Add accepts a BulkableRequest, so it's either BulkIndexRequest (for indexing), BulkUpdateRequest (for updating), or BulkDeleteRequest (for deleting).

// Say we want to index a tweet
t := Tweet{User: "telexico", Message: "Elasticsearch is great."}

// Create a bulk index request
// NOTICE: It is important to set the index and type here!
r := elastic.NewBulkIndexRequest().Index("twitter").Type("tweet").Id("12345").Doc(t)

// Add the request r to the processor p
p.Add(r)

Notice how we set the index and type on the request level. We need to do this with every request sent to a bulk processor; otherwise it won't be able to tell Elasticsearch how to index the document.

Thresholds

When you add a new request via Add, the request is not automatically committed to Elasticsearch. The whole idea of a bulk processor is to gather requests and finally send them to Elasticsearch in a batch.

Now, when does bulk processor send these batches? There are 3 parameters that you can control:

  1. When the batch exceeds a certain number.
  2. When the batch exceeds a certain size (in bytes).
  3. When the batch exceeds a certain timeout.

To specify the threshold for "number of requests", use the BulkActions(int) function on the bulk processor service. To specify the threshold for the "size of the requests", use the BulkSize(int) function. To specify an automatic flush, use the FlushInterval(time.Duration) function.

You can combine all of the three options:

client, err := elastic.NewClient()
if err != nil { ... }

// Setup a bulk processor
p, err := client.BulkProcessor().
    Name("MyBackgroundWorker-1").
    Workers(2).
    BulkActions(1000).              // commit if # requests >= 1000
    BulkSize(2 << 20).              // commit if size of requests >= 2 MB
    FlushInterval(30*time.Second).  // commit every 30s
    Do(context.Background())
if err != nil { ... }

Manual flush

Automatically committing bulk requests based on a policy is all fine, especially for long-running background tasks. However, sometimes you need to write e.g. a migration process that needs to commit all requests before the program exits. While the Close call on the bulk processor ensures that, there's a second method of ensuring all data is sent to Elasticsearch: Flush.

Here's an example that manually asks all workers on the bulk processor to flush its data to Elasticsearch:

client, err := elastic.NewClient()
if err != nil { ... }

// Setup a bulk processor
p, err := client.BulkProcessor().
    Name("MyBackgroundWorker-1").
    Workers(2).
    Do(context.Background())
if err != nil { ... }

// ... Do some work here ...

// Ask workers to commit all requests
err = p.Flush()
if err != nil { ... }

Notice that Flush is synchronous: It waits until all workers acknowledged that requests have been written.

Before/After callbacks

To get more control about how bulk processor sends requests to Elasticsearch and processes its responses, you can use the Before and After callbacks.

The Before callback is a function that gets a sequential execution identifier and the list of bulk requests that will be sent to Elasticsearch. As its name implies, it gets called before sending the requests to Elasticsearch.

The After callback is a function that gets a sequential execution identifier, the list of bulk requests that we've tried to send, the response from Elasticsearch, and an error (which can be nil). The After callback is called after the requests have been sent to Elasticsearch.

What is the purpose of the Before and After callbacks? Well, first you can collect some statistics on throughput or use it for logging. But more importantly, you can use the callbacks to handle accordingly when things go wrong. For example, if your Elasticsearch cluster goes down, the bulk processor will try to commit all your requests. If they cannot be processed and you do nothing, requests bulk up and--eventually--the system might crash. So a good way to find out that Elasticsearch has a problem is that you set up an After callback and watch the error parameter. If it is not nil or response.Errors is true, something's gone wrong, and you should probably throttle or even stop passing more requests to the bulk processor.

Error handling

Starting a background process like a bulk processor is straightforward as long as things go smooth. It gets much harder when errors occur.

As we saw in the last chapter, error handling and throttling can be done with the After callback. This section describes what happens when errors occur and what options you have to build resilient systems.

First of all, bulk processor retries on failure using exponential backoff. This means that when a worker fails to commit a list of requests to Elasticsearch, it will automatically retry. First the worker will choose a small retry interval of say 500ms. The retry interval will increase exponentially with every failed attempt. This is why it's called exponential backoff.

If bulk processor fails many many times, it will eventually give up. This is when the After callback gets triggered. If the After callback passes a non-nil error parameter or when response.Errors is true, you should be warned that there's something going wrong. However, bulk processor has no way of knowing what's the correct way to handle the problem. It needs the caller to decide an appropriate solution. E.g. one application is indexing log files and could, without further ado, stop processing and simply restart a new bulk processor later. Another application has critical data and might decide to stop passing requests to bulk processor until the Elasticsearch cluster is up again. There is no promise from bulk processor other than "we try our best to put your data into Elasticsearch".

Stats

If you're the logging, tracing, and statistics person, we got you covered. If you ask bulk processor to collect statistics, you can later retrieve them.

// Create a client
client, err := elastic.NewClient()
if err != nil { ... }

// Setup a bulk processor
p, err := client.BulkProcessor().
    Name("MyBackgroundWorker-1").
    Stats(true).                   // enable collecting stats
    Do(context.Background())
if err != nil { ... }

// ... Do some work here ...

// Get a snapshot of stats (always blank if not enabled--see above)
stats := p.Stats()

fmt.Printf("Number of times flush has been invoked: %d\n", stats.Flushed)
fmt.Printf("Number of times workers committed reqs: %d\n", stats.Committed)
fmt.Printf("Number of requests indexed            : %d\n", stats.Indexed)
fmt.Printf("Number of requests reported as created: %d\n", stats.Created)
fmt.Printf("Number of requests reported as updated: %d\n", stats.Updated)
fmt.Printf("Number of requests reported as success: %d\n", stats.Succeeded)
fmt.Printf("Number of requests reported as failed : %d\n", stats.Failed)

for i, w := range stats.Workers {
  fmt.Printf("Worker %d: Number of requests queued: %d\n", i, w.Queued)
  fmt.Printf("           Last response time       : %v\n", w.LastDuration)
}

List of options

Here's the list of options that can be passed into BulkProcessorService to create a BulkProcessor:

  • Before: A callback that will be invoked before a commit to Elasticsearch.
  • After: A callback that will be invoked after a commit to Elasticsearch.
  • Name: An (optional) name you can give this bulk processor. This is helpful if you set up different bulk processors in your application and you want to e.g. print statistics.
  • Workers: The number of workers that are able to receive bulk requests and eventually commit them to Elasticsearch. The default is 1.
  • BulkActions: The number of requests that can be enqueued before the bulk processor decides to commit. The default is 1000.
  • BulkSize: The number of bytes that the bulk requests can take up before the bulk processor decides to commit. The default is 5MB.
  • FlushInterval: A time.Duration after which bulk requests are committed, regardless of other tresholds. Set to zero to disable. By default, this is disabled.
  • Stats: A flag that indicates whether the bulk processor should collect stats while running. You need to set this to true if you want to get a snapshot of stats later. By default, this is disabled.

Here's an example of all settings:

// Setup a bulk processor
p, err := client.BulkProcessor().
    Name("MyBackgroundWorker-1").
    Before(beforeCallback).         // func to call before commits
    After(afterCallback).           // func to call after commits
    Workers(4).                     // number of workers
    BulkActions(1000).              // commit if # requests >= 1000
    BulkSize(2 << 20).              // commit if size of requests >= 2 MB
    FlushInterval(30*time.Second).  // commit every 30s
    Stats(true).                    // collect stats
    Do(context.Background())
if err != nil { ... }

FAQ

Why does Bulk Processor lose requests when my application quits?

You are responsible for flushing/closing the bulk processor before your application quits. Otherwise you might lose a batch of documents.

Use e.g. the following pattern to ensure that the bulk processor flushes all documents before the application quits.

p, err := elastic.NewBulkProcessor()...Do(context.Background())
if err != nil {
  return err
}
defer p.Close()  // Close flushes documents

Why does bulk processor stats return failed documents?

The Failed property of the bulk processor indicates the number of bulk requests returned as "failed" from Elasticsearch. There is nothing special in bulk processor, and this may happen if you use the Bulk API outside of bulk processor. You must handle these cases in your client. Some applications might want to re-add to the bulk processor, some want to trigger an event: That's application-specific.

Short: The application is responsible for handling those cases.

Example usage

See this gist for a working example of a failure resilient process that uses BulkProcessor under the hood.