Skip to content

Commit

Permalink
Merge branch 'agg-top-metrics' of git://github.com/mbalabin/elastic i…
Browse files Browse the repository at this point in the history
…nto mbalabin-agg-top-metrics
  • Loading branch information
olivere committed Jun 16, 2021
2 parents 113a1e5 + 61ec6da commit 86f7a1b
Show file tree
Hide file tree
Showing 5 changed files with 234 additions and 1 deletion.
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -196,7 +196,7 @@ Here are a few tips on how to get used to Elastic:
- [x] Sum
- [ ] T-test (X-pack)
- [x] Top Hits
- [ ] Top metrics (X-pack)
- [x] Top metrics (X-pack)
- [x] Value Count
- [x] Weighted avg
- Bucket Aggregations
Expand Down
29 changes: 29 additions & 0 deletions search_aggs.go
Expand Up @@ -840,6 +840,22 @@ func (a Aggregations) ScriptedMetric(name string) (*AggregationScriptedMetric, b
return nil, false
}

// TopMetrics returns top metrics aggregation results.
// See https://www.elastic.co/guide/en/elasticsearch/reference/7.0/search-aggregations-metrics-top-metrics.html
//for details
func (a Aggregations) TopMetrics(name string) (*AggregationTopMetricsItems, bool) {
if raw, found := a[name]; found {
agg := new(AggregationTopMetricsItems)
if raw == nil {
return agg, true
}
if err := json.Unmarshal(raw, agg); err == nil {
return agg, true
}
}
return nil, false
}

// -- Single value metric --

// AggregationValueMetric is a single-value metric, returned e.g. by a
Expand Down Expand Up @@ -1804,3 +1820,16 @@ func (a *AggregationScriptedMetric) UnmarshalJSON(data []byte) error {
a.Aggregations = aggs
return nil
}

// AggregationTopMetricsItems is the value returned by the top metrics aggregation
type AggregationTopMetricsItems struct {
Aggregations

Top []AggregationTopMetricsItem `json:"top"`
}

// AggregationTopMetricsItem is a set of metrics returned for the top document or documents
type AggregationTopMetricsItem struct {
Sort []interface{} `json:"sort"` // sort information
Metrics map[string]interface{} `json:"metrics"` // returned metrics
}
79 changes: 79 additions & 0 deletions search_aggs_metrics_top_metrics.go
@@ -0,0 +1,79 @@
package elastic

import "errors"

// TopMetricsAggregation selects metrics from the document with the largest or smallest "sort" value.
// top_metrics is fairly similar to top_hits in spirit but because it is more limited it is able to do
// its job using less memory and is often faster.
//
// See: https://www.elastic.co/guide/en/elasticsearch/reference/7.0/search-aggregations-metrics-top-metrics.html
type TopMetricsAggregation struct {
fields []string
sorter Sorter
size int
}

func NewTopMetricsAggregation() *TopMetricsAggregation {
return &TopMetricsAggregation{}
}

// Field adds a field to run aggregation against.
func (a *TopMetricsAggregation) Field(field string) *TopMetricsAggregation {
a.fields = append(a.fields, field)
return a
}

// Sort adds a sort order.
func (a *TopMetricsAggregation) Sort(field string, ascending bool) *TopMetricsAggregation {
a.sorter = SortInfo{Field: field, Ascending: ascending}
return a
}

// SortWithInfo adds a sort order.
func (a *TopMetricsAggregation) SortWithInfo(info SortInfo) *TopMetricsAggregation {
a.sorter = info
return a
}

// SortBy adds a sort order.
func (a *TopMetricsAggregation) SortBy(sorter Sorter) *TopMetricsAggregation {
a.sorter = sorter
return a
}

// Size sets the number of top documents returned by the aggregation. The default size is 1.
func (a *TopMetricsAggregation) Size(size int) *TopMetricsAggregation {
a.size = size
return a
}

func (a *TopMetricsAggregation) Source() (interface{}, error) {
params := make(map[string]interface{})

if len(a.fields) == 0 {
return nil, errors.New("field list is required for the top metrics aggregation")
}
metrics := make([]interface{}, len(a.fields))
for idx, field := range a.fields {
metrics[idx] = map[string]string{"field": field}
}
params["metrics"] = metrics

if a.sorter == nil {
return nil, errors.New("sorter is required for the top metrics aggregation")
}
sortSource, err := a.sorter.Source()
if err != nil {
return nil, err
}
params["sort"] = sortSource

if a.size > 1 {
params["size"] = a.size
}

source := map[string]interface{}{
"top_metrics": params,
}
return source, nil
}
84 changes: 84 additions & 0 deletions search_aggs_metrics_top_metrics_test.go
@@ -0,0 +1,84 @@
package elastic

import (
"encoding/json"
"testing"
)

func TestTopMetricsAggregation(t *testing.T) {
agg := NewTopMetricsAggregation().
Sort("f1", false).
Field("a").
Field("b").
Size(3)
src, err := agg.Source()
if err != nil {
t.Fatal(err)
}
data, err := json.Marshal(src)
if err != nil {
t.Fatalf("marshaling to JSON failed: %v", err)
}
got := string(data)
expected := `{"top_metrics":{"metrics":[{"field":"a"},{"field":"b"}],"size":3,"sort":{"f1":{"order":"desc"}}}}`
if got != expected {
t.Errorf("expected\n%s\n,got:\n%s", expected, got)
}
}

func TestTopMetricsAggregation_SortBy(t *testing.T) {
agg := NewTopMetricsAggregation().
SortBy(SortByDoc{}).
Field("a")
src, err := agg.Source()
if err != nil {
t.Fatal(err)
}
data, err := json.Marshal(src)
if err != nil {
t.Fatalf("marshaling to JSON failed: %v", err)
}
got := string(data)
expected := `{"top_metrics":{"metrics":[{"field":"a"}],"sort":"_doc"}}`
if got != expected {
t.Errorf("expected\n%s\n,got:\n%s", expected, got)
}
}

func TestTopMetricsAggregation_SortWithInfo(t *testing.T) {
agg := NewTopMetricsAggregation().
SortWithInfo(SortInfo{Field: "f2", Ascending: true, UnmappedType: "int"}).
Field("b")
src, err := agg.Source()
if err != nil {
t.Fatal(err)
}
data, err := json.Marshal(src)
if err != nil {
t.Fatalf("marshaling to JSON failed: %v", err)
}
got := string(data)
expected := `{"top_metrics":{"metrics":[{"field":"b"}],"sort":{"f2":{"order":"asc","unmapped_type":"int"}}}}`
if got != expected {
t.Errorf("expected\n%s\n,got:\n%s", expected, got)
}
}

func TestTopMetricsAggregation_FailNoSorter(t *testing.T) {
agg := NewTopMetricsAggregation().
Field("a").
Field("b")
_, err := agg.Source()
if err == nil || err.Error() != "sorter is required for the top metrics aggregation" {
t.Fatal(err)
}
}

func TestTopMetricsAggregation_FailNoFields(t *testing.T) {
agg := NewTopMetricsAggregation().
Sort("f1", false)
_, err := agg.Source()
if err == nil || err.Error() != "field list is required for the top metrics aggregation" {
t.Fatal(err)
}
}
41 changes: 41 additions & 0 deletions search_aggs_test.go
Expand Up @@ -116,6 +116,11 @@ func TestAggs(t *testing.T) {
geoHashAgg := NewGeoHashGridAggregation().Field("location").Precision(5)
geoCentroidAgg := NewGeoCentroidAggregation().Field("location")
geoTileAgg := NewGeoTileGridAggregation().Field("location")
topMetricsAgg := NewTopMetricsAggregation().
Field("user").
Field("retweets").
Sort("retweets", false).
Size(2)

// Run query
builder := client.Search().Index(testIndexName).Query(all).Pretty(true)
Expand Down Expand Up @@ -154,6 +159,7 @@ func TestAggs(t *testing.T) {
builder = builder.Aggregation("geohashed", geoHashAgg)
builder = builder.Aggregation("centroid", geoCentroidAgg)
builder = builder.Aggregation("geotile-grid", geoTileAgg)
builder = builder.Aggregation("top-metrics", topMetricsAgg)

// Unnamed filters
countByUserAgg := NewFiltersAggregation().
Expand Down Expand Up @@ -1357,6 +1363,41 @@ func TestAggs(t *testing.T) {
t.Fatalf("expected movingFn = %v, have %v", want, have)
}
}

// top metrics aggregation
{
agg, found := agg.TopMetrics("top-metrics")
if !found {
t.Fatalf("expected %v; got: %v", true, false)
}
if agg == nil {
t.Fatal("expected != nil; got: nil")
}

if want, have := 2, len(agg.Top); want != have {
t.Fatalf("expected %d top results, have %d", want, have)
}

if want, have := "olivere", agg.Top[0].Metrics["user"]; want != have {
t.Fatalf("expected %v top user, have %v", want, have)
}
if want, have := float64(108), agg.Top[0].Metrics["retweets"]; want != have {
t.Fatalf("expected %v top user, have %v", want, have)
}
if want, have := float64(108), agg.Top[0].Sort[0]; want != have {
t.Fatalf("expected %v sort value, have %v", want, have)
}

if want, have := "sandrae", agg.Top[1].Metrics["user"]; want != have {
t.Fatalf("expected %v top user, have %v", want, have)
}
if want, have := float64(12), agg.Top[1].Metrics["retweets"]; want != have {
t.Fatalf("expected %v top user, have %v", want, have)
}
if want, have := float64(12), agg.Top[1].Sort[0]; want != have {
t.Fatalf("expected %v sort value, have %v", want, have)
}
}
}

// TestAggsCompositeIntegration is an integration test for the Composite aggregation.
Expand Down

0 comments on commit 86f7a1b

Please sign in to comment.