Skip to content

Commit

Permalink
receive: try to fix pool race
Browse files Browse the repository at this point in the history
Signed-off-by: Michael Hoffmann <mhoffm@posteo.de>
  • Loading branch information
MichaHoffmann committed Mar 26, 2024
1 parent e0e836d commit e4a02f4
Show file tree
Hide file tree
Showing 5 changed files with 13 additions and 113 deletions.
1 change: 1 addition & 0 deletions .golangci.yml
Expand Up @@ -17,6 +17,7 @@ run:
skip-dirs:
- vendor
- internal/cortex
- pkg/store/storepb/remotewritepb


# output configuration options
Expand Down
20 changes: 7 additions & 13 deletions pkg/receive/handler.go
Expand Up @@ -825,11 +825,7 @@ func (h *Handler) sendLocalWrite(
span.SetTag("endpoint", writeDestination.endpoint)
span.SetTag("replica", writeDestination.replica)

wreq := remotewritepb.WriteRequestFromVTPool()
wreq.Timeseries = trackedSeries.timeSeries
defer wreq.ReturnToVTPool()

err := h.writer.Write(tracingCtx, tenant, wreq)
err := h.writer.Write(tracingCtx, tenant, trackedSeries.timeSeries)
if err != nil {
span.SetTag("error", true)
span.SetTag("error.msg", err.Error())
Expand Down Expand Up @@ -864,15 +860,15 @@ func (h *Handler) sendRemoteWrite(
// This is called "real" because it's 1-indexed.
realReplicationIndex := int64(endpointReplica.replica + 1)

wreq := remotewritepb.StoreWriteRequestFromVTPool()
wreq.Timeseries = trackedSeries.timeSeries
wreq.Tenant = tenant
// Increment replica since on-the-wire format is 1-indexed and 0 indicates un-replicated.
wreq.Replica = realReplicationIndex
wreq := &remotewritepb.StoreWriteRequest{
Timeseries: trackedSeries.timeSeries,
Tenant: tenant,
// Increment replica since on-the-wire format is 1-indexed and 0 indicates un-replicated.
Replica: realReplicationIndex,
}

// Actually make the request against the endpoint we determined should handle these time series.
cl.RemoteWriteAsync(ctx, wreq, endpointReplica, trackedSeries.seriesIDs, responses, func(err error) {
wreq.ReturnToVTPool()
if err == nil {
h.forwardRequests.WithLabelValues(labelSuccess).Inc()
if !alreadyReplicated {
Expand Down Expand Up @@ -908,8 +904,6 @@ func quorumReached(successes []int, successThreshold int) bool {

// RemoteWrite implements the gRPC remote write handler for storepb.WriteableStore.
func (h *Handler) RemoteWrite(ctx context.Context, r *remotewritepb.StoreWriteRequest) (*remotewritepb.StoreWriteResponse, error) {
defer r.ReturnToVTPool()

span, ctx := tracing.StartSpan(ctx, "receive_grpc")
defer span.Finish()

Expand Down
4 changes: 2 additions & 2 deletions pkg/receive/writer.go
Expand Up @@ -72,7 +72,7 @@ func NewWriter(logger log.Logger, multiTSDB TenantStorage, opts *WriterOptions)
}
}

func (r *Writer) Write(ctx context.Context, tenantID string, wreq *remotewritepb.WriteRequest) error {
func (r *Writer) Write(ctx context.Context, tenantID string, timeseries []*remotewritepb.TimeSeries) error {
tLogger := log.With(r.logger, "tenant", tenantID)

var (
Expand Down Expand Up @@ -112,7 +112,7 @@ func (r *Writer) Write(ctx context.Context, tenantID string, wreq *remotewritepb
tooFarInFuture: r.opts.TooFarInFutureTimeWindow,
Appender: app,
}
for _, t := range wreq.Timeseries {
for _, t := range timeseries {
// Check if time series labels are valid. If not, skip the time series
// and report the error.
lset := remotewritepb.LabelsToPromLabels(t.Labels)
Expand Down
97 changes: 0 additions & 97 deletions pkg/store/storepb/remotewritepb/rpc_vtproto.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion scripts/genproto.sh
Expand Up @@ -62,9 +62,11 @@ for dir in ${DIRS}; do
${PROTOC_BIN} \
--go_out=paths=source_relative:. \
--plugin protoc-gen-go=${PROTOC_GEN_GO_BIN} \
--go-grpc_out=paths=source_relative:. \
--plugin protoc-gen-go-grpc=${PROTOC_GEN_GO_GRPC_BIN} \
--go-vtproto_out=paths=source_relative:. \
--plugin protoc-gen-go-vtproto=${PROTOC_GEN_GO_VTPROTO_BIN} \
--go-vtproto_opt=features=grpc+marshal+unmarshal+size+pool+clone \
--go-vtproto_opt=features=marshal+unmarshal+size+pool+clone \
-I=. \
-I="${VTPROTO_PATH}" \
${dir}/*.proto
Expand Down

0 comments on commit e4a02f4

Please sign in to comment.