Skip to content

Commit

Permalink
Add metrics command syntax (#108115)
Browse files Browse the repository at this point in the history
This PR introduces a METRICS command in ESQL for timeseries indices. This 
PR only adds the metrics syntax and translates it into a pair of EsIndex
and Aggregate logical plans. Subsequent pull requests will introduce new
logical and physical plans for handling time-series aggregations.

Some examples of the METRICS command:

METRICS tsdb
METRICS tsdb max(cpu) BY host
METRICS pods load=avg(cpu), writes=max(rate(indexing_requests)) BY pod | SORT pod
  • Loading branch information
dnhatn committed May 11, 2024
1 parent 64ef2f4 commit 6028232
Show file tree
Hide file tree
Showing 16 changed files with 2,541 additions and 1,942 deletions.
Expand Up @@ -8,9 +8,17 @@
package org.elasticsearch.xpack.esql.action;

import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.mapper.DateFieldMapper;
import org.elasticsearch.xpack.esql.EsqlTestUtils;
import org.elasticsearch.xpack.esql.plugin.QueryPragmas;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;

public class TimeSeriesIT extends AbstractEsqlIntegTestCase {

Expand All @@ -37,6 +45,48 @@ public void testEmpty() {
"type=long,time_series_metric=gauge"
)
.get();
run("FROM pods | LIMIT 1").close();
run("METRICS pods | LIMIT 1").close();
}

public void testSimpleMetrics() {
Settings settings = Settings.builder().put("mode", "time_series").putList("routing_path", List.of("pod")).build();
client().admin()
.indices()
.prepareCreate("pods")
.setSettings(settings)
.setMapping(
"@timestamp",
"type=date",
"pod",
"type=keyword,time_series_dimension=true",
"cpu",
"type=double,time_series_metric=gauge"
)
.get();
List<String> pods = List.of("p1", "p2", "p3");
long startTime = DateFieldMapper.DEFAULT_DATE_TIME_FORMATTER.parseMillis("2024-04-15T00:00:00Z");
int numDocs = between(10, 10);
Map<String, List<Integer>> cpus = new HashMap<>();
for (int i = 0; i < numDocs; i++) {
String pod = randomFrom(pods);
int cpu = randomIntBetween(0, 100);
cpus.computeIfAbsent(pod, k -> new ArrayList<>()).add(cpu);
long timestamp = startTime + (1000L * i);
client().prepareIndex("pods").setSource("@timestamp", timestamp, "pod", pod, "cpu", cpu).get();
}
List<String> sortedGroups = cpus.keySet().stream().sorted().toList();
client().admin().indices().prepareRefresh("pods").get();
try (EsqlQueryResponse resp = run("METRICS pods load=avg(cpu) BY pod | SORT pod")) {
List<List<Object>> rows = EsqlTestUtils.getValuesList(resp);
assertThat(rows, hasSize(sortedGroups.size()));
for (int i = 0; i < rows.size(); i++) {
List<Object> r = rows.get(i);
String pod = (String) r.get(1);
assertThat(pod, equalTo(sortedGroups.get(i)));
List<Integer> values = cpus.get(pod);
double avg = values.stream().mapToDouble(n -> n).sum() / values.size();
assertThat((double) r.get(0), equalTo(avg));
}
}
}
}
77 changes: 70 additions & 7 deletions x-pack/plugin/esql/src/main/antlr/EsqlBaseLexer.g4
Expand Up @@ -11,6 +11,7 @@ INLINESTATS : 'inlinestats' -> pushMode(EXPRESSION_MODE);
KEEP : 'keep' -> pushMode(PROJECT_MODE);
LIMIT : 'limit' -> pushMode(EXPRESSION_MODE);
META : 'meta' -> pushMode(META_MODE);
METRICS : 'metrics' -> pushMode(METRICS_MODE);
MV_EXPAND : 'mv_expand' -> pushMode(MVEXPAND_MODE);
RENAME : 'rename' -> pushMode(RENAME_MODE);
ROW : 'row' -> pushMode(EXPRESSION_MODE);
Expand All @@ -31,6 +32,16 @@ MULTILINE_COMMENT
WS
: [ \r\n\t]+ -> channel(HIDDEN)
;

fragment INDEX_UNQUOTED_IDENTIFIER_PART
: ~[=`|,[\]/ \t\r\n]
| '/' ~[*/] // allow single / but not followed by another / or * which would start a comment
;

INDEX_UNQUOTED_IDENTIFIER
: INDEX_UNQUOTED_IDENTIFIER_PART+
;

//
// Explain
//
Expand Down Expand Up @@ -192,13 +203,8 @@ FROM_QUOTED_STRING : QUOTED_STRING -> type(QUOTED_STRING);
OPTIONS : 'options';
METADATA : 'metadata';

fragment FROM_UNQUOTED_IDENTIFIER_PART
: ~[=`|,[\]/ \t\r\n]
| '/' ~[*/] // allow single / but not followed by another / or * which would start a comment
;

FROM_UNQUOTED_IDENTIFIER
: FROM_UNQUOTED_IDENTIFIER_PART+
FROM_INDEX_UNQUOTED_IDENTIFIER
: INDEX_UNQUOTED_IDENTIFIER -> type(INDEX_UNQUOTED_IDENTIFIER)
;

FROM_LINE_COMMENT
Expand Down Expand Up @@ -424,3 +430,60 @@ SETTING_WS
: WS -> channel(HIDDEN)
;
//
// METRICS command
//
mode METRICS_MODE;
METRICS_PIPE : PIPE -> type(PIPE), popMode;
METRICS_INDEX_UNQUOTED_IDENTIFIER
: INDEX_UNQUOTED_IDENTIFIER -> type(INDEX_UNQUOTED_IDENTIFIER), popMode, pushMode(CLOSING_METRICS_MODE)
;
METRICS_LINE_COMMENT
: LINE_COMMENT -> channel(HIDDEN)
;
METRICS_MULTILINE_COMMENT
: MULTILINE_COMMENT -> channel(HIDDEN)
;
METRICS_WS
: WS -> channel(HIDDEN)
;
// TODO: remove this workaround mode - see https://github.com/elastic/elasticsearch/issues/108528
mode CLOSING_METRICS_MODE;
CLOSING_METRICS_COMMA
: COMMA -> type(COMMA), popMode, pushMode(METRICS_MODE)
;
CLOSING_METRICS_LINE_COMMENT
: LINE_COMMENT -> channel(HIDDEN)
;
CLOSING_METRICS_MULTILINE_COMMENT
: MULTILINE_COMMENT -> channel(HIDDEN)
;
CLOSING_METRICS_WS
: WS -> channel(HIDDEN)
;
CLOSING_METRICS_QUOTED_IDENTIFIER
: QUOTED_IDENTIFIER -> popMode, pushMode(EXPRESSION_MODE), type(QUOTED_IDENTIFIER)
;
CLOSING_METRICS_UNQUOTED_IDENTIFIER
:UNQUOTED_IDENTIFIER -> popMode, pushMode(EXPRESSION_MODE), type(UNQUOTED_IDENTIFIER)
;
CLOSING_METRICS_BY
:BY -> popMode, pushMode(EXPRESSION_MODE), type(BY)
;
CLOSING_METRICS_PIPE
: PIPE -> type(PIPE), popMode
;

0 comments on commit 6028232

Please sign in to comment.