Skip to content

Latest commit

 

History

History
39 lines (32 loc) · 2.84 KB

File metadata and controls

39 lines (32 loc) · 2.84 KB

Elasticsearch Sink

Sink that indexes documents into Elasticsearch.

This Elasticsearch sink only supports indexing JSON documents. It consumes data from an input destination and then indexes it to Elasticsearch. The input data can be a plain json string, or a java.util.Map that represents the JSON. It also accepts the data as the Elasticsearch provided XContentBuilder. However, this is a rare case as it is not likely the middleware keeps the records as XContentBuilder. This is provided mainly for direct invocation of the consumer.

Options

The Elasticsearch sink has the following options:

elasticsearch.consumer.async

Indicates whether the indexing operation is async or not. By default indexing is done synchronously. (Boolean, default: false)

elasticsearch.consumer.batch-size

Number of items to index for each request. It defaults to 1. For values greater than 1 bulk indexing API will be used. (Integer, default: 1)

elasticsearch.consumer.group-timeout

Timeout in milliseconds after which message group is flushed when bulk indexing is active. It defaults to -1, meaning no automatic flush of idle message groups occurs. (Long, default: -1)

elasticsearch.consumer.id

The id of the document to index. If set, the INDEX_ID header value overrides this property on a per message basis. (Expression, default: <none>)

elasticsearch.consumer.index

Name of the index. If set, the INDEX_NAME header value overrides this property on a per message basis. (String, default: <none>)

elasticsearch.consumer.routing

Indicates the shard to route to. If not provided, Elasticsearch will default to a hash of the document id. (String, default: <none>)

elasticsearch.consumer.timeout-seconds

Timeout for the shard to be available. If not set, it defaults to 1 minute set by the Elasticsearch client. (Long, default: 0)

Examples of running this sink

  1. From the folder elasticsearch-sink: ./mvnw clean package

  2. cd apps

  3. cd to the proper binder generated app (Kafka or RabbitMQ)

  4. ./mvnw clean package

  5. Make sure that you have Elasticsearch running. For example you can run it as a docker container using the following command. docker run -d --name es762 -p 9200:9200 -e "discovery.type=single-node" elasticsearch:7.6.2

  6. Start the middleware (Kafka or RabbitMQ) if it is not already running.

  7. java -jar target/elasticsearch-sink-<kafka|rabbit>-3.0.0-SNAPSHOT.jar --spring.cloud.stream.bindings.input.destination=els-in --elasticsearch.consumer.index=testing

  8. Send some JSON data into the middleware destination. For e.g: {"foo":"bar"}

  9. Verify that the data is indexed: curl localhost:9200/testing/_search