Skip to content

Commit

Permalink
Implement histogram iterators for batch (#5944)
Browse files Browse the repository at this point in the history
* Implement histogram iterators for batch

Signed-off-by: Ben Ye <benye@amazon.com>

* lint

Signed-off-by: Ben Ye <benye@amazon.com>

* update

Signed-off-by: Ben Ye <benye@amazon.com>

* lint

Signed-off-by: Ben Ye <benye@amazon.com>

* revert to not use unsafe

Signed-off-by: Ben Ye <benye@amazon.com>

---------

Signed-off-by: Ben Ye <benye@amazon.com>
  • Loading branch information
yeya24 committed May 16, 2024
1 parent 23b4148 commit 1b7a5e0
Show file tree
Hide file tree
Showing 14 changed files with 510 additions and 201 deletions.
42 changes: 30 additions & 12 deletions pkg/chunk/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package chunk
import (
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/tsdb/chunkenc"
)
Expand Down Expand Up @@ -45,31 +46,41 @@ type prometheusChunkIterator struct {
it chunkenc.Iterator
}

func (p *prometheusChunkIterator) Scan() bool {
return p.it.Next() != chunkenc.ValNone
func (p *prometheusChunkIterator) Scan() chunkenc.ValueType {
return p.it.Next()
}

func (p *prometheusChunkIterator) FindAtOrAfter(time model.Time) bool {
func (p *prometheusChunkIterator) FindAtOrAfter(time model.Time) chunkenc.ValueType {
// FindAtOrAfter must return OLDEST value at given time. That means we need to start with a fresh iterator,
// otherwise we cannot guarantee OLDEST.
p.it = p.c.Iterator(p.it)
return p.it.Seek(int64(time)) != chunkenc.ValNone
return p.it.Seek(int64(time))
}

func (p *prometheusChunkIterator) Batch(size int) Batch {
func (p *prometheusChunkIterator) Batch(size int, valType chunkenc.ValueType) Batch {
var batch Batch
j := 0
for j < size {
t, v := p.it.At()
batch.Timestamps[j] = t
batch.Values[j] = v
switch valType {
case chunkenc.ValNone:
break
case chunkenc.ValFloat:
t, v := p.it.At()
batch.Timestamps[j] = t
batch.Values[j] = v
case chunkenc.ValHistogram:
batch.Timestamps[j], batch.Histograms[j] = p.it.AtHistogram(nil)
case chunkenc.ValFloatHistogram:
batch.Timestamps[j], batch.FloatHistograms[j] = p.it.AtFloatHistogram(nil)
}
j++
if j < size && p.it.Next() == chunkenc.ValNone {
break
}
}
batch.Index = 0
batch.Length = j
batch.ValType = valType
return batch
}

Expand All @@ -79,7 +90,14 @@ func (p *prometheusChunkIterator) Err() error {

type errorIterator string

func (e errorIterator) Scan() bool { return false }
func (e errorIterator) FindAtOrAfter(time model.Time) bool { return false }
func (e errorIterator) Batch(size int) Batch { panic("no values") }
func (e errorIterator) Err() error { return errors.New(string(e)) }
func (e errorIterator) Scan() chunkenc.ValueType { return chunkenc.ValNone }
func (e errorIterator) FindAtOrAfter(time model.Time) chunkenc.ValueType { return chunkenc.ValNone }
func (e errorIterator) Value() model.SamplePair { panic("no values") }
func (e errorIterator) AtHistogram(_ *histogram.Histogram) (int64, *histogram.Histogram) {
panic("no values")
}
func (e errorIterator) AtFloatHistogram(_ *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
panic("no values")
}
func (e errorIterator) Batch(size int, valType chunkenc.ValueType) Batch { panic("no values") }
func (e errorIterator) Err() error { return errors.New(string(e)) }
39 changes: 34 additions & 5 deletions pkg/chunk/encoding/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"github.com/prometheus/prometheus/tsdb/chunkenc"
)

// Encoding defines which encoding we are using, delta, doubledelta, or varbit
// Encoding defines which encoding we are using.
type Encoding byte

// String implements flag.Value.
Expand All @@ -26,28 +26,57 @@ func (e Encoding) PromChunkEncoding() chunkenc.Encoding {
return chunkenc.EncNone
}

func (e Encoding) ChunkValueType() chunkenc.ValueType {
if known, found := encodings[e]; found {
return known.ValueType
}
return chunkenc.ValNone
}

const (
// PrometheusXorChunk is a wrapper around Prometheus XOR-encoded chunk.
// 4 is the magic value for backwards-compatibility with previous iota-based constants.
PrometheusXorChunk Encoding = 4
// PrometheusHistogramChunk is a wrapper around Prometheus histogram chunk.
// 5 is the magic value for backwards-compatibility with previous iota-based constants.
PrometheusHistogramChunk Encoding = 5
// PrometheusFloatHistogramChunk is a wrapper around Prometheus float histogram chunk.
// 6 is the magic value for backwards-compatibility with previous iota-based constants.
PrometheusFloatHistogramChunk Encoding = 6
)

type encoding struct {
Name string
Encoding chunkenc.Encoding
Name string
Encoding chunkenc.Encoding
ValueType chunkenc.ValueType
}

var encodings = map[Encoding]encoding{
PrometheusXorChunk: {
Name: "PrometheusXorChunk",
Encoding: chunkenc.EncXOR,
Name: "PrometheusXorChunk",
Encoding: chunkenc.EncXOR,
ValueType: chunkenc.ValFloat,
},
PrometheusHistogramChunk: {
Name: "PrometheusHistogramChunk",
Encoding: chunkenc.EncHistogram,
ValueType: chunkenc.ValHistogram,
},
PrometheusFloatHistogramChunk: {
Name: "PrometheusFloatHistogramChunk",
Encoding: chunkenc.EncFloatHistogram,
ValueType: chunkenc.ValFloatHistogram,
},
}

func FromPromChunkEncoding(enc chunkenc.Encoding) (Encoding, error) {
switch enc {
case chunkenc.EncXOR:
return PrometheusXorChunk, nil
case chunkenc.EncHistogram:
return PrometheusHistogramChunk, nil
case chunkenc.EncFloatHistogram:
return PrometheusFloatHistogramChunk, nil
}
return Encoding(0), errors.Errorf("unknown Prometheus chunk encoding: %v", enc)
}
33 changes: 19 additions & 14 deletions pkg/chunk/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package chunk

import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/tsdb/chunkenc"
)

// Iterator enables efficient access to the content of a chunk. It is
Expand All @@ -11,16 +13,16 @@ type Iterator interface {
// Scans the next value in the chunk. Directly after the iterator has
// been created, the next value is the first value in the
// chunk. Otherwise, it is the value following the last value scanned or
// found (by one of the Find... methods). Returns false if either the
// end of the chunk is reached or an error has occurred.
Scan() bool
// Finds the oldest value at or after the provided time. Returns false
// if either the chunk contains no value at or after the provided time,
// or an error has occurred.
FindAtOrAfter(model.Time) bool
// found (by one of the Find... methods). Returns chunkenc.ValNoe if either
// the end of the chunk is reached or an error has occurred.
Scan() chunkenc.ValueType
// Finds the oldest value at or after the provided time and returns the value type.
// Returns chunkenc.ValNone if either the chunk contains no value at or after
// the provided time, or an error has occurred.
FindAtOrAfter(model.Time) chunkenc.ValueType
// Returns a batch of the provisded size; NB not idempotent! Should only be called
// once per Scan.
Batch(size int) Batch
Batch(size int, valType chunkenc.ValueType) Batch
// Returns the last error encountered. In general, an error signals data
// corruption in the chunk and requires quarantining.
Err() error
Expand All @@ -30,11 +32,14 @@ type Iterator interface {
// 1 to 128.
const BatchSize = 12

// Batch is a sorted set of (timestamp, value) pairs. They are intended to be
// small, and passed by value.
// Batch is a sorted set of (timestamp, value) pairs. They are intended to be small,
// and passed by value. Value can vary depending on the chunk value type.
type Batch struct {
Timestamps [BatchSize]int64
Values [BatchSize]float64
Index int
Length int
Timestamps [BatchSize]int64
Values [BatchSize]float64
Histograms [BatchSize]*histogram.Histogram
FloatHistograms [BatchSize]*histogram.FloatHistogram
Index int
Length int
ValType chunkenc.ValueType
}
62 changes: 43 additions & 19 deletions pkg/querier/batch/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ package batch

import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/tsdb/chunkenc"

"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/querier/iterators"
)

// GenericChunk is a generic chunk used by the batch iterator, in order to make the batch
Expand All @@ -31,11 +31,11 @@ func (c GenericChunk) Iterator(reuse chunk.Iterator) chunk.Iterator {

// iterator iterates over batches.
type iterator interface {
// Seek to the batch at (or after) time t.
Seek(t int64, size int) bool
// Seek to the batch at (or after) time t and returns chunk value type.
Seek(t int64, size int) chunkenc.ValueType

// Next moves to the next batch.
Next(size int) bool
// Next moves to the next batch and returns chunk value type.
Next(size int) chunkenc.ValueType

// AtTime returns the start time of the next batch. Must only be called after
// Seek or Next have returned true.
Expand All @@ -44,7 +44,7 @@ type iterator interface {
// MaxCurrentChunkTime returns the max time on the current chunk.
MaxCurrentChunkTime() int64

// Batch returns the current batch. Must only be called after Seek or Next
// Batch returns the current batch. Must only be called after Seek or Next
// have returned true.
Batch() chunk.Batch

Expand Down Expand Up @@ -78,62 +78,71 @@ type iteratorAdapter struct {
}

func newIteratorAdapter(underlying iterator) chunkenc.Iterator {
return iterators.NewCompatibleChunksIterator(&iteratorAdapter{
return &iteratorAdapter{
batchSize: 1,
underlying: underlying,
})
}
}

// Seek implements chunkenc.Iterator.
func (a *iteratorAdapter) Seek(t int64) bool {
func (a *iteratorAdapter) Seek(t int64) chunkenc.ValueType {

// Optimisation: fulfill the seek using current batch if possible.
if a.curr.Length > 0 && a.curr.Index < a.curr.Length {
if t <= a.curr.Timestamps[a.curr.Index] {
//In this case, the interface's requirement is met, so state of this
//iterator does not need any change.
return true
return a.curr.ValType
} else if t <= a.curr.Timestamps[a.curr.Length-1] {
//In this case, some timestamp between current sample and end of batch can fulfill
//the seek. Let's find it.
for a.curr.Index < a.curr.Length && t > a.curr.Timestamps[a.curr.Index] {
a.curr.Index++
}
return true
return a.curr.ValType
} else if t <= a.underlying.MaxCurrentChunkTime() {
// In this case, some timestamp inside the current underlying chunk can fulfill the seek.
// In this case we will call next until we find the sample as it will be faster than calling
// `a.underlying.Seek` directly as this would cause the iterator to start from the beginning of the chunk.
// See: https://github.com/cortexproject/cortex/blob/f69452975877c67ac307709e5f60b8d20477764c/pkg/querier/batch/chunk.go#L26-L45
// https://github.com/cortexproject/cortex/blob/f69452975877c67ac307709e5f60b8d20477764c/pkg/chunk/encoding/prometheus_chunk.go#L90-L95
for a.Next() {
for {
valType := a.Next()
if valType == chunkenc.ValNone {
break
}
if t <= a.curr.Timestamps[a.curr.Index] {
return true
return valType
}
}
}
}

a.curr.Length = -1
a.batchSize = 1
if a.underlying.Seek(t, a.batchSize) {
if valType := a.underlying.Seek(t, a.batchSize); valType != chunkenc.ValNone {
a.curr = a.underlying.Batch()
return a.curr.Index < a.curr.Length
if a.curr.Index < a.curr.Length {
return a.curr.ValType
}
}
return false
return chunkenc.ValNone
}

// Next implements chunkenc.Iterator.
func (a *iteratorAdapter) Next() bool {
func (a *iteratorAdapter) Next() chunkenc.ValueType {
a.curr.Index++
for a.curr.Index >= a.curr.Length && a.underlying.Next(a.batchSize) {
for a.curr.Index >= a.curr.Length && a.underlying.Next(a.batchSize) != chunkenc.ValNone {
a.curr = a.underlying.Batch()
a.batchSize = a.batchSize * 2
if a.batchSize > chunk.BatchSize {
a.batchSize = chunk.BatchSize
}
}
return a.curr.Index < a.curr.Length
if a.curr.Index < a.curr.Length {
return a.curr.ValType
}
return chunkenc.ValNone
}

// At implements chunkenc.Iterator.
Expand All @@ -145,3 +154,18 @@ func (a *iteratorAdapter) At() (int64, float64) {
func (a *iteratorAdapter) Err() error {
return nil
}

// AtHistogram implements chunkenc.Iterator.
func (a *iteratorAdapter) AtHistogram(h *histogram.Histogram) (int64, *histogram.Histogram) {
return a.curr.Timestamps[a.curr.Index], a.curr.Histograms[a.curr.Index]
}

// AtFloatHistogram implements chunkenc.Iterator.
func (a *iteratorAdapter) AtFloatHistogram(h *histogram.FloatHistogram) (int64, *histogram.FloatHistogram) {
return a.curr.Timestamps[a.curr.Index], a.curr.FloatHistograms[a.curr.Index]
}

// AtT implements chunkenc.Iterator.
func (a *iteratorAdapter) AtT() int64 {
return a.curr.Timestamps[a.curr.Index]
}

0 comments on commit 1b7a5e0

Please sign in to comment.