From f836e2435242d9dd8e607ec0c05fc751e851b521 Mon Sep 17 00:00:00 2001 From: Oliver Eilhard Date: Mon, 30 Aug 2021 17:27:21 +0200 Subject: [PATCH] Add runtime fields This commit adds runtime fields/mappings to the client. See https://www.elastic.co/guide/en/elasticsearch/reference/7.14/runtime.html for details. Close #1527 --- field_caps.go | 32 ++++++--- field_caps_test.go | 10 +-- runtime_mappings.go | 16 +++++ runtime_mappings_test.go | 148 +++++++++++++++++++++++++++++++++++++++ search.go | 45 +++++++++++- search_source.go | 17 ++++- search_source_test.go | 42 +++++++++++ search_test.go | 50 ++++++++++++- 8 files changed, 341 insertions(+), 19 deletions(-) create mode 100644 runtime_mappings.go create mode 100644 runtime_mappings_test.go diff --git a/field_caps.go b/field_caps.go index b84a1f13f..4b1aa92d0 100644 --- a/field_caps.go +++ b/field_caps.go @@ -32,6 +32,7 @@ type FieldCapsService struct { expandWildcards string fields []string ignoreUnavailable *bool + includeUnmapped *bool bodyJson interface{} bodyString string } @@ -117,6 +118,12 @@ func (s *FieldCapsService) IgnoreUnavailable(ignoreUnavailable bool) *FieldCapsS return s } +// IncludeUnmapped specifies whether unmapped fields whould be included in the response. +func (s *FieldCapsService) IncludeUnmapped(includeUnmapped bool) *FieldCapsService { + s.includeUnmapped = &includeUnmapped + return s +} + // BodyJson is documented as: Field json objects containing the name and optionally a range to filter out indices result, that have results outside the defined bounds. func (s *FieldCapsService) BodyJson(body interface{}) *FieldCapsService { s.bodyJson = body @@ -160,7 +167,7 @@ func (s *FieldCapsService) buildURL() (string, url.Values, error) { params.Set("filter_path", strings.Join(s.filterPath, ",")) } if s.allowNoIndices != nil { - params.Set("allow_no_indices", fmt.Sprintf("%v", *s.allowNoIndices)) + params.Set("allow_no_indices", fmt.Sprint(*s.allowNoIndices)) } if s.expandWildcards != "" { params.Set("expand_wildcards", s.expandWildcards) @@ -169,7 +176,10 @@ func (s *FieldCapsService) buildURL() (string, url.Values, error) { params.Set("fields", strings.Join(s.fields, ",")) } if s.ignoreUnavailable != nil { - params.Set("ignore_unavailable", fmt.Sprintf("%v", *s.ignoreUnavailable)) + params.Set("ignore_unavailable", fmt.Sprint(*s.ignoreUnavailable)) + } + if s.includeUnmapped != nil { + params.Set("include_unmapped", fmt.Sprint(*s.includeUnmapped)) } return path, params, nil } @@ -231,7 +241,9 @@ func (s *FieldCapsService) Do(ctx context.Context) (*FieldCapsResponse, error) { // FieldCapsRequest can be used to set up the body to be used in the // Field Capabilities API. type FieldCapsRequest struct { - Fields []string `json:"fields"` + Fields []string `json:"fields"` // list of fields to retrieve + IndexFilter Query `json:"index_filter,omitempty"` + RuntimeMappings RuntimeMappings `json:"runtime_mappings,omitempty"` } // -- Response -- @@ -248,10 +260,12 @@ type FieldCapsType map[string]FieldCaps // type -> caps // FieldCaps contains capabilities of an individual field. type FieldCaps struct { - Type string `json:"type"` - Searchable bool `json:"searchable"` - Aggregatable bool `json:"aggregatable"` - Indices []string `json:"indices,omitempty"` - NonSearchableIndices []string `json:"non_searchable_indices,omitempty"` - NonAggregatableIndices []string `json:"non_aggregatable_indices,omitempty"` + Type string `json:"type"` + MetadataField bool `json:"metadata_field"` + Searchable bool `json:"searchable"` + Aggregatable bool `json:"aggregatable"` + Indices []string `json:"indices,omitempty"` + NonSearchableIndices []string `json:"non_searchable_indices,omitempty"` + NonAggregatableIndices []string `json:"non_aggregatable_indices,omitempty"` + Meta map[string]interface{} `json:"meta,omitempty"` } diff --git a/field_caps_test.go b/field_caps_test.go index b64a04842..58a34b6a0 100644 --- a/field_caps_test.go +++ b/field_caps_test.go @@ -96,25 +96,25 @@ func TestFieldCapsResponse(t *testing.T) { "failed": 0 }, "fields": { - "rating": { + "rating": { "long": { "searchable": true, "aggregatable": false, "indices": ["index1", "index2"], - "non_aggregatable_indices": ["index1"] + "non_aggregatable_indices": ["index1"] }, "keyword": { "searchable": false, "aggregatable": true, "indices": ["index3", "index4"], - "non_searchable_indices": ["index4"] + "non_searchable_indices": ["index4"] } }, - "title": { + "title": { "text": { "searchable": true, "aggregatable": false - + } } } diff --git a/runtime_mappings.go b/runtime_mappings.go new file mode 100644 index 000000000..28eafc00c --- /dev/null +++ b/runtime_mappings.go @@ -0,0 +1,16 @@ +// Copyright 2012-present Oliver Eilhard. All rights reserved. +// Use of this source code is governed by a MIT-license. +// See http://olivere.mit-license.org/license.txt for details. + +package elastic + +// RuntimeMappings specify fields that are evaluated at query time. +// +// See https://www.elastic.co/guide/en/elasticsearch/reference/7.14/runtime.html +// for details. +type RuntimeMappings map[string]interface{} + +// Source deserializes the runtime mappings. +func (m *RuntimeMappings) Source() (interface{}, error) { + return m, nil +} diff --git a/runtime_mappings_test.go b/runtime_mappings_test.go new file mode 100644 index 000000000..fa0ac181c --- /dev/null +++ b/runtime_mappings_test.go @@ -0,0 +1,148 @@ +// Copyright 2012-present Oliver Eilhard. All rights reserved. +// Use of this source code is governed by a MIT-license. +// See http://olivere.mit-license.org/license.txt for details. + +package elastic + +import ( + "context" + "encoding/json" + "testing" + "time" +) + +func TestRuntimeMappingsSource(t *testing.T) { + rm := RuntimeMappings{ + "day_of_week": map[string]interface{}{ + "type": "keyword", + }, + } + src, err := rm.Source() + if err != nil { + t.Fatal(err) + } + data, err := json.Marshal(src) + if err != nil { + t.Fatal(err) + } + expected := `{"day_of_week":{"type":"keyword"}}` + if want, have := expected, string(data); want != have { + t.Fatalf("want %s, have %s", want, have) + } +} + +func TestRuntimeMappings(t *testing.T) { + client := setupTestClient(t) //, SetTraceLog(log.New(os.Stdout, "", 0))) + + ctx := context.Background() + indexName := testIndexName + + // Create index + createIndex, err := client.CreateIndex(indexName).Do(ctx) + if err != nil { + t.Fatal(err) + } + if createIndex == nil { + t.Errorf("expected result to be != nil; got: %v", createIndex) + } + + mapping := `{ + "dynamic": "runtime", + "properties": { + "@timestamp": { + "type":"date" + } + }, + "runtime": { + "day_of_week": { + "type": "keyword", + "script": { + "source": "emit(doc['@timestamp'].value.dayOfWeekEnum.getDisplayName(TextStyle.FULL, Locale.ROOT))" + } + } + } + }` + type Doc struct { + Timestamp time.Time `json:"@timestamp"` + } + type DynamicDoc struct { + Timestamp time.Time `json:"@timestamp"` + DayOfWeek string `json:"day_of_week"` + } + + // Create mapping + putResp, err := client.PutMapping(). + Index(indexName). + BodyString(mapping). + Do(ctx) + if err != nil { + t.Fatalf("expected put mapping to succeed; got: %v", err) + } + if putResp == nil { + t.Fatalf("expected put mapping response; got: %v", putResp) + } + if !putResp.Acknowledged { + t.Fatalf("expected put mapping ack; got: %v", putResp.Acknowledged) + } + + // Add a document + timestamp := time.Date(2021, 1, 17, 23, 24, 25, 26, time.UTC) + indexResult, err := client.Index(). + Index(indexName). + Id("1"). + BodyJson(&Doc{ + Timestamp: timestamp, + }). + Refresh("wait_for"). + Do(ctx) + if err != nil { + t.Fatal(err) + } + if indexResult == nil { + t.Errorf("expected result to be != nil; got: %v", indexResult) + } + + // Execute a search to check for runtime fields + searchResp, err := client.Search(indexName). + Query(NewMatchAllQuery()). + DocvalueFields("@timestamp", "day_of_week"). + Do(ctx) + if err != nil { + t.Fatal(err) + } + if searchResp == nil { + t.Errorf("expected result to be != nil; got: %v", searchResp) + } + if want, have := int64(1), searchResp.TotalHits(); want != have { + t.Fatalf("expected %d search hits, got %d", want, have) + } + + // The hit should not have the "day_of_week" + hit := searchResp.Hits.Hits[0] + var doc DynamicDoc + if err := json.Unmarshal(hit.Source, &doc); err != nil { + t.Fatalf("unable to deserialize hit: %v", err) + } + if want, have := timestamp, doc.Timestamp; want != have { + t.Fatalf("expected timestamp=%v, got %v", want, have) + } + if want, have := "", doc.DayOfWeek; want != have { + t.Fatalf("expected day_of_week=%q, got %q", want, have) + } + + // The fields should include a "day_of_week" of ["Sunday"] + dayOfWeekIntfSlice, ok := hit.Fields["day_of_week"].([]interface{}) + if !ok { + t.Fatalf("expected a slice of strings, got %T", hit.Fields["day_of_week"]) + } + if want, have := 1, len(dayOfWeekIntfSlice); want != have { + t.Fatalf("expected a slice of size %d, have %d", want, have) + } + dayOfWeek, ok := dayOfWeekIntfSlice[0].(string) + if !ok { + t.Fatalf("expected an element of string, got %T", dayOfWeekIntfSlice[0]) + } + if want, have := "Sunday", dayOfWeek; want != have { + t.Fatalf("expected day_of_week=%q, have %q", want, have) + } +} diff --git a/search.go b/search.go index 6e0b476ac..2be5517f8 100644 --- a/search.go +++ b/search.go @@ -159,6 +159,12 @@ func (s *SearchService) PointInTime(pointInTime *PointInTime) *SearchService { return s } +// RuntimeMappings specifies optional runtime mappings. +func (s *SearchService) RuntimeMappings(runtimeMappings RuntimeMappings) *SearchService { + s.searchSource = s.searchSource.RuntimeMappings(runtimeMappings) + return s +} + // TimeoutInMillis sets the timeout in milliseconds. func (s *SearchService) TimeoutInMillis(timeoutInMillis int) *SearchService { s.searchSource = s.searchSource.TimeoutInMillis(timeoutInMillis) @@ -764,7 +770,7 @@ type SearchHit struct { Sort []interface{} `json:"sort,omitempty"` // sort information Highlight SearchHitHighlight `json:"highlight,omitempty"` // highlighter information Source json.RawMessage `json:"_source,omitempty"` // stored document source - Fields map[string]interface{} `json:"fields,omitempty"` // returned (stored) fields + Fields SearchHitFields `json:"fields,omitempty"` // returned (stored) fields Explanation *SearchExplanation `json:"_explanation,omitempty"` // explains how the score was computed MatchedQueries []string `json:"matched_queries,omitempty"` // matched queries InnerHits map[string]*SearchHitInnerHits `json:"inner_hits,omitempty"` // inner hits with ES >= 1.5.0 @@ -777,6 +783,43 @@ type SearchHit struct { // MatchedFilters } +// SearchHitFields helps to simplify resolving slices of specific types. +type SearchHitFields map[string]interface{} + +// Strings returns a slice of strings for the given field, if there is any +// such field in the hit. The method ignores elements that are not of type +// string. +func (f SearchHitFields) Strings(fieldName string) ([]string, bool) { + slice, ok := f[fieldName].([]interface{}) + if !ok { + return nil, false + } + results := make([]string, 0, len(slice)) + for _, item := range slice { + if v, ok := item.(string); ok { + results = append(results, v) + } + } + return results, true +} + +// Float64s returns a slice of float64's for the given field, if there is any +// such field in the hit. The method ignores elements that are not of +// type float64. +func (f SearchHitFields) Float64s(fieldName string) ([]float64, bool) { + slice, ok := f[fieldName].([]interface{}) + if !ok { + return nil, false + } + results := make([]float64, 0, len(slice)) + for _, item := range slice { + if v, ok := item.(float64); ok { + results = append(results, v) + } + } + return results, true +} + // SearchHitInnerHits is used for inner hits. type SearchHitInnerHits struct { Hits *SearchHits `json:"hits,omitempty"` diff --git a/search_source.go b/search_source.go index 7578af5b2..7f6826471 100644 --- a/search_source.go +++ b/search_source.go @@ -42,7 +42,8 @@ type SearchSource struct { collapse *CollapseBuilder // collapse profile bool // profile // TODO extBuilders []SearchExtBuilder // ext - pointInTime *PointInTime // pit + pointInTime *PointInTime // pit + runtimeMappings RuntimeMappings } // NewSearchSource initializes a new SearchSource. @@ -375,6 +376,12 @@ func (s *SearchSource) PointInTime(pointInTime *PointInTime) *SearchSource { return s } +// RuntimeMappings specifies optional runtime mappings. +func (s *SearchSource) RuntimeMappings(runtimeMappings RuntimeMappings) *SearchSource { + s.runtimeMappings = runtimeMappings + return s +} + // Source returns the serializable JSON for the source builder. func (s *SearchSource) Source() (interface{}, error) { source := make(map[string]interface{}) @@ -614,6 +621,14 @@ func (s *SearchSource) Source() (interface{}, error) { source["pit"] = src } + if s.runtimeMappings != nil { + src, err := s.runtimeMappings.Source() + if err != nil { + return nil, err + } + source["runtime_mappings"] = src + } + return source, nil } diff --git a/search_source_test.go b/search_source_test.go index bc0b97b1b..4b2852ac7 100644 --- a/search_source_test.go +++ b/search_source_test.go @@ -316,3 +316,45 @@ func TestSearchSourceSeqNoAndPrimaryTerm(t *testing.T) { t.Errorf("expected\n%s\n,got:\n%s", expected, got) } } + +func TestSearchSourcePointInTime(t *testing.T) { + matchAllQ := NewMatchAllQuery() + builder := NewSearchSource().Query(matchAllQ).PointInTime( + NewPointInTime("pit_id", "2m"), + ) + src, err := builder.Source() + if err != nil { + t.Fatal(err) + } + data, err := json.Marshal(src) + if err != nil { + t.Fatalf("marshaling to JSON failed: %v", err) + } + got := string(data) + expected := `{"pit":{"id":"pit_id","keep_alive":"2m"},"query":{"match_all":{}}}` + if got != expected { + t.Errorf("expected\n%s\n,got:\n%s", expected, got) + } +} + +func TestSearchSourceRuntimeMappings(t *testing.T) { + matchAllQ := NewMatchAllQuery() + builder := NewSearchSource().Query(matchAllQ).RuntimeMappings(RuntimeMappings{ + "day_of_week": map[string]interface{}{ + "type": "keyword", + }, + }) + src, err := builder.Source() + if err != nil { + t.Fatal(err) + } + data, err := json.Marshal(src) + if err != nil { + t.Fatalf("marshaling to JSON failed: %v", err) + } + got := string(data) + expected := `{"query":{"match_all":{}},"runtime_mappings":{"day_of_week":{"type":"keyword"}}}` + if got != expected { + t.Errorf("expected\n%s\n,got:\n%s", expected, got) + } +} diff --git a/search_test.go b/search_test.go index 1513551c9..b6d67a8d2 100644 --- a/search_test.go +++ b/search_test.go @@ -477,9 +477,14 @@ func TestSearchSpecificFields(t *testing.T) { // client := setupTestClientAndCreateIndexAndLog(t, SetTraceLog(log.New(os.Stdout, "", 0))) client := setupTestClientAndCreateIndex(t) - tweet1 := tweet{User: "olivere", Message: "Welcome to Golang and Elasticsearch."} - tweet2 := tweet{User: "olivere", Message: "Another unrelated topic."} - tweet3 := tweet{User: "sandrae", Message: "Cycling is fun."} + tweet1 := tweet{User: "olivere", Retweets: 1, Message: "Welcome to Golang and Elasticsearch."} + tweet2 := tweet{User: "olivere", Retweets: 2, Message: "Another unrelated topic."} + tweet3 := tweet{User: "sandrae", Retweets: 3, Message: "Cycling is fun."} + tweets := []tweet{ + tweet1, + tweet2, + tweet3, + } // Add all documents _, err := client.Index().Index(testIndexName).Id("1").BodyJson(&tweet1).Do(context.TODO()) @@ -508,6 +513,8 @@ func TestSearchSpecificFields(t *testing.T) { Index(testIndexName). Query(all). StoredFields("message"). + DocvalueFields("retweets"). + Sort("_id", true). Do(context.TODO()) if err != nil { t.Fatal(err) @@ -522,6 +529,7 @@ func TestSearchSpecificFields(t *testing.T) { t.Errorf("expected len(SearchResult.Hits.Hits) = %d; got %d", 3, len(searchResult.Hits.Hits)) } + // Manually inspect the fields for _, hit := range searchResult.Hits.Hits { if hit.Index != testIndexName { t.Errorf("expected SearchResult.Hits.Hit.Index = %q; got %q", testIndexName, hit.Index) @@ -551,6 +559,42 @@ func TestSearchSpecificFields(t *testing.T) { t.Errorf("expected a message; got: %q", message) } } + + // With the new helper method for fields + for i, hit := range searchResult.Hits.Hits { + // Field: message + items, ok := hit.Fields.Strings("message") + if !ok { + t.Fatalf("expected SearchResult.Hits.Hit.Fields[%s] to be found", "message") + } + if want, have := 1, len(items); want != have { + t.Fatalf("expected a field with %d entries; got %d", want, have) + } + if want, have := tweets[i].Message, items[0]; want != have { + t.Fatalf("expected message[%d]=%q; got %q", i, want, have) + } + + // Field: retweets + retweets, ok := hit.Fields.Float64s("retweets") + if !ok { + t.Fatalf("expected SearchResult.Hits.Hit.Fields[%s] to be found", "retweets") + } + if want, have := 1, len(retweets); want != have { + t.Fatalf("expected a field with %d entries; got %d", want, have) + } + if want, have := tweets[i].Retweets, int(retweets[0]); want != have { + t.Fatalf("expected retweets[%d]=%q; got %q", i, want, have) + } + + // Field should not exist + numbers, ok := hit.Fields.Float64s("score") + if ok { + t.Fatalf("expected SearchResult.Hits.Hit.Fields[%s] to NOT be found", "numbers") + } + if numbers != nil { + t.Fatalf("expected no field %q; got %+v", "numbers", numbers) + } + } } func TestSearchExplain(t *testing.T) {