Skip to content

Commit

Permalink
Mark shard failures caused by unsupported aggregations or queries aga…
Browse files Browse the repository at this point in the history
…inst rolled up data so Kibana can identify them (elastic#89252)

When an unsupported aggregation is executed on a
`aggregate_double_metric` field an error with a specific
`type` field is generated. This allows clients like Kibana
to identify these errors and handle them properly.

Also we would like to fail on queries using a date histogram
aggregation with `calendar_interval` on rollup indices.
Date histogram aggregations are executed on rollup indices
only if using `fixed_interval` and a `UTC` timezone.
  • Loading branch information
salvatore-campagna committed Aug 22, 2022
1 parent 91d2db2 commit 4b92e1d
Show file tree
Hide file tree
Showing 10 changed files with 533 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.SearchException;
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
import org.elasticsearch.search.aggregations.UnsupportedAggregationOnRollupIndex;
import org.elasticsearch.transport.TcpTransport;
import org.elasticsearch.xcontent.ParseField;
import org.elasticsearch.xcontent.ToXContentFragment;
Expand Down Expand Up @@ -1578,6 +1579,12 @@ private enum ElasticsearchExceptionHandle {
HealthNodeNotDiscoveredException::new,
166,
Version.V_8_5_0
),
UNSUPPORTED_AGGREGATION_ON_DOWNSAMPLED_FIELD_EXCEPTION(
UnsupportedAggregationOnRollupIndex.class,
UnsupportedAggregationOnRollupIndex::new,
167,
Version.V_8_5_0
);

final Class<? extends ElasticsearchException> exceptionClass;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.elasticsearch.cluster.routing.allocation.IndexMetadataUpdater;
import org.elasticsearch.cluster.routing.allocation.decider.DiskThresholdDecider;
import org.elasticsearch.cluster.routing.allocation.decider.ShardsLimitAllocationDecider;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.compress.CompressedXContent;
Expand Down Expand Up @@ -127,6 +128,16 @@ public class IndexMetadata implements Diffable<IndexMetadata>, ToXContentFragmen
EnumSet.of(ClusterBlockLevel.WRITE)
);

// TODO: refactor this method after adding more rollup metadata
public boolean isRollupIndex() {
final String sourceIndex = settings.get(IndexMetadata.INDEX_ROLLUP_SOURCE_NAME_KEY);
final String indexRollupStatus = settings.get(IndexMetadata.INDEX_ROLLUP_STATUS_KEY);
final boolean rollupSuccess = IndexMetadata.RollupTaskStatus.SUCCESS.name()
.toLowerCase(Locale.ROOT)
.equals(indexRollupStatus != null ? indexRollupStatus.toLowerCase(Locale.ROOT) : IndexMetadata.RollupTaskStatus.UNKNOWN);
return Strings.isNullOrEmpty(sourceIndex) == false && rollupSuccess;
}

public enum State implements Writeable {
OPEN((byte) 0),
CLOSE((byte) 1);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0 and the Server Side Public License, v 1; you may not use this file except
* in compliance with, at your election, the Elastic License 2.0 or the Server
* Side Public License, v 1.
*/

package org.elasticsearch.search.aggregations;

import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.rest.RestStatus;

import java.io.IOException;

// NOTE: the name of this class is part of a contract with Kibana which uses it to detect specific
// errors while running aggregations on rollup indices.
/**
* Thrown when executing an aggregation on a time series index field whose type is not supported.
* Downsampling uses specific types while aggregating some fields (like 'aggregate_metric_double').
* Such field types do not support some aggregations.
*/
public class UnsupportedAggregationOnRollupIndex extends AggregationExecutionException {

public UnsupportedAggregationOnRollupIndex(final String msg) {
super(msg);
}

public UnsupportedAggregationOnRollupIndex(final StreamInput in) throws IOException {
super(in);
}

@Override
public RestStatus status() {
return RestStatus.BAD_REQUEST;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -417,9 +417,28 @@ protected ValuesSourceAggregatorFactory innerBuild(
AggregatorFactory parent,
AggregatorFactories.Builder subFactoriesBuilder
) throws IOException {
DateHistogramAggregationSupplier aggregatorSupplier = context.getValuesSourceRegistry().getAggregator(REGISTRY_KEY, config);
final DateIntervalWrapper.IntervalTypeEnum dateHistogramIntervalType = dateHistogramInterval.getIntervalType();

if (context.getIndexSettings().getIndexMetadata().isRollupIndex()
&& DateIntervalWrapper.IntervalTypeEnum.CALENDAR.equals(dateHistogramIntervalType)) {
throw new IllegalArgumentException(
config.getDescription()
+ " is not supported for aggregation ["
+ getName()
+ "] with interval type ["
+ dateHistogramIntervalType.getPreferredName()
+ "]"
);
}

final ZoneId tz = timeZone();
if (context.getIndexSettings().getIndexMetadata().isRollupIndex() && tz != null && ZoneId.of("UTC").equals(tz) == false) {
throw new IllegalArgumentException(
config.getDescription() + " is not supported for aggregation [" + getName() + "] with timezone [" + tz + "]"
);
}

DateHistogramAggregationSupplier aggregatorSupplier = context.getValuesSourceRegistry().getAggregator(REGISTRY_KEY, config);
final Rounding rounding = dateHistogramInterval.createRounding(tz, offset);

LongBounds roundedBounds = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,13 +169,23 @@ public <T> T getAggregator(RegistryKey<T> registryKey, ValuesSourceConfig values
@SuppressWarnings("unchecked")
T supplier = (T) aggregatorRegistry.get(registryKey).get(valuesSourceConfig.valueSourceType());
if (supplier == null) {
throw new IllegalArgumentException(
valuesSourceConfig.getDescription() + " is not supported for aggregation [" + registryKey.getName() + "]"
);
final RuntimeException unmappedException = valuesSourceConfig.valueSourceType()
.getUnregisteredException(
valuesSourceConfig.getDescription() + " is not supported for aggregation [" + registryKey.getName() + "]"
);
assert unmappedException != null
: "Value source type ["
+ valuesSourceConfig.valueSourceType()
+ "] did not return a valid exception for aggregation ["
+ registryKey.getName()
+ "]";
throw unmappedException;
}
return supplier;
}
throw new AggregationExecutionException("Unregistered Aggregation [" + registryKey.getName() + "]");
throw new AggregationExecutionException(
"Unregistered Aggregation [" + (registryKey != null ? registryKey.getName() : "unknown aggregation") + "]"
);
}

public AggregationUsageService getUsageService() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,15 @@ default DocValueFormat getFormatter(String format, ZoneId tz) {
* @return the name of the Values Source Type
*/
String typeName();

/**
* Returns the exception to throw in case the registry (type, aggregator) entry
* is not registered.
*
* @param message the message for the exception
* @return the exception to throw
*/
default RuntimeException getUnregisteredException(String message) {
return new IllegalArgumentException(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
import org.elasticsearch.search.SearchParseException;
import org.elasticsearch.search.SearchShardTarget;
import org.elasticsearch.search.aggregations.MultiBucketConsumerService;
import org.elasticsearch.search.aggregations.UnsupportedAggregationOnRollupIndex;
import org.elasticsearch.search.internal.ShardSearchContextId;
import org.elasticsearch.snapshots.Snapshot;
import org.elasticsearch.snapshots.SnapshotException;
Expand Down Expand Up @@ -831,6 +832,7 @@ public void testIds() {
ids.put(164, VersionConflictException.class);
ids.put(165, SnapshotNameAlreadyInUseException.class);
ids.put(166, HealthNodeNotDiscoveredException.class);
ids.put(167, UnsupportedAggregationOnRollupIndex.class);

Map<Class<? extends ElasticsearchException>, Integer> reverse = new HashMap<>();
for (Map.Entry<Integer, Class<? extends ElasticsearchException>> entry : ids.entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.elasticsearch.script.AggregationScript;
import org.elasticsearch.search.DocValueFormat;
import org.elasticsearch.search.aggregations.AggregationExecutionException;
import org.elasticsearch.search.aggregations.UnsupportedAggregationOnRollupIndex;
import org.elasticsearch.search.aggregations.support.AggregationContext;
import org.elasticsearch.search.aggregations.support.FieldContext;
import org.elasticsearch.search.aggregations.support.ValueType;
Expand All @@ -22,6 +23,12 @@
public enum AggregateMetricsValuesSourceType implements ValuesSourceType {

AGGREGATE_METRIC() {

@Override
public RuntimeException getUnregisteredException(String message) {
return new UnsupportedAggregationOnRollupIndex(message);
}

@Override
public ValuesSource getEmpty() {
throw new IllegalArgumentException("Can't deal with unmapped AggregateMetricsValuesSource type " + this.value());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
setup:
- skip:
version: " - 8.4.99"
reason: "rollup: unsupported aggregations errors added in 8.5.0"

- do:
indices.create:
index: test
body:
settings:
number_of_shards: 1
number_of_replicas: 0
index:
mode: time_series
routing_path: [ uid ]
time_series:
start_time: 2021-04-28T00:00:00Z
end_time: 2021-04-29T00:00:00Z
mappings:
properties:
"@timestamp":
type: date
uid:
type: keyword
time_series_dimension: true
total_memory_used:
type: aggregate_metric_double
metrics: [ min, max ]
default_metric: min
- do:
bulk:
refresh: true
index: test
body:
- '{ "index": {} }'
- '{ "@timestamp": "2021-04-28T18:50:00Z", "uid": "001", "total_memory_used": { "min": 99198, "max": 106780 } }'
- '{ "index": {} }'
- '{ "@timestamp": "2021-04-28T18:55:00Z", "uid": "002", "total_memory_used": { "min": 102334, "max": 110450 } }'
- '{ "index": {} }'
- '{ "@timestamp": "2021-04-28T18:50:00Z", "uid": "003", "total_memory_used": { "min": 98012, "max": 109009 } }'
- '{ "index": {} }'
- '{ "@timestamp": "2021-04-28T18:55:00Z", "uid": "004", "total_memory_used": { "min": 101990, "max": 120770 } }'

---
"Histogram aggregation on aggregate_metric_double field":
- skip:
version: " - 8.4.99"
reason: "rollup: unsupported aggregations errors added in 8.5.0"

- do:
catch: bad_request
search:
index: test
body:
size: 0
aggs:
total_memory_used_histogram:
histogram:
field: total_memory_used
interval: 100000


- match: { status: 400 }
# NOTE: the type of error is part of a contract with Kibana which uses it to detect specific
# errors while running aggregations on rollup indices.
- match: { error.root_cause.0.type: unsupported_aggregation_on_rollup_index }
- match: { error.root_cause.0.reason: "Field [total_memory_used] of type [aggregate_metric_double] is not supported for aggregation [histogram]" }

---
"Range aggregation on aggregate_metric_double field":
- skip:
version: " - 8.4.99"
reason: "rollup: unsupported aggregations errors added in 8.5.0"

- do:
catch: bad_request
search:
index: test
body:
size: 0
aggs:
tx_range:
range:
field: total_memory_used
ranges:
-
from: 0.0
to: 200.0
-
from: 201.0
to: 400.0
-
from: 401.0

- match: { status: 400 }
# NOTE: the type of error is part of a contract with Kibana which uses it to detect specific
# errors while running aggregations on rollup indices.
- match: { error.root_cause.0.type: unsupported_aggregation_on_rollup_index }
- match: { error.root_cause.0.reason: "Field [total_memory_used] of type [aggregate_metric_double] is not supported for aggregation [range]" }

---
"Cardinality aggregation on aggregate_metric_double field":
- skip:
version: " - 8.4.99"
reason: "rollup: unsupported aggregations errors added in 8.5.0"

- do:
catch: bad_request
search:
index: test
body:
size: 0
aggs:
tx_cardinality:
cardinality:
field: total_memory_used

- match: { status: 400 }
# NOTE: the type of error is part of a contract with Kibana which uses it to detect specific
# errors while running aggregations on rollup indices.
- match: { error.root_cause.0.type: unsupported_aggregation_on_rollup_index }
- match: { error.root_cause.0.reason: "Field [total_memory_used] of type [aggregate_metric_double] is not supported for aggregation [cardinality]" }

---
"Percentiles aggregation on aggregate_metric_double field":
- skip:
version: " - 8.4.99"
reason: "rollup: unsupported aggregations errors added in 8.5.0"

- do:
catch: bad_request
search:
index: test
body:
size: 0
aggs:
tx_percentile:
percentiles:
field: total_memory_used
percents: [90, 95, 99]

- match: { status: 400 }
# NOTE: the type of error is part of a contract with Kibana which uses it to detect specific
# errors while running aggregations on rollup indices.
- match: { error.root_cause.0.type: unsupported_aggregation_on_rollup_index }
- match: { error.root_cause.0.reason: "Field [total_memory_used] of type [aggregate_metric_double] is not supported for aggregation [percentiles]" }

---
"Top-metrics aggregation on aggregate_metric_double field":
- skip:
version: " - 8.4.99"
reason: "rollup: unsupported aggregations errors added in 8.5.0"

- do:
catch: bad_request
search:
index: test
body:
size: 0
aggs:
top_total_memory_used:
top_metrics:
metrics:
field: total_memory_used
sort:
uid: desc
size: 2


- match: { status: 400 }
# NOTE: the type of error is part of a contract with Kibana which uses it to detect specific
# errors while running aggregations on rollup indices.
- match: { error.root_cause.0.type: unsupported_aggregation_on_rollup_index }
- match: { error.root_cause.0.reason: "Field [total_memory_used] of type [aggregate_metric_double] is not supported for aggregation [top_metrics]" }

0 comments on commit 4b92e1d

Please sign in to comment.