Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[refactor] simplifying appender commit #13924

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
106 changes: 71 additions & 35 deletions tsdb/head_append.go
Original file line number Diff line number Diff line change
Expand Up @@ -869,11 +869,13 @@
}
enc record.Encoder
)

defer func() {
for i := range oooRecords {
a.head.putBytesBuffer(oooRecords[i][:0])
}
}()

collectOOORecords := func() {
if a.head.wbl == nil {
// WBL is not enabled. So no need to collect.
Expand Down Expand Up @@ -906,6 +908,7 @@
wblSamples = nil
oooMmapMarkers = nil
}

for i, s := range a.samples {
series = a.sampleSeries[i]
series.Lock()
Expand Down Expand Up @@ -935,41 +938,21 @@
case oooSample:
// Sample is OOO and OOO handling is enabled
// and the delta is within the OOO tolerance.
var mmapRef chunks.ChunkDiskMapperRef
ok, chunkCreated, mmapRef = series.insert(s.T, s.V, a.head.chunkDiskMapper, oooCapMax)
if chunkCreated {
r, ok := oooMmapMarkers[series.ref]
if !ok || r != 0 {
// !ok means there are no markers collected for these samples yet. So we first flush the samples
// before setting this m-map marker.

// r != 0 means we have already m-mapped a chunk for this series in the same Commit().
// Hence, before we m-map again, we should add the samples and m-map markers
// seen till now to the WBL records.
collectOOORecords()
}

if oooMmapMarkers == nil {
oooMmapMarkers = make(map[chunks.HeadSeriesRef]chunks.ChunkDiskMapperRef)
}
oooMmapMarkers[series.ref] = mmapRef
}
if ok {
wblSamples = append(wblSamples, s)
if s.T < ooomint {
ooomint = s.T
}
if s.T > ooomaxt {
ooomaxt = s.T
}
floatOOOAccepted++
} else {
// Sample is an exact duplicate of the last sample.
// NOTE: We can only detect updates if they clash with a sample in the OOOHeadChunk,
// not with samples in already flushed OOO chunks.
// TODO(codesome): Add error reporting? It depends on addressing https://github.com/prometheus/prometheus/discussions/10305.
floatsAppended--
}
chunkCreated, oooMmapMarkers, ooomint, ooomaxt, floatOOOAccepted, floatOOORejected = handleOutOfOrderSample(
ok,
chunkCreated,
series,
s,
a.head.chunkDiskMapper,
oooCapMax,
oooMmapMarkers,
collectOOORecords,
&wblSamples,
ooomint,
oooCapMax,
floatOOOAccepted,
floatsAppended,
)
default:
ok, chunkCreated = series.append(s.T, s.V, a.appendID, appendChunkOpts)
if ok {
Expand Down Expand Up @@ -1069,12 +1052,65 @@
// until we have found what samples become OOO. We can try having a metric for this failure.
// Returning the error here is not correct because we have already put the samples into the memory,
// hence the append/insert was a success.
level.Error(a.head.logger).Log("msg", "Failed to log out of order samples into the WAL", "err", err)

Check failure on line 1055 in tsdb/head_append.go

View workflow job for this annotation

GitHub Actions / golangci-lint

SA4009: argument ok is overwritten before first use (staticcheck)
}

Check failure on line 1056 in tsdb/head_append.go

View workflow job for this annotation

GitHub Actions / golangci-lint

SA4009: argument chunkCreated is overwritten before first use (staticcheck)
}
return nil
}

func handleOutOfOrderSample(
ok,
chunkCreated bool,
series *memSeries,
s record.RefSample,
chunkDiskMapper *chunks.ChunkDiskMapper,
oooCapMax int64,
oooMmapMarkers map[chunks.HeadSeriesRef]chunks.ChunkDiskMapperRef,

Check failure on line 1068 in tsdb/head_append.go

View workflow job for this annotation

GitHub Actions / golangci-lint

SA4009(related information): assignment to ok (staticcheck)
collectOOORecords func(),
wblSamples *[]record.RefSample,
ooomint, ooomaxt int64,
floatOOOAccepted, floatsAppended int,
) (bool, map[chunks.HeadSeriesRef]chunks.ChunkDiskMapperRef, int64, int64, int, int) {
var mmapRef chunks.ChunkDiskMapperRef
ok, chunkCreated, mmapRef = series.insert(s.T, s.V, chunkDiskMapper, oooCapMax)
if chunkCreated {
r, ok := oooMmapMarkers[series.ref]
if !ok || r != 0 {
// !ok means there are no markers collected for these samples yet. So we first flush the samples
// before setting this m-map marker.

// r != 0 means we have already m-mapped a chunk for this series in the same Commit().
// Hence, before we m-map again, we should add the samples and m-map markers
// seen till now to the WBL records.
collectOOORecords()
}

if oooMmapMarkers == nil {
oooMmapMarkers = make(map[chunks.HeadSeriesRef]chunks.ChunkDiskMapperRef)
}
oooMmapMarkers[series.ref] = mmapRef
}

if ok {
*wblSamples = append(*wblSamples, s)
if s.T < ooomint {
ooomint = s.T
}
if s.T > ooomaxt {
ooomaxt = s.T
}
floatOOOAccepted++
} else {
// Sample is an exact duplicate of the last sample.
// NOTE: We can only detect updates if they clash with a sample in the OOOHeadChunk,
// not with samples in already flushed OOO chunks.
// TODO(codesome): Add error reporting? It depends on addressing https://github.com/prometheus/prometheus/discussions/10305.
floatsAppended--
}

return chunkCreated, oooMmapMarkers, ooomint, ooomaxt, floatOOOAccepted, floatsAppended
}

// insert is like append, except it inserts. Used for OOO samples.
func (s *memSeries) insert(t int64, v float64, chunkDiskMapper *chunks.ChunkDiskMapper, oooCapMax int64) (inserted, chunkCreated bool, mmapRef chunks.ChunkDiskMapperRef) {
if s.ooo == nil {
Expand Down