Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][io]add metrics to elastic search sink #20498

Closed
wants to merge 13 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public class ElasticSearchClient implements AutoCloseable {
};

private ElasticSearchConfig config;
private ElasticSearchMetrics metrics;
private RestClient client;
private SinkContext sinkContext;
private final RandomExponentialRetry backoffRetry;
Expand All @@ -58,18 +59,9 @@ public class ElasticSearchClient implements AutoCloseable {
final AtomicReference<Exception> irrecoverableError = new AtomicReference<>();
private final IndexNameFormatter indexNameFormatter;

// sink metrics
public static final String METRICS_TOTAL_INCOMING = "_elasticsearch_total_incoming_";
public static final String METRICS_TOTAL_SUCCESS = "_elasticsearch_total_success_";
public static final String METRICS_TOTAL_DELETE = "_elasticsearch_total_delete_";
public static final String METRICS_TOTAL_FAILURE = "_elasticsearch_total_failure_";
public static final String METRICS_TOTAL_SKIP = "_elasticsearch_total_skip_";
public static final String METRICS_TOTAL_MALFORMED_IGNORE = "_elasticsearch_total_malformed_ignore_";
public static final String METRICS_TOTAL_NULLVALUE_IGNORE = "_elasticsearch_total_nullvalue_ignore_";

public ElasticSearchClient(ElasticSearchConfig elasticSearchConfig, SinkContext sinkContext) {
public ElasticSearchClient(ElasticSearchConfig elasticSearchConfig, ElasticSearchMetrics metrics) {
this.config = elasticSearchConfig;
this.sinkContext = sinkContext;
this.metrics = metrics;
if (this.config.getIndexName() != null) {
this.indexNameFormatter = new IndexNameFormatter(this.config.getIndexName());
} else {
Expand All @@ -90,8 +82,8 @@ public void afterBulk(long executionId, List<BulkProcessor.BulkOperationRequest>
record.fail();
checkForIrrecoverableError(record, result);
} else {
incrementCounter(METRICS_TOTAL_SUCCESS, index);
record.ack();
metrics.incrementCounter(ElasticSearchMetrics.SUCCESS, index);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you already did the work, wouldn't you prefer to go all the way?
metrics.incrementCounter(ElasticSearchMetrics.SUCCESS, index); ---> metrics.recordRecordIndexedSuccess()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually it should be incremented by 1. The counter pegging along with message acknowledgement is in an iteration.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My point is changing the structure of the methods, not if it's by 1 or not :)

}
}
}
Expand All @@ -101,21 +93,15 @@ public void afterBulk(long executionId, List<BulkProcessor.BulkOperationRequest>
log.warn("Bulk request id={} failed:", executionId, throwable);
for (BulkProcessor.BulkOperationRequest operation: bulkOperationList) {
final Record record = operation.getPulsarRecord();
incrementCounter(METRICS_TOTAL_FAILURE, 1);
record.fail();
metrics.incrementCounter(ElasticSearchMetrics.FAILURE, 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't get it.
You switched to 'event based' methodology in ElasticSearchMetrics for recordRecordIndexedSuccess(), why not everywhere? This should be IMO metrics.recordRecordFailure()

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How do you differentiate between retry failures to complete failures?
This method is called each time an attempt has failed.

}
}
};
this.backoffRetry = new RandomExponentialRetry(elasticSearchConfig.getMaxRetryTimeInSec());
this.client = retry(() -> RestClientFactory.createClient(config, bulkListener), -1, "client creation");
}

public void incrementCounter(String counter, double value) {
if (sinkContext != null && counter != null) {
sinkContext.recordMetric(counter, value);
}
}

void failed(Exception e) {
if (irrecoverableError.compareAndSet(null, e)) {
log.error("Irrecoverable error:", e);
Expand All @@ -135,19 +121,19 @@ void checkForIrrecoverableError(Record<?> record, BulkProcessor.BulkOperationRes
for (String error : MALFORMED_ERRORS) {
if (errorCause.contains(error)) {
isMalformed = true;
metrics.incrementCounter(ElasticSearchMetrics.FAILURE, 1);
switch (config.getMalformedDocAction()) {
case IGNORE:
incrementCounter(METRICS_TOTAL_MALFORMED_IGNORE, 1);
metrics.incrementCounter(ElasticSearchMetrics.MALFORMED_IGNORE, 1);
break;
case WARN:
incrementCounter(METRICS_TOTAL_SKIP, 1);
metrics.incrementCounter(ElasticSearchMetrics.WARN, 1);
log.warn("Ignoring malformed document index={} id={}",
result.getIndex(),
result.getDocumentId(),
error);
break;
case FAIL:
incrementCounter(METRICS_TOTAL_FAILURE, 1);
log.error("Failure due to the malformed document index={} id={}",
result.getIndex(),
result.getDocumentId(),
Expand All @@ -161,7 +147,7 @@ void checkForIrrecoverableError(Record<?> record, BulkProcessor.BulkOperationRes
log.warn("Bulk request failed, message id=[{}] index={} error={}",
record.getMessage()
.map(m -> {
incrementCounter(METRICS_TOTAL_FAILURE, 1);
metrics.incrementCounter(ElasticSearchMetrics.FAILURE, 1);
return m.getMessageId().toString();
})
.orElse(""),
Expand All @@ -186,6 +172,7 @@ public void bulkIndex(Record record, Pair<String, String> idAndDoc) throws Excep
client.getBulkProcessor().appendIndexRequest(bulkIndexRequest);
} catch (Exception e) {
log.debug("index failed id=" + idAndDoc.getLeft(), e);
metrics.incrementCounter(ElasticSearchMetrics.FAILURE, 1);
record.fail();
throw e;
}
Expand All @@ -210,15 +197,16 @@ public boolean indexDocument(Record<GenericObject> record, Pair<String, String>
final boolean createdOrUpdated = client.indexDocument(indexName, documentId, documentSource);
if (createdOrUpdated) {
record.ack();
incrementCounter(ElasticSearchClient.METRICS_TOTAL_SUCCESS, 1);
metrics.incrementCounter(ElasticSearchMetrics.SUCCESS, 1);
} else {
record.fail();
incrementCounter(ElasticSearchClient.METRICS_TOTAL_FAILURE, 1);
metrics.incrementCounter(ElasticSearchMetrics.FAILURE, 1);
}
return createdOrUpdated;
} catch (final Exception ex) {
log.error("index failed id=" + idAndDoc.getLeft(), ex);
record.fail();
metrics.incrementCounter(ElasticSearchMetrics.FAILURE, 1);
throw ex;
}
}
Expand All @@ -239,6 +227,7 @@ public void bulkDelete(Record<GenericObject> record, String id) throws Exception
} catch (Exception e) {
log.debug("delete failed id: {}", id, e);
record.fail();
metrics.incrementCounter(ElasticSearchMetrics.FAILURE, 1);
throw e;
}
}
Expand All @@ -258,13 +247,16 @@ public boolean deleteDocument(Record<GenericObject> record, String id) throws Ex
final boolean deleted = client.deleteDocument(indexName, id);
if (deleted) {
record.ack();
metrics.incrementCounter(ElasticSearchMetrics.SUCCESS, 1);
} else {
record.fail();
metrics.incrementCounter(ElasticSearchMetrics.FAILURE, 1);
}
return deleted;
} catch (final Exception ex) {
log.debug("index failed id: {}", id, ex);
record.fail();
metrics.incrementCounter(ElasticSearchMetrics.FAILURE, 1);
throw ex;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.io.elasticsearch;

import org.apache.pulsar.io.core.SinkContext;

/*
* Metrics class for ElasticSearchSink
*/
public class ElasticSearchMetrics {

private SinkContext sinkContext;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

place below static varibles

// sink metrics
public static final String INCOMING = "_elasticsearch_incoming_";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

static final goes before private variables

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed


// INCOMING = SUCCESS + FAILURE + SKIP + NULLVALUE_IGNORE
public static final String SUCCESS = "_elasticsearch_success_";

// DELETE is an attempt to delete a document by id
public static final String DELETE = "elasticsearch_delete_";
public static final String FAILURE = "elasticsearch_failure_";
public static final String SKIP = "elasticsearch_skip_";
public static final String WARN = "elasticsearch_warn_";
public static final String MALFORMED_IGNORE = "elasticsearch_malformed_ignore_";
public static final String NULLVALUE_IGNORE = "elasticsearch_nullvalue_ignore_";

public ElasticSearchMetrics(SinkContext sinkContext) {
this.sinkContext = sinkContext;
}

public void incrementCounter(String counter, double value) {
if (sinkContext != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Why can sinkContext be null at all?
  2. Not here for sure. Put in ctor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed the if evaluation

this.sinkContext.recordMetric(counter, value);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.core.annotations.Connector;
import org.apache.pulsar.io.core.annotations.IOType;
import org.opensearch.index.engine.Engine.Delete;

@Connector(
name = "elastic_search",
Expand All @@ -65,6 +66,7 @@ public class ElasticSearchSink implements Sink<GenericObject> {

private ElasticSearchConfig elasticSearchConfig;
private ElasticSearchClient elasticsearchClient;
private ElasticSearchMetrics metrics;
private final ObjectMapper objectMapper = new ObjectMapper();
private ObjectMapper sortedObjectMapper;
private List<String> primaryFields = null;
Expand All @@ -75,7 +77,8 @@ public class ElasticSearchSink implements Sink<GenericObject> {
public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
elasticSearchConfig = ElasticSearchConfig.load(config, sinkContext);
elasticSearchConfig.validate();
elasticsearchClient = new ElasticSearchClient(elasticSearchConfig, sinkContext);
metrics = new ElasticSearchMetrics(sinkContext);
elasticsearchClient = new ElasticSearchClient(elasticSearchConfig, metrics);
if (!Strings.isNullOrEmpty(elasticSearchConfig.getPrimaryFields())) {
primaryFields = Arrays.asList(elasticSearchConfig.getPrimaryFields().split(","));
}
Expand Down Expand Up @@ -109,7 +112,7 @@ void setElasticsearchClient(ElasticSearchClient elasticsearchClient) {

@Override
public void write(Record<GenericObject> record) throws Exception {
this.elasticsearchClient.incrementCounter(ElasticSearchClient.METRICS_TOTAL_INCOMING, 1);
metrics.incrementCounter(ElasticSearchMetrics.INCOMING, 1);
if (!elasticsearchClient.isFailed()) {
Pair<String, String> idAndDoc = extractIdAndDocument(record);
try {
Expand All @@ -120,22 +123,21 @@ public void write(Record<GenericObject> record) throws Exception {
switch (elasticSearchConfig.getNullValueAction()) {
case DELETE:
if (idAndDoc.getLeft() != null) {
metrics.incrementCounter(ElasticSearchMetrics.DELETE, 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bulkDelete or deleteDocument can fail and increment failure, but you increase it anyway- double counting.

Copy link
Contributor Author

@zzzming zzzming Jun 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@asafm I changed to DELETE_ATTEMPT to count the number of attempts.

I'm having a problem to separate DELETE or index in the bulk operation. This requires more code change. Can I do this in another PR? We do need a delete_attempt count that helps to track if or how many delete documents requests.

Would you help me to add the DELETE counter in another PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. Ping me in the PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't you prefer to first fix the code to have everything you need and only then add the metrics?

if (elasticSearchConfig.isBulkEnabled()) {
elasticsearchClient.bulkDelete(record, idAndDoc.getLeft());
} else {
elasticsearchClient.deleteDocument(record, idAndDoc.getLeft());
elasticsearchClient.incrementCounter(ElasticSearchClient.METRICS_TOTAL_SUCCESS, 1);
elasticsearchClient.incrementCounter(ElasticSearchClient.METRICS_TOTAL_DELETE, 1);
}
} else {
elasticsearchClient.incrementCounter(ElasticSearchClient.METRICS_TOTAL_SKIP, 1);
metrics.incrementCounter(ElasticSearchMetrics.SKIP, 1);
}
break;
case IGNORE:
elasticsearchClient.incrementCounter(ElasticSearchClient.METRICS_TOTAL_NULLVALUE_IGNORE, 1);
metrics.incrementCounter(ElasticSearchMetrics.NULLVALUE_IGNORE, 1);
break;
case FAIL:
elasticsearchClient.incrementCounter(ElasticSearchClient.METRICS_TOTAL_FAILURE, 1);
metrics.incrementCounter(ElasticSearchMetrics.FAILURE, 1);
elasticsearchClient.failed(
new PulsarClientException.InvalidMessageException("Unexpected null message value"));
throw elasticsearchClient.irrecoverableError.get();
Expand All @@ -148,32 +150,32 @@ public void write(Record<GenericObject> record) throws Exception {
}
}
} catch (JsonProcessingException jsonProcessingException) {
// this is from non-bulk action
// a generical failure counter should have been incremented
switch (elasticSearchConfig.getMalformedDocAction()) {
case IGNORE:
elasticsearchClient.incrementCounter(ElasticSearchClient.METRICS_TOTAL_MALFORMED_IGNORE, 1);
metrics.incrementCounter(ElasticSearchMetrics.MALFORMED_IGNORE, 1);
break;
case WARN:
elasticsearchClient.incrementCounter(ElasticSearchClient.METRICS_TOTAL_FAILURE, 1);
metrics.incrementCounter(ElasticSearchMetrics.WARN, 1);
log.warn("Ignoring malformed document messageId={}",
record.getMessage().map(Message::getMessageId).orElse(null),
jsonProcessingException);
elasticsearchClient.failed(jsonProcessingException);
throw jsonProcessingException;
case FAIL:
elasticsearchClient.incrementCounter(ElasticSearchClient.METRICS_TOTAL_FAILURE, 1);
log.error("Malformed document messageId={}",
record.getMessage().map(Message::getMessageId).orElse(null),
jsonProcessingException);
elasticsearchClient.failed(jsonProcessingException);
throw jsonProcessingException;
}
} catch (Exception e) {
elasticsearchClient.incrementCounter(ElasticSearchClient.METRICS_TOTAL_FAILURE, 1);
log.error("write error for {} {}:", idAndDoc.getLeft(), idAndDoc.getRight(), e);
throw e;
}
} else {
elasticsearchClient.incrementCounter(ElasticSearchClient.METRICS_TOTAL_FAILURE, 1);
metrics.incrementCounter(ElasticSearchMetrics.FAILURE, 1);
throw new IllegalStateException("Elasticsearch client is in FAILED status");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,16 +78,17 @@ public void testBasicAuth() throws Exception {
config.setIndexName(indexName);
config.setMaxRetries(1);
config.setBulkEnabled(true);
ElasticSearchMetrics metrics = new ElasticSearchMetrics(null);
// ensure auth is needed
try (ElasticSearchClient client = new ElasticSearchClient(config);) {
try (ElasticSearchClient client = new ElasticSearchClient(config, metrics);) {
expectThrows(ElasticSearchConnectionException.class, () -> {
client.createIndexIfNeeded(indexName);
});
}

config.setPassword(ELASTICPWD);

try (ElasticSearchClient client = new ElasticSearchClient(config);) {
try (ElasticSearchClient client = new ElasticSearchClient(config, metrics);) {
ensureCalls(client, indexName);
}
}
Expand All @@ -104,24 +105,25 @@ public void testTokenAuth() throws Exception {
config.setBulkEnabled(true);


ElasticSearchMetrics metrics = new ElasticSearchMetrics(null);
config.setPassword(ELASTICPWD);
String token;
try (ElasticSearchClient client = new ElasticSearchClient(config);) {
try (ElasticSearchClient client = new ElasticSearchClient(config, metrics);) {
token = createAuthToken(client, "elastic", ELASTICPWD);
}

config.setUsername(null);
config.setPassword(null);

// ensure auth is needed
try (ElasticSearchClient client = new ElasticSearchClient(config);) {
try (ElasticSearchClient client = new ElasticSearchClient(config, metrics);) {
expectThrows(ElasticSearchConnectionException.class, () -> {
client.createIndexIfNeeded(indexName);
});
}

config.setToken(token);
try (ElasticSearchClient client = new ElasticSearchClient(config);) {
try (ElasticSearchClient client = new ElasticSearchClient(config, metrics);) {
ensureCalls(client, indexName);
}
}
Expand All @@ -137,24 +139,25 @@ public void testApiKey() throws Exception {
config.setMaxRetries(1);
config.setBulkEnabled(true);

ElasticSearchMetrics metrics = new ElasticSearchMetrics(null);
config.setPassword(ELASTICPWD);
String apiKey;
try (ElasticSearchClient client = new ElasticSearchClient(config);) {
try (ElasticSearchClient client = new ElasticSearchClient(config, metrics);) {
apiKey = createApiKey(client);
}

config.setUsername(null);
config.setPassword(null);

// ensure auth is needed
try (ElasticSearchClient client = new ElasticSearchClient(config);) {
try (ElasticSearchClient client = new ElasticSearchClient(config, metrics);) {
expectThrows(ElasticSearchConnectionException.class, () -> {
client.createIndexIfNeeded(indexName);
});
}

config.setApiKey(apiKey);
try (ElasticSearchClient client = new ElasticSearchClient(config);) {
try (ElasticSearchClient client = new ElasticSearchClient(config, metrics);) {
ensureCalls(client, indexName);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,8 @@ public void testSslDisableCertificateValidation() throws IOException {
}

private void testClientWithConfig(ElasticSearchConfig config) throws IOException {
try (ElasticSearchClient client = new ElasticSearchClient(config);) {
ElasticSearchMetrics metrics = new ElasticSearchMetrics(null);
try (ElasticSearchClient client = new ElasticSearchClient(config, metrics);) {
testIndexExists(client);
}
}
Expand Down