From 2ac1397b86442219c4bf8ca577d3793fd38acdf3 Mon Sep 17 00:00:00 2001 From: Chris Marchbanks Date: Wed, 22 May 2019 15:35:28 -0600 Subject: [PATCH] Better test coverage for compressed WAL Many tests that use the WAL were only running in uncompressed mode, now appropriate tests will run in both modes. Signed-off-by: Chris Marchbanks --- checkpoint_test.go | 186 ++++++++--------- head_test.go | 498 +++++++++++++++++++++++---------------------- wal/reader_test.go | 136 +++++++------ wal/wal_test.go | 4 +- 4 files changed, 426 insertions(+), 398 deletions(-) diff --git a/checkpoint_test.go b/checkpoint_test.go index 3bd5474b..0779894b 100644 --- a/checkpoint_test.go +++ b/checkpoint_test.go @@ -86,108 +86,112 @@ func TestDeleteCheckpoints(t *testing.T) { } func TestCheckpoint(t *testing.T) { - dir, err := ioutil.TempDir("", "test_checkpoint") - testutil.Ok(t, err) - defer func() { - testutil.Ok(t, os.RemoveAll(dir)) - }() - - var enc RecordEncoder - // Create a dummy segment to bump the initial number. - seg, err := wal.CreateSegment(dir, 100) - testutil.Ok(t, err) - testutil.Ok(t, seg.Close()) - - // Manually create checkpoint for 99 and earlier. - w, err := wal.New(nil, nil, filepath.Join(dir, "checkpoint.0099"), false) - testutil.Ok(t, err) - - // Add some data we expect to be around later. - err = w.Log(enc.Series([]RefSeries{ - {Ref: 0, Labels: labels.FromStrings("a", "b", "c", "0")}, - {Ref: 1, Labels: labels.FromStrings("a", "b", "c", "1")}, - }, nil)) - testutil.Ok(t, err) - testutil.Ok(t, w.Close()) - - // Start a WAL and write records to it as usual. - w, err = wal.NewSize(nil, nil, dir, 64*1024, false) - testutil.Ok(t, err) + for _, compress := range []bool{false, true} { + t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { + dir, err := ioutil.TempDir("", "test_checkpoint") + testutil.Ok(t, err) + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() - var last int64 - for i := 0; ; i++ { - _, n, err := w.Segments() - testutil.Ok(t, err) - if n >= 106 { - break - } - // Write some series initially. - if i == 0 { - b := enc.Series([]RefSeries{ - {Ref: 2, Labels: labels.FromStrings("a", "b", "c", "2")}, - {Ref: 3, Labels: labels.FromStrings("a", "b", "c", "3")}, - {Ref: 4, Labels: labels.FromStrings("a", "b", "c", "4")}, - {Ref: 5, Labels: labels.FromStrings("a", "b", "c", "5")}, - }, nil) - testutil.Ok(t, w.Log(b)) - } - // Write samples until the WAL has enough segments. - // Make them have drifting timestamps within a record to see that they - // get filtered properly. - b := enc.Samples([]RefSample{ - {Ref: 0, T: last, V: float64(i)}, - {Ref: 1, T: last + 10000, V: float64(i)}, - {Ref: 2, T: last + 20000, V: float64(i)}, - {Ref: 3, T: last + 30000, V: float64(i)}, - }, nil) - testutil.Ok(t, w.Log(b)) - - last += 100 - } - testutil.Ok(t, w.Close()) + var enc RecordEncoder + // Create a dummy segment to bump the initial number. + seg, err := wal.CreateSegment(dir, 100) + testutil.Ok(t, err) + testutil.Ok(t, seg.Close()) - _, err = Checkpoint(w, 100, 106, func(x uint64) bool { - return x%2 == 0 - }, last/2) - testutil.Ok(t, err) - testutil.Ok(t, w.Truncate(107)) - testutil.Ok(t, DeleteCheckpoints(w.Dir(), 106)) + // Manually create checkpoint for 99 and earlier. + w, err := wal.New(nil, nil, filepath.Join(dir, "checkpoint.0099"), compress) + testutil.Ok(t, err) - // Only the new checkpoint should be left. - files, err := fileutil.ReadDir(dir) - testutil.Ok(t, err) - testutil.Equals(t, 1, len(files)) - testutil.Equals(t, "checkpoint.000106", files[0]) + // Add some data we expect to be around later. + err = w.Log(enc.Series([]RefSeries{ + {Ref: 0, Labels: labels.FromStrings("a", "b", "c", "0")}, + {Ref: 1, Labels: labels.FromStrings("a", "b", "c", "1")}, + }, nil)) + testutil.Ok(t, err) + testutil.Ok(t, w.Close()) - sr, err := wal.NewSegmentsReader(filepath.Join(dir, "checkpoint.000106")) - testutil.Ok(t, err) - defer sr.Close() + // Start a WAL and write records to it as usual. + w, err = wal.NewSize(nil, nil, dir, 64*1024, compress) + testutil.Ok(t, err) - var dec RecordDecoder - var series []RefSeries - r := wal.NewReader(sr) + var last int64 + for i := 0; ; i++ { + _, n, err := w.Segments() + testutil.Ok(t, err) + if n >= 106 { + break + } + // Write some series initially. + if i == 0 { + b := enc.Series([]RefSeries{ + {Ref: 2, Labels: labels.FromStrings("a", "b", "c", "2")}, + {Ref: 3, Labels: labels.FromStrings("a", "b", "c", "3")}, + {Ref: 4, Labels: labels.FromStrings("a", "b", "c", "4")}, + {Ref: 5, Labels: labels.FromStrings("a", "b", "c", "5")}, + }, nil) + testutil.Ok(t, w.Log(b)) + } + // Write samples until the WAL has enough segments. + // Make them have drifting timestamps within a record to see that they + // get filtered properly. + b := enc.Samples([]RefSample{ + {Ref: 0, T: last, V: float64(i)}, + {Ref: 1, T: last + 10000, V: float64(i)}, + {Ref: 2, T: last + 20000, V: float64(i)}, + {Ref: 3, T: last + 30000, V: float64(i)}, + }, nil) + testutil.Ok(t, w.Log(b)) + + last += 100 + } + testutil.Ok(t, w.Close()) - for r.Next() { - rec := r.Record() + _, err = Checkpoint(w, 100, 106, func(x uint64) bool { + return x%2 == 0 + }, last/2) + testutil.Ok(t, err) + testutil.Ok(t, w.Truncate(107)) + testutil.Ok(t, DeleteCheckpoints(w.Dir(), 106)) - switch dec.Type(rec) { - case RecordSeries: - series, err = dec.Series(rec, series) + // Only the new checkpoint should be left. + files, err := fileutil.ReadDir(dir) testutil.Ok(t, err) - case RecordSamples: - samples, err := dec.Samples(rec, nil) + testutil.Equals(t, 1, len(files)) + testutil.Equals(t, "checkpoint.000106", files[0]) + + sr, err := wal.NewSegmentsReader(filepath.Join(dir, "checkpoint.000106")) testutil.Ok(t, err) - for _, s := range samples { - testutil.Assert(t, s.T >= last/2, "sample with wrong timestamp") + defer sr.Close() + + var dec RecordDecoder + var series []RefSeries + r := wal.NewReader(sr) + + for r.Next() { + rec := r.Record() + + switch dec.Type(rec) { + case RecordSeries: + series, err = dec.Series(rec, series) + testutil.Ok(t, err) + case RecordSamples: + samples, err := dec.Samples(rec, nil) + testutil.Ok(t, err) + for _, s := range samples { + testutil.Assert(t, s.T >= last/2, "sample with wrong timestamp") + } + } } - } + testutil.Ok(t, r.Err()) + testutil.Equals(t, []RefSeries{ + {Ref: 0, Labels: labels.FromStrings("a", "b", "c", "0")}, + {Ref: 2, Labels: labels.FromStrings("a", "b", "c", "2")}, + {Ref: 4, Labels: labels.FromStrings("a", "b", "c", "4")}, + }, series) + }) } - testutil.Ok(t, r.Err()) - testutil.Equals(t, []RefSeries{ - {Ref: 0, Labels: labels.FromStrings("a", "b", "c", "0")}, - {Ref: 2, Labels: labels.FromStrings("a", "b", "c", "2")}, - {Ref: 4, Labels: labels.FromStrings("a", "b", "c", "4")}, - }, series) } func TestCheckpointNoTmpFolderAfterError(t *testing.T) { diff --git a/head_test.go b/head_test.go index c0c86363..2364d5a7 100644 --- a/head_test.go +++ b/head_test.go @@ -14,6 +14,7 @@ package tsdb import ( + "fmt" "io/ioutil" "math" "math/rand" @@ -96,67 +97,71 @@ func readTestWAL(t testing.TB, dir string) (recs []interface{}) { } func TestHead_ReadWAL(t *testing.T) { - entries := []interface{}{ - []RefSeries{ - {Ref: 10, Labels: labels.FromStrings("a", "1")}, - {Ref: 11, Labels: labels.FromStrings("a", "2")}, - {Ref: 100, Labels: labels.FromStrings("a", "3")}, - }, - []RefSample{ - {Ref: 0, T: 99, V: 1}, - {Ref: 10, T: 100, V: 2}, - {Ref: 100, T: 100, V: 3}, - }, - []RefSeries{ - {Ref: 50, Labels: labels.FromStrings("a", "4")}, - }, - []RefSample{ - {Ref: 10, T: 101, V: 5}, - {Ref: 50, T: 101, V: 6}, - }, - []Stone{ - {ref: 0, intervals: []Interval{{Mint: 99, Maxt: 101}}}, - }, - } - dir, err := ioutil.TempDir("", "test_read_wal") - testutil.Ok(t, err) - defer func() { - testutil.Ok(t, os.RemoveAll(dir)) - }() + for _, compress := range []bool{false, true} { + t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { + entries := []interface{}{ + []RefSeries{ + {Ref: 10, Labels: labels.FromStrings("a", "1")}, + {Ref: 11, Labels: labels.FromStrings("a", "2")}, + {Ref: 100, Labels: labels.FromStrings("a", "3")}, + }, + []RefSample{ + {Ref: 0, T: 99, V: 1}, + {Ref: 10, T: 100, V: 2}, + {Ref: 100, T: 100, V: 3}, + }, + []RefSeries{ + {Ref: 50, Labels: labels.FromStrings("a", "4")}, + }, + []RefSample{ + {Ref: 10, T: 101, V: 5}, + {Ref: 50, T: 101, V: 6}, + }, + []Stone{ + {ref: 0, intervals: []Interval{{Mint: 99, Maxt: 101}}}, + }, + } + dir, err := ioutil.TempDir("", "test_read_wal") + testutil.Ok(t, err) + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() - w, err := wal.New(nil, nil, dir, false) - testutil.Ok(t, err) - defer w.Close() - populateTestWAL(t, w, entries) + w, err := wal.New(nil, nil, dir, compress) + testutil.Ok(t, err) + defer w.Close() + populateTestWAL(t, w, entries) - head, err := NewHead(nil, nil, w, 1000) - testutil.Ok(t, err) + head, err := NewHead(nil, nil, w, 1000) + testutil.Ok(t, err) - testutil.Ok(t, head.Init(math.MinInt64)) - testutil.Equals(t, uint64(100), head.lastSeriesID) + testutil.Ok(t, head.Init(math.MinInt64)) + testutil.Equals(t, uint64(100), head.lastSeriesID) - s10 := head.series.getByID(10) - s11 := head.series.getByID(11) - s50 := head.series.getByID(50) - s100 := head.series.getByID(100) + s10 := head.series.getByID(10) + s11 := head.series.getByID(11) + s50 := head.series.getByID(50) + s100 := head.series.getByID(100) - testutil.Equals(t, labels.FromStrings("a", "1"), s10.lset) - testutil.Equals(t, (*memSeries)(nil), s11) // Series without samples should be garbage colected at head.Init(). - testutil.Equals(t, labels.FromStrings("a", "4"), s50.lset) - testutil.Equals(t, labels.FromStrings("a", "3"), s100.lset) + testutil.Equals(t, labels.FromStrings("a", "1"), s10.lset) + testutil.Equals(t, (*memSeries)(nil), s11) // Series without samples should be garbage colected at head.Init(). + testutil.Equals(t, labels.FromStrings("a", "4"), s50.lset) + testutil.Equals(t, labels.FromStrings("a", "3"), s100.lset) - expandChunk := func(c chunkenc.Iterator) (x []sample) { - for c.Next() { - t, v := c.At() - x = append(x, sample{t: t, v: v}) - } - testutil.Ok(t, c.Err()) - return x - } + expandChunk := func(c chunkenc.Iterator) (x []sample) { + for c.Next() { + t, v := c.At() + x = append(x, sample{t: t, v: v}) + } + testutil.Ok(t, c.Err()) + return x + } - testutil.Equals(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.iterator(0))) - testutil.Equals(t, []sample{{101, 6}}, expandChunk(s50.iterator(0))) - testutil.Equals(t, []sample{{100, 3}}, expandChunk(s100.iterator(0))) + testutil.Equals(t, []sample{{100, 2}, {101, 5}}, expandChunk(s10.iterator(0))) + testutil.Equals(t, []sample{{101, 6}}, expandChunk(s50.iterator(0))) + testutil.Equals(t, []sample{{100, 3}}, expandChunk(s100.iterator(0))) + }) + } } func TestHead_Truncate(t *testing.T) { @@ -271,36 +276,40 @@ func TestMemSeries_truncateChunks(t *testing.T) { } func TestHeadDeleteSeriesWithoutSamples(t *testing.T) { - entries := []interface{}{ - []RefSeries{ - {Ref: 10, Labels: labels.FromStrings("a", "1")}, - }, - []RefSample{}, - []RefSeries{ - {Ref: 50, Labels: labels.FromStrings("a", "2")}, - }, - []RefSample{ - {Ref: 50, T: 80, V: 1}, - {Ref: 50, T: 90, V: 1}, - }, - } - dir, err := ioutil.TempDir("", "test_delete_series") - testutil.Ok(t, err) - defer func() { - testutil.Ok(t, os.RemoveAll(dir)) - }() + for _, compress := range []bool{false, true} { + t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { + entries := []interface{}{ + []RefSeries{ + {Ref: 10, Labels: labels.FromStrings("a", "1")}, + }, + []RefSample{}, + []RefSeries{ + {Ref: 50, Labels: labels.FromStrings("a", "2")}, + }, + []RefSample{ + {Ref: 50, T: 80, V: 1}, + {Ref: 50, T: 90, V: 1}, + }, + } + dir, err := ioutil.TempDir("", "test_delete_series") + testutil.Ok(t, err) + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() - w, err := wal.New(nil, nil, dir, false) - testutil.Ok(t, err) - defer w.Close() - populateTestWAL(t, w, entries) + w, err := wal.New(nil, nil, dir, compress) + testutil.Ok(t, err) + defer w.Close() + populateTestWAL(t, w, entries) - head, err := NewHead(nil, nil, w, 1000) - testutil.Ok(t, err) + head, err := NewHead(nil, nil, w, 1000) + testutil.Ok(t, err) - testutil.Ok(t, head.Init(math.MinInt64)) + testutil.Ok(t, head.Init(math.MinInt64)) - testutil.Ok(t, head.Delete(0, 100, labels.NewEqualMatcher("a", "1"))) + testutil.Ok(t, head.Delete(0, 100, labels.NewEqualMatcher("a", "1"))) + }) + } } func TestHeadDeleteSimple(t *testing.T) { @@ -340,129 +349,133 @@ func TestHeadDeleteSimple(t *testing.T) { }, } -Outer: - for _, c := range cases { - dir, err := ioutil.TempDir("", "test_wal_reload") - testutil.Ok(t, err) - defer func() { - testutil.Ok(t, os.RemoveAll(dir)) - }() - - w, err := wal.New(nil, nil, path.Join(dir, "wal"), false) - testutil.Ok(t, err) - defer w.Close() - - head, err := NewHead(nil, nil, w, 1000) - testutil.Ok(t, err) - defer head.Close() + for _, compress := range []bool{false, true} { + t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { + Outer: + for _, c := range cases { + dir, err := ioutil.TempDir("", "test_wal_reload") + testutil.Ok(t, err) + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() - app := head.Appender() - for _, smpl := range smplsAll { - _, err = app.Add(labels.Labels{lblDefault}, smpl.t, smpl.v) - testutil.Ok(t, err) + w, err := wal.New(nil, nil, path.Join(dir, "wal"), compress) + testutil.Ok(t, err) + defer w.Close() - } - testutil.Ok(t, app.Commit()) + head, err := NewHead(nil, nil, w, 1000) + testutil.Ok(t, err) + defer head.Close() - // Delete the ranges. - for _, r := range c.dranges { - testutil.Ok(t, head.Delete(r.Mint, r.Maxt, labels.NewEqualMatcher(lblDefault.Name, lblDefault.Value))) - } + app := head.Appender() + for _, smpl := range smplsAll { + _, err = app.Add(labels.Labels{lblDefault}, smpl.t, smpl.v) + testutil.Ok(t, err) - // Compare the samples for both heads - before and after the reload. - reloadedW, err := wal.New(nil, nil, w.Dir(), false) // Use a new wal to ensure deleted samples are gone even after a reload. - testutil.Ok(t, err) - defer reloadedW.Close() - reloadedHead, err := NewHead(nil, nil, reloadedW, 1000) - testutil.Ok(t, err) - defer reloadedHead.Close() - testutil.Ok(t, reloadedHead.Init(0)) - for _, h := range []*Head{head, reloadedHead} { - indexr, err := h.Index() - testutil.Ok(t, err) - // Use an emptyTombstoneReader explicitly to get all the samples. - css, err := LookupChunkSeries(indexr, emptyTombstoneReader, labels.NewEqualMatcher(lblDefault.Name, lblDefault.Value)) - testutil.Ok(t, err) + } + testutil.Ok(t, app.Commit()) - // Getting the actual samples. - actSamples := make([]sample, 0) - for css.Next() { - lblsAct, chkMetas, intv := css.At() - testutil.Equals(t, labels.Labels{lblDefault}, lblsAct) - testutil.Equals(t, 0, len(intv)) + // Delete the ranges. + for _, r := range c.dranges { + testutil.Ok(t, head.Delete(r.Mint, r.Maxt, labels.NewEqualMatcher(lblDefault.Name, lblDefault.Value))) + } - chunkr, err := h.Chunks() + // Compare the samples for both heads - before and after the reload. + reloadedW, err := wal.New(nil, nil, w.Dir(), compress) // Use a new wal to ensure deleted samples are gone even after a reload. + testutil.Ok(t, err) + defer reloadedW.Close() + reloadedHead, err := NewHead(nil, nil, reloadedW, 1000) testutil.Ok(t, err) - for _, meta := range chkMetas { - chk, err := chunkr.Chunk(meta.Ref) + defer reloadedHead.Close() + testutil.Ok(t, reloadedHead.Init(0)) + for _, h := range []*Head{head, reloadedHead} { + indexr, err := h.Index() + testutil.Ok(t, err) + // Use an emptyTombstoneReader explicitly to get all the samples. + css, err := LookupChunkSeries(indexr, emptyTombstoneReader, labels.NewEqualMatcher(lblDefault.Name, lblDefault.Value)) testutil.Ok(t, err) - ii := chk.Iterator() - for ii.Next() { - t, v := ii.At() - actSamples = append(actSamples, sample{t: t, v: v}) - } - } - } - testutil.Ok(t, css.Err()) - testutil.Equals(t, c.smplsExp, actSamples) - } + // Getting the actual samples. + actSamples := make([]sample, 0) + for css.Next() { + lblsAct, chkMetas, intv := css.At() + testutil.Equals(t, labels.Labels{lblDefault}, lblsAct) + testutil.Equals(t, 0, len(intv)) + + chunkr, err := h.Chunks() + testutil.Ok(t, err) + for _, meta := range chkMetas { + chk, err := chunkr.Chunk(meta.Ref) + testutil.Ok(t, err) + ii := chk.Iterator() + for ii.Next() { + t, v := ii.At() + actSamples = append(actSamples, sample{t: t, v: v}) + } + } + } - // Compare the query results for both heads - before and after the reload. - expSeriesSet := newMockSeriesSet([]Series{ - newSeries(map[string]string{lblDefault.Name: lblDefault.Value}, func() []tsdbutil.Sample { - ss := make([]tsdbutil.Sample, 0, len(c.smplsExp)) - for _, s := range c.smplsExp { - ss = append(ss, s) + testutil.Ok(t, css.Err()) + testutil.Equals(t, c.smplsExp, actSamples) } - return ss - }(), - ), - }) - for _, h := range []*Head{head, reloadedHead} { - q, err := NewBlockQuerier(h, h.MinTime(), h.MaxTime()) - testutil.Ok(t, err) - actSeriesSet, err := q.Select(labels.NewEqualMatcher(lblDefault.Name, lblDefault.Value)) - testutil.Ok(t, err) - lns, err := q.LabelNames() - testutil.Ok(t, err) - lvs, err := q.LabelValues(lblDefault.Name) - testutil.Ok(t, err) - // When all samples are deleted we expect that no labels should exist either. - if len(c.smplsExp) == 0 { - testutil.Equals(t, 0, len(lns)) - testutil.Equals(t, 0, len(lvs)) - testutil.Assert(t, actSeriesSet.Next() == false, "") - testutil.Ok(t, h.Close()) - continue - } else { - testutil.Equals(t, 1, len(lns)) - testutil.Equals(t, 1, len(lvs)) - testutil.Equals(t, lblDefault.Name, lns[0]) - testutil.Equals(t, lblDefault.Value, lvs[0]) - } + // Compare the query results for both heads - before and after the reload. + expSeriesSet := newMockSeriesSet([]Series{ + newSeries(map[string]string{lblDefault.Name: lblDefault.Value}, func() []tsdbutil.Sample { + ss := make([]tsdbutil.Sample, 0, len(c.smplsExp)) + for _, s := range c.smplsExp { + ss = append(ss, s) + } + return ss + }(), + ), + }) + for _, h := range []*Head{head, reloadedHead} { + q, err := NewBlockQuerier(h, h.MinTime(), h.MaxTime()) + testutil.Ok(t, err) + actSeriesSet, err := q.Select(labels.NewEqualMatcher(lblDefault.Name, lblDefault.Value)) + testutil.Ok(t, err) - for { - eok, rok := expSeriesSet.Next(), actSeriesSet.Next() - testutil.Equals(t, eok, rok) + lns, err := q.LabelNames() + testutil.Ok(t, err) + lvs, err := q.LabelValues(lblDefault.Name) + testutil.Ok(t, err) + // When all samples are deleted we expect that no labels should exist either. + if len(c.smplsExp) == 0 { + testutil.Equals(t, 0, len(lns)) + testutil.Equals(t, 0, len(lvs)) + testutil.Assert(t, actSeriesSet.Next() == false, "") + testutil.Ok(t, h.Close()) + continue + } else { + testutil.Equals(t, 1, len(lns)) + testutil.Equals(t, 1, len(lvs)) + testutil.Equals(t, lblDefault.Name, lns[0]) + testutil.Equals(t, lblDefault.Value, lvs[0]) + } - if !eok { - testutil.Ok(t, h.Close()) - continue Outer - } - expSeries := expSeriesSet.At() - actSeries := actSeriesSet.At() + for { + eok, rok := expSeriesSet.Next(), actSeriesSet.Next() + testutil.Equals(t, eok, rok) - testutil.Equals(t, expSeries.Labels(), actSeries.Labels()) + if !eok { + testutil.Ok(t, h.Close()) + continue Outer + } + expSeries := expSeriesSet.At() + actSeries := actSeriesSet.At() - smplExp, errExp := expandSeriesIterator(expSeries.Iterator()) - smplRes, errRes := expandSeriesIterator(actSeries.Iterator()) + testutil.Equals(t, expSeries.Labels(), actSeries.Labels()) - testutil.Equals(t, errExp, errRes) - testutil.Equals(t, smplExp, smplRes) + smplExp, errExp := expandSeriesIterator(expSeries.Iterator()) + smplRes, errRes := expandSeriesIterator(actSeries.Iterator()) + + testutil.Equals(t, errExp, errRes) + testutil.Equals(t, smplExp, smplRes) + } + } } - } + }) } } @@ -971,30 +984,34 @@ func TestRemoveSeriesAfterRollbackAndTruncate(t *testing.T) { } func TestHead_LogRollback(t *testing.T) { - dir, err := ioutil.TempDir("", "wal_rollback") - testutil.Ok(t, err) - defer func() { - testutil.Ok(t, os.RemoveAll(dir)) - }() + for _, compress := range []bool{false, true} { + t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { + dir, err := ioutil.TempDir("", "wal_rollback") + testutil.Ok(t, err) + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() - w, err := wal.New(nil, nil, dir, false) - testutil.Ok(t, err) - defer w.Close() - h, err := NewHead(nil, nil, w, 1000) - testutil.Ok(t, err) + w, err := wal.New(nil, nil, dir, compress) + testutil.Ok(t, err) + defer w.Close() + h, err := NewHead(nil, nil, w, 1000) + testutil.Ok(t, err) - app := h.Appender() - _, err = app.Add(labels.FromStrings("a", "b"), 1, 2) - testutil.Ok(t, err) + app := h.Appender() + _, err = app.Add(labels.FromStrings("a", "b"), 1, 2) + testutil.Ok(t, err) - testutil.Ok(t, app.Rollback()) - recs := readTestWAL(t, w.Dir()) + testutil.Ok(t, app.Rollback()) + recs := readTestWAL(t, w.Dir()) - testutil.Equals(t, 1, len(recs)) + testutil.Equals(t, 1, len(recs)) - series, ok := recs[0].([]RefSeries) - testutil.Assert(t, ok, "expected series record but got %+v", recs[0]) - testutil.Equals(t, []RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}}, series) + series, ok := recs[0].([]RefSeries) + testutil.Assert(t, ok, "expected series record but got %+v", recs[0]) + testutil.Equals(t, []RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}}, series) + }) + } } func TestWalRepair(t *testing.T) { @@ -1007,8 +1024,10 @@ func TestWalRepair(t *testing.T) { }{ "invalid_record": { func(rec []byte) []byte { - rec[0] = byte(RecordInvalid) - return rec + res := make([]byte, len(rec)) + copy(res, rec) + res[0] = byte(RecordInvalid) + return res }, enc.Series([]RefSeries{{Ref: 1, Labels: labels.FromStrings("a", "b")}}, []byte{}), 9, @@ -1039,44 +1058,45 @@ func TestWalRepair(t *testing.T) { 5, }, } { - t.Run(name, func(t *testing.T) { - dir, err := ioutil.TempDir("", "wal_head_repair") - testutil.Ok(t, err) - defer func() { - testutil.Ok(t, os.RemoveAll(dir)) - }() + for _, compress := range []bool{false, true} { + t.Run(fmt.Sprintf("%s,compress=%t", name, compress), func(t *testing.T) { + dir, err := ioutil.TempDir("", "wal_head_repair") + testutil.Ok(t, err) + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() - w, err := wal.New(nil, nil, dir, false) - testutil.Ok(t, err) - defer w.Close() + w, err := wal.New(nil, nil, dir, compress) + testutil.Ok(t, err) + defer w.Close() - for i := 1; i <= test.totalRecs; i++ { - // At this point insert a corrupted record. - if i-1 == test.expRecs { - testutil.Ok(t, w.Log(test.corrFunc(test.rec))) - continue + for i := 1; i <= test.totalRecs; i++ { + // At this point insert a corrupted record. + if i-1 == test.expRecs { + testutil.Ok(t, w.Log(test.corrFunc(test.rec))) + continue + } + testutil.Ok(t, w.Log(test.rec)) } - testutil.Ok(t, w.Log(test.rec)) - } - h, err := NewHead(nil, nil, w, 1) - testutil.Ok(t, err) - testutil.Equals(t, 0.0, prom_testutil.ToFloat64(h.metrics.walCorruptionsTotal)) - testutil.Ok(t, h.Init(math.MinInt64)) - testutil.Equals(t, 1.0, prom_testutil.ToFloat64(h.metrics.walCorruptionsTotal)) + h, err := NewHead(nil, nil, w, 1) + testutil.Ok(t, err) + testutil.Equals(t, 0.0, prom_testutil.ToFloat64(h.metrics.walCorruptionsTotal)) + testutil.Ok(t, h.Init(math.MinInt64)) + testutil.Equals(t, 1.0, prom_testutil.ToFloat64(h.metrics.walCorruptionsTotal)) - sr, err := wal.NewSegmentsReader(dir) - testutil.Ok(t, err) - defer sr.Close() - r := wal.NewReader(sr) + sr, err := wal.NewSegmentsReader(dir) + testutil.Ok(t, err) + defer sr.Close() + r := wal.NewReader(sr) - var actRec int - for r.Next() { - actRec++ - } - testutil.Ok(t, r.Err()) - testutil.Equals(t, test.expRecs, actRec, "Wrong number of intact records") - }) + var actRec int + for r.Next() { + actRec++ + } + testutil.Ok(t, r.Err()) + testutil.Equals(t, test.expRecs, actRec, "Wrong number of intact records") + }) + } } - } diff --git a/wal/reader_test.go b/wal/reader_test.go index 2f2b9f80..22ac2372 100644 --- a/wal/reader_test.go +++ b/wal/reader_test.go @@ -347,83 +347,87 @@ func TestReaderFuzz(t *testing.T) { func TestReaderFuzz_Live(t *testing.T) { logger := testutil.NewLogger(t) - dir, err := ioutil.TempDir("", "wal_fuzz_live") - testutil.Ok(t, err) - defer func() { - testutil.Ok(t, os.RemoveAll(dir)) - }() + for _, compress := range []bool{false, true} { + t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) { + dir, err := ioutil.TempDir("", "wal_fuzz_live") + testutil.Ok(t, err) + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() - w, err := NewSize(nil, nil, dir, 128*pageSize, false) - testutil.Ok(t, err) - defer w.Close() - - // In the background, generate a stream of random records and write them - // to the WAL. - input := make(chan []byte, fuzzLen/10) // buffering required as we sometimes batch WAL writes. - done := make(chan struct{}) - go func() { - err := generateRandomEntries(w, input) - testutil.Ok(t, err) - time.Sleep(100 * time.Millisecond) - close(done) - }() + w, err := NewSize(nil, nil, dir, 128*pageSize, compress) + testutil.Ok(t, err) + defer w.Close() + + // In the background, generate a stream of random records and write them + // to the WAL. + input := make(chan []byte, fuzzLen/10) // buffering required as we sometimes batch WAL writes. + done := make(chan struct{}) + go func() { + err := generateRandomEntries(w, input) + testutil.Ok(t, err) + time.Sleep(100 * time.Millisecond) + close(done) + }() - // Tail the WAL and compare the results. - m, _, err := w.Segments() - testutil.Ok(t, err) + // Tail the WAL and compare the results. + m, _, err := w.Segments() + testutil.Ok(t, err) - seg, err := OpenReadSegment(SegmentName(dir, m)) - testutil.Ok(t, err) - defer seg.Close() + seg, err := OpenReadSegment(SegmentName(dir, m)) + testutil.Ok(t, err) + defer seg.Close() - r := NewLiveReader(logger, seg) - segmentTicker := time.NewTicker(100 * time.Millisecond) - readTicker := time.NewTicker(10 * time.Millisecond) - - readSegment := func(r *LiveReader) bool { - for r.Next() { - rec := r.Record() - expected, ok := <-input - testutil.Assert(t, ok, "unexpected record") - testutil.Equals(t, expected, rec, "record does not match expected") - } - testutil.Assert(t, r.Err() == io.EOF, "expected EOF, got: %v", r.Err()) - return true - } + r := NewLiveReader(logger, seg) + segmentTicker := time.NewTicker(100 * time.Millisecond) + readTicker := time.NewTicker(10 * time.Millisecond) -outer: - for { - select { - case <-segmentTicker.C: - // check if new segments exist - _, last, err := w.Segments() - testutil.Ok(t, err) - if last <= seg.i { - continue + readSegment := func(r *LiveReader) bool { + for r.Next() { + rec := r.Record() + expected, ok := <-input + testutil.Assert(t, ok, "unexpected record") + testutil.Equals(t, expected, rec, "record does not match expected") + } + testutil.Assert(t, r.Err() == io.EOF, "expected EOF, got: %v", r.Err()) + return true } - // read to end of segment. - readSegment(r) + outer: + for { + select { + case <-segmentTicker.C: + // check if new segments exist + _, last, err := w.Segments() + testutil.Ok(t, err) + if last <= seg.i { + continue + } - fi, err := os.Stat(SegmentName(dir, seg.i)) - testutil.Ok(t, err) - testutil.Assert(t, r.Offset() == fi.Size(), "expected to have read whole segment, but read %d of %d", r.Offset(), fi.Size()) + // read to end of segment. + readSegment(r) - seg, err = OpenReadSegment(SegmentName(dir, seg.i+1)) - testutil.Ok(t, err) - defer seg.Close() - r = NewLiveReader(logger, seg) + fi, err := os.Stat(SegmentName(dir, seg.i)) + testutil.Ok(t, err) + testutil.Assert(t, r.Offset() == fi.Size(), "expected to have read whole segment, but read %d of %d", r.Offset(), fi.Size()) - case <-readTicker.C: - readSegment(r) + seg, err = OpenReadSegment(SegmentName(dir, seg.i+1)) + testutil.Ok(t, err) + defer seg.Close() + r = NewLiveReader(logger, seg) - case <-done: - readSegment(r) - break outer - } - } + case <-readTicker.C: + readSegment(r) - testutil.Assert(t, r.Err() == io.EOF, "expected EOF") + case <-done: + readSegment(r) + break outer + } + } + + testutil.Assert(t, r.Err() == io.EOF, "expected EOF") + }) + } } func TestLiveReaderCorrupt_ShortFile(t *testing.T) { @@ -527,7 +531,7 @@ func TestReaderData(t *testing.T) { for name, fn := range readerConstructors { t.Run(name, func(t *testing.T) { - w, err := New(nil, nil, dir, false) + w, err := New(nil, nil, dir, true) testutil.Ok(t, err) sr, err := allSegments(dir) diff --git a/wal/wal_test.go b/wal/wal_test.go index 9908892b..76ba602b 100644 --- a/wal/wal_test.go +++ b/wal/wal_test.go @@ -357,7 +357,7 @@ func BenchmarkWAL_LogBatched(b *testing.B) { testutil.Ok(b, os.RemoveAll(dir)) }() - w, err := New(nil, nil, "testdir", compress) + w, err := New(nil, nil, dir, compress) testutil.Ok(b, err) defer w.Close() @@ -391,7 +391,7 @@ func BenchmarkWAL_Log(b *testing.B) { testutil.Ok(b, os.RemoveAll(dir)) }() - w, err := New(nil, nil, "testdir", compress) + w, err := New(nil, nil, dir, compress) testutil.Ok(b, err) defer w.Close()