-
Notifications
You must be signed in to change notification settings - Fork 3.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[improve][io]add metrics to elastic search sink #20498
Changes from all commits
e886a89
e3309d1
3eeb7b5
cedaed0
90e90a1
2b29c53
e59871c
81d8033
3b95212
11743cd
381293c
1f4329b
616ad1a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,6 +33,7 @@ | |
import org.apache.commons.lang3.tuple.Pair; | ||
import org.apache.pulsar.client.api.schema.GenericObject; | ||
import org.apache.pulsar.functions.api.Record; | ||
import org.apache.pulsar.io.core.SinkContext; | ||
import org.apache.pulsar.io.elasticsearch.client.BulkProcessor; | ||
import org.apache.pulsar.io.elasticsearch.client.RestClient; | ||
import org.apache.pulsar.io.elasticsearch.client.RestClientFactory; | ||
|
@@ -47,7 +48,9 @@ public class ElasticSearchClient implements AutoCloseable { | |
}; | ||
|
||
private ElasticSearchConfig config; | ||
private ElasticSearchMetrics metrics; | ||
private RestClient client; | ||
private SinkContext sinkContext; | ||
private final RandomExponentialRetry backoffRetry; | ||
|
||
final Set<String> indexCache = new HashSet<>(); | ||
|
@@ -56,8 +59,9 @@ public class ElasticSearchClient implements AutoCloseable { | |
final AtomicReference<Exception> irrecoverableError = new AtomicReference<>(); | ||
private final IndexNameFormatter indexNameFormatter; | ||
|
||
public ElasticSearchClient(ElasticSearchConfig elasticSearchConfig) { | ||
public ElasticSearchClient(ElasticSearchConfig elasticSearchConfig, ElasticSearchMetrics metrics) { | ||
this.config = elasticSearchConfig; | ||
this.metrics = metrics; | ||
if (this.config.getIndexName() != null) { | ||
this.indexNameFormatter = new IndexNameFormatter(this.config.getIndexName()); | ||
} else { | ||
|
@@ -79,6 +83,7 @@ public void afterBulk(long executionId, List<BulkProcessor.BulkOperationRequest> | |
checkForIrrecoverableError(record, result); | ||
} else { | ||
record.ack(); | ||
metrics.recordRecordIndexedSuccess(); | ||
} | ||
} | ||
} | ||
|
@@ -89,6 +94,7 @@ public void afterBulk(long executionId, List<BulkProcessor.BulkOperationRequest> | |
for (BulkProcessor.BulkOperationRequest operation: bulkOperationList) { | ||
final Record record = operation.getPulsarRecord(); | ||
record.fail(); | ||
metrics.incrementCounter(ElasticSearchMetrics.FAILURE, 1); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How do you differentiate between retry failures to complete failures? |
||
} | ||
} | ||
}; | ||
|
@@ -115,10 +121,13 @@ void checkForIrrecoverableError(Record<?> record, BulkProcessor.BulkOperationRes | |
for (String error : MALFORMED_ERRORS) { | ||
if (errorCause.contains(error)) { | ||
isMalformed = true; | ||
metrics.incrementCounter(ElasticSearchMetrics.FAILURE, 1); | ||
switch (config.getMalformedDocAction()) { | ||
case IGNORE: | ||
metrics.incrementCounter(ElasticSearchMetrics.MALFORMED_IGNORE, 1); | ||
break; | ||
case WARN: | ||
metrics.incrementCounter(ElasticSearchMetrics.WARN, 1); | ||
log.warn("Ignoring malformed document index={} id={}", | ||
result.getIndex(), | ||
result.getDocumentId(), | ||
|
@@ -137,7 +146,10 @@ void checkForIrrecoverableError(Record<?> record, BulkProcessor.BulkOperationRes | |
if (!isMalformed) { | ||
log.warn("Bulk request failed, message id=[{}] index={} error={}", | ||
record.getMessage() | ||
.map(m -> m.getMessageId().toString()) | ||
.map(m -> { | ||
metrics.incrementCounter(ElasticSearchMetrics.FAILURE, 1); | ||
return m.getMessageId().toString(); | ||
}) | ||
.orElse(""), | ||
result.getIndex(), result.getError()); | ||
} | ||
|
@@ -160,6 +172,7 @@ public void bulkIndex(Record record, Pair<String, String> idAndDoc) throws Excep | |
client.getBulkProcessor().appendIndexRequest(bulkIndexRequest); | ||
} catch (Exception e) { | ||
log.debug("index failed id=" + idAndDoc.getLeft(), e); | ||
metrics.incrementCounter(ElasticSearchMetrics.FAILURE, 1); | ||
record.fail(); | ||
throw e; | ||
} | ||
|
@@ -184,13 +197,16 @@ public boolean indexDocument(Record<GenericObject> record, Pair<String, String> | |
final boolean createdOrUpdated = client.indexDocument(indexName, documentId, documentSource); | ||
if (createdOrUpdated) { | ||
record.ack(); | ||
metrics.recordRecordIndexedSuccess(); | ||
} else { | ||
record.fail(); | ||
metrics.incrementCounter(ElasticSearchMetrics.FAILURE, 1); | ||
} | ||
return createdOrUpdated; | ||
} catch (final Exception ex) { | ||
log.error("index failed id=" + idAndDoc.getLeft(), ex); | ||
record.fail(); | ||
metrics.incrementCounter(ElasticSearchMetrics.FAILURE, 1); | ||
throw ex; | ||
} | ||
} | ||
|
@@ -211,6 +227,7 @@ public void bulkDelete(Record<GenericObject> record, String id) throws Exception | |
} catch (Exception e) { | ||
log.debug("delete failed id: {}", id, e); | ||
record.fail(); | ||
metrics.incrementCounter(ElasticSearchMetrics.FAILURE, 1); | ||
throw e; | ||
} | ||
} | ||
|
@@ -230,13 +247,16 @@ public boolean deleteDocument(Record<GenericObject> record, String id) throws Ex | |
final boolean deleted = client.deleteDocument(indexName, id); | ||
if (deleted) { | ||
record.ack(); | ||
metrics.incrementCounter(ElasticSearchMetrics.SUCCESS, 1); | ||
} else { | ||
record.fail(); | ||
metrics.incrementCounter(ElasticSearchMetrics.FAILURE, 1); | ||
} | ||
return deleted; | ||
} catch (final Exception ex) { | ||
log.debug("index failed id: {}", id, ex); | ||
record.fail(); | ||
metrics.incrementCounter(ElasticSearchMetrics.FAILURE, 1); | ||
throw ex; | ||
} | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
package org.apache.pulsar.io.elasticsearch; | ||
|
||
import org.apache.pulsar.io.core.SinkContext; | ||
|
||
/* | ||
* Metrics class for ElasticSearchSink | ||
*/ | ||
public class ElasticSearchMetrics { | ||
|
||
private SinkContext sinkContext; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. place below static varibles |
||
// sink metrics | ||
public static final String INCOMING = "_elasticsearch_incoming"; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why the leading underscore? |
||
|
||
// INCOMING = SUCCESS + FAILURE + SKIP + NULLVALUE_IGNORE | ||
public static final String SUCCESS = "_elasticsearch_success"; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. why the leading underscore? |
||
|
||
// DELETE_ATTEMPT is an attempt to delete a document by id | ||
// TODO: add delete success metrics, currently it's difficult to separate delete and index from the bulk operations | ||
public static final String DELETE_ATTEMPT = "elasticsearch_delete_attempt"; | ||
|
||
public static final String FAILURE = "elasticsearch_failure"; | ||
public static final String SKIP = "elasticsearch_skip"; | ||
public static final String WARN = "elasticsearch_warn"; | ||
public static final String MALFORMED_IGNORE = "elasticsearch_malformed_ignore"; | ||
public static final String NULLVALUE_IGNORE = "elasticsearch_nullvalue_ignore"; | ||
|
||
public ElasticSearchMetrics(SinkContext sinkContext) { | ||
this.sinkContext = sinkContext; | ||
} | ||
|
||
public void incrementCounter(String counter, double value) { | ||
this.sinkContext.recordMetric(counter, value); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
} | ||
|
||
public void recordRecordIndexedSuccess() { | ||
incrementCounter(SUCCESS, 1); | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't get it.
You switched to 'event based' methodology in
ElasticSearchMetrics
forrecordRecordIndexedSuccess()
, why not everywhere? This should be IMOmetrics.recordRecordFailure()