Skip to content

Commit

Permalink
[PRW 2.0] Moved rw2 proto to the full path (both package name and pla…
Browse files Browse the repository at this point in the history
…cement)

This is what buf recommends https://buf.build/docs/reference/protobuf-files-and-packages#packages

Signed-off-by: bwplotka <bwplotka@gmail.com>
  • Loading branch information
bwplotka committed May 2, 2024
1 parent 7b88101 commit a622bbe
Show file tree
Hide file tree
Showing 10 changed files with 200 additions and 202 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func (h Histogram) IsFloatHistogram() bool {
return ok
}

func (m *WriteRequest) OptimizedMarshal(dst []byte) ([]byte, error) {
func (m *Request) OptimizedMarshal(dst []byte) ([]byte, error) {
siz := m.Size()
if cap(dst) < siz {
dst = make([]byte, siz)
Expand All @@ -41,7 +41,7 @@ func (m *WriteRequest) OptimizedMarshal(dst []byte) ([]byte, error) {

// OptimizedMarshalToSizedBuffer is mostly a copy of the generated MarshalToSizedBuffer,
// but calls OptimizedMarshalToSizedBuffer on the timeseries.
func (m *WriteRequest) OptimizedMarshalToSizedBuffer(dAtA []byte) (int, error) {
func (m *Request) OptimizedMarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ func TestOptimizedMarshal(t *testing.T) {

tests := []struct {
name string
m *WriteRequest
m *Request
}{
// {
// name: "empty",
// m: &WriteRequest{},
// },
{
name: "simple",
m: &WriteRequest{
m: &Request{
Timeseries: []TimeSeries{
{
LabelsRefs: []uint32{
Expand Down Expand Up @@ -90,7 +90,7 @@ func TestOptimizedMarshal(t *testing.T) {
require.Equal(t, expected, got)

// round trip
m := &WriteRequest{}
m := &Request{}
require.NoError(t, m.Unmarshal(got))
require.Equal(t, tt.m, m)
})
Expand Down
288 changes: 143 additions & 145 deletions prompb/write/v2/types.pb.go → prompb/io/prometheus/write/v2/types.pb.go

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2017 Prometheus Team
// Copyright 2024 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand All @@ -11,58 +11,55 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// NOTE: This file is also available on https://buf.build/prometheus/prometheus/docs/main:io.prometheus.write.v2

syntax = "proto3";
package write.v2;
package io.prometheus.write.v2;

option go_package = "writev2";

import "gogoproto/gogo.proto";

// WriteRequest represents a Remote Write 2.0 request to write given time series
// to remote destination. Note that Remote Write 2.0 requires a content
// negotiation for version and compressions, explained in
// https://docs.google.com/document/d/1jx1fqpRnM0pAndeo3AgY7g6BLxN3Ah8R0Mm8RvNsHoU/edit
// TODO(bwplotka): Change URL to Prometheus docs once ready.
message WriteRequest {
// symbols contains de-duplicated array of string elements used for various
// items in WriteRequest like labels and some metadata items. To decode
// Request represents a request to write the given timeseries to a remote destination.
// This was introduced in the Remote Write 2.0 specification:
// https://prometheus.io/docs/concepts/remote_write_spec_2_0/
// The canonical Content-Type request header value for this message is
// "application/x-protobuf;proto=io.prometheus.write.v2.Request"
//
// NOTE: gogoproto options might change in future for this file, they
// are not part of the spec proto (they only modify the generated Go code, not
// the serialized message). See: https://github.com/prometheus/prometheus/issues/11908
message Request {
// symbols contains a de-duplicated array of string elements used for various
// items in a Request message, like labels and metadata items. To decode
// each of those items, referenced, by "ref(s)" suffix, you need to lookup the
// actual string by index from symbols array. The order of strings is up to
// the client, server should not assume any particular encoding.
repeated string symbols = 1;
// timeseries represents array of distinct series with 0 or more samples.
// timeseries represents an array of distinct series with 0 or more samples.
repeated TimeSeries timeseries = 2 [(gogoproto.nullable) = false];
}

// TimeSeries represents a single series.
message TimeSeries {
// labels_refs is a list of label name-value pair references, encoded
// as indices to the WriteRequest.symbols array. This list's len is always
// a multiple of 2, and the underlying labels should be sorted.
// as indices to the Request.symbols array. This list's length is always
// a multiple of two, and the underlying labels should be sorted.
//
// Note that there might be multiple TimeSeries objects in the same
// WriteRequests with the same labels e.g. for different exemplars, metadata
// Requests with the same labels e.g. for different exemplars, metadata
// or created timestamp.
repeated uint32 labels_refs = 1;

// samples contain zero or more samples for a given timeseries. For typical
// clients, in healthy cases, there will be only one sample, for ~real
// time metric streaming. Samples can, in theory, co-exist with histogram samples
// (histograms field), although it should be extremely rare in practice (e.g.
// only when classic histogram series and native histogram share exactly the
// same metric name).
// Timeseries messages can either specify samples or (native) histogram samples
// (histogram field), but not both. For typical clients (~real-time metric
// streaming), in healthy cases, there will be only one sample or histogram.
//
// Samples are sorted by timestamp (older first).
// Samples and histograms are sorted by timestamp (older first).
repeated Sample samples = 2 [(gogoproto.nullable) = false];
// histograms contain zero or more histogram samples for a given timeseries.
// For typical clients, in healthy cases, there will be only one sample, for ~real
// time metric streaming. histograms can co-exist with samples (see samples
// for details).
//
// histograms are sorted by timestamp (older first).
repeated Histogram histograms = 3 [(gogoproto.nullable) = false];

// exemplars represents optional set of exemplars attached to this series' samples.
// exemplars represents an optional set of exemplars attached to this series' samples.
repeated Exemplar exemplars = 4 [(gogoproto.nullable) = false];

// metadata represents the metadata associated with the given series' samples.
Expand All @@ -71,15 +68,15 @@ message TimeSeries {
// created_timestamp represents an optional created timestamp associated with
// this series' samples in ms format, typically for counter or histogram type
// metrics. Note that some servers might require this and in return fail to
// ingest such series within the WriteRequest.
// ingest such samples within the Request.
//
// For Go, see github.com/prometheus/prometheus/model/timestamp/timestamp.go
// for conversion from/to time.Time to Prometheus timestamp.
//
// NOTE: Optional key word is omitted due to
// Note that the "optional" keyword is omitted due to
// https://cloud.google.com/apis/design/design_patterns.md#optional_primitive_fields
// Zero value means value not set. If you need to use exactly zero value for
// timestamp, use 1 millisecond before or after.
// the timestamp, use 1 millisecond before or after.
int64 created_timestamp = 6;
}

Expand All @@ -88,7 +85,7 @@ message TimeSeries {
// the metric changes.
message Exemplar {
// labels_refs is a list of label name-value pair references, encoded
// as indices to the WriteRequest.symbols array. This list's len is always
// as indices to the Request.symbols array. This list's len is always
// a multiple of 2, and the underlying labels should be sorted.
repeated uint32 labels_refs = 1;
// value represents an exact example value. This can be useful when the exemplar
Expand All @@ -97,11 +94,11 @@ message Exemplar {
// timestamp represents an optional timestamp of the example in ms.
// For Go, see github.com/prometheus/prometheus/model/timestamp/timestamp.go
// for conversion from/to time.Time to Prometheus timestamp.
//
// NOTE: Optional key word is omitted due to
//
// Note that the "optional" keyword is omitted due to
// https://cloud.google.com/apis/design/design_patterns.md#optional_primitive_fields
// Zero value means value not set. If you need to use exactly zero value for
// timestamp, use 1 millisecond before or after.
// the timestamp, use 1 millisecond before or after.
int64 timestamp = 3;
}

Expand All @@ -128,10 +125,10 @@ message Metadata {
METRIC_TYPE_STATESET = 7;
}
MetricType type = 1;
// help_ref is a reference to the WriteRequest.symbols array representing help
// help_ref is a reference to the Request.symbols array representing help
// text for the metric.
uint32 help_ref = 3;
// unit_ref is a reference to the WriteRequest.symbols array representing unit
// unit_ref is a reference to the Request.symbols array representing unit
// for the metric.
uint32 unit_ref = 4;
}
Expand Down
2 changes: 1 addition & 1 deletion scripts/genproto.sh
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ for dir in ${DIRS}; do
./*.proto
protoc --gogofast_out=plugins=grpc:. -I=. \
-I="${GOGOPROTO_PATH}" \
./write/v2/*.proto
./io/prometheus/write/v2/*.proto
protoc --gogofast_out=Mgoogle/protobuf/timestamp.proto=github.com/gogo/protobuf/types,paths=source_relative:. -I=. \
-I="${GOGOPROTO_PATH}" \
./io/prometheus/client/*.proto
Expand Down
8 changes: 4 additions & 4 deletions storage/remote/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/prompb"
writev2 "github.com/prometheus/prometheus/prompb/write/v2"
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
Expand Down Expand Up @@ -1059,7 +1059,7 @@ func DecodeOTLPWriteRequest(r *http.Request) (pmetricotlp.ExportRequest, error)
return otlpReq, nil
}

func DecodeMinimizedWriteRequestStr(r io.Reader) (*writev2.WriteRequest, error) {
func DecodeMinimizedWriteRequestStr(r io.Reader) (*writev2.Request, error) {
compressed, err := io.ReadAll(r)
if err != nil {
return nil, err
Expand All @@ -1070,15 +1070,15 @@ func DecodeMinimizedWriteRequestStr(r io.Reader) (*writev2.WriteRequest, error)
return nil, err
}

var req writev2.WriteRequest
var req writev2.Request
if err := proto.Unmarshal(reqBuf, &req); err != nil {
return nil, err
}

return &req, nil
}

func MinimizedWriteRequestToWriteRequest(redReq *writev2.WriteRequest) (*prompb.WriteRequest, error) {
func MinimizedWriteRequestToWriteRequest(redReq *writev2.Request) (*prompb.WriteRequest, error) {
req := &prompb.WriteRequest{
Timeseries: make([]prompb.TimeSeries, len(redReq.Timeseries)),
// TODO handle metadata?
Expand Down
6 changes: 3 additions & 3 deletions storage/remote/codec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/prompb"
writev2 "github.com/prometheus/prometheus/prompb/write/v2"
writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/prometheus/prometheus/tsdb/chunks"
Expand Down Expand Up @@ -77,7 +77,7 @@ var writeRequestFixture = &prompb.WriteRequest{
}

// writeRequestMinimizedFixture represents the same request as writeRequestFixture, but using the minimized representation.
var writeRequestMinimizedFixture = func() *writev2.WriteRequest {
var writeRequestMinimizedFixture = func() *writev2.Request {
st := newRwSymbolTable()
var labels []uint32
for _, s := range []string{
Expand All @@ -97,7 +97,7 @@ var writeRequestMinimizedFixture = func() *writev2.WriteRequest {
st.RefStr(s)
}

return &writev2.WriteRequest{
return &writev2.Request{
Timeseries: []writev2.TimeSeries{
{
LabelsRefs: labels,
Expand Down
5 changes: 3 additions & 2 deletions storage/remote/queue_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,15 @@ import (
semconv "go.opentelemetry.io/otel/semconv/v1.21.0"
"go.uber.org/atomic"

writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"

"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/prompb"
writev2 "github.com/prometheus/prometheus/prompb/write/v2"
"github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/record"
Expand Down Expand Up @@ -2233,7 +2234,7 @@ func buildV2WriteRequest(logger log.Logger, samples []writev2.TimeSeries, labels
level.Debug(logger).Log("msg", "dropped data due to their age", "droppedSamples", droppedSamples, "droppedExemplars", droppedExemplars, "droppedHistograms", droppedHistograms)
}

req := &writev2.WriteRequest{
req := &writev2.Request{
Symbols: labels,
Timeseries: timeSeries,
}
Expand Down
5 changes: 3 additions & 2 deletions storage/remote/queue_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,14 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/atomic"

writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"

"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/prompb"
writev2 "github.com/prometheus/prometheus/prompb/write/v2"
"github.com/prometheus/prometheus/scrape"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/prometheus/prometheus/tsdb/record"
Expand Down Expand Up @@ -1129,7 +1130,7 @@ func (c *TestWriteClient) Store(_ context.Context, req []byte, attemptNos int, r
reqProto = &prompb.WriteRequest{}
err = proto.Unmarshal(reqBuf, reqProto)
case Version2:
var reqMin writev2.WriteRequest
var reqMin writev2.Request
err = proto.Unmarshal(reqBuf, &reqMin)
if err == nil {
reqProto, err = MinimizedWriteRequestToWriteRequest(&reqMin)
Expand Down
7 changes: 4 additions & 3 deletions storage/remote/write_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"

writev2 "github.com/prometheus/prometheus/prompb/io/prometheus/write/v2"

"github.com/gogo/protobuf/proto"
"github.com/golang/snappy"

Expand All @@ -33,7 +35,6 @@ import (
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/prompb"
writev2 "github.com/prometheus/prometheus/prompb/write/v2"
"github.com/prometheus/prometheus/storage"
otlptranslator "github.com/prometheus/prometheus/storage/remote/otlptranslator/prometheusremotewrite"
)
Expand Down Expand Up @@ -204,7 +205,7 @@ func (h *writeHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
err = h.write(r.Context(), &req)
case RemoteWriteVersion20HeaderValue:
// 2.0 request.
var reqMinStr writev2.WriteRequest
var reqMinStr writev2.Request
if err := proto.Unmarshal(decompressed, &reqMinStr); err != nil {
level.Error(h.logger).Log("msg", "Error decoding remote write request", "err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)
Expand Down Expand Up @@ -452,7 +453,7 @@ func (h *otlpWriteHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}

func (h *writeHandler) writeMinStr(ctx context.Context, req *writev2.WriteRequest) (err error) {
func (h *writeHandler) writeMinStr(ctx context.Context, req *writev2.Request) (err error) {
outOfOrderExemplarErrs := 0

app := h.appendable.Appender(ctx)
Expand Down

0 comments on commit a622bbe

Please sign in to comment.