Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][io]add metrics to elastic search sink #20498

Closed
wants to merge 13 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,24 @@ public class ElasticSearchSink implements Sink<GenericObject> {
private ElasticSearchClient elasticsearchClient;
private final ObjectMapper objectMapper = new ObjectMapper();
private ObjectMapper sortedObjectMapper;
private SinkContext sinkContext;
private List<String> primaryFields = null;
private final Pattern nonPrintableCharactersPattern = Pattern.compile("[\\p{C}]");
private final Base64.Encoder base64Encoder = Base64.getEncoder().withoutPadding();

//
zzzming marked this conversation as resolved.
Show resolved Hide resolved
public static final String METRICS_TOTAL_INCOMING = "_elasticsearch_total_incoming_";
public static final String METRICS_TOTAL_SUCCESS = "_elasticsearch_total_success_";
public static final String METRICS_TOTAL_DELETE = "_elasticsearch_total_delete_";
public static final String METRICS_TOTAL_FAILURE = "_elasticsearch_total_failure_";
public static final String METRICS_TOTAL_SKIP = "_elasticsearch_total_skip_";

@Override
public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
elasticSearchConfig = ElasticSearchConfig.load(config, sinkContext);
elasticSearchConfig.validate();
elasticsearchClient = new ElasticSearchClient(elasticSearchConfig);
this.sinkContext = sinkContext;
if (!Strings.isNullOrEmpty(elasticSearchConfig.getPrimaryFields())) {
primaryFields = Arrays.asList(elasticSearchConfig.getPrimaryFields().split(","));
}
Expand Down Expand Up @@ -107,8 +116,15 @@ void setElasticsearchClient(ElasticSearchClient elasticsearchClient) {
this.elasticsearchClient = elasticsearchClient;
}

private void incrementCounter(String counter, double value) {
if (sinkContext != null && counter != null) {
sinkContext.recordMetric(counter, value);
}
}

@Override
public void write(Record<GenericObject> record) throws Exception {
incrementCounter(METRICS_TOTAL_INCOMING, 1);
if (!elasticsearchClient.isFailed()) {
Pair<String, String> idAndDoc = extractIdAndDocument(record);
try {
Expand All @@ -124,11 +140,17 @@ public void write(Record<GenericObject> record) throws Exception {
} else {
elasticsearchClient.deleteDocument(record, idAndDoc.getLeft());
}
incrementCounter(METRICS_TOTAL_SUCCESS, 1);
incrementCounter(METRICS_TOTAL_DELETE, 1);
} else {
incrementCounter(METRICS_TOTAL_SKIP, 1);
}
break;
case IGNORE:
incrementCounter(METRICS_TOTAL_SKIP, 1);
break;
case FAIL:
incrementCounter(METRICS_TOTAL_FAILURE, 1);
elasticsearchClient.failed(
new PulsarClientException.InvalidMessageException("Unexpected null message value"));
throw elasticsearchClient.irrecoverableError.get();
Expand All @@ -139,29 +161,35 @@ public void write(Record<GenericObject> record) throws Exception {
} else {
elasticsearchClient.indexDocument(record, idAndDoc);
}
incrementCounter(METRICS_TOTAL_SUCCESS, 1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the only concern about the metrics is that when bulk indexing is used, the result is probably asynchronous. @nicoloboschi is this the case?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fantastic point. Note also that indexDocument can fail without throwing an exception:

final boolean createdOrUpdated = client.indexDocument(indexName, documentId, documentSource);
if (createdOrUpdated) {
record.ack();
} else {
record.fail();
}
return createdOrUpdated;

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps we can integrate with this interface?

/**
* Acknowledge that this record is fully processed.
*/
default void ack() {
}
/**
* To indicate that this record has failed to be processed.
*/
default void fail() {
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It almost seems like we could get a generic metrics collector for acks on all sinks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need the generic solution in this PR, of course, but we do need to handle the async behavior of writing to ES.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@michaeljmarshall @lhotari
Added metrics to bulk request case

}
} catch (JsonProcessingException jsonProcessingException) {
switch (elasticSearchConfig.getMalformedDocAction()) {
case IGNORE:
incrementCounter(METRICS_TOTAL_SKIP, 1);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems like it would be valuable to differentiate this case from the other SKIP case since this one is a failure case. What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you might have misunderstood my question @zzzming. I think this case is different than the other skip, so we should have different metric names.

Copy link
Contributor Author

@zzzming zzzming Jun 5, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the second commit, does it make sense to change this to a new metric? I added an IGNORE counter. I agree it should not use the SKIP metrics.

               case IGNORE:
                        incrementCounter(METRICS_TOTAL_IGNORE, 1);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are still two calls to incrementCounter(METRICS_TOTAL_IGNORE, 1); in what seem like two different scenarios. I am suggesting they need to be differentiated.

break;
case WARN:
incrementCounter(METRICS_TOTAL_FAILURE, 1);
log.warn("Ignoring malformed document messageId={}",
record.getMessage().map(Message::getMessageId).orElse(null),
jsonProcessingException);
elasticsearchClient.failed(jsonProcessingException);
throw jsonProcessingException;
case FAIL:
incrementCounter(METRICS_TOTAL_FAILURE, 1);
log.error("Malformed document messageId={}",
record.getMessage().map(Message::getMessageId).orElse(null),
jsonProcessingException);
elasticsearchClient.failed(jsonProcessingException);
throw jsonProcessingException;
}
} catch (Exception e) {
incrementCounter(METRICS_TOTAL_FAILURE, 1);
log.error("write error for {} {}:", idAndDoc.getLeft(), idAndDoc.getRight(), e);
throw e;
}
} else {
incrementCounter(METRICS_TOTAL_FAILURE, 1);
throw new IllegalStateException("Elasticsearch client is in FAILED status");
}
}
Expand Down Expand Up @@ -277,7 +305,7 @@ public Pair<String, String> extractIdAndDocument(Record<GenericObject> record) t
}
doc = sanitizeValue(doc);
return Pair.of(id, doc);
} else {
} else {
michaeljmarshall marked this conversation as resolved.
Show resolved Hide resolved
final byte[] data = record
.getMessage()
.orElseThrow(() -> new IllegalArgumentException("Record does not carry message information"))
Expand Down