Skip to content

Commit

Permalink
Add back pressure to BulkProcessor
Browse files Browse the repository at this point in the history
This is a backport of #698 from v6
(9297f94).
  • Loading branch information
olivere committed Feb 13, 2018
1 parent 8e15c58 commit 3d6edfd
Show file tree
Hide file tree
Showing 5 changed files with 216 additions and 12 deletions.
1 change: 1 addition & 0 deletions CONTRIBUTORS
Expand Up @@ -101,6 +101,7 @@ Pete C [@peteclark-ft](https://github.com/peteclark-ft)
Radoslaw Wesolowski [r--w](https://github.com/r--w)
Roman Colohanin [@zuzmic](https://github.com/zuzmic)
Ryan Schmukler [@rschmukler](https://github.com/rschmukler)
Ryan Wynn [@rwynn](https://github.com/rwynn)
Sacheendra talluri [@sacheendra](https://github.com/sacheendra)
Sean DuBois [@Sean-Der](https://github.com/Sean-Der)
Shalin LK [@shalinlk](https://github.com/shalinlk)
Expand Down
62 changes: 57 additions & 5 deletions bulk_processor.go
Expand Up @@ -6,6 +6,7 @@ package elastic

import (
"context"
"net"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -121,7 +122,7 @@ func (s *BulkProcessorService) Stats(wantStats bool) *BulkProcessorService {
return s
}

// Set the backoff strategy to use for errors
// Backoff sets the backoff strategy to use for errors
func (s *BulkProcessorService) Backoff(backoff Backoff) *BulkProcessorService {
s.backoff = backoff
return s
Expand Down Expand Up @@ -248,6 +249,8 @@ type BulkProcessor struct {

statsMu sync.Mutex // guards the following block
stats *BulkProcessorStats

stopReconnC chan struct{} // channel to signal stop reconnection attempts
}

func newBulkProcessor(
Expand Down Expand Up @@ -293,6 +296,7 @@ func (p *BulkProcessor) Start(ctx context.Context) error {
p.requestsC = make(chan BulkableRequest)
p.executionId = 0
p.stats = newBulkProcessorStats(p.numWorkers)
p.stopReconnC = make(chan struct{})

// Create and start up workers.
p.workers = make([]*bulkWorker, p.numWorkers)
Expand Down Expand Up @@ -331,6 +335,12 @@ func (p *BulkProcessor) Close() error {
return nil
}

// Tell connection checkers to stop
if p.stopReconnC != nil {
close(p.stopReconnC)
p.stopReconnC = nil
}

// Stop flusher (if enabled)
if p.flusherStopC != nil {
p.flusherStopC <- struct{}{}
Expand Down Expand Up @@ -436,29 +446,42 @@ func (w *bulkWorker) work(ctx context.Context) {

var stop bool
for !stop {
var err error
select {
case req, open := <-w.p.requestsC:
if open {
// Received a new request
w.service.Add(req)
if w.commitRequired() {
w.commit(ctx) // TODO swallow errors here?
err = w.commit(ctx)
}
} else {
// Channel closed: Stop.
stop = true
if w.service.NumberOfActions() > 0 {
w.commit(ctx) // TODO swallow errors here?
err = w.commit(ctx)
}
}

case <-w.flushC:
// Commit outstanding requests
if w.service.NumberOfActions() > 0 {
w.commit(ctx) // TODO swallow errors here?
err = w.commit(ctx)
}
w.flushAckC <- struct{}{}
}
if !stop && err != nil {
waitForActive := func() {
// Add back pressure to prevent Add calls from filling up the request queue
ready := make(chan struct{})
go w.waitForActiveConnection(ready)
<-ready
}
if _, ok := err.(net.Error); ok {
waitForActive()
} else if IsConnErr(err) {
waitForActive()
}
}
}
}

Expand Down Expand Up @@ -511,6 +534,35 @@ func (w *bulkWorker) commit(ctx context.Context) error {
return err
}

func (w *bulkWorker) waitForActiveConnection(ready chan<- struct{}) {
defer close(ready)

t := time.NewTicker(5 * time.Second)
defer t.Stop()

client := w.p.c
stopReconnC := w.p.stopReconnC
w.p.c.errorf("elastic: bulk processor %q is waiting for an active connection", w.p.name)

// loop until a health check finds at least 1 active connection or the reconnection channel is closed
for {
select {
case _, ok := <-stopReconnC:
if !ok {
w.p.c.errorf("elastic: bulk processor %q active connection check interrupted", w.p.name)
return
}
case <-t.C:
client.healthcheck(time.Duration(3)*time.Second, true)
if client.mustActiveConn() == nil {
// found an active connection
// exit and signal done to the WaitGroup
return
}
}
}
}

func (w *bulkWorker) updateStats(res *BulkResponse) {
// Update stats
if res != nil {
Expand Down
8 changes: 1 addition & 7 deletions client.go
Expand Up @@ -26,7 +26,7 @@ import (

const (
// Version is the current version of Elastic.
Version = "5.0.62"
Version = "5.0.63"

// DefaultURL is the default endpoint of Elasticsearch on the local machine.
// It is used e.g. when initializing a new Client without a specific URL.
Expand Down Expand Up @@ -1842,9 +1842,3 @@ func (c *Client) WaitForGreenStatus(timeout string) error {
func (c *Client) WaitForYellowStatus(timeout string) error {
return c.WaitForStatus("yellow", timeout)
}

// IsConnError unwraps the given error value and checks if it is equal to
// elastic.ErrNoClient.
func IsConnErr(err error) bool {
return errors.Cause(err) == ErrNoClient
}
8 changes: 8 additions & 0 deletions errors.go
Expand Up @@ -9,6 +9,8 @@ import (
"fmt"
"io/ioutil"
"net/http"

"github.com/pkg/errors"
)

// checkResponse will return an error if the request/response indicates
Expand Down Expand Up @@ -89,6 +91,12 @@ func (e *Error) Error() string {
}
}

// IsConnErr returns true if the error indicates that Elastic could not
// find an Elasticsearch host to connect to.
func IsConnErr(err error) bool {
return err == ErrNoClient || errors.Cause(err) == ErrNoClient
}

// IsNotFound returns true if the given error indicates that Elasticsearch
// returned HTTP status 404. The err parameter can be of type *elastic.Error,
// elastic.Error, *http.Response or int (indicating the HTTP status code).
Expand Down
149 changes: 149 additions & 0 deletions recipes/bulk_processor/main.go
@@ -0,0 +1,149 @@
// Copyright 2012-present Oliver Eilhard. All rights reserved.
// Use of this source code is governed by a MIT-license.
// See http://olivere.mit-license.org/license.txt for details.

// BulkProcessor runs a bulk processing job that fills an index
// given certain criteria like flush interval etc.
//
// Example
//
// bulk_processor -url=http://127.0.0.1:9200/bulk-processor-test?sniff=false -n=100000 -flush-interval=1s
//
package main

import (
"context"
"flag"
"fmt"
"log"
"math/rand"
"os"
"os/signal"
"sync/atomic"
"syscall"
"time"

"github.com/google/uuid"

elastic "gopkg.in/olivere/elastic.v5"
"gopkg.in/olivere/elastic.v5/config"
)

func main() {
var (
url = flag.String("url", "http://localhost:9200/bulk-processor-test", "Elasticsearch URL")
numWorkers = flag.Int("num-workers", 4, "Number of workers")
n = flag.Int64("n", -1, "Number of documents to process (-1 for unlimited)")
flushInterval = flag.Duration("flush-interval", 1*time.Second, "Flush interval")
bulkActions = flag.Int("bulk-actions", 0, "Number of bulk actions before committing")
bulkSize = flag.Int("bulk-size", 0, "Size of bulk requests before committing")
)
flag.Parse()
log.SetFlags(0)

rand.Seed(time.Now().UnixNano())

// Parse configuration from URL
cfg, err := config.Parse(*url)
if err != nil {
log.Fatal(err)
}

// Create an Elasticsearch client from the parsed config
client, err := elastic.NewClientFromConfig(cfg)
if err != nil {
log.Fatal(err)
}

// Drop old index
exists, err := client.IndexExists(cfg.Index).Do(context.Background())
if err != nil {
log.Fatal(err)
}
if exists {
_, err = client.DeleteIndex(cfg.Index).Do(context.Background())
if err != nil {
log.Fatal(err)
}
}

// Create processor
bulkp := elastic.NewBulkProcessorService(client).
Name("bulk-test-processor").
Stats(true).
Backoff(elastic.StopBackoff{}).
FlushInterval(*flushInterval).
Workers(*numWorkers)
if *bulkActions > 0 {
bulkp = bulkp.BulkActions(*bulkActions)
}
if *bulkSize > 0 {
bulkp = bulkp.BulkSize(*bulkSize)
}
p, err := bulkp.Do(context.Background())
if err != nil {
log.Fatal(err)
}

var created int64
errc := make(chan error, 1)
go func() {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
<-c
errc <- nil
}()

go func() {
defer func() {
if err := p.Close(); err != nil {
errc <- err
}
}()

type Doc struct {
Timestamp time.Time `json:"@timestamp"`
}

for {
current := atomic.AddInt64(&created, 1)
if *n > 0 && current >= *n {
errc <- nil
return
}
r := elastic.NewBulkIndexRequest().
Index(cfg.Index).
Type("doc").
Id(uuid.New().String()).
Doc(Doc{Timestamp: time.Now()})
p.Add(r)

time.Sleep(time.Duration(rand.Intn(1000)) * time.Microsecond)
}
}()

go func() {
t := time.NewTicker(1 * time.Second)
defer t.Stop()
for range t.C {
stats := p.Stats()
written := atomic.LoadInt64(&created)
var queued int64
for _, w := range stats.Workers {
queued += w.Queued
}
fmt.Printf("Queued=%5d Written=%8d Succeeded=%8d Failed=%8d Comitted=%6d Flushed=%6d\n",
queued,
written,
stats.Succeeded,
stats.Failed,
stats.Committed,
stats.Flushed,
)
}
}()

if err := <-errc; err != nil {
log.Fatal(err)
}
}

0 comments on commit 3d6edfd

Please sign in to comment.