New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[improve][io]add metrics to elastic search sink #20498
Conversation
@zzzming Please add the following content to your PR description and select a checkbox:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your contribution @zzzming! I left some comments.
...ar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
Show resolved
Hide resolved
...ar-io/elastic-search/src/main/java/org/apache/pulsar/io/elasticsearch/ElasticSearchSink.java
Outdated
Show resolved
Hide resolved
} | ||
} catch (JsonProcessingException jsonProcessingException) { | ||
switch (elasticSearchConfig.getMalformedDocAction()) { | ||
case IGNORE: | ||
incrementCounter(METRICS_TOTAL_SKIP, 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like it would be valuable to differentiate this case from the other SKIP
case since this one is a failure case. What do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you might have misunderstood my question @zzzming. I think this case is different than the other skip, so we should have different metric names.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the second commit, does it make sense to change this to a new metric? I added an IGNORE counter. I agree it should not use the SKIP metrics.
case IGNORE:
incrementCounter(METRICS_TOTAL_IGNORE, 1);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are still two calls to incrementCounter(METRICS_TOTAL_IGNORE, 1);
in what seem like two different scenarios. I am suggesting they need to be differentiated.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
This will be helpful in debugging workloads
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Good work @zzzming
@@ -139,29 +163,35 @@ public void write(Record<GenericObject> record) throws Exception { | |||
} else { | |||
elasticsearchClient.indexDocument(record, idAndDoc); | |||
} | |||
incrementCounter(METRICS_TOTAL_SUCCESS, 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess the only concern about the metrics is that when bulk indexing is used, the result is probably asynchronous. @nicoloboschi is this the case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fantastic point. Note also that indexDocument
can fail without throwing an exception:
Lines 184 to 190 in 82237d3
final boolean createdOrUpdated = client.indexDocument(indexName, documentId, documentSource); | |
if (createdOrUpdated) { | |
record.ack(); | |
} else { | |
record.fail(); | |
} | |
return createdOrUpdated; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps we can integrate with this interface?
pulsar/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Record.java
Lines 106 to 116 in 82237d3
/** | |
* Acknowledge that this record is fully processed. | |
*/ | |
default void ack() { | |
} | |
/** | |
* To indicate that this record has failed to be processed. | |
*/ | |
default void fail() { | |
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It almost seems like we could get a generic metrics collector for acks on all sinks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need the generic solution in this PR, of course, but we do need to handle the async behavior of writing to ES.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@michaeljmarshall @lhotari
Added metrics to bulk request case
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the late changes requested, but in order for these metrics to provide correct insight, I think we need to properly handle bulk indexing and index failures before we can merge this.
I'll also note that it'd be interesting to expose the duration of index requests, but that would belong in another PR.
@@ -139,29 +163,35 @@ public void write(Record<GenericObject> record) throws Exception { | |||
} else { | |||
elasticsearchClient.indexDocument(record, idAndDoc); | |||
} | |||
incrementCounter(METRICS_TOTAL_SUCCESS, 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fantastic point. Note also that indexDocument
can fail without throwing an exception:
Lines 184 to 190 in 82237d3
final boolean createdOrUpdated = client.indexDocument(indexName, documentId, documentSource); | |
if (createdOrUpdated) { | |
record.ack(); | |
} else { | |
record.fail(); | |
} | |
return createdOrUpdated; |
Moved a metrics inside the indexDocument() function I will raise another PR to track the index request duration. |
@@ -78,6 +90,7 @@ public void afterBulk(long executionId, List<BulkProcessor.BulkOperationRequest> | |||
record.fail(); | |||
checkForIrrecoverableError(record, result); | |||
} else { | |||
incrementCounter(METRICS_TOTAL_SUCCESS, index); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Aren't you suppose to increment failures as well above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Increment counter of failure and skip cases are in the checkIrrecoverableError() method because there are different cases to cover
@@ -96,6 +110,12 @@ public void afterBulk(long executionId, List<BulkProcessor.BulkOperationRequest> | |||
this.client = retry(() -> RestClientFactory.createClient(config, bulkListener), -1, "client creation"); | |||
} | |||
|
|||
public void incrementCounter(String counter, double value) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why public?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because it will be called by ElasticSearchSink.java too
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wrote below - IMO it's a design smell.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move the function to the metrics class as you suggested
@@ -56,8 +58,18 @@ public class ElasticSearchClient implements AutoCloseable { | |||
final AtomicReference<Exception> irrecoverableError = new AtomicReference<>(); | |||
private final IndexNameFormatter indexNameFormatter; | |||
|
|||
public ElasticSearchClient(ElasticSearchConfig elasticSearchConfig) { | |||
// sink metrics | |||
public static final String METRICS_TOTAL_INCOMING = "_elasticsearch_total_incoming_"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- The metric name goes as a label into a Prometheus Summary collector. Why the
_
prefix? - I would remove
_total_
everywhere. This is just a label in a Prometheus Summary, so this ends up exported as multiple quantiles and sum and count. So maybe let's just express what this counts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I follow the same pattern from KinesisSink.java. I suppose the there is a prefix to be added to this metrics name. The leading underscore is a delimiter.
I can remove the _total
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can double-check me - run the connector and check the /metrics
output. If I'm right, all I'm saying is previous code shouldn't be used as an example.
Removing the total is only 2nd point. Look at point 1 as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. The leading underscore is removed. Thank you
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- The metric name goes as a label into a Prometheus Summary collector. Why the
_
prefix?
Great point @asafm. I had assumed these were metrics names, not label names. Here is the generation code and it supports that it is in fact a label name:
pulsar/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
Lines 212 to 222 in b1822ed
this.userMetricsSummary = collectorRegistry.registerIfNotExist( | |
prefix + ComponentStatsManager.USER_METRIC_PREFIX, | |
Summary.build() | |
.name(prefix + ComponentStatsManager.USER_METRIC_PREFIX) | |
.help("User defined metric.") | |
.labelNames(userMetricsLabelNames) | |
.quantile(0.5, 0.01) | |
.quantile(0.9, 0.01) | |
.quantile(0.99, 0.01) | |
.quantile(0.999, 0.01) | |
.create()); |
We should remove the prefix and suffix of the _
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed both leading and ending underscores
@@ -96,6 +110,12 @@ public void afterBulk(long executionId, List<BulkProcessor.BulkOperationRequest> | |||
this.client = retry(() -> RestClientFactory.createClient(config, bulkListener), -1, "client creation"); | |||
} | |||
|
|||
public void incrementCounter(String counter, double value) { | |||
if (sinkContext != null && counter != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's with the if? Why would both ever be null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Again, to evaluate sinkContext against null exists in KinesisSink.java. I will remove counter evaluation part.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please explain what is the relation of sinkContext != null
to KinesisSink
in this method/line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I removed the if evaluation.
log.warn("Ignoring malformed document index={} id={}", | ||
result.getIndex(), | ||
result.getDocumentId(), | ||
error); | ||
break; | ||
case FAIL: | ||
incrementCounter(METRICS_TOTAL_FAILURE, 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For me failed == have not been indexed, whether the reason was malformed or not. So we need to decide how to differentiate.
Maybe: es_total_failures, es_skipped_failures
Also, I wouldn't differentiate between IGNORE and WARN. Don't see a good reason for it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In our case, we need to track different ignored cases and skipped. I'm going to make a change to count them as all failures but peg ignores and skip too.
@@ -109,6 +109,7 @@ void setElasticsearchClient(ElasticSearchClient elasticsearchClient) { | |||
|
|||
@Override | |||
public void write(Record<GenericObject> record) throws Exception { | |||
this.elasticsearchClient.incrementCounter(ElasticSearchClient.METRICS_TOTAL_INCOMING, 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't make much sense - Let's have this metric (actually constant) in ElasticSearchSink
and just call sinkContext.record from there.
@@ -123,12 +124,18 @@ public void write(Record<GenericObject> record) throws Exception { | |||
elasticsearchClient.bulkDelete(record, idAndDoc.getLeft()); | |||
} else { | |||
elasticsearchClient.deleteDocument(record, idAndDoc.getLeft()); | |||
elasticsearchClient.incrementCounter(ElasticSearchClient.METRICS_TOTAL_SUCCESS, 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This needs to be cleaner. If you have two classes, use this, then let's extract it to ElasticSearchSinkMetrics
, define the constants there, and record all those events.
break; | ||
case FAIL: | ||
elasticsearchClient.incrementCounter(ElasticSearchClient.METRICS_TOTAL_FAILURE, 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In elasticsearchClient.failed(
you call below, you also introduced increments, no? Won't it double count?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
An exception is thrown in this code block. Any counter incremental below is for different cases. They are invoked in different code path.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You're correct. That's super strange method :)
@mattisonchao Thanks for pinging me. |
I'll review again once this is ready for review.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You have no tests to see that your metrics are correct
record.ack(); | ||
metrics.incrementCounter(ElasticSearchMetrics.SUCCESS, index); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you already did the work, wouldn't you prefer to go all the way?
metrics.incrementCounter(ElasticSearchMetrics.SUCCESS, index);
---> metrics.recordRecordIndexedSuccess()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually it should be incremented by 1. The counter pegging along with message acknowledgement is in an iteration.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My point is changing the structure of the methods, not if it's by 1 or not :)
|
||
private SinkContext sinkContext; | ||
// sink metrics | ||
public static final String INCOMING = "_elasticsearch_incoming_"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
static final goes before private variables
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
} | ||
|
||
public void incrementCounter(String counter, double value) { | ||
if (sinkContext != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Why can sinkContext be null at all?
- Not here for sure. Put in ctor.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
removed the if evaluation
@@ -120,22 +123,21 @@ public void write(Record<GenericObject> record) throws Exception { | |||
switch (elasticSearchConfig.getNullValueAction()) { | |||
case DELETE: | |||
if (idAndDoc.getLeft() != null) { | |||
metrics.incrementCounter(ElasticSearchMetrics.DELETE, 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
bulkDelete
or deleteDocument
can fail and increment failure, but you increase it anyway- double counting.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@asafm I changed to DELETE_ATTEMPT to count the number of attempts.
I'm having a problem to separate DELETE or index in the bulk operation. This requires more code change. Can I do this in another PR? We do need a delete_attempt count that helps to track if or how many delete documents requests.
Would you help me to add the DELETE counter in another PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. Ping me in the PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't you prefer to first fix the code to have everything you need and only then add the metrics?
Can you ping me to review once you take care of my comment "My point is changing the structure of the methods, not if it's by 1 or not :)"? |
@asafm added the function per your review comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't reviewed most of it yet, because I'm still trying to figure out how retry works.
Also if you have 0 tests for your code - You have no idea if what you wrote is working properly , counting properly. Not to mention future changes will be super afraid to touch it since they might break without the protection of an e2e test or something similar.
@@ -89,6 +94,7 @@ public void afterBulk(long executionId, List<BulkProcessor.BulkOperationRequest> | |||
for (BulkProcessor.BulkOperationRequest operation: bulkOperationList) { | |||
final Record record = operation.getPulsarRecord(); | |||
record.fail(); | |||
metrics.incrementCounter(ElasticSearchMetrics.FAILURE, 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't get it.
You switched to 'event based' methodology in ElasticSearchMetrics
for recordRecordIndexedSuccess()
, why not everywhere? This should be IMO metrics.recordRecordFailure()
@@ -89,6 +94,7 @@ public void afterBulk(long executionId, List<BulkProcessor.BulkOperationRequest> | |||
for (BulkProcessor.BulkOperationRequest operation: bulkOperationList) { | |||
final Record record = operation.getPulsarRecord(); | |||
record.fail(); | |||
metrics.incrementCounter(ElasticSearchMetrics.FAILURE, 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do you differentiate between retry failures to complete failures?
This method is called each time an attempt has failed.
*/ | ||
public class ElasticSearchMetrics { | ||
|
||
private SinkContext sinkContext; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
place below static varibles
|
||
private SinkContext sinkContext; | ||
// sink metrics | ||
public static final String INCOMING = "_elasticsearch_incoming"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why the leading underscore?
public static final String INCOMING = "_elasticsearch_incoming"; | ||
|
||
// INCOMING = SUCCESS + FAILURE + SKIP + NULLVALUE_IGNORE | ||
public static final String SUCCESS = "_elasticsearch_success"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why the leading underscore?
} | ||
|
||
public void incrementCounter(String counter, double value) { | ||
this.sinkContext.recordMetric(counter, value); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this
is redundant
The pr had no activity for 30 days, mark with Stale label. |
Closed as stale and conflict. |
Motivation
add metrics to elastic search sink
Modifications
add metrics to elastic search sink
Verifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
doc
doc-required
doc-not-needed
doc-complete
Matching PR in forked repository
PR in forked repository: