Skip to content
This repository has been archived by the owner on Aug 13, 2019. It is now read-only.

Commit

Permalink
Better test coverage for compressed WAL
Browse files Browse the repository at this point in the history
Many tests that use the WAL were only running in uncompressed mode, now
appropriate tests will run in both modes.

Signed-off-by: Chris Marchbanks <csmarchbanks@gmail.com>
  • Loading branch information
csmarchbanks committed May 22, 2019
1 parent f3af604 commit 2ac1397
Show file tree
Hide file tree
Showing 4 changed files with 426 additions and 398 deletions.
186 changes: 95 additions & 91 deletions checkpoint_test.go
Expand Up @@ -86,108 +86,112 @@ func TestDeleteCheckpoints(t *testing.T) {
}

func TestCheckpoint(t *testing.T) {
dir, err := ioutil.TempDir("", "test_checkpoint")
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()

var enc RecordEncoder
// Create a dummy segment to bump the initial number.
seg, err := wal.CreateSegment(dir, 100)
testutil.Ok(t, err)
testutil.Ok(t, seg.Close())

// Manually create checkpoint for 99 and earlier.
w, err := wal.New(nil, nil, filepath.Join(dir, "checkpoint.0099"), false)
testutil.Ok(t, err)

// Add some data we expect to be around later.
err = w.Log(enc.Series([]RefSeries{
{Ref: 0, Labels: labels.FromStrings("a", "b", "c", "0")},
{Ref: 1, Labels: labels.FromStrings("a", "b", "c", "1")},
}, nil))
testutil.Ok(t, err)
testutil.Ok(t, w.Close())

// Start a WAL and write records to it as usual.
w, err = wal.NewSize(nil, nil, dir, 64*1024, false)
testutil.Ok(t, err)
for _, compress := range []bool{false, true} {
t.Run(fmt.Sprintf("compress=%t", compress), func(t *testing.T) {
dir, err := ioutil.TempDir("", "test_checkpoint")
testutil.Ok(t, err)
defer func() {
testutil.Ok(t, os.RemoveAll(dir))
}()

var last int64
for i := 0; ; i++ {
_, n, err := w.Segments()
testutil.Ok(t, err)
if n >= 106 {
break
}
// Write some series initially.
if i == 0 {
b := enc.Series([]RefSeries{
{Ref: 2, Labels: labels.FromStrings("a", "b", "c", "2")},
{Ref: 3, Labels: labels.FromStrings("a", "b", "c", "3")},
{Ref: 4, Labels: labels.FromStrings("a", "b", "c", "4")},
{Ref: 5, Labels: labels.FromStrings("a", "b", "c", "5")},
}, nil)
testutil.Ok(t, w.Log(b))
}
// Write samples until the WAL has enough segments.
// Make them have drifting timestamps within a record to see that they
// get filtered properly.
b := enc.Samples([]RefSample{
{Ref: 0, T: last, V: float64(i)},
{Ref: 1, T: last + 10000, V: float64(i)},
{Ref: 2, T: last + 20000, V: float64(i)},
{Ref: 3, T: last + 30000, V: float64(i)},
}, nil)
testutil.Ok(t, w.Log(b))

last += 100
}
testutil.Ok(t, w.Close())
var enc RecordEncoder
// Create a dummy segment to bump the initial number.
seg, err := wal.CreateSegment(dir, 100)
testutil.Ok(t, err)
testutil.Ok(t, seg.Close())

_, err = Checkpoint(w, 100, 106, func(x uint64) bool {
return x%2 == 0
}, last/2)
testutil.Ok(t, err)
testutil.Ok(t, w.Truncate(107))
testutil.Ok(t, DeleteCheckpoints(w.Dir(), 106))
// Manually create checkpoint for 99 and earlier.
w, err := wal.New(nil, nil, filepath.Join(dir, "checkpoint.0099"), compress)
testutil.Ok(t, err)

// Only the new checkpoint should be left.
files, err := fileutil.ReadDir(dir)
testutil.Ok(t, err)
testutil.Equals(t, 1, len(files))
testutil.Equals(t, "checkpoint.000106", files[0])
// Add some data we expect to be around later.
err = w.Log(enc.Series([]RefSeries{
{Ref: 0, Labels: labels.FromStrings("a", "b", "c", "0")},
{Ref: 1, Labels: labels.FromStrings("a", "b", "c", "1")},
}, nil))
testutil.Ok(t, err)
testutil.Ok(t, w.Close())

sr, err := wal.NewSegmentsReader(filepath.Join(dir, "checkpoint.000106"))
testutil.Ok(t, err)
defer sr.Close()
// Start a WAL and write records to it as usual.
w, err = wal.NewSize(nil, nil, dir, 64*1024, compress)
testutil.Ok(t, err)

var dec RecordDecoder
var series []RefSeries
r := wal.NewReader(sr)
var last int64
for i := 0; ; i++ {
_, n, err := w.Segments()
testutil.Ok(t, err)
if n >= 106 {
break
}
// Write some series initially.
if i == 0 {
b := enc.Series([]RefSeries{
{Ref: 2, Labels: labels.FromStrings("a", "b", "c", "2")},
{Ref: 3, Labels: labels.FromStrings("a", "b", "c", "3")},
{Ref: 4, Labels: labels.FromStrings("a", "b", "c", "4")},
{Ref: 5, Labels: labels.FromStrings("a", "b", "c", "5")},
}, nil)
testutil.Ok(t, w.Log(b))
}
// Write samples until the WAL has enough segments.
// Make them have drifting timestamps within a record to see that they
// get filtered properly.
b := enc.Samples([]RefSample{
{Ref: 0, T: last, V: float64(i)},
{Ref: 1, T: last + 10000, V: float64(i)},
{Ref: 2, T: last + 20000, V: float64(i)},
{Ref: 3, T: last + 30000, V: float64(i)},
}, nil)
testutil.Ok(t, w.Log(b))

last += 100
}
testutil.Ok(t, w.Close())

for r.Next() {
rec := r.Record()
_, err = Checkpoint(w, 100, 106, func(x uint64) bool {
return x%2 == 0
}, last/2)
testutil.Ok(t, err)
testutil.Ok(t, w.Truncate(107))
testutil.Ok(t, DeleteCheckpoints(w.Dir(), 106))

switch dec.Type(rec) {
case RecordSeries:
series, err = dec.Series(rec, series)
// Only the new checkpoint should be left.
files, err := fileutil.ReadDir(dir)
testutil.Ok(t, err)
case RecordSamples:
samples, err := dec.Samples(rec, nil)
testutil.Equals(t, 1, len(files))
testutil.Equals(t, "checkpoint.000106", files[0])

sr, err := wal.NewSegmentsReader(filepath.Join(dir, "checkpoint.000106"))
testutil.Ok(t, err)
for _, s := range samples {
testutil.Assert(t, s.T >= last/2, "sample with wrong timestamp")
defer sr.Close()

var dec RecordDecoder
var series []RefSeries
r := wal.NewReader(sr)

for r.Next() {
rec := r.Record()

switch dec.Type(rec) {
case RecordSeries:
series, err = dec.Series(rec, series)
testutil.Ok(t, err)
case RecordSamples:
samples, err := dec.Samples(rec, nil)
testutil.Ok(t, err)
for _, s := range samples {
testutil.Assert(t, s.T >= last/2, "sample with wrong timestamp")
}
}
}
}
testutil.Ok(t, r.Err())
testutil.Equals(t, []RefSeries{
{Ref: 0, Labels: labels.FromStrings("a", "b", "c", "0")},
{Ref: 2, Labels: labels.FromStrings("a", "b", "c", "2")},
{Ref: 4, Labels: labels.FromStrings("a", "b", "c", "4")},
}, series)
})
}
testutil.Ok(t, r.Err())
testutil.Equals(t, []RefSeries{
{Ref: 0, Labels: labels.FromStrings("a", "b", "c", "0")},
{Ref: 2, Labels: labels.FromStrings("a", "b", "c", "2")},
{Ref: 4, Labels: labels.FromStrings("a", "b", "c", "4")},
}, series)
}

func TestCheckpointNoTmpFolderAfterError(t *testing.T) {
Expand Down

0 comments on commit 2ac1397

Please sign in to comment.