Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update query API to support timeouts #1014

Merged
merged 6 commits into from Apr 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
41 changes: 37 additions & 4 deletions api/prometheus/v1/api.go
Expand Up @@ -238,9 +238,9 @@ type API interface {
// LabelValues performs a query for the values of the given label, time range and matchers.
LabelValues(ctx context.Context, label string, matches []string, startTime time.Time, endTime time.Time) (model.LabelValues, Warnings, error)
// Query performs a query for the given time.
Query(ctx context.Context, query string, ts time.Time) (model.Value, Warnings, error)
Query(ctx context.Context, query string, ts time.Time, opts ...Option) (model.Value, Warnings, error)
// QueryRange performs a query for the given range.
QueryRange(ctx context.Context, query string, r Range) (model.Value, Warnings, error)
QueryRange(ctx context.Context, query string, r Range, opts ...Option) (model.Value, Warnings, error)
// QueryExemplars performs a query for exemplars by the given query and time range.
QueryExemplars(ctx context.Context, query string, startTime time.Time, endTime time.Time) ([]ExemplarQueryResult, error)
// Buildinfo returns various build information properties about the Prometheus server
Expand Down Expand Up @@ -818,10 +818,33 @@ func (h *httpAPI) LabelValues(ctx context.Context, label string, matches []strin
return labelValues, w, json.Unmarshal(body, &labelValues)
}

func (h *httpAPI) Query(ctx context.Context, query string, ts time.Time) (model.Value, Warnings, error) {
type apiOptions struct {
timeout time.Duration
}

type Option func(c *apiOptions)

func WithTimeout(timeout time.Duration) Option {
return func(o *apiOptions) {
o.timeout = timeout
}
}

func (h *httpAPI) Query(ctx context.Context, query string, ts time.Time, opts ...Option) (model.Value, Warnings, error) {

u := h.client.URL(epQuery, nil)
q := u.Query()

opt := &apiOptions{}
for _, o := range opts {
o(opt)
}

d := opt.timeout
if d > 0 {
q.Set("timeout", d.String())
}

q.Set("query", query)
if !ts.IsZero() {
q.Set("time", formatTime(ts))
Expand All @@ -836,7 +859,7 @@ func (h *httpAPI) Query(ctx context.Context, query string, ts time.Time) (model.
return model.Value(qres.v), warnings, json.Unmarshal(body, &qres)
}

func (h *httpAPI) QueryRange(ctx context.Context, query string, r Range) (model.Value, Warnings, error) {
func (h *httpAPI) QueryRange(ctx context.Context, query string, r Range, opts ...Option) (model.Value, Warnings, error) {
u := h.client.URL(epQueryRange, nil)
q := u.Query()

Expand All @@ -845,6 +868,16 @@ func (h *httpAPI) QueryRange(ctx context.Context, query string, r Range) (model.
q.Set("end", formatTime(r.End))
q.Set("step", strconv.FormatFloat(r.Step.Seconds(), 'f', -1, 64))

opt := &apiOptions{}
for _, o := range opts {
o(opt)
}

d := opt.timeout
if d > 0 {
q.Set("timeout", d.String())
}

_, body, warnings, err := h.client.DoGetFallback(ctx, u, q)
if err != nil {
return nil, warnings, err
Expand Down
26 changes: 14 additions & 12 deletions api/prometheus/v1/api_test.go
Expand Up @@ -170,15 +170,15 @@ func TestAPIs(t *testing.T) {
}
}

doQuery := func(q string, ts time.Time) func() (interface{}, Warnings, error) {
doQuery := func(q string, ts time.Time, opts ...Option) func() (interface{}, Warnings, error) {
return func() (interface{}, Warnings, error) {
return promAPI.Query(context.Background(), q, ts)
return promAPI.Query(context.Background(), q, ts, opts...)
}
}

doQueryRange := func(q string, rng Range) func() (interface{}, Warnings, error) {
doQueryRange := func(q string, rng Range, opts ...Option) func() (interface{}, Warnings, error) {
return func() (interface{}, Warnings, error) {
return promAPI.QueryRange(context.Background(), q, rng)
return promAPI.QueryRange(context.Background(), q, rng, opts...)
}
}

Expand Down Expand Up @@ -246,7 +246,7 @@ func TestAPIs(t *testing.T) {

queryTests := []apiTest{
{
do: doQuery("2", testTime),
do: doQuery("2", testTime, WithTimeout(5*time.Second)),
inRes: &queryResult{
Type: model.ValScalar,
Result: &model.Scalar{
Expand All @@ -258,8 +258,9 @@ func TestAPIs(t *testing.T) {
reqMethod: "POST",
reqPath: "/api/v1/query",
reqParam: url.Values{
"query": []string{"2"},
"time": []string{testTime.Format(time.RFC3339Nano)},
"query": []string{"2"},
"time": []string{testTime.Format(time.RFC3339Nano)},
"timeout": []string{(5 * time.Second).String()},
},
res: &model.Scalar{
Value: 2,
Expand Down Expand Up @@ -365,16 +366,17 @@ func TestAPIs(t *testing.T) {
Start: testTime.Add(-time.Minute),
End: testTime,
Step: time.Minute,
}),
}, WithTimeout(5*time.Second)),
inErr: fmt.Errorf("some error"),

reqMethod: "POST",
reqPath: "/api/v1/query_range",
reqParam: url.Values{
"query": []string{"2"},
"start": []string{testTime.Add(-time.Minute).Format(time.RFC3339Nano)},
"end": []string{testTime.Format(time.RFC3339Nano)},
"step": []string{time.Minute.String()},
"query": []string{"2"},
"start": []string{testTime.Add(-time.Minute).Format(time.RFC3339Nano)},
"end": []string{testTime.Format(time.RFC3339Nano)},
"step": []string{time.Minute.String()},
"timeout": []string{(5 * time.Second).String()},
},
err: fmt.Errorf("some error"),
},
Expand Down
4 changes: 2 additions & 2 deletions api/prometheus/v1/example_test.go
Expand Up @@ -39,7 +39,7 @@ func ExampleAPI_query() {
v1api := v1.NewAPI(client)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
result, warnings, err := v1api.Query(ctx, "up", time.Now())
result, warnings, err := v1api.Query(ctx, "up", time.Now(), v1.WithTimeout(5*time.Second))
if err != nil {
fmt.Printf("Error querying Prometheus: %v\n", err)
os.Exit(1)
Expand Down Expand Up @@ -67,7 +67,7 @@ func ExampleAPI_queryRange() {
End: time.Now(),
Step: time.Minute,
}
result, warnings, err := v1api.QueryRange(ctx, "rate(prometheus_tsdb_head_samples_appended_total[5m])", r)
result, warnings, err := v1api.QueryRange(ctx, "rate(prometheus_tsdb_head_samples_appended_total[5m])", r, v1.WithTimeout(5*time.Second))
if err != nil {
fmt.Printf("Error querying Prometheus: %v\n", err)
os.Exit(1)
Expand Down