Skip to content

Commit

Permalink
[refactor] simplify appender commit
Browse files Browse the repository at this point in the history
Signed-off-by: Nicolas Takashi <nicolas.tcs@hotmail.com>
  • Loading branch information
nicolastakashi committed Apr 12, 2024
1 parent 9b7de47 commit 45e1f57
Showing 1 changed file with 49 additions and 19 deletions.
68 changes: 49 additions & 19 deletions tsdb/head_append.go
Original file line number Diff line number Diff line change
Expand Up @@ -869,11 +869,13 @@ func (a *headAppender) Commit() (err error) {
}
enc record.Encoder
)

defer func() {
for i := range oooRecords {
a.head.putBytesBuffer(oooRecords[i][:0])
}
}()

collectOOORecords := func() {
if a.head.wbl == nil {
// WBL is not enabled. So no need to collect.
Expand Down Expand Up @@ -906,6 +908,7 @@ func (a *headAppender) Commit() (err error) {
wblSamples = nil
oooMmapMarkers = nil
}

for i, s := range a.samples {
series = a.sampleSeries[i]
series.Lock()
Expand Down Expand Up @@ -935,27 +938,18 @@ func (a *headAppender) Commit() (err error) {
case oooSample:
// Sample is OOO and OOO handling is enabled
// and the delta is within the OOO tolerance.
var mmapRef chunks.ChunkDiskMapperRef
ok, chunkCreated, mmapRef = series.insert(s.T, s.V, a.head.chunkDiskMapper, oooCapMax)
if chunkCreated {
r, ok := oooMmapMarkers[series.ref]
if !ok || r != 0 {
// !ok means there are no markers collected for these samples yet. So we first flush the samples
// before setting this m-map marker.

// r != 0 means we have already m-mapped a chunk for this series in the same Commit().
// Hence, before we m-map again, we should add the samples and m-map markers
// seen till now to the WBL records.
collectOOORecords()
}
ok, chunkCreated, oooMmapMarkers, wblSamples = handleOutOfOrderSample(
series,
s,
a.head.chunkDiskMapper,
oooCapMax,
oooMmapMarkers,
collectOOORecords,
wblSamples,
)

if oooMmapMarkers == nil {
oooMmapMarkers = make(map[chunks.HeadSeriesRef]chunks.ChunkDiskMapperRef)
}
oooMmapMarkers[series.ref] = mmapRef
}
if ok {
wblSamples = append(wblSamples, s)
// wblSamples = append(wblSamples, s)
if s.T < ooomint {
ooomint = s.T
}
Expand Down Expand Up @@ -1075,6 +1069,42 @@ func (a *headAppender) Commit() (err error) {
return nil
}

func handleOutOfOrderSample(
series *memSeries,
s record.RefSample,
chunkDiskMapper *chunks.ChunkDiskMapper,
oooCapMax int64,
oooMmapMarkers map[chunks.HeadSeriesRef]chunks.ChunkDiskMapperRef,
collectOOORecords func(),
wblSamples []record.RefSample,
) (bool, bool, map[chunks.HeadSeriesRef]chunks.ChunkDiskMapperRef, []record.RefSample) {
var mmapRef chunks.ChunkDiskMapperRef

ok, chunkCreated, mmapRef := series.insert(s.T, s.V, chunkDiskMapper, oooCapMax)
if chunkCreated {
if r, ok := oooMmapMarkers[series.ref]; !ok || r != 0 {
// !ok means there are no markers collected for these samples yet. So we first flush the samples
// before setting this m-map marker.

// r != 0 means we have already m-mapped a chunk for this series in the same Commit().
// Hence, before we m-map again, we should add the samples and m-map markers
// seen till now to the WBL records.
collectOOORecords()
}

if oooMmapMarkers == nil {
oooMmapMarkers = make(map[chunks.HeadSeriesRef]chunks.ChunkDiskMapperRef)
}
oooMmapMarkers[series.ref] = mmapRef
}

if ok {
wblSamples = append(wblSamples, s)
}

return ok, chunkCreated, oooMmapMarkers, wblSamples
}

// insert is like append, except it inserts. Used for OOO samples.
func (s *memSeries) insert(t int64, v float64, chunkDiskMapper *chunks.ChunkDiskMapper, oooCapMax int64) (inserted, chunkCreated bool, mmapRef chunks.ChunkDiskMapperRef) {
if s.ooo == nil {
Expand Down

0 comments on commit 45e1f57

Please sign in to comment.