Skip to content

Commit

Permalink
[refactor] simplify appender commit
Browse files Browse the repository at this point in the history
Signed-off-by: Nicolas Takashi <nicolas.tcs@hotmail.com>
  • Loading branch information
nicolastakashi committed Apr 19, 2024
1 parent 9b7de47 commit 0bdbbb7
Showing 1 changed file with 71 additions and 35 deletions.
106 changes: 71 additions & 35 deletions tsdb/head_append.go
Original file line number Diff line number Diff line change
Expand Up @@ -869,11 +869,13 @@ func (a *headAppender) Commit() (err error) {
}
enc record.Encoder
)

defer func() {
for i := range oooRecords {
a.head.putBytesBuffer(oooRecords[i][:0])
}
}()

collectOOORecords := func() {
if a.head.wbl == nil {
// WBL is not enabled. So no need to collect.
Expand Down Expand Up @@ -906,6 +908,7 @@ func (a *headAppender) Commit() (err error) {
wblSamples = nil
oooMmapMarkers = nil
}

for i, s := range a.samples {
series = a.sampleSeries[i]
series.Lock()
Expand Down Expand Up @@ -935,41 +938,21 @@ func (a *headAppender) Commit() (err error) {
case oooSample:
// Sample is OOO and OOO handling is enabled
// and the delta is within the OOO tolerance.
var mmapRef chunks.ChunkDiskMapperRef
ok, chunkCreated, mmapRef = series.insert(s.T, s.V, a.head.chunkDiskMapper, oooCapMax)
if chunkCreated {
r, ok := oooMmapMarkers[series.ref]
if !ok || r != 0 {
// !ok means there are no markers collected for these samples yet. So we first flush the samples
// before setting this m-map marker.

// r != 0 means we have already m-mapped a chunk for this series in the same Commit().
// Hence, before we m-map again, we should add the samples and m-map markers
// seen till now to the WBL records.
collectOOORecords()
}

if oooMmapMarkers == nil {
oooMmapMarkers = make(map[chunks.HeadSeriesRef]chunks.ChunkDiskMapperRef)
}
oooMmapMarkers[series.ref] = mmapRef
}
if ok {
wblSamples = append(wblSamples, s)
if s.T < ooomint {
ooomint = s.T
}
if s.T > ooomaxt {
ooomaxt = s.T
}
floatOOOAccepted++
} else {
// Sample is an exact duplicate of the last sample.
// NOTE: We can only detect updates if they clash with a sample in the OOOHeadChunk,
// not with samples in already flushed OOO chunks.
// TODO(codesome): Add error reporting? It depends on addressing https://github.com/prometheus/prometheus/discussions/10305.
floatsAppended--
}
chunkCreated, oooMmapMarkers, ooomint, ooomaxt, floatOOOAccepted, floatOOORejected = handleOutOfOrderSample(
ok,
chunkCreated,
series,
s,
a.head.chunkDiskMapper,
oooCapMax,
oooMmapMarkers,
collectOOORecords,
&wblSamples,
ooomint,
oooCapMax,
floatOOOAccepted,
floatsAppended,
)
default:
ok, chunkCreated = series.append(s.T, s.V, a.appendID, appendChunkOpts)
if ok {
Expand Down Expand Up @@ -1075,6 +1058,59 @@ func (a *headAppender) Commit() (err error) {
return nil
}

func handleOutOfOrderSample(
ok,
chunkCreated bool,
series *memSeries,
s record.RefSample,
chunkDiskMapper *chunks.ChunkDiskMapper,
oooCapMax int64,
oooMmapMarkers map[chunks.HeadSeriesRef]chunks.ChunkDiskMapperRef,

Check failure on line 1068 in tsdb/head_append.go

View workflow job for this annotation

GitHub Actions / golangci-lint

SA4009(related information): assignment to ok (staticcheck)
collectOOORecords func(),
wblSamples *[]record.RefSample,
ooomint, ooomaxt int64,
floatOOOAccepted, floatsAppended int,
) (bool, map[chunks.HeadSeriesRef]chunks.ChunkDiskMapperRef, int64, int64, int, int) {
var mmapRef chunks.ChunkDiskMapperRef
ok, chunkCreated, mmapRef = series.insert(s.T, s.V, chunkDiskMapper, oooCapMax)
if chunkCreated {
r, ok := oooMmapMarkers[series.ref]
if !ok || r != 0 {
// !ok means there are no markers collected for these samples yet. So we first flush the samples
// before setting this m-map marker.

// r != 0 means we have already m-mapped a chunk for this series in the same Commit().
// Hence, before we m-map again, we should add the samples and m-map markers
// seen till now to the WBL records.
collectOOORecords()
}

if oooMmapMarkers == nil {
oooMmapMarkers = make(map[chunks.HeadSeriesRef]chunks.ChunkDiskMapperRef)
}
oooMmapMarkers[series.ref] = mmapRef
}

if ok {
*wblSamples = append(*wblSamples, s)
if s.T < ooomint {
ooomint = s.T
}
if s.T > ooomaxt {
ooomaxt = s.T
}
floatOOOAccepted++
} else {
// Sample is an exact duplicate of the last sample.
// NOTE: We can only detect updates if they clash with a sample in the OOOHeadChunk,
// not with samples in already flushed OOO chunks.
// TODO(codesome): Add error reporting? It depends on addressing https://github.com/prometheus/prometheus/discussions/10305.
floatsAppended--
}

return chunkCreated, oooMmapMarkers, ooomint, ooomaxt, floatOOOAccepted, floatsAppended
}

// insert is like append, except it inserts. Used for OOO samples.
func (s *memSeries) insert(t int64, v float64, chunkDiskMapper *chunks.ChunkDiskMapper, oooCapMax int64) (inserted, chunkCreated bool, mmapRef chunks.ChunkDiskMapperRef) {
if s.ooo == nil {
Expand Down

0 comments on commit 0bdbbb7

Please sign in to comment.