Skip to content

Commit

Permalink
api: fix race between Respond() and query/queryRange
Browse files Browse the repository at this point in the history
Fix a data race between Respond() and query/queryRange functions by
returning an extra optional function from instrumented functions that
releases the resources i.e. calls Close().

Cannot reproduce the following race:

```
==================
WARNING: DATA RACE
Write at 0x00c00566fa00 by goroutine 562:
  github.com/prometheus/prometheus/promql.(*evaluator).eval()
      /home/giedrius/go/pkg/mod/github.com/vinted/prometheus@v1.8.2-0.20220808145920-5c879a061105/promql/engine.go:1450 +0x8044
  github.com/prometheus/prometheus/promql.(*evaluator).rangeEval()
      /home/giedrius/go/pkg/mod/github.com/vinted/prometheus@v1.8.2-0.20220808145920-5c879a061105/promql/engine.go:1060 +0x2684
  github.com/prometheus/prometheus/promql.(*evaluator).eval()
      /home/giedrius/go/pkg/mod/github.com/vinted/prometheus@v1.8.2-0.20220808145920-5c879a061105/promql/engine.go:1281 +0x42a4
  github.com/prometheus/prometheus/promql.(*evaluator).rangeEval()
      /home/giedrius/go/pkg/mod/github.com/vinted/prometheus@v1.8.2-0.20220808145920-5c879a061105/promql/engine.go:1060 +0x2684
  github.com/prometheus/prometheus/promql.(*evaluator).eval()
      /home/giedrius/go/pkg/mod/github.com/vinted/prometheus@v1.8.2-0.20220808145920-5c879a061105/promql/engine.go:1281 +0x42a4
  github.com/prometheus/prometheus/promql.(*evaluator).Eval()
      /home/giedrius/go/pkg/mod/github.com/vinted/prometheus@v1.8.2-0.20220808145920-5c879a061105/promql/engine.go:989 +0xf5
  github.com/prometheus/prometheus/promql.(*Engine).execEvalStmt()
      /home/giedrius/go/pkg/mod/github.com/vinted/prometheus@v1.8.2-0.20220808145920-5c879a061105/promql/engine.go:645 +0xa77
  github.com/prometheus/prometheus/promql.(*Engine).exec()
      /home/giedrius/go/pkg/mod/github.com/vinted/prometheus@v1.8.2-0.20220808145920-5c879a061105/promql/engine.go:595 +0x71e
  github.com/prometheus/prometheus/promql.(*query).Exec()
      /home/giedrius/go/pkg/mod/github.com/vinted/prometheus@v1.8.2-0.20220808145920-5c879a061105/promql/engine.go:197 +0x250
  github.com/thanos-io/thanos/pkg/api/query.(*QueryAPI).query()
      /home/giedrius/dev/thanos/pkg/api/query/v1.go:387 +0xbf2
  github.com/thanos-io/thanos/pkg/api/query.(*QueryAPI).query-fm()

  ...
  Previous read at 0x00c00566fa00 by goroutine 570:
  github.com/prometheus/prometheus/promql.(*Point).MarshalJSON()
      <autogenerated>:1 +0x4e
  encoding/json.addrMarshalerEncoder()
      /usr/lib/go-1.19/src/encoding/json/encode.go:495 +0x1af
  encoding/json.condAddrEncoder.encode()
      /usr/lib/go-1.19/src/encoding/json/encode.go:959 +0x94
  encoding/json.condAddrEncoder.encode-fm()
      <autogenerated>:1 +0xa4
  encoding/json.arrayEncoder.encode()
      /usr/lib/go-1.19/src/encoding/json/encode.go:915 +0x10e
  encoding/json.arrayEncoder.encode-fm()
      <autogenerated>:1 +0x90
  encoding/json.sliceEncoder.encode()

```

Should fix thanos-io#5501.

Signed-off-by: Giedrius Statkevičius <giedrius.statkevicius@vinted.com>
  • Loading branch information
GiedriusS committed Aug 10, 2022
1 parent 6d1b98d commit 2cedf6a
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 128 deletions.
23 changes: 13 additions & 10 deletions pkg/api/api.go
Expand Up @@ -121,7 +121,7 @@ func SetCORS(w http.ResponseWriter) {
}
}

type ApiFunc func(r *http.Request) (interface{}, []error, *ApiError)
type ApiFunc func(r *http.Request) (interface{}, []error, *ApiError, func())

type BaseAPI struct {
logger log.Logger
Expand Down Expand Up @@ -156,20 +156,20 @@ func (api *BaseAPI) Register(r *route.Router, tracer opentracing.Tracer, logger
r.Get("/status/buildinfo", instr("status_build", api.serveBuildInfo))
}

func (api *BaseAPI) options(r *http.Request) (interface{}, []error, *ApiError) {
return nil, nil, nil
func (api *BaseAPI) options(r *http.Request) (interface{}, []error, *ApiError, func()) {
return nil, nil, nil, func() {}
}

func (api *BaseAPI) flags(r *http.Request) (interface{}, []error, *ApiError) {
return api.flagsMap, nil, nil
func (api *BaseAPI) flags(r *http.Request) (interface{}, []error, *ApiError, func()) {
return api.flagsMap, nil, nil, func() {}
}

func (api *BaseAPI) serveRuntimeInfo(r *http.Request) (interface{}, []error, *ApiError) {
return api.runtimeInfo(), nil, nil
func (api *BaseAPI) serveRuntimeInfo(r *http.Request) (interface{}, []error, *ApiError, func()) {
return api.runtimeInfo(), nil, nil, func() {}
}

func (api *BaseAPI) serveBuildInfo(r *http.Request) (interface{}, []error, *ApiError) {
return api.buildInfo, nil, nil
func (api *BaseAPI) serveBuildInfo(r *http.Request) (interface{}, []error, *ApiError, func()) {
return api.buildInfo, nil, nil, func() {}
}

func GetRuntimeInfoFunc(logger log.Logger) RuntimeInfoFn {
Expand Down Expand Up @@ -208,12 +208,15 @@ func GetInstr(
if !disableCORS {
SetCORS(w)
}
if data, warnings, err := f(r); err != nil {
if data, warnings, err, releaseResources := f(r); err != nil {
RespondError(w, err, data)
releaseResources()
} else if data != nil {
Respond(w, data, warnings)
releaseResources()
} else {
w.WriteHeader(http.StatusNoContent)
releaseResources()
}
})

Expand Down
22 changes: 11 additions & 11 deletions pkg/api/blocks/v1.go
Expand Up @@ -86,48 +86,48 @@ func (bapi *BlocksAPI) Register(r *route.Router, tracer opentracing.Tracer, logg
r.Post("/blocks/mark", instr("blocks_mark", bapi.markBlock))
}

func (bapi *BlocksAPI) markBlock(r *http.Request) (interface{}, []error, *api.ApiError) {
func (bapi *BlocksAPI) markBlock(r *http.Request) (interface{}, []error, *api.ApiError, func()) {
idParam := r.FormValue("id")
actionParam := r.FormValue("action")
detailParam := r.FormValue("detail")

if idParam == "" {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: errors.New("ID cannot be empty")}
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: errors.New("ID cannot be empty")}, func() {}
}

if actionParam == "" {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: errors.New("Action cannot be empty")}
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: errors.New("Action cannot be empty")}, func() {}
}

id, err := ulid.Parse(idParam)
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Errorf("ULID %q is not valid: %v", idParam, err)}
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Errorf("ULID %q is not valid: %v", idParam, err)}, func() {}
}

actionType := parse(actionParam)
switch actionType {
case Deletion:
err := block.MarkForDeletion(r.Context(), bapi.logger, bapi.bkt, id, detailParam, promauto.With(nil).NewCounter(prometheus.CounterOpts{}))
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}, func() {}
}
case NoCompaction:
err := block.MarkForNoCompact(r.Context(), bapi.logger, bapi.bkt, id, metadata.ManualNoCompactReason, detailParam, promauto.With(nil).NewCounter(prometheus.CounterOpts{}))
if err != nil {
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: err}, func() {}
}
default:
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Errorf("not supported marker %v", actionParam)}
return nil, nil, &api.ApiError{Typ: api.ErrorBadData, Err: errors.Errorf("not supported marker %v", actionParam)}, func() {}
}
return nil, nil, nil
return nil, nil, nil, func() {}
}

func (bapi *BlocksAPI) blocks(r *http.Request) (interface{}, []error, *api.ApiError) {
func (bapi *BlocksAPI) blocks(r *http.Request) (interface{}, []error, *api.ApiError, func()) {
viewParam := r.URL.Query().Get("view")
if viewParam == "loaded" {
return bapi.loadedBlocksInfo, nil, nil
return bapi.loadedBlocksInfo, nil, nil, func() {}
}
return bapi.globalBlocksInfo, nil, nil
return bapi.globalBlocksInfo, nil, nil, func() {}
}

func (b *BlocksInfo) set(blocks []metadata.Meta, err error) {
Expand Down
3 changes: 2 additions & 1 deletion pkg/api/blocks/v1_test.go
Expand Up @@ -71,7 +71,8 @@ func testEndpoint(t *testing.T, test endpointTestCase, name string, responseComp
req.Header.Set("Content-Type", "application/x-www-form-urlencoded")
}

resp, _, apiErr := test.endpoint(req.WithContext(ctx))
resp, _, apiErr, releaseResources := test.endpoint(req.WithContext(ctx))
defer releaseResources()
if apiErr != nil {
if test.errType == baseAPI.ErrorNone {
t.Fatalf("Unexpected error: %s", apiErr)
Expand Down

0 comments on commit 2cedf6a

Please sign in to comment.