Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Top Metrics aggregation #1500

Merged
merged 3 commits into from Jun 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -196,7 +196,7 @@ Here are a few tips on how to get used to Elastic:
- [x] Sum
- [ ] T-test (X-pack)
- [x] Top Hits
- [ ] Top metrics (X-pack)
- [x] Top metrics (X-pack)
- [x] Value Count
- [x] Weighted avg
- Bucket Aggregations
Expand Down
29 changes: 29 additions & 0 deletions search_aggs.go
Expand Up @@ -840,6 +840,22 @@ func (a Aggregations) ScriptedMetric(name string) (*AggregationScriptedMetric, b
return nil, false
}

// TopMetrics returns top metrics aggregation results.
// See https://www.elastic.co/guide/en/elasticsearch/reference/7.0/search-aggregations-metrics-top-metrics.html
//for details
func (a Aggregations) TopMetrics(name string) (*AggregationTopMetricsItems, bool) {
if raw, found := a[name]; found {
agg := new(AggregationTopMetricsItems)
if raw == nil {
return agg, true
}
if err := json.Unmarshal(raw, agg); err == nil {
return agg, true
}
}
return nil, false
}

// -- Single value metric --

// AggregationValueMetric is a single-value metric, returned e.g. by a
Expand Down Expand Up @@ -1804,3 +1820,16 @@ func (a *AggregationScriptedMetric) UnmarshalJSON(data []byte) error {
a.Aggregations = aggs
return nil
}

// AggregationTopMetricsItems is the value returned by the top metrics aggregation
type AggregationTopMetricsItems struct {
Aggregations

Top []AggregationTopMetricsItem `json:"top"`
}

// AggregationTopMetricsItem is a set of metrics returned for the top document or documents
type AggregationTopMetricsItem struct {
Sort []interface{} `json:"sort"` // sort information
Metrics map[string]interface{} `json:"metrics"` // returned metrics
}
79 changes: 79 additions & 0 deletions search_aggs_metrics_top_metrics.go
@@ -0,0 +1,79 @@
package elastic

import "errors"

// TopMetricsAggregation selects metrics from the document with the largest or smallest "sort" value.
// top_metrics is fairly similar to top_hits in spirit but because it is more limited it is able to do
// its job using less memory and is often faster.
//
// See: https://www.elastic.co/guide/en/elasticsearch/reference/7.0/search-aggregations-metrics-top-metrics.html
type TopMetricsAggregation struct {
fields []string
sorter Sorter
size int
}

func NewTopMetricsAggregation() *TopMetricsAggregation {
return &TopMetricsAggregation{}
}

// Field adds a field to run aggregation against.
func (a *TopMetricsAggregation) Field(field string) *TopMetricsAggregation {
a.fields = append(a.fields, field)
return a
}

// Sort adds a sort order.
func (a *TopMetricsAggregation) Sort(field string, ascending bool) *TopMetricsAggregation {
a.sorter = SortInfo{Field: field, Ascending: ascending}
return a
}

// SortWithInfo adds a sort order.
func (a *TopMetricsAggregation) SortWithInfo(info SortInfo) *TopMetricsAggregation {
a.sorter = info
return a
}

// SortBy adds a sort order.
func (a *TopMetricsAggregation) SortBy(sorter Sorter) *TopMetricsAggregation {
a.sorter = sorter
return a
}

// Size sets the number of top documents returned by the aggregation. The default size is 1.
func (a *TopMetricsAggregation) Size(size int) *TopMetricsAggregation {
a.size = size
return a
}

func (a *TopMetricsAggregation) Source() (interface{}, error) {
params := make(map[string]interface{})

if len(a.fields) == 0 {
return nil, errors.New("field list is required for the top metrics aggregation")
}
metrics := make([]interface{}, len(a.fields))
for idx, field := range a.fields {
metrics[idx] = map[string]string{"field": field}
}
params["metrics"] = metrics

if a.sorter == nil {
return nil, errors.New("sorter is required for the top metrics aggregation")
}
sortSource, err := a.sorter.Source()
if err != nil {
return nil, err
}
params["sort"] = sortSource

if a.size > 1 {
params["size"] = a.size
}

source := map[string]interface{}{
"top_metrics": params,
}
return source, nil
}
84 changes: 84 additions & 0 deletions search_aggs_metrics_top_metrics_test.go
@@ -0,0 +1,84 @@
package elastic

import (
"encoding/json"
"testing"
)

func TestTopMetricsAggregation(t *testing.T) {
agg := NewTopMetricsAggregation().
Sort("f1", false).
Field("a").
Field("b").
Size(3)
src, err := agg.Source()
if err != nil {
t.Fatal(err)
}
data, err := json.Marshal(src)
if err != nil {
t.Fatalf("marshaling to JSON failed: %v", err)
}
got := string(data)
expected := `{"top_metrics":{"metrics":[{"field":"a"},{"field":"b"}],"size":3,"sort":{"f1":{"order":"desc"}}}}`
if got != expected {
t.Errorf("expected\n%s\n,got:\n%s", expected, got)
}
}

func TestTopMetricsAggregation_SortBy(t *testing.T) {
agg := NewTopMetricsAggregation().
SortBy(SortByDoc{}).
Field("a")
src, err := agg.Source()
if err != nil {
t.Fatal(err)
}
data, err := json.Marshal(src)
if err != nil {
t.Fatalf("marshaling to JSON failed: %v", err)
}
got := string(data)
expected := `{"top_metrics":{"metrics":[{"field":"a"}],"sort":"_doc"}}`
if got != expected {
t.Errorf("expected\n%s\n,got:\n%s", expected, got)
}
}

func TestTopMetricsAggregation_SortWithInfo(t *testing.T) {
agg := NewTopMetricsAggregation().
SortWithInfo(SortInfo{Field: "f2", Ascending: true, UnmappedType: "int"}).
Field("b")
src, err := agg.Source()
if err != nil {
t.Fatal(err)
}
data, err := json.Marshal(src)
if err != nil {
t.Fatalf("marshaling to JSON failed: %v", err)
}
got := string(data)
expected := `{"top_metrics":{"metrics":[{"field":"b"}],"sort":{"f2":{"order":"asc","unmapped_type":"int"}}}}`
if got != expected {
t.Errorf("expected\n%s\n,got:\n%s", expected, got)
}
}

func TestTopMetricsAggregation_FailNoSorter(t *testing.T) {
agg := NewTopMetricsAggregation().
Field("a").
Field("b")
_, err := agg.Source()
if err == nil || err.Error() != "sorter is required for the top metrics aggregation" {
t.Fatal(err)
}
}

func TestTopMetricsAggregation_FailNoFields(t *testing.T) {
agg := NewTopMetricsAggregation().
Sort("f1", false)
_, err := agg.Source()
if err == nil || err.Error() != "field list is required for the top metrics aggregation" {
t.Fatal(err)
}
}
41 changes: 41 additions & 0 deletions search_aggs_test.go
Expand Up @@ -116,6 +116,11 @@ func TestAggs(t *testing.T) {
geoHashAgg := NewGeoHashGridAggregation().Field("location").Precision(5)
geoCentroidAgg := NewGeoCentroidAggregation().Field("location")
geoTileAgg := NewGeoTileGridAggregation().Field("location")
topMetricsAgg := NewTopMetricsAggregation().
Field("user").
Field("retweets").
Sort("retweets", false).
Size(2)

// Run query
builder := client.Search().Index(testIndexName).Query(all).Pretty(true)
Expand Down Expand Up @@ -154,6 +159,7 @@ func TestAggs(t *testing.T) {
builder = builder.Aggregation("geohashed", geoHashAgg)
builder = builder.Aggregation("centroid", geoCentroidAgg)
builder = builder.Aggregation("geotile-grid", geoTileAgg)
builder = builder.Aggregation("top-metrics", topMetricsAgg)

// Unnamed filters
countByUserAgg := NewFiltersAggregation().
Expand Down Expand Up @@ -1357,6 +1363,41 @@ func TestAggs(t *testing.T) {
t.Fatalf("expected movingFn = %v, have %v", want, have)
}
}

// top metrics aggregation
{
agg, found := agg.TopMetrics("top-metrics")
if !found {
t.Fatalf("expected %v; got: %v", true, false)
}
if agg == nil {
t.Fatal("expected != nil; got: nil")
}

if want, have := 2, len(agg.Top); want != have {
t.Fatalf("expected %d top results, have %d", want, have)
}

if want, have := "olivere", agg.Top[0].Metrics["user"]; want != have {
t.Fatalf("expected %v top user, have %v", want, have)
}
if want, have := float64(108), agg.Top[0].Metrics["retweets"]; want != have {
t.Fatalf("expected %v top user, have %v", want, have)
}
if want, have := float64(108), agg.Top[0].Sort[0]; want != have {
t.Fatalf("expected %v sort value, have %v", want, have)
}

if want, have := "sandrae", agg.Top[1].Metrics["user"]; want != have {
t.Fatalf("expected %v top user, have %v", want, have)
}
if want, have := float64(12), agg.Top[1].Metrics["retweets"]; want != have {
t.Fatalf("expected %v top user, have %v", want, have)
}
if want, have := float64(12), agg.Top[1].Sort[0]; want != have {
t.Fatalf("expected %v sort value, have %v", want, have)
}
}
}

// TestAggsCompositeIntegration is an integration test for the Composite aggregation.
Expand Down