From 71cbef98489618baebba612c15d5b86caeaf7842 Mon Sep 17 00:00:00 2001 From: Oliver Eilhard Date: Mon, 12 Apr 2021 14:26:18 +0200 Subject: [PATCH] Add Point In Time API (#1480) This commit adds the Point in Time API as described here: https://www.elastic.co/guide/en/elasticsearch/reference/7.x/point-in-time-api.html Close #1433 --- client.go | 10 ++ pit.go | 36 +++++++ pit_close.go | 176 +++++++++++++++++++++++++++++++++ pit_open.go | 247 ++++++++++++++++++++++++++++++++++++++++++++++ pit_test.go | 93 +++++++++++++++++ search.go | 8 ++ search_request.go | 7 ++ search_source.go | 17 ++++ 8 files changed, 594 insertions(+) create mode 100644 pit.go create mode 100644 pit_close.go create mode 100644 pit_open.go create mode 100644 pit_test.go diff --git a/client.go b/client.go index 25d863c4b..fdf453993 100644 --- a/client.go +++ b/client.go @@ -1636,6 +1636,16 @@ func (c *Client) ClearScroll(scrollIds ...string) *ClearScrollService { return NewClearScrollService(c).ScrollId(scrollIds...) } +// OpenPointInTime opens a new Point in Time. +func (c *Client) OpenPointInTime(indices ...string) *OpenPointInTimeService { + return NewOpenPointInTimeService(c).Index(indices...) +} + +// ClosePointInTime closes an existing Point in Time. +func (c *Client) ClosePointInTime(id string) *ClosePointInTimeService { + return NewClosePointInTimeService(c).ID(id) +} + // -- Indices APIs -- // CreateIndex returns a service to create a new index. diff --git a/pit.go b/pit.go new file mode 100644 index 000000000..22871309c --- /dev/null +++ b/pit.go @@ -0,0 +1,36 @@ +// Copyright 2012-present Oliver Eilhard. All rights reserved. +// Use of this source code is governed by a MIT-license. +// See http://olivere.mit-license.org/license.txt for details. + +package elastic + +// PointInTime is a lightweight view into the state of the data that existed +// when initiated. It can be created with OpenPointInTime API and be used +// when searching, e.g. in Search API or with SearchSource. +type PointInTime struct { + // Id that uniquely identifies the point in time, as created with the + // OpenPointInTime API. + Id string `json:"id,omitempty"` + // KeepAlive is the time for which this specific PointInTime will be + // kept alive by Elasticsearch. + KeepAlive string `json:"keep_alive,omitempty"` +} + +// NewPointInTime creates a new PointInTime. +func NewPointInTime(id, keepAlive string) *PointInTime { + return &PointInTime{ + Id: id, + KeepAlive: keepAlive, + } +} + +// Source generates the JSON serializable fragment for the PointInTime. +func (pit *PointInTime) Source() (interface{}, error) { + if pit == nil { + return nil, nil + } + return map[string]interface{}{ + "id": pit.Id, + "keep_alive": pit.KeepAlive, + }, nil +} diff --git a/pit_close.go b/pit_close.go new file mode 100644 index 000000000..86515a605 --- /dev/null +++ b/pit_close.go @@ -0,0 +1,176 @@ +// Copyright 2012-present Oliver Eilhard. All rights reserved. +// Use of this source code is governed by a MIT-license. +// See http://olivere.mit-license.org/license.txt for details. + +package elastic + +import ( + "context" + "fmt" + "net/http" + "net/url" + "strings" +) + +// ClosePointInTimeService removes a point in time. +// +// See https://www.elastic.co/guide/en/elasticsearch/reference/7.x/point-in-time-api.html +// for details. +type ClosePointInTimeService struct { + client *Client + + pretty *bool // pretty format the returned JSON response + human *bool // return human readable values for statistics + errorTrace *bool // include the stack trace of returned errors + filterPath []string // list of filters used to reduce the response + headers http.Header // custom request-level HTTP headers + + id string + bodyJson interface{} + bodyString string +} + +// NewClosePointInTimeService creates a new ClosePointInTimeService. +func NewClosePointInTimeService(client *Client) *ClosePointInTimeService { + return &ClosePointInTimeService{ + client: client, + } +} + +// Pretty tells Elasticsearch whether to return a formatted JSON response. +func (s *ClosePointInTimeService) Pretty(pretty bool) *ClosePointInTimeService { + s.pretty = &pretty + return s +} + +// Human specifies whether human readable values should be returned in +// the JSON response, e.g. "7.5mb". +func (s *ClosePointInTimeService) Human(human bool) *ClosePointInTimeService { + s.human = &human + return s +} + +// ErrorTrace specifies whether to include the stack trace of returned errors. +func (s *ClosePointInTimeService) ErrorTrace(errorTrace bool) *ClosePointInTimeService { + s.errorTrace = &errorTrace + return s +} + +// FilterPath specifies a list of filters used to reduce the response. +func (s *ClosePointInTimeService) FilterPath(filterPath ...string) *ClosePointInTimeService { + s.filterPath = filterPath + return s +} + +// Header adds a header to the request. +func (s *ClosePointInTimeService) Header(name string, value string) *ClosePointInTimeService { + if s.headers == nil { + s.headers = http.Header{} + } + s.headers.Add(name, value) + return s +} + +// Headers specifies the headers of the request. +func (s *ClosePointInTimeService) Headers(headers http.Header) *ClosePointInTimeService { + s.headers = headers + return s +} + +// ID to close. +func (s *ClosePointInTimeService) ID(id string) *ClosePointInTimeService { + s.id = id + return s +} + +// BodyJson is the document as a serializable JSON interface. +func (s *ClosePointInTimeService) BodyJson(body interface{}) *ClosePointInTimeService { + s.bodyJson = body + return s +} + +// BodyString is the document encoded as a string. +func (s *ClosePointInTimeService) BodyString(body string) *ClosePointInTimeService { + s.bodyString = body + return s +} + +// buildURL builds the URL for the operation. +func (s *ClosePointInTimeService) buildURL() (string, string, url.Values, error) { + var ( + method = "DELETE" + path = "/_pit" + ) + + // Add query string parameters + params := url.Values{} + if v := s.pretty; v != nil { + params.Set("pretty", fmt.Sprint(*v)) + } + if v := s.human; v != nil { + params.Set("human", fmt.Sprint(*v)) + } + if v := s.errorTrace; v != nil { + params.Set("error_trace", fmt.Sprint(*v)) + } + if len(s.filterPath) > 0 { + params.Set("filter_path", strings.Join(s.filterPath, ",")) + } + return method, path, params, nil +} + +// Validate checks if the operation is valid. +func (s *ClosePointInTimeService) Validate() error { + return nil +} + +// Do executes the operation. +func (s *ClosePointInTimeService) Do(ctx context.Context) (*ClosePointInTimeResponse, error) { + // Check pre-conditions + if err := s.Validate(); err != nil { + return nil, err + } + + // Get URL for request + method, path, params, err := s.buildURL() + if err != nil { + return nil, err + } + + // Setup HTTP request body + var body interface{} + if s.id != "" { + body = map[string]interface{}{ + "id": s.id, + } + } else if s.bodyJson != nil { + body = s.bodyJson + } else { + body = s.bodyString + } + + // Get HTTP response + res, err := s.client.PerformRequest(ctx, PerformRequestOptions{ + Method: method, + Path: path, + Params: params, + Body: body, + Headers: s.headers, + }) + if err != nil { + return nil, err + } + + // Return operation response + ret := new(ClosePointInTimeResponse) + if err := s.client.decoder.Decode(res.Body, ret); err != nil { + return nil, err + } + return ret, nil +} + +// ClosePointInTimeResponse is the result of closing a point in time. +type ClosePointInTimeResponse struct { + Succeeded bool `json:"succeeded,omitempty"` + NumFreed int `json:"num_freed,omitempty"` +} diff --git a/pit_open.go b/pit_open.go new file mode 100644 index 000000000..847a4d237 --- /dev/null +++ b/pit_open.go @@ -0,0 +1,247 @@ +// Copyright 2012-present Oliver Eilhard. All rights reserved. +// Use of this source code is governed by a MIT-license. +// See http://olivere.mit-license.org/license.txt for details. + +package elastic + +import ( + "context" + "fmt" + "net/http" + "net/url" + "strings" + + "github.com/olivere/elastic/v7/uritemplates" +) + +// OpenPointInTimeService opens a point in time that can be used in subsequent +// searches. +// +// See https://www.elastic.co/guide/en/elasticsearch/reference/7.x/point-in-time-api.html +// for details. +type OpenPointInTimeService struct { + client *Client + + pretty *bool // pretty format the returned JSON response + human *bool // return human readable values for statistics + errorTrace *bool // include the stack trace of returned errors + filterPath []string // list of filters used to reduce the response + headers http.Header // custom request-level HTTP headers + + index []string + preference string + routing string + ignoreUnavailable *bool + expandWildcards string + keepAlive string + bodyJson interface{} + bodyString string +} + +// NewOpenPointInTimeService creates a new OpenPointInTimeService. +func NewOpenPointInTimeService(client *Client) *OpenPointInTimeService { + return &OpenPointInTimeService{ + client: client, + } +} + +// Pretty tells Elasticsearch whether to return a formatted JSON response. +func (s *OpenPointInTimeService) Pretty(pretty bool) *OpenPointInTimeService { + s.pretty = &pretty + return s +} + +// Human specifies whether human readable values should be returned in +// the JSON response, e.g. "7.5mb". +func (s *OpenPointInTimeService) Human(human bool) *OpenPointInTimeService { + s.human = &human + return s +} + +// ErrorTrace specifies whether to include the stack trace of returned errors. +func (s *OpenPointInTimeService) ErrorTrace(errorTrace bool) *OpenPointInTimeService { + s.errorTrace = &errorTrace + return s +} + +// FilterPath specifies a list of filters used to reduce the response. +func (s *OpenPointInTimeService) FilterPath(filterPath ...string) *OpenPointInTimeService { + s.filterPath = filterPath + return s +} + +// Header adds a header to the request. +func (s *OpenPointInTimeService) Header(name string, value string) *OpenPointInTimeService { + if s.headers == nil { + s.headers = http.Header{} + } + s.headers.Add(name, value) + return s +} + +// Headers specifies the headers of the request. +func (s *OpenPointInTimeService) Headers(headers http.Header) *OpenPointInTimeService { + s.headers = headers + return s +} + +// Preference specifies the node or shard the operation should be performed on. +func (s *OpenPointInTimeService) Preference(preference string) *OpenPointInTimeService { + s.preference = preference + return s +} + +// Index is the name of the index (or indices). +func (s *OpenPointInTimeService) Index(index ...string) *OpenPointInTimeService { + s.index = index + return s +} + +// Routing is a specific routing value. +func (s *OpenPointInTimeService) Routing(routing string) *OpenPointInTimeService { + s.routing = routing + return s +} + +// IgnoreUnavailable indicates whether specified concrete indices should be +// ignored when unavailable (missing or closed). +func (s *OpenPointInTimeService) IgnoreUnavailable(ignoreUnavailable bool) *OpenPointInTimeService { + s.ignoreUnavailable = &ignoreUnavailable + return s +} + +// ExpandWildcards indicates whether to expand wildcard expression to +// concrete indices that are open, closed or both. +func (s *OpenPointInTimeService) ExpandWildcards(expandWildcards string) *OpenPointInTimeService { + s.expandWildcards = expandWildcards + return s +} + +// KeepAlive indicates the specific time to live for the point in time. +func (s *OpenPointInTimeService) KeepAlive(keepAlive string) *OpenPointInTimeService { + s.keepAlive = keepAlive + return s +} + +// BodyJson is the document as a serializable JSON interface. +func (s *OpenPointInTimeService) BodyJson(body interface{}) *OpenPointInTimeService { + s.bodyJson = body + return s +} + +// BodyString is the document encoded as a string. +func (s *OpenPointInTimeService) BodyString(body string) *OpenPointInTimeService { + s.bodyString = body + return s +} + +// buildURL builds the URL for the operation. +func (s *OpenPointInTimeService) buildURL() (string, string, url.Values, error) { + var err error + var method, path string + + if len(s.index) > 0 { + method = "POST" + path, err = uritemplates.Expand("/{index}/_pit", map[string]string{ + "index": strings.Join(s.index, ","), + }) + } else { + method = "POST" + path = "/_pit" + } + if err != nil { + return "", "", url.Values{}, err + } + + // Add query string parameters + params := url.Values{} + if v := s.pretty; v != nil { + params.Set("pretty", fmt.Sprint(*v)) + } + if v := s.human; v != nil { + params.Set("human", fmt.Sprint(*v)) + } + if v := s.errorTrace; v != nil { + params.Set("error_trace", fmt.Sprint(*v)) + } + if len(s.filterPath) > 0 { + params.Set("filter_path", strings.Join(s.filterPath, ",")) + } + if s.preference != "" { + params.Set("preference", s.preference) + } + if s.routing != "" { + params.Set("routing", s.routing) + } + if s.ignoreUnavailable != nil { + params.Set("ignore_unavailable", fmt.Sprintf("%v", *s.ignoreUnavailable)) + } + if s.expandWildcards != "" { + params.Set("expand_wildcards", s.expandWildcards) + } + if s.keepAlive != "" { + params.Set("keep_alive", s.keepAlive) + } + return method, path, params, nil +} + +// Validate checks if the operation is valid. +func (s *OpenPointInTimeService) Validate() error { + var invalid []string + if len(s.index) == 0 { + invalid = append(invalid, "Index") + } + if s.keepAlive == "" { + invalid = append(invalid, "KeepAlive") + } + if len(invalid) > 0 { + return fmt.Errorf("missing required fields: %v", invalid) + } + return nil +} + +// Do executes the operation. +func (s *OpenPointInTimeService) Do(ctx context.Context) (*OpenPointInTimeResponse, error) { + // Check pre-conditions + if err := s.Validate(); err != nil { + return nil, err + } + + // Get URL for request + method, path, params, err := s.buildURL() + if err != nil { + return nil, err + } + + // Setup HTTP request body + var body interface{} + if s.bodyJson != nil { + body = s.bodyJson + } else { + body = s.bodyString + } + + // Get HTTP response + res, err := s.client.PerformRequest(ctx, PerformRequestOptions{ + Method: method, + Path: path, + Params: params, + Body: body, + Headers: s.headers, + }) + if err != nil { + return nil, err + } + + // Return operation response + ret := new(OpenPointInTimeResponse) + if err := s.client.decoder.Decode(res.Body, ret); err != nil { + return nil, err + } + return ret, nil +} + +// OpenPointInTimeResponse is the result of opening a point in time. +type OpenPointInTimeResponse struct { + Id string `json:"id,omitempty"` +} diff --git a/pit_test.go b/pit_test.go new file mode 100644 index 000000000..6e521fbc4 --- /dev/null +++ b/pit_test.go @@ -0,0 +1,93 @@ +// Copyright 2012-present Oliver Eilhard. All rights reserved. +// Use of this source code is governed by a MIT-license. +// See http://olivere.mit-license.org/license.txt for details. + +package elastic + +import ( + "context" + "testing" +) + +func TestPointInTimeOpenAndClose(t *testing.T) { + client := setupTestClientAndCreateIndexAndAddDocs(t) //, SetTraceLog(log.New(os.Stdout, "", 0))) + + // Create a Point In Time + openResp, err := client.OpenPointInTime(testIndexName). + KeepAlive("1m"). + Pretty(true). + Do(context.Background()) + if err != nil { + t.Fatal(err) + } + if openResp == nil { + t.Fatal("expected non-nil Point In Time") + } + if openResp.Id == "" { + t.Fatal("expected non-blank Point In Time ID") + } + + // Close the Point in Time + closeResp, err := client.ClosePointInTime(openResp.Id).Pretty(true).Do(context.Background()) + if err != nil { + t.Fatal(err) + } + if closeResp == nil { + t.Fatal("expected non-nil Point In Time") + } + if want, have := true, closeResp.Succeeded; want != have { + t.Fatalf("want Succeeded=%v, have %v", want, have) + } + if want, have := 1, closeResp.NumFreed; want != have { + t.Fatalf("want NumFreed=%v, have %v", want, have) + } +} + +func TestPointInTimeLifecycle(t *testing.T) { + client := setupTestClientAndCreateIndexAndAddDocs(t) //, SetTraceLog(log.New(os.Stdout, "", 0))) + + // Create a Point In Time + pitResp, err := client.OpenPointInTime(). + Index(testIndexName). + KeepAlive("1m"). + Pretty(true). + Do(context.Background()) + if err != nil { + t.Fatal(err) + } + if pitResp == nil { + t.Fatal("expected non-nil Point In Time") + } + if pitResp.Id == "" { + t.Fatal("expected non-blank Point In Time ID") + } + + // We remove the documents here, but will be able to still search with + // the PIT previously created + _, err = client.DeleteByQuery(testIndexName). + Query(NewMatchAllQuery()). + Refresh("true"). + Do(context.Background()) + if err != nil { + t.Fatal(err) + } + + // Search with the Point in Time ID + searchResult, err := client.Search(). + // Index(testIndexName). // <-- you may not use indices with PointInTime! + Query(NewMatchAllQuery()). + PointInTime(NewPointInTime(pitResp.Id, "1m")). + Size(100). + Pretty(true). + Do(context.TODO()) + if err != nil { + t.Fatal(err) + } + if searchResult.Hits == nil { + t.Errorf("expected SearchResult.Hits != nil; got nil") + } + if got, want := searchResult.TotalHits(), int64(3); got != want { + t.Errorf("expected SearchResult.TotalHits() = %d; got %d", want, got) + } + +} diff --git a/search.go b/search.go index 127e2b638..f47fdf5cb 100644 --- a/search.go +++ b/search.go @@ -152,6 +152,13 @@ func (s *SearchService) Collapse(collapse *CollapseBuilder) *SearchService { return s } +// PointInTime specifies an optional PointInTime to be used in the context +// of this search. +func (s *SearchService) PointInTime(pointInTime *PointInTime) *SearchService { + s.searchSource = s.searchSource.PointInTime(pointInTime) + return s +} + // TimeoutInMillis sets the timeout in milliseconds. func (s *SearchService) TimeoutInMillis(timeoutInMillis int) *SearchService { s.searchSource = s.searchSource.TimeoutInMillis(timeoutInMillis) @@ -655,6 +662,7 @@ type SearchResult struct { Profile *SearchProfile `json:"profile,omitempty"` // profiling results, if optional Profile API was active for this search Shards *ShardsInfo `json:"_shards,omitempty"` // shard information Status int `json:"status,omitempty"` // used in MultiSearch + PitId string `json:"pit_id,omitempty"` // Point In Time ID } // SearchResultCluster holds information about a search response diff --git a/search_request.go b/search_request.go index 3a444b808..e44d48ddd 100644 --- a/search_request.go +++ b/search_request.go @@ -413,6 +413,13 @@ func (r *SearchRequest) Collapse(collapse *CollapseBuilder) *SearchRequest { return r } +// PointInTime specifies an optional PointInTime to be used in the context +// of this search. +func (s *SearchRequest) PointInTime(pointInTime *PointInTime) *SearchRequest { + s.searchSource = s.searchSource.PointInTime(pointInTime) + return s +} + // AllowPartialSearchResults indicates if this request should allow partial // results. (If method is not called, will default to the cluster level // setting). diff --git a/search_source.go b/search_source.go index ae2ae41cd..7578af5b2 100644 --- a/search_source.go +++ b/search_source.go @@ -42,6 +42,7 @@ type SearchSource struct { collapse *CollapseBuilder // collapse profile bool // profile // TODO extBuilders []SearchExtBuilder // ext + pointInTime *PointInTime // pit } // NewSearchSource initializes a new SearchSource. @@ -367,6 +368,13 @@ func (s *SearchSource) Collapse(collapse *CollapseBuilder) *SearchSource { return s } +// PointInTime specifies an optional PointInTime to be used in the context +// of this search. +func (s *SearchSource) PointInTime(pointInTime *PointInTime) *SearchSource { + s.pointInTime = pointInTime + return s +} + // Source returns the serializable JSON for the source builder. func (s *SearchSource) Source() (interface{}, error) { source := make(map[string]interface{}) @@ -597,6 +605,15 @@ func (s *SearchSource) Source() (interface{}, error) { source["inner_hits"] = m } + // Point in Time + if s.pointInTime != nil { + src, err := s.pointInTime.Source() + if err != nil { + return nil, err + } + source["pit"] = src + } + return source, nil }