Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][io]add metrics to elastic search sink #20498

Closed
wants to merge 13 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.functions.api.Record;
import org.apache.pulsar.io.core.SinkContext;
import org.apache.pulsar.io.elasticsearch.client.BulkProcessor;
import org.apache.pulsar.io.elasticsearch.client.RestClient;
import org.apache.pulsar.io.elasticsearch.client.RestClientFactory;
Expand All @@ -48,6 +49,7 @@ public class ElasticSearchClient implements AutoCloseable {

private ElasticSearchConfig config;
private RestClient client;
private SinkContext sinkContext;
private final RandomExponentialRetry backoffRetry;

final Set<String> indexCache = new HashSet<>();
Expand All @@ -56,8 +58,18 @@ public class ElasticSearchClient implements AutoCloseable {
final AtomicReference<Exception> irrecoverableError = new AtomicReference<>();
private final IndexNameFormatter indexNameFormatter;

public ElasticSearchClient(ElasticSearchConfig elasticSearchConfig) {
// sink metrics
public static final String METRICS_TOTAL_INCOMING = "_elasticsearch_total_incoming_";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. The metric name goes as a label into a Prometheus Summary collector. Why the _ prefix?
  2. I would remove _total_ everywhere. This is just a label in a Prometheus Summary, so this ends up exported as multiple quantiles and sum and count. So maybe let's just express what this counts.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I follow the same pattern from KinesisSink.java. I suppose the there is a prefix to be added to this metrics name. The leading underscore is a delimiter.
I can remove the _total

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can double-check me - run the connector and check the /metrics output. If I'm right, all I'm saying is previous code shouldn't be used as an example.
Removing the total is only 2nd point. Look at point 1 as well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. The leading underscore is removed. Thank you

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. The metric name goes as a label into a Prometheus Summary collector. Why the _ prefix?

Great point @asafm. I had assumed these were metrics names, not label names. Here is the generation code and it supports that it is in fact a label name:

this.userMetricsSummary = collectorRegistry.registerIfNotExist(
prefix + ComponentStatsManager.USER_METRIC_PREFIX,
Summary.build()
.name(prefix + ComponentStatsManager.USER_METRIC_PREFIX)
.help("User defined metric.")
.labelNames(userMetricsLabelNames)
.quantile(0.5, 0.01)
.quantile(0.9, 0.01)
.quantile(0.99, 0.01)
.quantile(0.999, 0.01)
.create());

We should remove the prefix and suffix of the _.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed both leading and ending underscores

public static final String METRICS_TOTAL_SUCCESS = "_elasticsearch_total_success_";
public static final String METRICS_TOTAL_DELETE = "_elasticsearch_total_delete_";
public static final String METRICS_TOTAL_FAILURE = "_elasticsearch_total_failure_";
public static final String METRICS_TOTAL_SKIP = "_elasticsearch_total_skip_";
public static final String METRICS_TOTAL_MALFORMED_IGNORE = "_elasticsearch_total_malformed_ignore_";
public static final String METRICS_TOTAL_NULLVALUE_IGNORE = "_elasticsearch_total_nullvalue_ignore_";

public ElasticSearchClient(ElasticSearchConfig elasticSearchConfig, SinkContext sinkContext) {
this.config = elasticSearchConfig;
this.sinkContext = sinkContext;
if (this.config.getIndexName() != null) {
this.indexNameFormatter = new IndexNameFormatter(this.config.getIndexName());
} else {
Expand All @@ -78,6 +90,7 @@ public void afterBulk(long executionId, List<BulkProcessor.BulkOperationRequest>
record.fail();
checkForIrrecoverableError(record, result);
} else {
incrementCounter(METRICS_TOTAL_SUCCESS, index);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Aren't you suppose to increment failures as well above?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Increment counter of failure and skip cases are in the checkIrrecoverableError() method because there are different cases to cover

record.ack();
}
}
Expand All @@ -88,6 +101,7 @@ public void afterBulk(long executionId, List<BulkProcessor.BulkOperationRequest>
log.warn("Bulk request id={} failed:", executionId, throwable);
for (BulkProcessor.BulkOperationRequest operation: bulkOperationList) {
final Record record = operation.getPulsarRecord();
incrementCounter(METRICS_TOTAL_FAILURE, 1);
record.fail();
}
}
Expand All @@ -96,6 +110,12 @@ public void afterBulk(long executionId, List<BulkProcessor.BulkOperationRequest>
this.client = retry(() -> RestClientFactory.createClient(config, bulkListener), -1, "client creation");
}

public void incrementCounter(String counter, double value) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why public?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it will be called by ElasticSearchSink.java too

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wrote below - IMO it's a design smell.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move the function to the metrics class as you suggested

if (sinkContext != null && counter != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's with the if? Why would both ever be null?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, to evaluate sinkContext against null exists in KinesisSink.java. I will remove counter evaluation part.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please explain what is the relation of sinkContext != null to KinesisSink in this method/line?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the if evaluation.

sinkContext.recordMetric(counter, value);
}
}

void failed(Exception e) {
if (irrecoverableError.compareAndSet(null, e)) {
log.error("Irrecoverable error:", e);
Expand All @@ -117,14 +137,17 @@ void checkForIrrecoverableError(Record<?> record, BulkProcessor.BulkOperationRes
isMalformed = true;
switch (config.getMalformedDocAction()) {
case IGNORE:
incrementCounter(METRICS_TOTAL_MALFORMED_IGNORE, 1);
break;
case WARN:
incrementCounter(METRICS_TOTAL_SKIP, 1);
log.warn("Ignoring malformed document index={} id={}",
result.getIndex(),
result.getDocumentId(),
error);
break;
case FAIL:
incrementCounter(METRICS_TOTAL_FAILURE, 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For me failed == have not been indexed, whether the reason was malformed or not. So we need to decide how to differentiate.
Maybe: es_total_failures, es_skipped_failures

Also, I wouldn't differentiate between IGNORE and WARN. Don't see a good reason for it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In our case, we need to track different ignored cases and skipped. I'm going to make a change to count them as all failures but peg ignores and skip too.

log.error("Failure due to the malformed document index={} id={}",
result.getIndex(),
result.getDocumentId(),
Expand All @@ -137,7 +160,10 @@ void checkForIrrecoverableError(Record<?> record, BulkProcessor.BulkOperationRes
if (!isMalformed) {
log.warn("Bulk request failed, message id=[{}] index={} error={}",
record.getMessage()
.map(m -> m.getMessageId().toString())
.map(m -> {
incrementCounter(METRICS_TOTAL_FAILURE, 1);
return m.getMessageId().toString();
})
.orElse(""),
result.getIndex(), result.getError());
}
Expand Down Expand Up @@ -184,8 +210,10 @@ public boolean indexDocument(Record<GenericObject> record, Pair<String, String>
final boolean createdOrUpdated = client.indexDocument(indexName, documentId, documentSource);
if (createdOrUpdated) {
record.ack();
incrementCounter(ElasticSearchClient.METRICS_TOTAL_SUCCESS, 1);
} else {
record.fail();
incrementCounter(ElasticSearchClient.METRICS_TOTAL_FAILURE, 1);
}
return createdOrUpdated;
} catch (final Exception ex) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public class ElasticSearchSink implements Sink<GenericObject> {
public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
elasticSearchConfig = ElasticSearchConfig.load(config, sinkContext);
elasticSearchConfig.validate();
elasticsearchClient = new ElasticSearchClient(elasticSearchConfig);
elasticsearchClient = new ElasticSearchClient(elasticSearchConfig, sinkContext);
if (!Strings.isNullOrEmpty(elasticSearchConfig.getPrimaryFields())) {
primaryFields = Arrays.asList(elasticSearchConfig.getPrimaryFields().split(","));
}
Expand Down Expand Up @@ -109,6 +109,7 @@ void setElasticsearchClient(ElasticSearchClient elasticsearchClient) {

@Override
public void write(Record<GenericObject> record) throws Exception {
this.elasticsearchClient.incrementCounter(ElasticSearchClient.METRICS_TOTAL_INCOMING, 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't make much sense - Let's have this metric (actually constant) in ElasticSearchSink and just call sinkContext.record from there.

if (!elasticsearchClient.isFailed()) {
Pair<String, String> idAndDoc = extractIdAndDocument(record);
try {
Expand All @@ -123,12 +124,18 @@ public void write(Record<GenericObject> record) throws Exception {
elasticsearchClient.bulkDelete(record, idAndDoc.getLeft());
} else {
elasticsearchClient.deleteDocument(record, idAndDoc.getLeft());
elasticsearchClient.incrementCounter(ElasticSearchClient.METRICS_TOTAL_SUCCESS, 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be cleaner. If you have two classes, use this, then let's extract it to ElasticSearchSinkMetrics, define the constants there, and record all those events.

elasticsearchClient.incrementCounter(ElasticSearchClient.METRICS_TOTAL_DELETE, 1);
}
} else {
elasticsearchClient.incrementCounter(ElasticSearchClient.METRICS_TOTAL_SKIP, 1);
}
break;
case IGNORE:
elasticsearchClient.incrementCounter(ElasticSearchClient.METRICS_TOTAL_NULLVALUE_IGNORE, 1);
break;
case FAIL:
elasticsearchClient.incrementCounter(ElasticSearchClient.METRICS_TOTAL_FAILURE, 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In elasticsearchClient.failed( you call below, you also introduced increments, no? Won't it double count?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

An exception is thrown in this code block. Any counter incremental below is for different cases. They are invoked in different code path.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're correct. That's super strange method :)

elasticsearchClient.failed(
new PulsarClientException.InvalidMessageException("Unexpected null message value"));
throw elasticsearchClient.irrecoverableError.get();
Expand All @@ -143,25 +150,30 @@ public void write(Record<GenericObject> record) throws Exception {
} catch (JsonProcessingException jsonProcessingException) {
switch (elasticSearchConfig.getMalformedDocAction()) {
case IGNORE:
elasticsearchClient.incrementCounter(ElasticSearchClient.METRICS_TOTAL_MALFORMED_IGNORE, 1);
break;
case WARN:
elasticsearchClient.incrementCounter(ElasticSearchClient.METRICS_TOTAL_FAILURE, 1);
log.warn("Ignoring malformed document messageId={}",
record.getMessage().map(Message::getMessageId).orElse(null),
jsonProcessingException);
elasticsearchClient.failed(jsonProcessingException);
throw jsonProcessingException;
case FAIL:
elasticsearchClient.incrementCounter(ElasticSearchClient.METRICS_TOTAL_FAILURE, 1);
log.error("Malformed document messageId={}",
record.getMessage().map(Message::getMessageId).orElse(null),
jsonProcessingException);
elasticsearchClient.failed(jsonProcessingException);
throw jsonProcessingException;
}
} catch (Exception e) {
elasticsearchClient.incrementCounter(ElasticSearchClient.METRICS_TOTAL_FAILURE, 1);
log.error("write error for {} {}:", idAndDoc.getLeft(), idAndDoc.getRight(), e);
throw e;
}
} else {
elasticsearchClient.incrementCounter(ElasticSearchClient.METRICS_TOTAL_FAILURE, 1);
throw new IllegalStateException("Elasticsearch client is in FAILED status");
}
}
Expand Down Expand Up @@ -277,7 +289,7 @@ public Pair<String, String> extractIdAndDocument(Record<GenericObject> record) t
}
doc = sanitizeValue(doc);
return Pair.of(id, doc);
} else {
} else {
michaeljmarshall marked this conversation as resolved.
Show resolved Hide resolved
final byte[] data = record
.getMessage()
.orElseThrow(() -> new IllegalArgumentException("Record does not carry message information"))
Expand Down