From 0997a151d4558b58eb73ed362ef87ad43e2a790b Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Wed, 24 Apr 2019 17:22:57 +0300 Subject: [PATCH 01/28] Added db read only open mode and use it for the tsdb cli. Signed-off-by: Krasi Georgiev --- CHANGELOG.md | 1 + cmd/tsdb/main.go | 11 +++--- db.go | 87 ++++++++++++++++++++++++++++++++++++++++++++++-- head.go | 54 +++++++++++++++++------------- head_test.go | 66 ++++++++++++++++++++++++------------ 5 files changed, 165 insertions(+), 54 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e643c442..0b9bd745 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,5 @@ ## master / unreleased + - [FEATURE] New `OpenReadOnly` API to allow read only mode for a database. ## 0.7.0 - [CHANGE] tsdb now requires golang 1.12 or higher. diff --git a/cmd/tsdb/main.go b/cmd/tsdb/main.go index 8c049451..f119fa15 100644 --- a/cmd/tsdb/main.go +++ b/cmd/tsdb/main.go @@ -62,9 +62,6 @@ func main() { dumpMaxTime = dumpCmd.Flag("max-time", "maximum timestamp to dump").Default(strconv.FormatInt(math.MaxInt64, 10)).Int64() ) - safeDBOptions := *tsdb.DefaultOptions - safeDBOptions.RetentionDuration = 0 - switch kingpin.MustParse(cli.Parse(os.Args[1:])) { case benchWriteCmd.FullCommand(): wb := &writeBenchmark{ @@ -74,13 +71,14 @@ func main() { } wb.run() case listCmd.FullCommand(): - db, err := tsdb.Open(*listPath, nil, nil, &safeDBOptions) + db, err := tsdb.OpenReadOnly(*listPath, nil, nil) if err != nil { exitWithError(err) } printBlocks(db.Blocks(), listCmdHumanReadable) case analyzeCmd.FullCommand(): - db, err := tsdb.Open(*analyzePath, nil, nil, &safeDBOptions) + db, err := tsdb.OpenReadOnly(*analyzePath, nil, nil) + if err != nil { exitWithError(err) } @@ -101,7 +99,8 @@ func main() { } analyzeBlock(block, *analyzeLimit) case dumpCmd.FullCommand(): - db, err := tsdb.Open(*dumpPath, nil, nil, &safeDBOptions) + db, err := tsdb.OpenReadOnly(*dumpPath, nil, nil) + if err != nil { exitWithError(err) } diff --git a/db.go b/db.go index 52b21c2f..d4ae2d19 100644 --- a/db.go +++ b/db.go @@ -153,6 +153,7 @@ type dbMetrics struct { tombCleanTimer prometheus.Histogram blocksBytes prometheus.Gauge sizeRetentionCount prometheus.Counter + walCorruptionsTotal prometheus.Counter } func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { @@ -222,6 +223,10 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { Name: "prometheus_tsdb_size_retentions_total", Help: "The number of times that blocks were deleted because the maximum number of bytes was exceeded.", }) + m.walCorruptionsTotal = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_wal_corruptions_total", + Help: "Total number of WAL corruptions.", + }) if r != nil { r.MustRegister( @@ -240,6 +245,75 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { return m } +// OpenReadOnly returns a new DB in the given directory only for read operatiuons. +func OpenReadOnly(dir string, l log.Logger, r prometheus.Registerer) (db *DB, err error) { + if l == nil { + l = log.NewNopLogger() + } + + if _, err := os.Stat(dir); err != nil { + return nil, err + } + + db = &DB{ + dir: dir, + logger: l, + chunkPool: chunkenc.NewPool(), + } + + db.head, err = NewHead(r, l, nil, 1) + if err != nil { + return nil, err + } + + loadable, corrupted, err := db.openBlocks() + if err != nil { + return nil, err + } + + // Corrupted blocks that have been replaced by parents can be safely ignored. + for _, block := range loadable { + for _, b := range block.Meta().Compaction.Parents { + delete(corrupted, b.ULID) + } + } + if len(corrupted) > 0 { + return nil, fmt.Errorf("unexpected corrupted block:%v", corrupted) + } + + if len(loadable) == 0 { + return nil, errors.New("no blocks found") + } + + sort.Slice(loadable, func(i, j int) bool { + return loadable[i].Meta().MinTime < loadable[j].Meta().MinTime + }) + + db.blocks = loadable + + blockMetas := make([]BlockMeta, 0, len(loadable)) + for _, b := range loadable { + blockMetas = append(blockMetas, b.Meta()) + } + if overlaps := OverlappingBlocks(blockMetas); len(overlaps) > 0 { + level.Warn(db.logger).Log("msg", "overlapping blocks found during opening", "detail", overlaps.String()) + } + + // Set the min valid time for the ingested wal samples + // to be no lower than the maxt of the last block. + blocks := db.Blocks() + minValidTime := int64(math.MinInt64) + if len(blocks) > 0 { + minValidTime = blocks[len(blocks)-1].Meta().MaxTime + } + + if err := db.head.Init(minValidTime); err != nil { + return nil, errors.Wrap(err, "read WAL") + } + + return db, nil +} + // Open returns a new DB in the given directory. func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db *DB, err error) { if err := os.MkdirAll(dir, 0777); err != nil { @@ -322,8 +396,17 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db minValidTime = blocks[len(blocks)-1].Meta().MaxTime } - if err := db.head.Init(minValidTime); err != nil { - return nil, errors.Wrap(err, "read WAL") + if initErr := db.head.Init(minValidTime); initErr != nil { + err := errors.Cause(initErr) // So that we can pick up errors even when wrapped. + if _, ok := err.(*wal.CorruptionErr); ok { + level.Warn(db.logger).Log("msg", "encountered WAL corruption error, attempting repair", "err", err) + db.metrics.walCorruptionsTotal.Inc() + if err := wlog.Repair(err); err != nil { + return nil, errors.Wrap(err, "repair corrupted WAL") + } + } else { + return nil, errors.Wrap(initErr, "read WAL") + } } go db.run() diff --git a/head.go b/head.go index cfb7cb89..c2834f29 100644 --- a/head.go +++ b/head.go @@ -95,7 +95,6 @@ type headMetrics struct { maxTime prometheus.GaugeFunc samplesAppended prometheus.Counter walTruncateDuration prometheus.Summary - walCorruptionsTotal prometheus.Counter headTruncateFail prometheus.Counter headTruncateTotal prometheus.Counter checkpointDeleteFail prometheus.Counter @@ -159,10 +158,6 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { Name: "prometheus_tsdb_wal_truncate_duration_seconds", Help: "Duration of WAL truncation.", }) - m.walCorruptionsTotal = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_wal_corruptions_total", - Help: "Total number of WAL corruptions.", - }) m.samplesAppended = prometheus.NewCounter(prometheus.CounterOpts{ Name: "prometheus_tsdb_head_samples_appended_total", Help: "Total number of appended samples.", @@ -206,7 +201,6 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { m.maxTime, m.gcDuration, m.walTruncateDuration, - m.walCorruptionsTotal, m.samplesAppended, m.headTruncateFail, m.headTruncateTotal, @@ -312,7 +306,7 @@ func (h *Head) updateMinMaxTime(mint, maxt int64) { } } -func (h *Head) loadWAL(r *wal.Reader) error { +func (h *Head) loadWAL(r *wal.Reader) (err error) { // Track number of samples that referenced a series we don't know about // for error reporting. var unknownRefs uint64 @@ -328,6 +322,18 @@ func (h *Head) loadWAL(r *wal.Reader) error { ) wg.Add(n) + defer func() { + // For CorruptionErr ensure to terminate all workers before exiting. + if _, ok := err.(*wal.CorruptionErr); ok { + for i := 0; i < n; i++ { + close(inputs[i]) + for range outputs[i] { + } + } + wg.Wait() + } + }() + for i := 0; i < n; i++ { outputs[i] = make(chan []RefSample, 300) inputs[i] = make(chan []RefSample, 300) @@ -345,9 +351,12 @@ func (h *Head) loadWAL(r *wal.Reader) error { samples []RefSample tstones []Stone allStones = newMemTombstones() - err error ) - defer allStones.Close() + defer func() { + if err := allStones.Close(); err != nil { + level.Warn(h.logger).Log("msg", "closing memTombstones during wal read", "err", err) + } + }() for r.Next() { series, samples, tstones = series[:0], samples[:0], tstones[:0] rec := r.Record() @@ -432,9 +441,6 @@ func (h *Head) loadWAL(r *wal.Reader) error { } } } - if r.Err() != nil { - return errors.Wrap(r.Err(), "read records") - } // Signal termination to each worker and wait for it to close its output channel. for i := 0; i < n; i++ { @@ -444,6 +450,10 @@ func (h *Head) loadWAL(r *wal.Reader) error { } wg.Wait() + if r.Err() != nil { + return errors.Wrap(r.Err(), "read records") + } + if err := allStones.Iter(func(ref uint64, dranges Intervals) error { return h.chunkRewrite(ref, dranges) }); err != nil { @@ -488,23 +498,19 @@ func (h *Head) Init(minValidTime int64) error { startFrom++ } - // Backfill segments from the last checkpoint onwards + // Backfill segments from the last checkpoint onwards. sr, err := wal.NewSegmentsRangeReader(wal.SegmentRange{Dir: h.wal.Dir(), First: startFrom, Last: -1}) if err != nil { return errors.Wrap(err, "open WAL segments") } - + defer func() { + if err := sr.Close(); err != nil { + level.Warn(h.logger).Log("msg", "error while closing the wal segments reader", "err", err) + } + }() err = h.loadWAL(wal.NewReader(sr)) - sr.Close() // Close the reader so that if there was an error the repair can remove the corrupted file under Windows. - if err == nil { - return nil - } - level.Warn(h.logger).Log("msg", "encountered WAL error, attempting repair", "err", err) - h.metrics.walCorruptionsTotal.Inc() - if err := h.wal.Repair(err); err != nil { - return errors.Wrap(err, "repair corrupted WAL") - } - return nil + + return err } // Truncate removes old data before mint from the head. diff --git a/head_test.go b/head_test.go index 4fbdec03..eaf32e6d 100644 --- a/head_test.go +++ b/head_test.go @@ -23,6 +23,7 @@ import ( "sort" "testing" + "github.com/pkg/errors" prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/tsdb/chunkenc" "github.com/prometheus/tsdb/chunks" @@ -1037,42 +1038,63 @@ func TestWalRepair(t *testing.T) { }, } { t.Run(name, func(t *testing.T) { - dir, err := ioutil.TempDir("", "wal_head_repair") + dir, err := ioutil.TempDir("", "wal_repair") testutil.Ok(t, err) defer func() { testutil.Ok(t, os.RemoveAll(dir)) }() - w, err := wal.New(nil, nil, dir) + w, err := wal.New(nil, nil, filepath.Join(dir, "wal")) testutil.Ok(t, err) defer w.Close() - for i := 1; i <= test.totalRecs; i++ { - // At this point insert a corrupted record. - if i-1 == test.expRecs { - testutil.Ok(t, w.Log(test.corrFunc(test.rec))) - continue + // Fill the wal and corrupt it. + { + for i := 1; i <= test.totalRecs; i++ { + // At this point insert a corrupted record. + if i-1 == test.expRecs { + testutil.Ok(t, w.Log(test.corrFunc(test.rec))) + continue + } + testutil.Ok(t, w.Log(test.rec)) } - testutil.Ok(t, w.Log(test.rec)) + + h, err := NewHead(nil, nil, w, 1) + testutil.Ok(t, err) + + initErr := h.Init(math.MinInt64) + + err = errors.Cause(initErr) // So that we can pick up errors even if wrapped. + _, corrErr := err.(*wal.CorruptionErr) + testutil.Assert(t, corrErr, "reading the wal didn't return corruption error") } - h, err := NewHead(nil, nil, w, 1) - testutil.Ok(t, err) - testutil.Equals(t, 0.0, prom_testutil.ToFloat64(h.metrics.walCorruptionsTotal)) - testutil.Ok(t, h.Init(math.MinInt64)) - testutil.Equals(t, 1.0, prom_testutil.ToFloat64(h.metrics.walCorruptionsTotal)) + // Open the db to trigger a repair. + // Also test the wal corruption metric is working as expected. + { + db, err := Open(dir, nil, nil, DefaultOptions) + testutil.Ok(t, err) + defer func() { + testutil.Ok(t, db.Close()) + }() - sr, err := wal.NewSegmentsReader(dir) - testutil.Ok(t, err) - defer sr.Close() - r := wal.NewReader(sr) + testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.metrics.walCorruptionsTotal)) + } - var actRec int - for r.Next() { - actRec++ + // Read the wal content after the repair. + { + sr, err := wal.NewSegmentsReader(w.Dir()) + testutil.Ok(t, err) + defer sr.Close() + r := wal.NewReader(sr) + + var actRec int + for r.Next() { + actRec++ + } + testutil.Ok(t, r.Err()) + testutil.Equals(t, test.expRecs, actRec, "Wrong number of intact records") } - testutil.Ok(t, r.Err()) - testutil.Equals(t, test.expRecs, actRec, "Wrong number of intact records") }) } From cd0875332f774180a18db1d18809ea3619e60915 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Wed, 24 Apr 2019 17:56:06 +0300 Subject: [PATCH 02/28] add the DBReadOnly struct Signed-off-by: Krasi Georgiev --- CHANGELOG.md | 2 +- cmd/tsdb/main.go | 2 +- db.go | 49 +++++++++++++++++++++++++++++++++--------------- 3 files changed, 36 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0b9bd745..2c2d64f7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,5 @@ ## master / unreleased - - [FEATURE] New `OpenReadOnly` API to allow read only mode for a database. + - [FEATURE] New `OpenReadOnly` API to allow opening a database in read only mode. ## 0.7.0 - [CHANGE] tsdb now requires golang 1.12 or higher. diff --git a/cmd/tsdb/main.go b/cmd/tsdb/main.go index f119fa15..90a7c471 100644 --- a/cmd/tsdb/main.go +++ b/cmd/tsdb/main.go @@ -547,7 +547,7 @@ func analyzeBlock(b *tsdb.Block, limit int) { printInfo(postingInfos) } -func dumpSamples(db *tsdb.DB, mint, maxt int64) { +func dumpSamples(db *tsdb.DBReadOnly, mint, maxt int64) { q, err := db.Querier(mint, maxt) if err != nil { exitWithError(err) diff --git a/db.go b/db.go index d4ae2d19..ca765b9e 100644 --- a/db.go +++ b/db.go @@ -245,8 +245,13 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { return m } -// OpenReadOnly returns a new DB in the given directory only for read operatiuons. -func OpenReadOnly(dir string, l log.Logger, r prometheus.Registerer) (db *DB, err error) { +// DBReadOnly provides APIs for read only operations on a database. +type DBReadOnly struct { + db *DB +} + +// OpenReadOnly returns a new DB in the given directory only for read operations. +func OpenReadOnly(dir string, l log.Logger, r prometheus.Registerer) (db *DBReadOnly, err error) { if l == nil { l = log.NewNopLogger() } @@ -255,18 +260,21 @@ func OpenReadOnly(dir string, l log.Logger, r prometheus.Registerer) (db *DB, er return nil, err } - db = &DB{ - dir: dir, - logger: l, - chunkPool: chunkenc.NewPool(), - } - - db.head, err = NewHead(r, l, nil, 1) + head, err := NewHead(r, l, nil, 1) if err != nil { return nil, err } - loadable, corrupted, err := db.openBlocks() + db = &DBReadOnly{ + &DB{ + dir: dir, + logger: l, + head: head, + }, + } + db.db.metrics = newDBMetrics(db.db, r) + + loadable, corrupted, err := db.db.openBlocks() if err != nil { return nil, err } @@ -289,29 +297,40 @@ func OpenReadOnly(dir string, l log.Logger, r prometheus.Registerer) (db *DB, er return loadable[i].Meta().MinTime < loadable[j].Meta().MinTime }) - db.blocks = loadable + db.db.blocks = loadable blockMetas := make([]BlockMeta, 0, len(loadable)) for _, b := range loadable { blockMetas = append(blockMetas, b.Meta()) } if overlaps := OverlappingBlocks(blockMetas); len(overlaps) > 0 { - level.Warn(db.logger).Log("msg", "overlapping blocks found during opening", "detail", overlaps.String()) + level.Warn(db.db.logger).Log("msg", "overlapping blocks found during opening", "detail", overlaps.String()) } + return db, nil +} + +// Querier loads the wal and returns a new querier over the data partition for the given time range. +// A goroutine must not handle more than one open Querier. +func (db *DBReadOnly) Querier(mint, maxt int64) (Querier, error) { // Set the min valid time for the ingested wal samples // to be no lower than the maxt of the last block. - blocks := db.Blocks() + blocks := db.db.Blocks() minValidTime := int64(math.MinInt64) if len(blocks) > 0 { minValidTime = blocks[len(blocks)-1].Meta().MaxTime } - if err := db.head.Init(minValidTime); err != nil { + if err := db.db.head.Init(minValidTime); err != nil { return nil, errors.Wrap(err, "read WAL") } - return db, nil + return db.db.Querier(mint, maxt) +} + +// Blocks returns the databases persisted blocks. +func (db *DBReadOnly) Blocks() []*Block { + return db.db.Blocks() } // Open returns a new DB in the given directory. From 8128c53bfa8c16978506ac65064924898253934c Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Wed, 24 Apr 2019 19:23:09 +0300 Subject: [PATCH 03/28] fix wal closing Signed-off-by: Krasi Georgiev --- head_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/head_test.go b/head_test.go index eaf32e6d..d51eb97a 100644 --- a/head_test.go +++ b/head_test.go @@ -1044,12 +1044,11 @@ func TestWalRepair(t *testing.T) { testutil.Ok(t, os.RemoveAll(dir)) }() - w, err := wal.New(nil, nil, filepath.Join(dir, "wal")) - testutil.Ok(t, err) - defer w.Close() - // Fill the wal and corrupt it. { + w, err := wal.New(nil, nil, filepath.Join(dir, "wal")) + testutil.Ok(t, err) + for i := 1; i <= test.totalRecs; i++ { // At this point insert a corrupted record. if i-1 == test.expRecs { @@ -1067,6 +1066,7 @@ func TestWalRepair(t *testing.T) { err = errors.Cause(initErr) // So that we can pick up errors even if wrapped. _, corrErr := err.(*wal.CorruptionErr) testutil.Assert(t, corrErr, "reading the wal didn't return corruption error") + testutil.Ok(t, w.Close()) } // Open the db to trigger a repair. @@ -1083,7 +1083,7 @@ func TestWalRepair(t *testing.T) { // Read the wal content after the repair. { - sr, err := wal.NewSegmentsReader(w.Dir()) + sr, err := wal.NewSegmentsReader(filepath.Join(dir, "wal")) testutil.Ok(t, err) defer sr.Close() r := wal.NewReader(sr) From 83d36838f5267c0bfa6f2a7278c54e767f020612 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Thu, 25 Apr 2019 10:38:33 +0300 Subject: [PATCH 04/28] refactored to update blocks on domand Signed-off-by: Krasi Georgiev --- cmd/tsdb/main.go | 11 ++++-- db.go | 93 +++++++++++++++++++++++++++--------------------- 2 files changed, 62 insertions(+), 42 deletions(-) diff --git a/cmd/tsdb/main.go b/cmd/tsdb/main.go index 90a7c471..247d7db8 100644 --- a/cmd/tsdb/main.go +++ b/cmd/tsdb/main.go @@ -75,14 +75,21 @@ func main() { if err != nil { exitWithError(err) } - printBlocks(db.Blocks(), listCmdHumanReadable) + blocks, err := db.Blocks() + if err != nil { + exitWithError(err) + } + printBlocks(blocks, listCmdHumanReadable) case analyzeCmd.FullCommand(): db, err := tsdb.OpenReadOnly(*analyzePath, nil, nil) if err != nil { exitWithError(err) } - blocks := db.Blocks() + blocks, err := db.Blocks() + if err != nil { + exitWithError(err) + } var block *tsdb.Block if *analyzeBlockID != "" { for _, b := range blocks { diff --git a/db.go b/db.go index ca765b9e..fa8eae0f 100644 --- a/db.go +++ b/db.go @@ -247,34 +247,73 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { // DBReadOnly provides APIs for read only operations on a database. type DBReadOnly struct { - db *DB + logger log.Logger + dir string + registerer prometheus.Registerer } // OpenReadOnly returns a new DB in the given directory only for read operations. -func OpenReadOnly(dir string, l log.Logger, r prometheus.Registerer) (db *DBReadOnly, err error) { +func OpenReadOnly(dir string, l log.Logger, r prometheus.Registerer) (*DBReadOnly, error) { + if _, err := os.Stat(dir); err != nil { + return nil, err + } + if l == nil { l = log.NewNopLogger() } - if _, err := os.Stat(dir); err != nil { + db := &DBReadOnly{ + logger: l, + dir: dir, + registerer: r, + } + + return db, nil +} + +// Querier loads the wal and returns a new querier over the data partition for the given time range. +// A goroutine must not handle more than one open Querier. +func (dbRead *DBReadOnly) Querier(mint, maxt int64) (Querier, error) { + head, err := NewHead(dbRead.registerer, dbRead.logger, nil, 1) + if err != nil { return nil, err } - head, err := NewHead(r, l, nil, 1) + blocks, err := dbRead.Blocks() if err != nil { return nil, err } - db = &DBReadOnly{ - &DB{ - dir: dir, - logger: l, - head: head, - }, + db := &DB{ + dir: dbRead.dir, + logger: dbRead.logger, + head: head, + blocks: blocks, + } + + // Set the min valid time for the ingested wal samples + // to be no lower than the maxt of the last block. + minValidTime := int64(math.MinInt64) + if len(blocks) > 0 { + minValidTime = blocks[len(blocks)-1].Meta().MaxTime + } + + if err := db.head.Init(minValidTime); err != nil { + return nil, errors.Wrap(err, "read WAL") + } + + return db.Querier(mint, maxt) +} + +// Blocks returns the databases persisted blocks. +func (dbRead *DBReadOnly) Blocks() ([]*Block, error) { + db := &DB{ + dir: dbRead.dir, + logger: dbRead.logger, } - db.db.metrics = newDBMetrics(db.db, r) + db.metrics = newDBMetrics(db, dbRead.registerer) - loadable, corrupted, err := db.db.openBlocks() + loadable, corrupted, err := db.openBlocks() if err != nil { return nil, err } @@ -297,40 +336,14 @@ func OpenReadOnly(dir string, l log.Logger, r prometheus.Registerer) (db *DBRead return loadable[i].Meta().MinTime < loadable[j].Meta().MinTime }) - db.db.blocks = loadable - blockMetas := make([]BlockMeta, 0, len(loadable)) for _, b := range loadable { blockMetas = append(blockMetas, b.Meta()) } if overlaps := OverlappingBlocks(blockMetas); len(overlaps) > 0 { - level.Warn(db.db.logger).Log("msg", "overlapping blocks found during opening", "detail", overlaps.String()) + level.Warn(dbRead.logger).Log("msg", "overlapping blocks found during opening", "detail", overlaps.String()) } - return db, nil -} - -// Querier loads the wal and returns a new querier over the data partition for the given time range. -// A goroutine must not handle more than one open Querier. -func (db *DBReadOnly) Querier(mint, maxt int64) (Querier, error) { - - // Set the min valid time for the ingested wal samples - // to be no lower than the maxt of the last block. - blocks := db.db.Blocks() - minValidTime := int64(math.MinInt64) - if len(blocks) > 0 { - minValidTime = blocks[len(blocks)-1].Meta().MaxTime - } - - if err := db.db.head.Init(minValidTime); err != nil { - return nil, errors.Wrap(err, "read WAL") - } - - return db.db.Querier(mint, maxt) -} - -// Blocks returns the databases persisted blocks. -func (db *DBReadOnly) Blocks() []*Block { - return db.db.Blocks() + return loadable, nil } // Open returns a new DB in the given directory. From a70454199ad0a2cfb211ecb3717d30f4d08787ba Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Thu, 25 Apr 2019 10:55:29 +0300 Subject: [PATCH 05/28] rename to DBView Signed-off-by: Krasi Georgiev --- cmd/tsdb/main.go | 8 ++++---- db.go | 14 +++++++------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/cmd/tsdb/main.go b/cmd/tsdb/main.go index 247d7db8..fdaade45 100644 --- a/cmd/tsdb/main.go +++ b/cmd/tsdb/main.go @@ -71,7 +71,7 @@ func main() { } wb.run() case listCmd.FullCommand(): - db, err := tsdb.OpenReadOnly(*listPath, nil, nil) + db, err := tsdb.NewDBView(*listPath, nil, nil) if err != nil { exitWithError(err) } @@ -81,7 +81,7 @@ func main() { } printBlocks(blocks, listCmdHumanReadable) case analyzeCmd.FullCommand(): - db, err := tsdb.OpenReadOnly(*analyzePath, nil, nil) + db, err := tsdb.NewDBView(*analyzePath, nil, nil) if err != nil { exitWithError(err) @@ -106,7 +106,7 @@ func main() { } analyzeBlock(block, *analyzeLimit) case dumpCmd.FullCommand(): - db, err := tsdb.OpenReadOnly(*dumpPath, nil, nil) + db, err := tsdb.NewDBView(*dumpPath, nil, nil) if err != nil { exitWithError(err) @@ -554,7 +554,7 @@ func analyzeBlock(b *tsdb.Block, limit int) { printInfo(postingInfos) } -func dumpSamples(db *tsdb.DBReadOnly, mint, maxt int64) { +func dumpSamples(db *tsdb.DBView, mint, maxt int64) { q, err := db.Querier(mint, maxt) if err != nil { exitWithError(err) diff --git a/db.go b/db.go index fa8eae0f..fefa5012 100644 --- a/db.go +++ b/db.go @@ -245,15 +245,15 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { return m } -// DBReadOnly provides APIs for read only operations on a database. -type DBReadOnly struct { +// DBView provides APIs for read only operations on a database. +type DBView struct { logger log.Logger dir string registerer prometheus.Registerer } -// OpenReadOnly returns a new DB in the given directory only for read operations. -func OpenReadOnly(dir string, l log.Logger, r prometheus.Registerer) (*DBReadOnly, error) { +// NewDBView returns a new DB in the given directory only for read operations. +func NewDBView(dir string, l log.Logger, r prometheus.Registerer) (*DBView, error) { if _, err := os.Stat(dir); err != nil { return nil, err } @@ -262,7 +262,7 @@ func OpenReadOnly(dir string, l log.Logger, r prometheus.Registerer) (*DBReadOnl l = log.NewNopLogger() } - db := &DBReadOnly{ + db := &DBView{ logger: l, dir: dir, registerer: r, @@ -273,7 +273,7 @@ func OpenReadOnly(dir string, l log.Logger, r prometheus.Registerer) (*DBReadOnl // Querier loads the wal and returns a new querier over the data partition for the given time range. // A goroutine must not handle more than one open Querier. -func (dbRead *DBReadOnly) Querier(mint, maxt int64) (Querier, error) { +func (dbRead *DBView) Querier(mint, maxt int64) (Querier, error) { head, err := NewHead(dbRead.registerer, dbRead.logger, nil, 1) if err != nil { return nil, err @@ -306,7 +306,7 @@ func (dbRead *DBReadOnly) Querier(mint, maxt int64) (Querier, error) { } // Blocks returns the databases persisted blocks. -func (dbRead *DBReadOnly) Blocks() ([]*Block, error) { +func (dbRead *DBView) Blocks() ([]*Block, error) { db := &DB{ dir: dbRead.dir, logger: dbRead.logger, From a16cd172b46ca01306dbff6c50981a991ccd9f19 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Thu, 25 Apr 2019 11:28:04 +0300 Subject: [PATCH 06/28] revert renaming Signed-off-by: Krasi Georgiev --- cmd/tsdb/main.go | 8 ++++---- db.go | 14 +++++++------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/cmd/tsdb/main.go b/cmd/tsdb/main.go index fdaade45..584855e3 100644 --- a/cmd/tsdb/main.go +++ b/cmd/tsdb/main.go @@ -71,7 +71,7 @@ func main() { } wb.run() case listCmd.FullCommand(): - db, err := tsdb.NewDBView(*listPath, nil, nil) + db, err := tsdb.NewDBReadOnly(*listPath, nil, nil) if err != nil { exitWithError(err) } @@ -81,7 +81,7 @@ func main() { } printBlocks(blocks, listCmdHumanReadable) case analyzeCmd.FullCommand(): - db, err := tsdb.NewDBView(*analyzePath, nil, nil) + db, err := tsdb.NewDBReadOnly(*analyzePath, nil, nil) if err != nil { exitWithError(err) @@ -106,7 +106,7 @@ func main() { } analyzeBlock(block, *analyzeLimit) case dumpCmd.FullCommand(): - db, err := tsdb.NewDBView(*dumpPath, nil, nil) + db, err := tsdb.NewDBReadOnly(*dumpPath, nil, nil) if err != nil { exitWithError(err) @@ -554,7 +554,7 @@ func analyzeBlock(b *tsdb.Block, limit int) { printInfo(postingInfos) } -func dumpSamples(db *tsdb.DBView, mint, maxt int64) { +func dumpSamples(db *tsdb.DBReadOnly, mint, maxt int64) { q, err := db.Querier(mint, maxt) if err != nil { exitWithError(err) diff --git a/db.go b/db.go index fefa5012..b24aba10 100644 --- a/db.go +++ b/db.go @@ -245,15 +245,15 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { return m } -// DBView provides APIs for read only operations on a database. -type DBView struct { +// DBReadOnly provides APIs for read only operations on a database. +type DBReadOnly struct { logger log.Logger dir string registerer prometheus.Registerer } -// NewDBView returns a new DB in the given directory only for read operations. -func NewDBView(dir string, l log.Logger, r prometheus.Registerer) (*DBView, error) { +// NewDBReadOnly returns a new DB in the given directory for read only operations. +func NewDBReadOnly(dir string, l log.Logger, r prometheus.Registerer) (*DBReadOnly, error) { if _, err := os.Stat(dir); err != nil { return nil, err } @@ -262,7 +262,7 @@ func NewDBView(dir string, l log.Logger, r prometheus.Registerer) (*DBView, erro l = log.NewNopLogger() } - db := &DBView{ + db := &DBReadOnly{ logger: l, dir: dir, registerer: r, @@ -273,7 +273,7 @@ func NewDBView(dir string, l log.Logger, r prometheus.Registerer) (*DBView, erro // Querier loads the wal and returns a new querier over the data partition for the given time range. // A goroutine must not handle more than one open Querier. -func (dbRead *DBView) Querier(mint, maxt int64) (Querier, error) { +func (dbRead *DBReadOnly) Querier(mint, maxt int64) (Querier, error) { head, err := NewHead(dbRead.registerer, dbRead.logger, nil, 1) if err != nil { return nil, err @@ -306,7 +306,7 @@ func (dbRead *DBView) Querier(mint, maxt int64) (Querier, error) { } // Blocks returns the databases persisted blocks. -func (dbRead *DBView) Blocks() ([]*Block, error) { +func (dbRead *DBReadOnly) Blocks() ([]*Block, error) { db := &DB{ dir: dbRead.dir, logger: dbRead.logger, From ce25efc8bc925ad0caaf0e23f892e939e9d56c85 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Thu, 25 Apr 2019 11:34:20 +0300 Subject: [PATCH 07/28] remove metrics Signed-off-by: Krasi Georgiev --- CHANGELOG.md | 2 +- cmd/tsdb/main.go | 6 +++--- db.go | 15 ++++++--------- 3 files changed, 10 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2c2d64f7..dafb23b1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,5 @@ ## master / unreleased - - [FEATURE] New `OpenReadOnly` API to allow opening a database in read only mode. + - [FEATURE] Added `DBReadOnly` to allow opening a database in read only mode. ## 0.7.0 - [CHANGE] tsdb now requires golang 1.12 or higher. diff --git a/cmd/tsdb/main.go b/cmd/tsdb/main.go index 584855e3..94a5fa9c 100644 --- a/cmd/tsdb/main.go +++ b/cmd/tsdb/main.go @@ -71,7 +71,7 @@ func main() { } wb.run() case listCmd.FullCommand(): - db, err := tsdb.NewDBReadOnly(*listPath, nil, nil) + db, err := tsdb.NewDBReadOnly(*listPath, nil) if err != nil { exitWithError(err) } @@ -81,7 +81,7 @@ func main() { } printBlocks(blocks, listCmdHumanReadable) case analyzeCmd.FullCommand(): - db, err := tsdb.NewDBReadOnly(*analyzePath, nil, nil) + db, err := tsdb.NewDBReadOnly(*analyzePath, nil) if err != nil { exitWithError(err) @@ -106,7 +106,7 @@ func main() { } analyzeBlock(block, *analyzeLimit) case dumpCmd.FullCommand(): - db, err := tsdb.NewDBReadOnly(*dumpPath, nil, nil) + db, err := tsdb.NewDBReadOnly(*dumpPath, nil) if err != nil { exitWithError(err) diff --git a/db.go b/db.go index b24aba10..2c50a2ca 100644 --- a/db.go +++ b/db.go @@ -247,13 +247,12 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { // DBReadOnly provides APIs for read only operations on a database. type DBReadOnly struct { - logger log.Logger - dir string - registerer prometheus.Registerer + logger log.Logger + dir string } // NewDBReadOnly returns a new DB in the given directory for read only operations. -func NewDBReadOnly(dir string, l log.Logger, r prometheus.Registerer) (*DBReadOnly, error) { +func NewDBReadOnly(dir string, l log.Logger) (*DBReadOnly, error) { if _, err := os.Stat(dir); err != nil { return nil, err } @@ -263,9 +262,8 @@ func NewDBReadOnly(dir string, l log.Logger, r prometheus.Registerer) (*DBReadOn } db := &DBReadOnly{ - logger: l, - dir: dir, - registerer: r, + logger: l, + dir: dir, } return db, nil @@ -274,7 +272,7 @@ func NewDBReadOnly(dir string, l log.Logger, r prometheus.Registerer) (*DBReadOn // Querier loads the wal and returns a new querier over the data partition for the given time range. // A goroutine must not handle more than one open Querier. func (dbRead *DBReadOnly) Querier(mint, maxt int64) (Querier, error) { - head, err := NewHead(dbRead.registerer, dbRead.logger, nil, 1) + head, err := NewHead(nil, dbRead.logger, nil, 1) if err != nil { return nil, err } @@ -311,7 +309,6 @@ func (dbRead *DBReadOnly) Blocks() ([]*Block, error) { dir: dbRead.dir, logger: dbRead.logger, } - db.metrics = newDBMetrics(db, dbRead.registerer) loadable, corrupted, err := db.openBlocks() if err != nil { From 825d6c0d7d48fb4c832c5bc1a2415ef648515052 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Mon, 6 May 2019 12:12:24 +0300 Subject: [PATCH 08/28] detach db from openBlocks() Signed-off-by: Krasi Georgiev --- db.go | 35 +++++++++++++++-------------------- 1 file changed, 15 insertions(+), 20 deletions(-) diff --git a/db.go b/db.go index 2c50a2ca..7930186b 100644 --- a/db.go +++ b/db.go @@ -305,12 +305,7 @@ func (dbRead *DBReadOnly) Querier(mint, maxt int64) (Querier, error) { // Blocks returns the databases persisted blocks. func (dbRead *DBReadOnly) Blocks() ([]*Block, error) { - db := &DB{ - dir: dbRead.dir, - logger: dbRead.logger, - } - - loadable, corrupted, err := db.openBlocks() + loadable, corrupted, err := openBlocks(dbRead.logger, dbRead.dir, nil, nil) if err != nil { return nil, err } @@ -607,13 +602,13 @@ func (db *DB) compact() (err error) { return nil } -func (db *DB) getBlock(id ulid.ULID) (*Block, bool) { - for _, b := range db.blocks { +func getBlock(loaded []*Block, id ulid.ULID) *Block { + for _, b := range loaded { if b.Meta().ULID == id { - return b, true + return b } } - return nil, false + return nil } // reload blocks and trigger head truncation if new blocks appeared. @@ -626,7 +621,7 @@ func (db *DB) reload() (err error) { db.metrics.reloads.Inc() }() - loadable, corrupted, err := db.openBlocks() + loadable, corrupted, err := openBlocks(db.logger, db.dir, db.blocks, db.chunkPool) if err != nil { return err } @@ -646,7 +641,7 @@ func (db *DB) reload() (err error) { if len(corrupted) > 0 { // Close all new blocks to release the lock for windows. for _, block := range loadable { - if _, loaded := db.getBlock(block.Meta().ULID); !loaded { + if b := getBlock(db.blocks, block.Meta().ULID); b == nil { block.Close() } } @@ -714,24 +709,24 @@ func (db *DB) reload() (err error) { return errors.Wrap(db.head.Truncate(maxt), "head truncate failed") } -func (db *DB) openBlocks() (blocks []*Block, corrupted map[ulid.ULID]error, err error) { - dirs, err := blockDirs(db.dir) +func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Pool) (blocks []*Block, corrupted map[ulid.ULID]error, err error) { + blockDirs, err := blockDirs(dir) if err != nil { return nil, nil, errors.Wrap(err, "find blocks") } corrupted = make(map[ulid.ULID]error) - for _, dir := range dirs { - meta, err := readMetaFile(dir) + for _, blockDir := range blockDirs { + meta, err := readMetaFile(blockDir) if err != nil { - level.Error(db.logger).Log("msg", "not a block dir", "dir", dir) + level.Error(l).Log("msg", "not a block dir", "dir", blockDir) continue } // See if we already have the block in memory or open it otherwise. - block, ok := db.getBlock(meta.ULID) - if !ok { - block, err = OpenBlock(db.logger, dir, db.chunkPool) + block := getBlock(loaded, meta.ULID) + if block == nil { + block, err = OpenBlock(l, blockDir, chunkPool) if err != nil { corrupted[meta.ULID] = err continue From c2d809bd3491eee80f8b38d98fd846d3a8e2b601 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Thu, 30 May 2019 12:47:51 +0300 Subject: [PATCH 09/28] refactor and add tests Signed-off-by: Krasi Georgiev --- cmd/tsdb/main.go | 6 +-- db.go | 38 ++++++++++++------- db_test.go | 87 +++++++++++++++++++++++++++++++++++++++++++ testutil/directory.go | 16 ++++++++ wal/wal.go | 70 ++++++++++++++++++++++------------ 5 files changed, 176 insertions(+), 41 deletions(-) diff --git a/cmd/tsdb/main.go b/cmd/tsdb/main.go index 39503cca..d99f3af3 100644 --- a/cmd/tsdb/main.go +++ b/cmd/tsdb/main.go @@ -71,7 +71,7 @@ func main() { } wb.run() case listCmd.FullCommand(): - db, err := tsdb.NewDBReadOnly(*listPath, nil) + db, err := tsdb.OpenDBReadOnly(*listPath, nil) if err != nil { exitWithError(err) } @@ -81,7 +81,7 @@ func main() { } printBlocks(blocks, listCmdHumanReadable) case analyzeCmd.FullCommand(): - db, err := tsdb.NewDBReadOnly(*analyzePath, nil) + db, err := tsdb.OpenDBReadOnly(*analyzePath, nil) if err != nil { exitWithError(err) @@ -106,7 +106,7 @@ func main() { } analyzeBlock(block, *analyzeLimit) case dumpCmd.FullCommand(): - db, err := tsdb.NewDBReadOnly(*dumpPath, nil) + db, err := tsdb.OpenDBReadOnly(*dumpPath, nil) if err != nil { exitWithError(err) diff --git a/db.go b/db.go index 7930186b..b69639b8 100644 --- a/db.go +++ b/db.go @@ -251,8 +251,8 @@ type DBReadOnly struct { dir string } -// NewDBReadOnly returns a new DB in the given directory for read only operations. -func NewDBReadOnly(dir string, l log.Logger) (*DBReadOnly, error) { +// OpenDBReadOnly opens DB in the given directory for read only operations. +func OpenDBReadOnly(dir string, l log.Logger) (*DBReadOnly, error) { if _, err := os.Stat(dir); err != nil { return nil, err } @@ -271,20 +271,24 @@ func NewDBReadOnly(dir string, l log.Logger) (*DBReadOnly, error) { // Querier loads the wal and returns a new querier over the data partition for the given time range. // A goroutine must not handle more than one open Querier. -func (dbRead *DBReadOnly) Querier(mint, maxt int64) (Querier, error) { - head, err := NewHead(nil, dbRead.logger, nil, 1) +func (db *DBReadOnly) Querier(mint, maxt int64) (Querier, error) { + w, err := wal.NewReadOnly(db.logger, nil, filepath.Join(db.dir, "wal")) + if err != nil { + return nil, err + } + head, err := NewHead(nil, db.logger, w, 1) if err != nil { return nil, err } - blocks, err := dbRead.Blocks() + blocks, err := db.Blocks() if err != nil { return nil, err } - db := &DB{ - dir: dbRead.dir, - logger: dbRead.logger, + dbWritable := &DB{ + dir: db.dir, + logger: db.logger, head: head, blocks: blocks, } @@ -296,16 +300,17 @@ func (dbRead *DBReadOnly) Querier(mint, maxt int64) (Querier, error) { minValidTime = blocks[len(blocks)-1].Meta().MaxTime } - if err := db.head.Init(minValidTime); err != nil { + if err := dbWritable.head.Init(minValidTime); err != nil { return nil, errors.Wrap(err, "read WAL") } - return db.Querier(mint, maxt) + return dbWritable.Querier(mint, maxt) } -// Blocks returns the databases persisted blocks. -func (dbRead *DBReadOnly) Blocks() ([]*Block, error) { - loadable, corrupted, err := openBlocks(dbRead.logger, dbRead.dir, nil, nil) +// Blocks returns all persisted blocks. +// It is up to the caller to close the blocks. +func (db *DBReadOnly) Blocks() ([]*Block, error) { + loadable, corrupted, err := openBlocks(db.logger, db.dir, nil, nil) if err != nil { return nil, err } @@ -317,6 +322,11 @@ func (dbRead *DBReadOnly) Blocks() ([]*Block, error) { } } if len(corrupted) > 0 { + for _, b := range loadable { + if err := b.Close(); err != nil { + level.Warn(db.logger).Log("msg", "closing a block", err) + } + } return nil, fmt.Errorf("unexpected corrupted block:%v", corrupted) } @@ -333,7 +343,7 @@ func (dbRead *DBReadOnly) Blocks() ([]*Block, error) { blockMetas = append(blockMetas, b.Meta()) } if overlaps := OverlappingBlocks(blockMetas); len(overlaps) > 0 { - level.Warn(dbRead.logger).Log("msg", "overlapping blocks found during opening", "detail", overlaps.String()) + level.Warn(db.logger).Log("msg", "overlapping blocks found during opening", "detail", overlaps.String()) } return loadable, nil } diff --git a/db_test.go b/db_test.go index 6014a1d0..868fa5fa 100644 --- a/db_test.go +++ b/db_test.go @@ -2191,3 +2191,90 @@ func TestBlockRanges(t *testing.T) { t.Fatalf("new block overlaps old:%v,new:%v", db.Blocks()[2].Meta(), db.Blocks()[3].Meta()) } } + +// TestDBReadOnly ensures that opening a DB in readonly mode doesn't modify any files on the disk. +// It also checks that the API calls return equivalent results as a normal db.Open() mode. +func TestDBReadOnly(t *testing.T) { + var ( + dbDir string + logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) + expBlocks []*Block + expSeries map[string][]tsdbutil.Sample + expDbSize int64 + matchAll = labels.NewEqualMatcher("", "") + err error + ) + + // Boostrap the db. + { + dbDir, err = ioutil.TempDir("", "test") + testutil.Ok(t, err) + + defer func() { + testutil.Ok(t, os.RemoveAll(dbDir)) + }() + + dbBlocks := []*BlockMeta{ + {MinTime: 10, MaxTime: 11}, + {MinTime: 11, MaxTime: 12}, + {MinTime: 12, MaxTime: 13}, + } + + for _, m := range dbBlocks { + createBlock(t, dbDir, genSeries(1, 1, m.MinTime, m.MaxTime)) + } + } + + // Open a normal db to use for a comparison. + { + dbWritable, err := Open(dbDir, logger, nil, nil) + testutil.Ok(t, err) + defer func() { + testutil.Ok(t, dbWritable.Close()) + }() + dbWritable.DisableCompactions() + + dbSizeBeforeAppend, err := testutil.DirSize(dbWritable.Dir()) + testutil.Ok(t, err) + app := dbWritable.Appender() + _, err = app.Add(labels.FromStrings("foo", "bar"), dbWritable.Head().MaxTime()+1, 0) + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + + expBlocks = dbWritable.Blocks() + expDbSize, err = testutil.DirSize(dbWritable.Dir()) + testutil.Ok(t, err) + testutil.Assert(t, expDbSize > dbSizeBeforeAppend, "db size didn't increase after an append") + + q, err := dbWritable.Querier(math.MinInt64, math.MaxInt64) + testutil.Ok(t, err) + expSeries = query(t, q, matchAll) + } + + // Open a read only db and ensure that the API returns the same result as the normal DB. + { + dbReadOnly, err := OpenDBReadOnly(dbDir, logger) + testutil.Ok(t, err) + blocks, err := dbReadOnly.Blocks() + testutil.Ok(t, err) + testutil.Equals(t, len(expBlocks), len(blocks)) + + for i, expBlock := range expBlocks { + testutil.Equals(t, expBlock.Size(), blocks[i].Size(), "block size mismatch") + testutil.Equals(t, expBlock.Meta(), blocks[i].Meta(), "block meta mismatch") + testutil.Equals(t, expBlock.Dir(), blocks[i].Dir(), "block dir mismatch") + testutil.Equals(t, expBlock.MinTime(), blocks[i].MinTime(), "block MinTime mismatch") + testutil.Equals(t, expBlock.MaxTime(), blocks[i].MaxTime(), "block MaxTime mismatch") + } + + q, err := dbReadOnly.Querier(math.MinInt64, math.MaxInt64) + testutil.Ok(t, err) + readOnlySeries := query(t, q, matchAll) + readOnlyDBSize, err := testutil.DirSize(dbDir) + testutil.Ok(t, err) + + testutil.Assert(t, len(readOnlySeries) > 0, "querier should return some series") + testutil.Equals(t, expSeries, readOnlySeries, "series mismatch") + testutil.Equals(t, expDbSize, readOnlyDBSize, "after all read operations the db size should remain the same") + } +} diff --git a/testutil/directory.go b/testutil/directory.go index d3c9c926..fd9d4dae 100644 --- a/testutil/directory.go +++ b/testutil/directory.go @@ -16,6 +16,7 @@ package testutil import ( "io/ioutil" "os" + "path/filepath" ) const ( @@ -127,3 +128,18 @@ func NewTemporaryDirectory(name string, t T) (handler TemporaryDirectory) { return } + +// DirSize returns a directory size in bytes. +func DirSize(path string) (int64, error) { + var size int64 + err := filepath.Walk(path, func(_ string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if !info.IsDir() { + size += info.Size() + } + return err + }) + return size, err +} diff --git a/wal/wal.go b/wal/wal.go index cb2e11ff..433ad5ff 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -199,6 +199,52 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi actorc: make(chan func(), 100), stopc: make(chan chan struct{}), } + registerMetrics(reg, w) + + _, j, err := w.Segments() + // Index of the Segment we want to open and write to. + writeSegmentIndex := 0 + if err != nil { + return nil, errors.Wrap(err, "get segment range") + } + // If some segments already exist create one with a higher index than the last segment. + if j != -1 { + writeSegmentIndex = j + 1 + } + + segment, err := CreateSegment(w.dir, writeSegmentIndex) + if err != nil { + return nil, err + } + + if err := w.setSegment(segment); err != nil { + return nil, err + } + + go w.run() + + return w, nil +} + +// NewReadOnly returns a WAL for read only operations. +func NewReadOnly(logger log.Logger, reg prometheus.Registerer, dir string) (*WAL, error) { + if logger == nil { + logger = log.NewNopLogger() + } + w := &WAL{ + dir: dir, + logger: logger, + segmentSize: DefaultSegmentSize, + page: &page{}, + actorc: make(chan func(), 100), + stopc: make(chan chan struct{}), + } + + registerMetrics(reg, w) + return w, nil +} + +func registerMetrics(reg prometheus.Registerer, w *WAL) { w.fsyncDuration = prometheus.NewSummary(prometheus.SummaryOpts{ Name: "prometheus_tsdb_wal_fsync_duration_seconds", Help: "Duration of WAL fsync.", @@ -226,30 +272,6 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi if reg != nil { reg.MustRegister(w.fsyncDuration, w.pageFlushes, w.pageCompletions, w.truncateFail, w.truncateTotal, w.currentSegment) } - - _, j, err := w.Segments() - // Index of the Segment we want to open and write to. - writeSegmentIndex := 0 - if err != nil { - return nil, errors.Wrap(err, "get segment range") - } - // If some segments already exist create one with a higher index than the last segment. - if j != -1 { - writeSegmentIndex = j + 1 - } - - segment, err := CreateSegment(w.dir, writeSegmentIndex) - if err != nil { - return nil, err - } - - if err := w.setSegment(segment); err != nil { - return nil, err - } - - go w.run() - - return w, nil } // Dir returns the directory of the WAL. From 1b68f0d8f2d9a463a656414525e8b1a31cc45dca Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Thu, 30 May 2019 14:55:02 +0300 Subject: [PATCH 10/28] added db.Close to close all open blocks for windows Signed-off-by: Krasi Georgiev --- db.go | 24 +++++++++++++++++++++++- db_test.go | 3 +++ 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/db.go b/db.go index b69639b8..733cdd89 100644 --- a/db.go +++ b/db.go @@ -249,6 +249,8 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { type DBReadOnly struct { logger log.Logger dir string + blocks []*Block // Keep all open blocks in the cache to close them at db.Close. + closed bool } // OpenDBReadOnly opens DB in the given directory for read only operations. @@ -308,7 +310,6 @@ func (db *DBReadOnly) Querier(mint, maxt int64) (Querier, error) { } // Blocks returns all persisted blocks. -// It is up to the caller to close the blocks. func (db *DBReadOnly) Blocks() ([]*Block, error) { loadable, corrupted, err := openBlocks(db.logger, db.dir, nil, nil) if err != nil { @@ -345,9 +346,30 @@ func (db *DBReadOnly) Blocks() ([]*Block, error) { if overlaps := OverlappingBlocks(blockMetas); len(overlaps) > 0 { level.Warn(db.logger).Log("msg", "overlapping blocks found during opening", "detail", overlaps.String()) } + + // Close all previously open blocks and add the new ones to the catche. + for _, b := range db.blocks { + b.Close() + } + db.blocks = loadable + return loadable, nil } +// Close all db blocks to release the locks. +func (db *DBReadOnly) Close() error { + if db.closed { + return errors.New("db already closed") + } + var merr tsdb_errors.MultiError + + for _, b := range db.blocks { + merr.Add(b.Close()) + } + db.closed = true + return merr.Err() +} + // Open returns a new DB in the given directory. func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db *DB, err error) { if err := os.MkdirAll(dir, 0777); err != nil { diff --git a/db_test.go b/db_test.go index 868fa5fa..ef45f790 100644 --- a/db_test.go +++ b/db_test.go @@ -2255,6 +2255,9 @@ func TestDBReadOnly(t *testing.T) { { dbReadOnly, err := OpenDBReadOnly(dbDir, logger) testutil.Ok(t, err) + defer func() { + testutil.Ok(t, dbReadOnly.Close()) + }() blocks, err := dbReadOnly.Blocks() testutil.Ok(t, err) testutil.Equals(t, len(expBlocks), len(blocks)) From 4778d73f97deba234467e4196c89a92df865c1cb Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Fri, 31 May 2019 12:37:59 +0300 Subject: [PATCH 11/28] add a read only interface for a block. Signed-off-by: Krasi Georgiev --- block.go | 18 ++++++++++++++++++ cmd/tsdb/main.go | 8 +++----- db.go | 30 ++++++++++++++++++++++-------- db_test.go | 1 + 4 files changed, 44 insertions(+), 13 deletions(-) diff --git a/block.go b/block.go index 1b6e79d9..4c442475 100644 --- a/block.go +++ b/block.go @@ -157,6 +157,24 @@ type SizeReader interface { Size() int64 } +// BlockReadOnly is a block with a read only API. +type BlockReadOnly interface { + Close() error + String() string + Dir() string + Meta() BlockMeta + MinTime() int64 + MaxTime() int64 + Size() int64 + Index() (IndexReader, error) + Chunks() (ChunkReader, error) + Tombstones() (TombstoneReader, error) + GetSymbolTableSize() uint64 + Snapshot(dir string) error + OverlapsClosedInterval(mint, maxt int64) bool + LabelNames() ([]string, error) +} + // BlockMeta provides meta information about a block. type BlockMeta struct { // Unique identifier for the block and its contents. Changes on compaction. diff --git a/cmd/tsdb/main.go b/cmd/tsdb/main.go index d99f3af3..f495a6f6 100644 --- a/cmd/tsdb/main.go +++ b/cmd/tsdb/main.go @@ -82,7 +82,6 @@ func main() { printBlocks(blocks, listCmdHumanReadable) case analyzeCmd.FullCommand(): db, err := tsdb.OpenDBReadOnly(*analyzePath, nil) - if err != nil { exitWithError(err) } @@ -90,7 +89,7 @@ func main() { if err != nil { exitWithError(err) } - var block *tsdb.Block + var block tsdb.BlockReadOnly if *analyzeBlockID != "" { for _, b := range blocks { if b.Meta().ULID.String() == *analyzeBlockID { @@ -107,7 +106,6 @@ func main() { analyzeBlock(block, *analyzeLimit) case dumpCmd.FullCommand(): db, err := tsdb.OpenDBReadOnly(*dumpPath, nil) - if err != nil { exitWithError(err) } @@ -400,7 +398,7 @@ func exitWithError(err error) { os.Exit(1) } -func printBlocks(blocks []*tsdb.Block, humanReadable *bool) { +func printBlocks(blocks []tsdb.BlockReadOnly, humanReadable *bool) { tw := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0) defer tw.Flush() @@ -427,7 +425,7 @@ func getFormatedTime(timestamp int64, humanReadable *bool) string { return strconv.FormatInt(timestamp, 10) } -func analyzeBlock(b *tsdb.Block, limit int) { +func analyzeBlock(b tsdb.BlockReadOnly, limit int) { fmt.Printf("Block path: %s\n", b.Dir()) meta := b.Meta() // Presume 1ms resolution that Prometheus uses. diff --git a/db.go b/db.go index 733cdd89..34b50725 100644 --- a/db.go +++ b/db.go @@ -283,11 +283,20 @@ func (db *DBReadOnly) Querier(mint, maxt int64) (Querier, error) { return nil, err } - blocks, err := db.Blocks() + blocksReadOnly, err := db.Blocks() if err != nil { return nil, err } + blocks := make([]*Block, len(blocksReadOnly)) + for i, b := range blocksReadOnly { + b, ok := b.(*Block) + if !ok { + return nil, errors.New("unable to convert a read only block to a normal block") + } + blocks[i] = b + } + dbWritable := &DB{ dir: db.dir, logger: db.logger, @@ -310,7 +319,7 @@ func (db *DBReadOnly) Querier(mint, maxt int64) (Querier, error) { } // Blocks returns all persisted blocks. -func (db *DBReadOnly) Blocks() ([]*Block, error) { +func (db *DBReadOnly) Blocks() ([]BlockReadOnly, error) { loadable, corrupted, err := openBlocks(db.logger, db.dir, nil, nil) if err != nil { return nil, err @@ -353,7 +362,12 @@ func (db *DBReadOnly) Blocks() ([]*Block, error) { } db.blocks = loadable - return loadable, nil + blocks := make([]BlockReadOnly, len(loadable)) + for i, b := range loadable { + blocks[i] = b + } + + return blocks, nil } // Close all db blocks to release the locks. @@ -742,23 +756,23 @@ func (db *DB) reload() (err error) { } func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Pool) (blocks []*Block, corrupted map[ulid.ULID]error, err error) { - blockDirs, err := blockDirs(dir) + bDirs, err := blockDirs(dir) if err != nil { return nil, nil, errors.Wrap(err, "find blocks") } corrupted = make(map[ulid.ULID]error) - for _, blockDir := range blockDirs { - meta, err := readMetaFile(blockDir) + for _, bDir := range bDirs { + meta, err := readMetaFile(bDir) if err != nil { - level.Error(l).Log("msg", "not a block dir", "dir", blockDir) + level.Error(l).Log("msg", "not a block dir", "dir", bDir) continue } // See if we already have the block in memory or open it otherwise. block := getBlock(loaded, meta.ULID) if block == nil { - block, err = OpenBlock(l, blockDir, chunkPool) + block, err = OpenBlock(l, bDir, chunkPool) if err != nil { corrupted[meta.ULID] = err continue diff --git a/db_test.go b/db_test.go index ef45f790..1a07478a 100644 --- a/db_test.go +++ b/db_test.go @@ -2263,6 +2263,7 @@ func TestDBReadOnly(t *testing.T) { testutil.Equals(t, len(expBlocks), len(blocks)) for i, expBlock := range expBlocks { + testutil.Equals(t, expBlock.String(), blocks[i].String(), "block string mismatch") testutil.Equals(t, expBlock.Size(), blocks[i].Size(), "block size mismatch") testutil.Equals(t, expBlock.Meta(), blocks[i].Meta(), "block meta mismatch") testutil.Equals(t, expBlock.Dir(), blocks[i].Dir(), "block dir mismatch") From 32d5aaef4d839bb070c4fd68e6d63b6ed1287655 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev Date: Mon, 3 Jun 2019 10:02:07 +0300 Subject: [PATCH 12/28] nits Signed-off-by: Krasi Georgiev --- db.go | 26 ++++++++++++++------------ 1 file changed, 14 insertions(+), 12 deletions(-) diff --git a/db.go b/db.go index 34b50725..081238fb 100644 --- a/db.go +++ b/db.go @@ -249,7 +249,7 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { type DBReadOnly struct { logger log.Logger dir string - blocks []*Block // Keep all open blocks in the cache to close them at db.Close. + blocks []BlockReadOnly // Keep all open blocks in the cache to close them at db.Close. closed bool } @@ -325,7 +325,7 @@ func (db *DBReadOnly) Blocks() ([]BlockReadOnly, error) { return nil, err } - // Corrupted blocks that have been replaced by parents can be safely ignored. + // Corrupted blocks that have been superseded by a loadable block can be safely ignored. for _, block := range loadable { for _, b := range block.Meta().Compaction.Parents { delete(corrupted, b.ULID) @@ -360,12 +360,12 @@ func (db *DBReadOnly) Blocks() ([]BlockReadOnly, error) { for _, b := range db.blocks { b.Close() } - db.blocks = loadable blocks := make([]BlockReadOnly, len(loadable)) for i, b := range loadable { blocks[i] = b } + db.blocks = blocks return blocks, nil } @@ -467,15 +467,17 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db } if initErr := db.head.Init(minValidTime); initErr != nil { - err := errors.Cause(initErr) // So that we can pick up errors even when wrapped. - if _, ok := err.(*wal.CorruptionErr); ok { - level.Warn(db.logger).Log("msg", "encountered WAL corruption error, attempting repair", "err", err) - db.metrics.walCorruptionsTotal.Inc() - if err := wlog.Repair(err); err != nil { - return nil, errors.Wrap(err, "repair corrupted WAL") + // So that we can pick up errors even when wrapped. + if err := errors.Cause(initErr); err != nil { + if _, ok := err.(*wal.CorruptionErr); ok { + level.Warn(db.logger).Log("msg", "encountered WAL corruption error, attempting repair", "err", err) + db.metrics.walCorruptionsTotal.Inc() + if err := wlog.Repair(err); err != nil { + return nil, errors.Wrap(err, "repair corrupted WAL") + } + } else { + return nil, errors.Wrap(initErr, "read WAL") } - } else { - return nil, errors.Wrap(initErr, "read WAL") } } @@ -674,7 +676,7 @@ func (db *DB) reload() (err error) { deletable := db.deletableBlocks(loadable) - // Corrupted blocks that have been replaced by parents can be safely ignored and deleted. + // Corrupted blocks that have been superseded by a loadable block can be safely ignored. // This makes it resilient against the process crashing towards the end of a compaction. // Creation of a new block and deletion of its parents cannot happen atomically. // By creating blocks with their parents, we can pick up the deletion where it left off during a crash. From add6f1ec39aa36af662fd8e430c92953b6febfaa Mon Sep 17 00:00:00 2001 From: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> Date: Tue, 11 Jun 2019 18:35:58 +0300 Subject: [PATCH 13/28] nits Signed-off-by: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> --- CHANGELOG.md | 1 + db.go | 19 ++++++++----------- db_test.go | 19 +++++++++++-------- 3 files changed, 20 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0e35541b..b2be756a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,6 @@ ## master / unreleased - [FEATURE] Added `DBReadOnly` to allow opening a database in read only mode. + - As part if this also added the `BlockReadOnly` interface to implement this new feature. ## 0.8.0 - [BUGFIX] Calling `Close` more than once on a querier returns an error instead of a panic. diff --git a/db.go b/db.go index 8121f7f2..e9d26383 100644 --- a/db.go +++ b/db.go @@ -256,6 +256,7 @@ type DBReadOnly struct { logger log.Logger dir string blocks []BlockReadOnly // Keep all open blocks in the cache to close them at db.Close. + mtx sync.Mutex closed bool } @@ -362,6 +363,8 @@ func (db *DBReadOnly) Blocks() ([]BlockReadOnly, error) { level.Warn(db.logger).Log("msg", "overlapping blocks found during opening", "detail", overlaps.String()) } + db.mtx.Lock() + defer db.mtx.Unlock() // Close all previously open blocks and add the new ones to the catche. for _, b := range db.blocks { b.Close() @@ -383,6 +386,8 @@ func (db *DBReadOnly) Close() error { } var merr tsdb_errors.MultiError + db.mtx.Lock() + defer db.mtx.Unlock() for _, b := range db.blocks { merr.Add(b.Close()) } @@ -473,17 +478,9 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db } if initErr := db.head.Init(minValidTime); initErr != nil { - // So that we can pick up errors even when wrapped. - if err := errors.Cause(initErr); err != nil { - if _, ok := err.(*wal.CorruptionErr); ok { - level.Warn(db.logger).Log("msg", "encountered WAL corruption error, attempting repair", "err", err) - db.metrics.walCorruptionsTotal.Inc() - if err := wlog.Repair(err); err != nil { - return nil, errors.Wrap(err, "repair corrupted WAL") - } - } else { - return nil, errors.Wrap(initErr, "read WAL") - } + level.Warn(db.logger).Log("msg", "encountered WAL read error, attempting repair", "err", err) + if err := wlog.Repair(err); err != nil { + return nil, errors.Wrap(err, "repair corrupted WAL") } } diff --git a/db_test.go b/db_test.go index 1a07478a..a0658059 100644 --- a/db_test.go +++ b/db_test.go @@ -2196,13 +2196,14 @@ func TestBlockRanges(t *testing.T) { // It also checks that the API calls return equivalent results as a normal db.Open() mode. func TestDBReadOnly(t *testing.T) { var ( - dbDir string - logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) - expBlocks []*Block - expSeries map[string][]tsdbutil.Sample - expDbSize int64 - matchAll = labels.NewEqualMatcher("", "") - err error + dbDir string + logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) + expBlocks []*Block + expSeries map[string][]tsdbutil.Sample + expSeriesCount int + expDbSize int64 + matchAll = labels.NewEqualMatcher("", "") + err error ) // Boostrap the db. @@ -2223,6 +2224,7 @@ func TestDBReadOnly(t *testing.T) { for _, m := range dbBlocks { createBlock(t, dbDir, genSeries(1, 1, m.MinTime, m.MaxTime)) } + expSeriesCount++ } // Open a normal db to use for a comparison. @@ -2240,6 +2242,7 @@ func TestDBReadOnly(t *testing.T) { _, err = app.Add(labels.FromStrings("foo", "bar"), dbWritable.Head().MaxTime()+1, 0) testutil.Ok(t, err) testutil.Ok(t, app.Commit()) + expSeriesCount++ expBlocks = dbWritable.Blocks() expDbSize, err = testutil.DirSize(dbWritable.Dir()) @@ -2277,7 +2280,7 @@ func TestDBReadOnly(t *testing.T) { readOnlyDBSize, err := testutil.DirSize(dbDir) testutil.Ok(t, err) - testutil.Assert(t, len(readOnlySeries) > 0, "querier should return some series") + testutil.Equals(t, expSeriesCount, len(readOnlySeries), "total series mismatch") testutil.Equals(t, expSeries, readOnlySeries, "series mismatch") testutil.Equals(t, expDbSize, readOnlyDBSize, "after all read operations the db size should remain the same") } From 6092063bdb70cc5990e05239a7fb956ac5cc1e8e Mon Sep 17 00:00:00 2001 From: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> Date: Wed, 12 Jun 2019 12:18:01 +0300 Subject: [PATCH 14/28] simplified Signed-off-by: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> --- db.go | 7 +------ head_test.go | 4 ---- wal/wal.go | 19 ++++++++++++------- wal/wal_test.go | 5 +++++ 4 files changed, 18 insertions(+), 17 deletions(-) diff --git a/db.go b/db.go index e9d26383..08f2ed35 100644 --- a/db.go +++ b/db.go @@ -154,7 +154,6 @@ type dbMetrics struct { tombCleanTimer prometheus.Histogram blocksBytes prometheus.Gauge sizeRetentionCount prometheus.Counter - walCorruptionsTotal prometheus.Counter } func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { @@ -228,10 +227,6 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { Name: "prometheus_tsdb_size_retentions_total", Help: "The number of times that blocks were deleted because the maximum number of bytes was exceeded.", }) - m.walCorruptionsTotal = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_wal_corruptions_total", - Help: "Total number of WAL corruptions.", - }) if r != nil { r.MustRegister( @@ -479,7 +474,7 @@ func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db if initErr := db.head.Init(minValidTime); initErr != nil { level.Warn(db.logger).Log("msg", "encountered WAL read error, attempting repair", "err", err) - if err := wlog.Repair(err); err != nil { + if err := wlog.Repair(initErr); err != nil { return nil, errors.Wrap(err, "repair corrupted WAL") } } diff --git a/head_test.go b/head_test.go index b7b27e8d..f57f3e37 100644 --- a/head_test.go +++ b/head_test.go @@ -24,7 +24,6 @@ import ( "testing" "github.com/pkg/errors" - prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/tsdb/chunkenc" "github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/index" @@ -1121,15 +1120,12 @@ func TestWalRepair(t *testing.T) { } // Open the db to trigger a repair. - // Also test the wal corruption metric is working as expected. { db, err := Open(dir, nil, nil, DefaultOptions) testutil.Ok(t, err) defer func() { testutil.Ok(t, db.Close()) }() - - testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.metrics.walCorruptionsTotal)) } // Read the wal content after the repair. diff --git a/wal/wal.go b/wal/wal.go index f28420e6..b81fceb8 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -166,12 +166,13 @@ type WAL struct { actorc chan func() closed bool // To allow calling Close() more than once without blocking. - fsyncDuration prometheus.Summary - pageFlushes prometheus.Counter - pageCompletions prometheus.Counter - truncateFail prometheus.Counter - truncateTotal prometheus.Counter - currentSegment prometheus.Gauge + fsyncDuration prometheus.Summary + pageFlushes prometheus.Counter + pageCompletions prometheus.Counter + truncateFail prometheus.Counter + truncateTotal prometheus.Counter + currentSegment prometheus.Gauge + walCorruptionsTotal prometheus.Counter } // New returns a new WAL over the given directory. @@ -269,6 +270,10 @@ func registerMetrics(reg prometheus.Registerer, w *WAL) { Name: "prometheus_tsdb_wal_segment_current", Help: "WAL segment index that TSDB is currently writing to.", }) + w.walCorruptionsTotal = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_wal_corruptions_total", + Help: "Total number of WAL corruptions.", + }) if reg != nil { reg.MustRegister(w.fsyncDuration, w.pageFlushes, w.pageCompletions, w.truncateFail, w.truncateTotal, w.currentSegment) } @@ -314,7 +319,7 @@ func (w *WAL) Repair(origErr error) error { if cerr.Segment < 0 { return errors.New("corruption error does not specify position") } - + w.walCorruptionsTotal.Inc() level.Warn(w.logger).Log("msg", "starting corruption repair", "segment", cerr.Segment, "offset", cerr.Offset) diff --git a/wal/wal_test.go b/wal/wal_test.go index 577ae5fd..a2abea7e 100644 --- a/wal/wal_test.go +++ b/wal/wal_test.go @@ -24,6 +24,8 @@ import ( "testing" client_testutil "github.com/prometheus/client_golang/prometheus/testutil" + prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" + "github.com/prometheus/tsdb/testutil" ) @@ -291,8 +293,11 @@ func TestCorruptAndCarryOn(t *testing.T) { w, err := NewSize(logger, nil, dir, segmentSize) testutil.Ok(t, err) + // Also test the wal corruption metric is working as expected. + testutil.Equals(t, 0.0, prom_testutil.ToFloat64(w.walCorruptionsTotal)) err = w.Repair(corruptionErr) testutil.Ok(t, err) + testutil.Equals(t, 1.0, prom_testutil.ToFloat64(w.walCorruptionsTotal)) // Ensure that we have a completely clean slate after reapiring. testutil.Equals(t, w.segment.Index(), 1) // We corrupted segment 0. From fbea5e8e11e72af239b757b2586d1a7c20d083ab Mon Sep 17 00:00:00 2001 From: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> Date: Thu, 13 Jun 2019 15:03:20 +0300 Subject: [PATCH 15/28] refactored to use the Blockreader API. Signed-off-by: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> --- CHANGELOG.md | 2 +- block.go | 25 ++------------ cmd/tsdb/main.go | 32 +++++++++++++++--- compact.go | 8 ++--- compact_test.go | 3 +- db.go | 86 ++++++++++++++++++++++++++---------------------- db_test.go | 5 --- head.go | 25 ++++++++++++-- mocks_test.go | 3 +- 9 files changed, 106 insertions(+), 83 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b2be756a..7647cca2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,6 @@ ## master / unreleased - [FEATURE] Added `DBReadOnly` to allow opening a database in read only mode. - - As part if this also added the `BlockReadOnly` interface to implement this new feature. + - `BlockReader` interface is refactored to return the full block meta instead of just MinTime/MaxTime. ## 0.8.0 - [BUGFIX] Calling `Close` more than once on a querier returns an error instead of a panic. diff --git a/block.go b/block.go index 4c442475..0eaee82d 100644 --- a/block.go +++ b/block.go @@ -138,11 +138,8 @@ type BlockReader interface { // Tombstones returns a TombstoneReader over the block's deleted data. Tombstones() (TombstoneReader, error) - // MinTime returns the min time of the block. - MinTime() int64 - - // MaxTime returns the max time of the block. - MaxTime() int64 + // Meta provides meta information about the block reader. + Meta() BlockMeta } // Appendable defines an entity to which data can be appended. @@ -157,24 +154,6 @@ type SizeReader interface { Size() int64 } -// BlockReadOnly is a block with a read only API. -type BlockReadOnly interface { - Close() error - String() string - Dir() string - Meta() BlockMeta - MinTime() int64 - MaxTime() int64 - Size() int64 - Index() (IndexReader, error) - Chunks() (ChunkReader, error) - Tombstones() (TombstoneReader, error) - GetSymbolTableSize() uint64 - Snapshot(dir string) error - OverlapsClosedInterval(mint, maxt int64) bool - LabelNames() ([]string, error) -} - // BlockMeta provides meta information about a block. type BlockMeta struct { // Unique identifier for the block and its contents. Changes on compaction. diff --git a/cmd/tsdb/main.go b/cmd/tsdb/main.go index 2f62f85d..788d2e7a 100644 --- a/cmd/tsdb/main.go +++ b/cmd/tsdb/main.go @@ -74,6 +74,12 @@ func main() { if err != nil { exitWithError(err) } + defer func() { + err := db.Close() + if err != nil { + exitWithError(err) + } + }() blocks, err := db.Blocks() if err != nil { exitWithError(err) @@ -84,11 +90,17 @@ func main() { if err != nil { exitWithError(err) } + defer func() { + err := db.Close() + if err != nil { + exitWithError(err) + } + }() blocks, err := db.Blocks() if err != nil { exitWithError(err) } - var block tsdb.BlockReadOnly + var block tsdb.BlockReader if *analyzeBlockID != "" { for _, b := range blocks { if b.Meta().ULID.String() == *analyzeBlockID { @@ -108,6 +120,12 @@ func main() { if err != nil { exitWithError(err) } + defer func() { + err := db.Close() + if err != nil { + exitWithError(err) + } + }() dumpSamples(db, *dumpMinTime, *dumpMaxTime) } } @@ -394,7 +412,7 @@ func exitWithError(err error) { os.Exit(1) } -func printBlocks(blocks []tsdb.BlockReadOnly, humanReadable *bool) { +func printBlocks(blocks []tsdb.BlockReader, humanReadable *bool) { tw := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0) defer tw.Flush() @@ -421,9 +439,9 @@ func getFormatedTime(timestamp int64, humanReadable *bool) string { return strconv.FormatInt(timestamp, 10) } -func analyzeBlock(b tsdb.BlockReadOnly, limit int) { - fmt.Printf("Block path: %s\n", b.Dir()) +func analyzeBlock(b tsdb.BlockReader, limit int) { meta := b.Meta() + fmt.Printf("Block ID: %s\n", meta.ULID) // Presume 1ms resolution that Prometheus uses. fmt.Printf("Duration: %s\n", (time.Duration(meta.MaxTime-meta.MinTime) * 1e6).String()) fmt.Printf("Series: %d\n", meta.Stats.NumSeries) @@ -555,6 +573,12 @@ func dumpSamples(db *tsdb.DBReadOnly, mint, maxt int64) { if err != nil { exitWithError(err) } + defer func() { + err := q.Close() + if err != nil { + exitWithError(err) + } + }() ss, err := q.Select(labels.NewMustRegexpMatcher("", ".*")) if err != nil { diff --git a/compact.go b/compact.go index 4a56f585..49dc6b53 100644 --- a/compact.go +++ b/compact.go @@ -660,7 +660,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, }() c.metrics.populatingBlocks.Set(1) - globalMaxt := blocks[0].MaxTime() + globalMaxt := blocks[0].Meta().MaxTime for i, b := range blocks { select { case <-c.ctx.Done(): @@ -669,13 +669,13 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, } if !overlapping { - if i > 0 && b.MinTime() < globalMaxt { + if i > 0 && b.Meta().MinTime < globalMaxt { c.metrics.overlappingBlocks.Inc() overlapping = true level.Warn(c.logger).Log("msg", "found overlapping blocks during compaction", "ulid", meta.ULID) } - if b.MaxTime() > globalMaxt { - globalMaxt = b.MaxTime() + if b.Meta().MaxTime > globalMaxt { + globalMaxt = b.Meta().MaxTime } } diff --git a/compact_test.go b/compact_test.go index 545dedbf..6b2f9994 100644 --- a/compact_test.go +++ b/compact_test.go @@ -458,8 +458,7 @@ type erringBReader struct{} func (erringBReader) Index() (IndexReader, error) { return nil, errors.New("index") } func (erringBReader) Chunks() (ChunkReader, error) { return nil, errors.New("chunks") } func (erringBReader) Tombstones() (TombstoneReader, error) { return nil, errors.New("tombstones") } -func (erringBReader) MinTime() int64 { return 0 } -func (erringBReader) MaxTime() int64 { return 0 } +func (erringBReader) Meta() BlockMeta { return BlockMeta{} } type nopChunkWriter struct{} diff --git a/db.go b/db.go index 08f2ed35..06f922f0 100644 --- a/db.go +++ b/db.go @@ -248,11 +248,11 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { // DBReadOnly provides APIs for read only operations on a database. type DBReadOnly struct { - logger log.Logger - dir string - blocks []BlockReadOnly // Keep all open blocks in the cache to close them at db.Close. - mtx sync.Mutex - closed bool + logger log.Logger + dir string + closers []io.Closer + mtx sync.Mutex + closed bool } // OpenDBReadOnly opens DB in the given directory for read only operations. @@ -276,22 +276,12 @@ func OpenDBReadOnly(dir string, l log.Logger) (*DBReadOnly, error) { // Querier loads the wal and returns a new querier over the data partition for the given time range. // A goroutine must not handle more than one open Querier. func (db *DBReadOnly) Querier(mint, maxt int64) (Querier, error) { - w, err := wal.NewReadOnly(db.logger, nil, filepath.Join(db.dir, "wal")) + blocksReaders, err := db.Blocks() if err != nil { return nil, err } - head, err := NewHead(nil, db.logger, w, 1) - if err != nil { - return nil, err - } - - blocksReadOnly, err := db.Blocks() - if err != nil { - return nil, err - } - - blocks := make([]*Block, len(blocksReadOnly)) - for i, b := range blocksReadOnly { + blocks := make([]*Block, len(blocksReaders)) + for i, b := range blocksReaders { b, ok := b.(*Block) if !ok { return nil, errors.New("unable to convert a read only block to a normal block") @@ -299,29 +289,45 @@ func (db *DBReadOnly) Querier(mint, maxt int64) (Querier, error) { blocks[i] = b } - dbWritable := &DB{ - dir: db.dir, - logger: db.logger, - head: head, - blocks: blocks, + var head *Head + maxBlockTime := int64(math.MinInt64) + if len(blocks) > 0 { + maxBlockTime = blocks[len(blocks)-1].Meta().MaxTime } - // Set the min valid time for the ingested wal samples - // to be no lower than the maxt of the last block. - minValidTime := int64(math.MinInt64) - if len(blocks) > 0 { - minValidTime = blocks[len(blocks)-1].Meta().MaxTime + // Also add the WAL if the current blocks don't cover the requestes time range. + if maxBlockTime <= maxt { + w, err := wal.NewReadOnly(db.logger, nil, filepath.Join(db.dir, "wal")) + if err != nil { + return nil, err + } + head, err = NewHead(nil, db.logger, w, 1) + if err != nil { + return nil, err + } + // Set the min valid time for the ingested wal samples + // to be no lower than the maxt of the last block. + if err := head.Init(maxBlockTime); err != nil { + return nil, errors.Wrap(err, "read WAL") + } + + db.mtx.Lock() + db.closers = append(db.closers, head) + db.mtx.Unlock() } - if err := dbWritable.head.Init(minValidTime); err != nil { - return nil, errors.Wrap(err, "read WAL") + dbWritable := &DB{ + dir: db.dir, + logger: db.logger, + blocks: blocks, + head: head, } return dbWritable.Querier(mint, maxt) } // Blocks returns all persisted blocks. -func (db *DBReadOnly) Blocks() ([]BlockReadOnly, error) { +func (db *DBReadOnly) Blocks() ([]BlockReader, error) { loadable, corrupted, err := openBlocks(db.logger, db.dir, nil, nil) if err != nil { return nil, err @@ -360,18 +366,20 @@ func (db *DBReadOnly) Blocks() ([]BlockReadOnly, error) { db.mtx.Lock() defer db.mtx.Unlock() - // Close all previously open blocks and add the new ones to the catche. - for _, b := range db.blocks { - b.Close() + // Close all previously open readers and add the new ones to the cache. + for _, closer := range db.closers { + closer.Close() } - blocks := make([]BlockReadOnly, len(loadable)) + blockClosers := make([]io.Closer, len(loadable)) + blockReaders := make([]BlockReader, len(loadable)) for i, b := range loadable { - blocks[i] = b + blockClosers[i] = b + blockReaders[i] = b } - db.blocks = blocks + db.closers = blockClosers - return blocks, nil + return blockReaders, nil } // Close all db blocks to release the locks. @@ -383,7 +391,7 @@ func (db *DBReadOnly) Close() error { db.mtx.Lock() defer db.mtx.Unlock() - for _, b := range db.blocks { + for _, b := range db.closers { merr.Add(b.Close()) } db.closed = true diff --git a/db_test.go b/db_test.go index a0658059..a0aab01c 100644 --- a/db_test.go +++ b/db_test.go @@ -2266,12 +2266,7 @@ func TestDBReadOnly(t *testing.T) { testutil.Equals(t, len(expBlocks), len(blocks)) for i, expBlock := range expBlocks { - testutil.Equals(t, expBlock.String(), blocks[i].String(), "block string mismatch") - testutil.Equals(t, expBlock.Size(), blocks[i].Size(), "block size mismatch") testutil.Equals(t, expBlock.Meta(), blocks[i].Meta(), "block meta mismatch") - testutil.Equals(t, expBlock.Dir(), blocks[i].Dir(), "block dir mismatch") - testutil.Equals(t, expBlock.MinTime(), blocks[i].MinTime(), "block MinTime mismatch") - testutil.Equals(t, expBlock.MaxTime(), blocks[i].MaxTime(), "block MaxTime mismatch") } q, err := dbReadOnly.Querier(math.MinInt64, math.MaxInt64) diff --git a/head.go b/head.go index 01d7596e..2e576225 100644 --- a/head.go +++ b/head.go @@ -25,6 +25,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/tsdb/chunkenc" @@ -533,10 +534,9 @@ func (h *Head) Init(minValidTime int64) error { if err := sr.Close(); err != nil { level.Warn(h.logger).Log("msg", "error while closing the wal segments reader", "err", err) } - if err == nil { - continue + if err != nil { + return err } - return err } return nil @@ -687,6 +687,14 @@ func (h *rangeHead) MaxTime() int64 { return h.maxt } +func (h *rangeHead) Meta() BlockMeta { + return BlockMeta{ + MinTime: h.MinTime(), + MaxTime: h.MaxTime(), + ULID: h.head.Meta().ULID, + } +} + // initAppender is a helper to initialize the time bounds of the head // upon the first sample it receives. type initAppender struct { @@ -1090,6 +1098,17 @@ func (h *Head) chunksRange(mint, maxt int64) *headChunkReader { return &headChunkReader{head: h, mint: mint, maxt: maxt} } +// Meta returns meta information about the head. +func (h *Head) Meta() BlockMeta { + var id [16]byte + copy(id[:], "______head______") + return BlockMeta{ + MinTime: h.MinTime(), + MaxTime: h.MaxTime(), + ULID: ulid.ULID(id), + } +} + // MinTime returns the lowest time bound on visible data in the head. func (h *Head) MinTime() int64 { return atomic.LoadInt64(&h.minTime) diff --git a/mocks_test.go b/mocks_test.go index 243d5cf1..5442c680 100644 --- a/mocks_test.go +++ b/mocks_test.go @@ -73,5 +73,4 @@ type mockBReader struct { func (r *mockBReader) Index() (IndexReader, error) { return r.ir, nil } func (r *mockBReader) Chunks() (ChunkReader, error) { return r.cr, nil } func (r *mockBReader) Tombstones() (TombstoneReader, error) { return newMemTombstones(), nil } -func (r *mockBReader) MinTime() int64 { return r.mint } -func (r *mockBReader) MaxTime() int64 { return r.maxt } +func (r *mockBReader) Meta() BlockMeta { return BlockMeta{MinTime: r.mint, MaxTime: r.maxt} } From 3b76b2f25a176424c0ea0d4f04c70e0050ce6924 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> Date: Fri, 14 Jun 2019 13:20:33 +0300 Subject: [PATCH 16/28] non blocking head closing and use dir hash to ensure read only. Signed-off-by: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> --- db.go | 5 ++++- db_test.go | 10 ++++++---- testutil/directory.go | 41 +++++++++++++++++++++++++++++++++++++++++ wal/wal.go | 12 ++++-------- 4 files changed, 55 insertions(+), 13 deletions(-) diff --git a/db.go b/db.go index 06f922f0..13cdc21d 100644 --- a/db.go +++ b/db.go @@ -297,7 +297,7 @@ func (db *DBReadOnly) Querier(mint, maxt int64) (Querier, error) { // Also add the WAL if the current blocks don't cover the requestes time range. if maxBlockTime <= maxt { - w, err := wal.NewReadOnly(db.logger, nil, filepath.Join(db.dir, "wal")) + w, err := wal.Open(db.logger, nil, filepath.Join(db.dir, "wal")) if err != nil { return nil, err } @@ -310,6 +310,9 @@ func (db *DBReadOnly) Querier(mint, maxt int64) (Querier, error) { if err := head.Init(maxBlockTime); err != nil { return nil, errors.Wrap(err, "read WAL") } + // Set the wal to nil to disable all wal operations. + // This is mainly to avoid blocking when closing the head. + head.wal = nil db.mtx.Lock() db.closers = append(db.closers, head) diff --git a/db_test.go b/db_test.go index a0aab01c..38265989 100644 --- a/db_test.go +++ b/db_test.go @@ -2201,7 +2201,7 @@ func TestDBReadOnly(t *testing.T) { expBlocks []*Block expSeries map[string][]tsdbutil.Sample expSeriesCount int - expDbSize int64 + expDBHash []byte matchAll = labels.NewEqualMatcher("", "") err error ) @@ -2245,7 +2245,9 @@ func TestDBReadOnly(t *testing.T) { expSeriesCount++ expBlocks = dbWritable.Blocks() - expDbSize, err = testutil.DirSize(dbWritable.Dir()) + expDBHash, err = testutil.DirHash(dbWritable.Dir()) + testutil.Ok(t, err) + expDbSize, err := testutil.DirSize(dbWritable.Dir()) testutil.Ok(t, err) testutil.Assert(t, expDbSize > dbSizeBeforeAppend, "db size didn't increase after an append") @@ -2272,11 +2274,11 @@ func TestDBReadOnly(t *testing.T) { q, err := dbReadOnly.Querier(math.MinInt64, math.MaxInt64) testutil.Ok(t, err) readOnlySeries := query(t, q, matchAll) - readOnlyDBSize, err := testutil.DirSize(dbDir) + readOnlyDBHash, err := testutil.DirHash(dbDir) testutil.Ok(t, err) testutil.Equals(t, expSeriesCount, len(readOnlySeries), "total series mismatch") testutil.Equals(t, expSeries, readOnlySeries, "series mismatch") - testutil.Equals(t, expDbSize, readOnlyDBSize, "after all read operations the db size should remain the same") + testutil.Equals(t, expDBHash, readOnlyDBHash, "after all read operations the db hash should remain the same") } } diff --git a/testutil/directory.go b/testutil/directory.go index fd9d4dae..7066cf46 100644 --- a/testutil/directory.go +++ b/testutil/directory.go @@ -14,9 +14,12 @@ package testutil import ( + "crypto/md5" + "io" "io/ioutil" "os" "path/filepath" + "strconv" ) const ( @@ -143,3 +146,41 @@ func DirSize(path string) (int64, error) { }) return size, err } + +// DirHash returns a hash of all files attribites and their content within a directory. +func DirHash(path string) ([]byte, error) { + hash := md5.New() + err := filepath.Walk(path, func(path string, info os.FileInfo, err error) error { + if err != nil { + return err + } + if !info.IsDir() { + f, err := os.Open(path) + if err != nil { + return err + } + defer f.Close() + + if _, err := io.Copy(hash, f); err != nil { + return err + } + + if _, err := io.WriteString(hash, strconv.Itoa(int(info.Size()))); err != nil { + return err + } + if _, err := io.WriteString(hash, info.Name()); err != nil { + return err + } + modTime, err := info.ModTime().GobEncode() + if err != nil { + return err + } + if _, err := io.WriteString(hash, string(modTime)); err != nil { + return err + } + + } + return err + }) + return hash.Sum(nil), err +} diff --git a/wal/wal.go b/wal/wal.go index b81fceb8..10ee17fb 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -227,18 +227,14 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi return w, nil } -// NewReadOnly returns a WAL for read only operations. -func NewReadOnly(logger log.Logger, reg prometheus.Registerer, dir string) (*WAL, error) { +// Open an existing WAL. +func Open(logger log.Logger, reg prometheus.Registerer, dir string) (*WAL, error) { if logger == nil { logger = log.NewNopLogger() } w := &WAL{ - dir: dir, - logger: logger, - segmentSize: DefaultSegmentSize, - page: &page{}, - actorc: make(chan func(), 100), - stopc: make(chan chan struct{}), + dir: dir, + logger: logger, } registerMetrics(reg, w) From 764d307d90167a0adc0c00091836a29962770388 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> Date: Tue, 25 Jun 2019 13:40:27 +0300 Subject: [PATCH 17/28] fix wal corruption metrics and head test Signed-off-by: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> --- head.go | 6 +++ head_test.go | 98 ++++++++++++++++++++++--------------------- testutil/directory.go | 1 + wal/wal.go | 18 +++----- 4 files changed, 64 insertions(+), 59 deletions(-) diff --git a/head.go b/head.go index 104645ca..138e1598 100644 --- a/head.go +++ b/head.go @@ -97,6 +97,7 @@ type headMetrics struct { maxTime prometheus.GaugeFunc samplesAppended prometheus.Counter walTruncateDuration prometheus.Summary + walCorruptionsTotal prometheus.Counter headTruncateFail prometheus.Counter headTruncateTotal prometheus.Counter checkpointDeleteFail prometheus.Counter @@ -162,6 +163,10 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { Help: "Duration of WAL truncation.", Objectives: map[float64]float64{}, }) + m.walCorruptionsTotal = prometheus.NewCounter(prometheus.CounterOpts{ + Name: "prometheus_tsdb_wal_corruptions_total", + Help: "Total number of WAL corruptions.", + }) m.samplesAppended = prometheus.NewCounter(prometheus.CounterOpts{ Name: "prometheus_tsdb_head_samples_appended_total", Help: "Total number of appended samples.", @@ -205,6 +210,7 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { m.maxTime, m.gcDuration, m.walTruncateDuration, + m.walCorruptionsTotal, m.samplesAppended, m.headTruncateFail, m.headTruncateTotal, diff --git a/head_test.go b/head_test.go index 0f38ad59..55067eae 100644 --- a/head_test.go +++ b/head_test.go @@ -25,6 +25,7 @@ import ( "testing" "github.com/pkg/errors" + prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/tsdb/chunkenc" "github.com/prometheus/tsdb/chunks" "github.com/prometheus/tsdb/index" @@ -1107,62 +1108,65 @@ func TestWalRepair_DecodingError(t *testing.T) { 5, }, } { - t.Run(name, func(t *testing.T) { - dir, err := ioutil.TempDir("", "wal_repair") - testutil.Ok(t, err) - defer func() { - testutil.Ok(t, os.RemoveAll(dir)) - }() - - // Fill the wal and corrupt it. - { - w, err := wal.New(nil, nil, filepath.Join(dir, "wal")) + for _, compress := range []bool{false, true} { + t.Run(fmt.Sprintf("%s,compress=%t", name, compress), func(t *testing.T) { + dir, err := ioutil.TempDir("", "wal_repair") testutil.Ok(t, err) + defer func() { + testutil.Ok(t, os.RemoveAll(dir)) + }() - for i := 1; i <= test.totalRecs; i++ { - // At this point insert a corrupted record. - if i-1 == test.expRecs { - testutil.Ok(t, w.Log(test.corrFunc(test.rec))) - continue - } - testutil.Ok(t, w.Log(test.rec)) - } + // Fill the wal and corrupt it. + { + w, err := wal.New(nil, nil, filepath.Join(dir, "wal"), compress) + testutil.Ok(t, err) - h, err := NewHead(nil, nil, w, 1) - testutil.Ok(t, err) + for i := 1; i <= test.totalRecs; i++ { + // At this point insert a corrupted record. + if i-1 == test.expRecs { + testutil.Ok(t, w.Log(test.corrFunc(test.rec))) + continue + } + testutil.Ok(t, w.Log(test.rec)) + } - initErr := h.Init(math.MinInt64) + h, err := NewHead(nil, nil, w, 1) + testutil.Ok(t, err) + testutil.Equals(t, 0.0, prom_testutil.ToFloat64(h.metrics.walCorruptionsTotal)) + initErr := h.Init(math.MinInt64) - err = errors.Cause(initErr) // So that we can pick up errors even if wrapped. - _, corrErr := err.(*wal.CorruptionErr) - testutil.Assert(t, corrErr, "reading the wal didn't return corruption error") - testutil.Ok(t, w.Close()) - } + err = errors.Cause(initErr) // So that we can pick up errors even if wrapped. + _, corrErr := err.(*wal.CorruptionErr) + testutil.Assert(t, corrErr, "reading the wal didn't return corruption error") + testutil.Ok(t, w.Close()) + } - // Open the db to trigger a repair. - { - db, err := Open(dir, nil, nil, DefaultOptions) - testutil.Ok(t, err) - defer func() { - testutil.Ok(t, db.Close()) - }() - } + // Open the db to trigger a repair. + { + db, err := Open(dir, nil, nil, DefaultOptions) + testutil.Ok(t, err) + defer func() { + testutil.Ok(t, db.Close()) + }() + testutil.Equals(t, 1.0, prom_testutil.ToFloat64(db.head.metrics.walCorruptionsTotal)) + } - // Read the wal content after the repair. - { - sr, err := wal.NewSegmentsReader(filepath.Join(dir, "wal")) - testutil.Ok(t, err) - defer sr.Close() - r := wal.NewReader(sr) + // Read the wal content after the repair. + { + sr, err := wal.NewSegmentsReader(filepath.Join(dir, "wal")) + testutil.Ok(t, err) + defer sr.Close() + r := wal.NewReader(sr) - var actRec int - for r.Next() { - actRec++ + var actRec int + for r.Next() { + actRec++ + } + testutil.Ok(t, r.Err()) + testutil.Equals(t, test.expRecs, actRec, "Wrong number of intact records") } - testutil.Ok(t, r.Err()) - testutil.Equals(t, test.expRecs, actRec, "Wrong number of intact records") - } - }) + }) + } } } diff --git a/testutil/directory.go b/testutil/directory.go index 781dc1a0..7066cf46 100644 --- a/testutil/directory.go +++ b/testutil/directory.go @@ -19,6 +19,7 @@ import ( "io/ioutil" "os" "path/filepath" + "strconv" ) const ( diff --git a/wal/wal.go b/wal/wal.go index 919ecb0d..ff0d4c35 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -169,13 +169,12 @@ type WAL struct { compress bool snappyBuf []byte - fsyncDuration prometheus.Summary - pageFlushes prometheus.Counter - pageCompletions prometheus.Counter - truncateFail prometheus.Counter - truncateTotal prometheus.Counter - currentSegment prometheus.Gauge - walCorruptionsTotal prometheus.Counter + fsyncDuration prometheus.Summary + pageFlushes prometheus.Counter + pageCompletions prometheus.Counter + truncateFail prometheus.Counter + truncateTotal prometheus.Counter + currentSegment prometheus.Gauge } // New returns a new WAL over the given directory. @@ -271,10 +270,6 @@ func registerMetrics(reg prometheus.Registerer, w *WAL) { Name: "prometheus_tsdb_wal_segment_current", Help: "WAL segment index that TSDB is currently writing to.", }) - w.walCorruptionsTotal = prometheus.NewCounter(prometheus.CounterOpts{ - Name: "prometheus_tsdb_wal_corruptions_total", - Help: "Total number of WAL corruptions.", - }) if reg != nil { reg.MustRegister(w.fsyncDuration, w.pageFlushes, w.pageCompletions, w.truncateFail, w.truncateTotal, w.currentSegment) } @@ -325,7 +320,6 @@ func (w *WAL) Repair(origErr error) error { if cerr.Segment < 0 { return errors.New("corruption error does not specify position") } - w.walCorruptionsTotal.Inc() level.Warn(w.logger).Log("msg", "starting corruption repair", "segment", cerr.Segment, "offset", cerr.Offset) From bd79d07c526de5305b543e90e037756e1984a6f8 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> Date: Tue, 25 Jun 2019 13:48:52 +0300 Subject: [PATCH 18/28] nits Signed-off-by: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> --- CHANGELOG.md | 4 +++- db.go | 2 +- db_test.go | 9 ++++----- testutil/directory.go | 4 ++-- wal/wal.go | 1 - wal/wal_test.go | 5 ----- 6 files changed, 10 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 21371e00..731c42ca 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,8 @@ ## master / unreleased - [FEATURE] Added `DBReadOnly` to allow opening a database in read only mode. - - `BlockReader` interface is refactored to return the full block meta instead of just MinTime/MaxTime. + - `DBReadOnly.Blocks()` exposes a slice of `BlockReader`s and + for this the interface is refactored to return the full block meta instead of + just MinTime/MaxTime. Required to allow reading the ULID of a block. - [FEATURE] Provide option to compress WAL records using Snappy. [#609](https://github.com/prometheus/tsdb/pull/609) - [BUGFIX] Re-calculate block size when calling `block.Delete`. - [CHANGE] The meta file `BlockStats` no longer holds size information. This is now dynamically calculated and kept in memory. It also includes the meta file size which was not included before. diff --git a/db.go b/db.go index a9037470..8e90d67c 100644 --- a/db.go +++ b/db.go @@ -333,7 +333,7 @@ func (db *DBReadOnly) Querier(mint, maxt int64) (Querier, error) { return dbWritable.Querier(mint, maxt) } -// Blocks returns all persisted blocks. +// Blocks returns a slice of block readers for persisted blocks. func (db *DBReadOnly) Blocks() ([]BlockReader, error) { loadable, corrupted, err := openBlocks(db.logger, db.dir, nil, nil) if err != nil { diff --git a/db_test.go b/db_test.go index 2be58888..0732c5dd 100644 --- a/db_test.go +++ b/db_test.go @@ -2216,9 +2216,6 @@ func TestDBReadOnly(t *testing.T) { { dbWritable, err := Open(dbDir, logger, nil, nil) testutil.Ok(t, err) - defer func() { - testutil.Ok(t, dbWritable.Close()) - }() dbWritable.DisableCompactions() dbSizeBeforeAppend, err := testutil.DirSize(dbWritable.Dir()) @@ -2230,8 +2227,6 @@ func TestDBReadOnly(t *testing.T) { expSeriesCount++ expBlocks = dbWritable.Blocks() - expDBHash, err = testutil.DirHash(dbWritable.Dir()) - testutil.Ok(t, err) expDbSize, err := testutil.DirSize(dbWritable.Dir()) testutil.Ok(t, err) testutil.Assert(t, expDbSize > dbSizeBeforeAppend, "db size didn't increase after an append") @@ -2239,6 +2234,10 @@ func TestDBReadOnly(t *testing.T) { q, err := dbWritable.Querier(math.MinInt64, math.MaxInt64) testutil.Ok(t, err) expSeries = query(t, q, matchAll) + + testutil.Ok(t, dbWritable.Close()) // Close here to allow getting the dir hash for windows. + expDBHash, err = testutil.DirHash(dbWritable.Dir()) + testutil.Ok(t, err) } // Open a read only db and ensure that the API returns the same result as the normal DB. diff --git a/testutil/directory.go b/testutil/directory.go index 7066cf46..f0c66700 100644 --- a/testutil/directory.go +++ b/testutil/directory.go @@ -132,7 +132,7 @@ func NewTemporaryDirectory(name string, t T) (handler TemporaryDirectory) { return } -// DirSize returns a directory size in bytes. +// DirSize returns the size in bytes of all files in a directory. func DirSize(path string) (int64, error) { var size int64 err := filepath.Walk(path, func(_ string, info os.FileInfo, err error) error { @@ -142,7 +142,7 @@ func DirSize(path string) (int64, error) { if !info.IsDir() { size += info.Size() } - return err + return nil }) return size, err } diff --git a/wal/wal.go b/wal/wal.go index ff0d4c35..878aae6b 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -504,7 +504,6 @@ func (w *WAL) flushPage(clear bool) error { // First Byte of header format: // [ 4 bits unallocated] [1 bit snappy compression flag] [ 3 bit record type ] - const ( snappyMask = 1 << 3 recTypeMask = snappyMask - 1 diff --git a/wal/wal_test.go b/wal/wal_test.go index 9811a3f0..d2a6ccc2 100644 --- a/wal/wal_test.go +++ b/wal/wal_test.go @@ -24,8 +24,6 @@ import ( "testing" client_testutil "github.com/prometheus/client_golang/prometheus/testutil" - prom_testutil "github.com/prometheus/client_golang/prometheus/testutil" - "github.com/prometheus/tsdb/testutil" ) @@ -299,11 +297,8 @@ func TestCorruptAndCarryOn(t *testing.T) { w, err := NewSize(logger, nil, dir, segmentSize, false) testutil.Ok(t, err) - // Also test the wal corruption metric is working as expected. - testutil.Equals(t, 0.0, prom_testutil.ToFloat64(w.walCorruptionsTotal)) err = w.Repair(corruptionErr) testutil.Ok(t, err) - testutil.Equals(t, 1.0, prom_testutil.ToFloat64(w.walCorruptionsTotal)) // Ensure that we have a completely clean slate after reapiring. testutil.Equals(t, w.segment.Index(), 1) // We corrupted segment 0. From 106d0e9bd13248593c60b5256b7d4084af725d7b Mon Sep 17 00:00:00 2001 From: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> Date: Tue, 25 Jun 2019 16:23:17 +0300 Subject: [PATCH 19/28] nit Signed-off-by: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> --- testutil/directory.go | 52 ++++++++++++++++++++++--------------------- 1 file changed, 27 insertions(+), 25 deletions(-) diff --git a/testutil/directory.go b/testutil/directory.go index f0c66700..171ca988 100644 --- a/testutil/directory.go +++ b/testutil/directory.go @@ -154,33 +154,35 @@ func DirHash(path string) ([]byte, error) { if err != nil { return err } - if !info.IsDir() { - f, err := os.Open(path) - if err != nil { - return err - } - defer f.Close() - - if _, err := io.Copy(hash, f); err != nil { - return err - } - - if _, err := io.WriteString(hash, strconv.Itoa(int(info.Size()))); err != nil { - return err - } - if _, err := io.WriteString(hash, info.Name()); err != nil { - return err - } - modTime, err := info.ModTime().GobEncode() - if err != nil { - return err - } - if _, err := io.WriteString(hash, string(modTime)); err != nil { - return err - } + if info.IsDir() { + return nil + } + f, err := os.Open(path) + if err != nil { + return err + } + defer f.Close() + if _, err := io.Copy(hash, f); err != nil { + return err } - return err + + if _, err := io.WriteString(hash, strconv.Itoa(int(info.Size()))); err != nil { + return err + } + if _, err := io.WriteString(hash, info.Name()); err != nil { + return err + } + modTime, err := info.ModTime().GobEncode() + if err != nil { + return err + } + if _, err := io.WriteString(hash, string(modTime)); err != nil { + return err + } + + return nil }) + return hash.Sum(nil), err } From 2b4ddbc01ef24481386cfc997afe335b93ead8c9 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> Date: Tue, 2 Jul 2019 14:59:38 +0300 Subject: [PATCH 20/28] nits Signed-off-by: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> --- cmd/tsdb/main.go | 12 ++++-------- db.go | 7 +++++-- testutil/directory.go | 4 ++-- 3 files changed, 11 insertions(+), 12 deletions(-) diff --git a/cmd/tsdb/main.go b/cmd/tsdb/main.go index c4d9efed..4570dca3 100644 --- a/cmd/tsdb/main.go +++ b/cmd/tsdb/main.go @@ -75,8 +75,7 @@ func main() { exitWithError(err) } defer func() { - err := db.Close() - if err != nil { + if err := db.Close(); err != nil { exitWithError(err) } }() @@ -91,8 +90,7 @@ func main() { exitWithError(err) } defer func() { - err := db.Close() - if err != nil { + if err := db.Close(); err != nil { exitWithError(err) } }() @@ -121,8 +119,7 @@ func main() { exitWithError(err) } defer func() { - err := db.Close() - if err != nil { + if err := db.Close(); err != nil { exitWithError(err) } }() @@ -598,8 +595,7 @@ func dumpSamples(db *tsdb.DBReadOnly, mint, maxt int64) { exitWithError(err) } defer func() { - err := q.Close() - if err != nil { + if err := q.Close(); err != nil { exitWithError(err) } }() diff --git a/db.go b/db.go index 8e90d67c..7ad6f615 100644 --- a/db.go +++ b/db.go @@ -293,7 +293,10 @@ func (db *DBReadOnly) Querier(mint, maxt int64) (Querier, error) { blocks[i] = b } - var head *Head + head, err := NewHead(nil, db.logger, nil, 1) + if err != nil { + return nil, err + } maxBlockTime := int64(math.MinInt64) if len(blocks) > 0 { maxBlockTime = blocks[len(blocks)-1].Meta().MaxTime @@ -389,7 +392,7 @@ func (db *DBReadOnly) Blocks() ([]BlockReader, error) { return blockReaders, nil } -// Close all db blocks to release the locks. +// Close all block readers. func (db *DBReadOnly) Close() error { if db.closed { return errors.New("db already closed") diff --git a/testutil/directory.go b/testutil/directory.go index 171ca988..5c50d9e1 100644 --- a/testutil/directory.go +++ b/testutil/directory.go @@ -14,7 +14,7 @@ package testutil import ( - "crypto/md5" + "crypto/sha256" "io" "io/ioutil" "os" @@ -149,7 +149,7 @@ func DirSize(path string) (int64, error) { // DirHash returns a hash of all files attribites and their content within a directory. func DirHash(path string) ([]byte, error) { - hash := md5.New() + hash := sha256.New() err := filepath.Walk(path, func(path string, info os.FileInfo, err error) error { if err != nil { return err From fb75682deae935944f4f249db7a141d45f98574d Mon Sep 17 00:00:00 2001 From: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> Date: Tue, 9 Jul 2019 16:15:00 +0300 Subject: [PATCH 21/28] refactor error handling for DirHash and DirSize Signed-off-by: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> --- block_test.go | 8 +++---- db_test.go | 18 ++++++---------- testutil/directory.go | 50 +++++++++++++++++++------------------------ wal/wal_test.go | 6 ++---- 4 files changed, 33 insertions(+), 49 deletions(-) diff --git a/block_test.go b/block_test.go index c56d08c4..3f39a899 100644 --- a/block_test.go +++ b/block_test.go @@ -175,8 +175,7 @@ func TestBlockSize(t *testing.T) { testutil.Ok(t, blockInit.Close()) }() expSizeInit = blockInit.Size() - actSizeInit, err := testutil.DirSize(blockInit.Dir()) - testutil.Ok(t, err) + actSizeInit := testutil.DirSize(t, blockInit.Dir()) testutil.Equals(t, expSizeInit, actSizeInit) } @@ -185,7 +184,7 @@ func TestBlockSize(t *testing.T) { testutil.Ok(t, blockInit.Delete(1, 10, labels.NewMustRegexpMatcher("", ".*"))) expAfterDelete := blockInit.Size() testutil.Assert(t, expAfterDelete > expSizeInit, "after a delete the block size should be bigger as the tombstone file should grow %v > %v", expAfterDelete, expSizeInit) - actAfterDelete, err := testutil.DirSize(blockDirInit) + actAfterDelete := testutil.DirSize(t, blockDirInit) testutil.Ok(t, err) testutil.Equals(t, expAfterDelete, actAfterDelete, "after a delete reported block size doesn't match actual disk size") @@ -199,8 +198,7 @@ func TestBlockSize(t *testing.T) { testutil.Ok(t, blockAfterCompact.Close()) }() expAfterCompact := blockAfterCompact.Size() - actAfterCompact, err := testutil.DirSize(blockAfterCompact.Dir()) - testutil.Ok(t, err) + actAfterCompact := testutil.DirSize(t, blockAfterCompact.Dir()) testutil.Assert(t, actAfterDelete > actAfterCompact, "after a delete and compaction the block size should be smaller %v,%v", actAfterDelete, actAfterCompact) testutil.Equals(t, expAfterCompact, actAfterCompact, "after a delete and compaction reported block size doesn't match actual disk size") } diff --git a/db_test.go b/db_test.go index 14502cce..44fb97d2 100644 --- a/db_test.go +++ b/db_test.go @@ -1113,8 +1113,7 @@ func TestSizeRetention(t *testing.T) { testutil.Ok(t, db.reload()) // Reload the db to register the new db size. testutil.Equals(t, len(blocks), len(db.Blocks())) // Ensure all blocks are registered. expSize := int64(prom_testutil.ToFloat64(db.metrics.blocksBytes)) // Use the the actual internal metrics. - actSize, err := testutil.DirSize(db.Dir()) - testutil.Ok(t, err) + actSize := testutil.DirSize(t, db.Dir()) testutil.Equals(t, expSize, actSize, "registered size doesn't match actual disk size") // Decrease the max bytes limit so that a delete is triggered. @@ -1128,8 +1127,7 @@ func TestSizeRetention(t *testing.T) { actBlocks := db.Blocks() expSize = int64(prom_testutil.ToFloat64(db.metrics.blocksBytes)) actRetentCount := int(prom_testutil.ToFloat64(db.metrics.sizeRetentionCount)) - actSize, err = testutil.DirSize(db.Dir()) - testutil.Ok(t, err) + actSize = testutil.DirSize(t, db.Dir()) testutil.Equals(t, 1, actRetentCount, "metric retention count mismatch") testutil.Equals(t, actSize, expSize, "metric db size doesn't match actual disk size") @@ -2274,8 +2272,7 @@ func TestDBReadOnly(t *testing.T) { testutil.Ok(t, err) dbWritable.DisableCompactions() - dbSizeBeforeAppend, err := testutil.DirSize(dbWritable.Dir()) - testutil.Ok(t, err) + dbSizeBeforeAppend := testutil.DirSize(t, dbWritable.Dir()) app := dbWritable.Appender() _, err = app.Add(labels.FromStrings("foo", "bar"), dbWritable.Head().MaxTime()+1, 0) testutil.Ok(t, err) @@ -2283,8 +2280,7 @@ func TestDBReadOnly(t *testing.T) { expSeriesCount++ expBlocks = dbWritable.Blocks() - expDbSize, err := testutil.DirSize(dbWritable.Dir()) - testutil.Ok(t, err) + expDbSize := testutil.DirSize(t, dbWritable.Dir()) testutil.Assert(t, expDbSize > dbSizeBeforeAppend, "db size didn't increase after an append") q, err := dbWritable.Querier(math.MinInt64, math.MaxInt64) @@ -2292,8 +2288,7 @@ func TestDBReadOnly(t *testing.T) { expSeries = query(t, q, matchAll) testutil.Ok(t, dbWritable.Close()) // Close here to allow getting the dir hash for windows. - expDBHash, err = testutil.DirHash(dbWritable.Dir()) - testutil.Ok(t, err) + expDBHash = testutil.DirHash(t, dbWritable.Dir()) } // Open a read only db and ensure that the API returns the same result as the normal DB. @@ -2314,8 +2309,7 @@ func TestDBReadOnly(t *testing.T) { q, err := dbReadOnly.Querier(math.MinInt64, math.MaxInt64) testutil.Ok(t, err) readOnlySeries := query(t, q, matchAll) - readOnlyDBHash, err := testutil.DirHash(dbDir) - testutil.Ok(t, err) + readOnlyDBHash := testutil.DirHash(t, dbDir) testutil.Equals(t, expSeriesCount, len(readOnlySeries), "total series mismatch") testutil.Equals(t, expSeries, readOnlySeries, "series mismatch") diff --git a/testutil/directory.go b/testutil/directory.go index 5c50d9e1..5f1c3155 100644 --- a/testutil/directory.go +++ b/testutil/directory.go @@ -20,6 +20,7 @@ import ( "os" "path/filepath" "strconv" + "testing" ) const ( @@ -133,56 +134,49 @@ func NewTemporaryDirectory(name string, t T) (handler TemporaryDirectory) { } // DirSize returns the size in bytes of all files in a directory. -func DirSize(path string) (int64, error) { +func DirSize(t *testing.T, path string) int64 { var size int64 err := filepath.Walk(path, func(_ string, info os.FileInfo, err error) error { - if err != nil { - return err - } + Ok(t, err) if !info.IsDir() { size += info.Size() } return nil }) - return size, err + Ok(t, err) + return size } // DirHash returns a hash of all files attribites and their content within a directory. -func DirHash(path string) ([]byte, error) { +func DirHash(t *testing.T, path string) []byte { hash := sha256.New() err := filepath.Walk(path, func(path string, info os.FileInfo, err error) error { - if err != nil { - return err - } + Ok(t, err) + if info.IsDir() { return nil } f, err := os.Open(path) - if err != nil { - return err - } + Ok(t, err) defer f.Close() - if _, err := io.Copy(hash, f); err != nil { - return err - } + _, err = io.Copy(hash, f) + Ok(t, err) + + _, err = io.WriteString(hash, strconv.Itoa(int(info.Size()))) + Ok(t, err) + + _, err = io.WriteString(hash, info.Name()) + Ok(t, err) - if _, err := io.WriteString(hash, strconv.Itoa(int(info.Size()))); err != nil { - return err - } - if _, err := io.WriteString(hash, info.Name()); err != nil { - return err - } modTime, err := info.ModTime().GobEncode() - if err != nil { - return err - } - if _, err := io.WriteString(hash, string(modTime)); err != nil { - return err - } + Ok(t, err) + _, err = io.WriteString(hash, string(modTime)) + Ok(t, err) return nil }) + Ok(t, err) - return hash.Sum(nil), err + return hash.Sum(nil) } diff --git a/wal/wal_test.go b/wal/wal_test.go index d2a6ccc2..12fe1a2d 100644 --- a/wal/wal_test.go +++ b/wal/wal_test.go @@ -408,10 +408,8 @@ func TestCompression(t *testing.T) { testutil.Ok(t, os.RemoveAll(dirUnCompressed)) }() - uncompressedSize, err := testutil.DirSize(dirUnCompressed) - testutil.Ok(t, err) - compressedSize, err := testutil.DirSize(dirCompressed) - testutil.Ok(t, err) + uncompressedSize := testutil.DirSize(t, dirUnCompressed) + compressedSize := testutil.DirSize(t, dirCompressed) testutil.Assert(t, float64(uncompressedSize)*0.75 > float64(compressedSize), "Compressing zeroes should save at least 25%% space - uncompressedSize: %d, compressedSize: %d", uncompressedSize, compressedSize) } From 1cf409f42d1362df268ca1cd853d0b273e8fa70d Mon Sep 17 00:00:00 2001 From: Ganesh Vernekar Date: Tue, 16 Jul 2019 15:09:59 +0530 Subject: [PATCH 22/28] NumSeries in Meta() of Head Signed-off-by: Ganesh Vernekar --- head.go | 27 +++++++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/head.go b/head.go index 878c508b..3c2089de 100644 --- a/head.go +++ b/head.go @@ -65,6 +65,7 @@ type Head struct { logger log.Logger appendPool sync.Pool bytesPool sync.Pool + numSeries uint64 minTime, maxTime int64 // Current min and max of the samples included in the head. minValidTime int64 // Mint allowed to be added to the head. It shouldn't be lower than the maxt of the last persisted block. @@ -85,7 +86,7 @@ type Head struct { type headMetrics struct { activeAppenders prometheus.Gauge - series prometheus.Gauge + series prometheus.GaugeFunc seriesCreated prometheus.Counter seriesRemoved prometheus.Counter seriesNotFound prometheus.Counter @@ -113,9 +114,11 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { Name: "prometheus_tsdb_head_active_appenders", Help: "Number of currently active appender transactions", }) - m.series = prometheus.NewGauge(prometheus.GaugeOpts{ + m.series = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ Name: "prometheus_tsdb_head_series", Help: "Total number of series in the head block.", + }, func() float64 { + return float64(h.NumSeries()) }) m.seriesCreated = prometheus.NewCounter(prometheus.CounterOpts{ Name: "prometheus_tsdb_head_series_created_total", @@ -699,11 +702,18 @@ func (h *rangeHead) MaxTime() int64 { return h.maxt } +func (h *rangeHead) NumSeries() uint64 { + return h.head.NumSeries() +} + func (h *rangeHead) Meta() BlockMeta { return BlockMeta{ MinTime: h.MinTime(), MaxTime: h.MaxTime(), ULID: h.head.Meta().ULID, + Stats: BlockStats{ + NumSeries: h.NumSeries(), + }, } } @@ -1031,9 +1041,10 @@ func (h *Head) gc() { seriesRemoved := len(deleted) h.metrics.seriesRemoved.Add(float64(seriesRemoved)) - h.metrics.series.Sub(float64(seriesRemoved)) h.metrics.chunksRemoved.Add(float64(chunksRemoved)) h.metrics.chunks.Sub(float64(chunksRemoved)) + // Ref: https://golang.org/pkg/sync/atomic/#AddUint64 + atomic.AddUint64(&h.numSeries, ^uint64(seriesRemoved-1)) // Remove deleted series IDs from the postings lists. h.postings.Delete(deleted) @@ -1110,6 +1121,11 @@ func (h *Head) chunksRange(mint, maxt int64) *headChunkReader { return &headChunkReader{head: h, mint: mint, maxt: maxt} } +// NumSeries returns the number of active series in the head. +func (h *Head) NumSeries() uint64 { + return atomic.LoadUint64(&h.numSeries) +} + // Meta returns meta information about the head. func (h *Head) Meta() BlockMeta { var id [16]byte @@ -1118,6 +1134,9 @@ func (h *Head) Meta() BlockMeta { MinTime: h.MinTime(), MaxTime: h.MaxTime(), ULID: ulid.ULID(id), + Stats: BlockStats{ + NumSeries: h.NumSeries(), + }, } } @@ -1367,8 +1386,8 @@ func (h *Head) getOrCreateWithID(id, hash uint64, lset labels.Labels) (*memSerie return s, false } - h.metrics.series.Inc() h.metrics.seriesCreated.Inc() + atomic.AddUint64(&h.numSeries, 1) h.postings.Add(id, lset) From af10877062ad142790e73877413ed6b8bd080f29 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> Date: Wed, 17 Jul 2019 18:26:49 +0300 Subject: [PATCH 23/28] nits Signed-off-by: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> --- cmd/tsdb/main.go | 153 ++++++++++++++++++++++++++--------------------- db.go | 10 ++-- 2 files changed, 89 insertions(+), 74 deletions(-) diff --git a/cmd/tsdb/main.go b/cmd/tsdb/main.go index 4570dca3..ca02343f 100644 --- a/cmd/tsdb/main.go +++ b/cmd/tsdb/main.go @@ -39,6 +39,13 @@ import ( ) func main() { + if err := execute(); err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } +} + +func execute() (err error) { var ( defaultDBPath = filepath.Join("benchout", "storage") @@ -61,42 +68,41 @@ func main() { dumpMaxTime = dumpCmd.Flag("max-time", "maximum timestamp to dump").Default(strconv.FormatInt(math.MaxInt64, 10)).Int64() ) + logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) + switch kingpin.MustParse(cli.Parse(os.Args[1:])) { case benchWriteCmd.FullCommand(): wb := &writeBenchmark{ outPath: *benchWriteOutPath, numMetrics: *benchWriteNumMetrics, samplesFile: *benchSamplesFile, + logger: logger, } - wb.run() + return wb.run() case listCmd.FullCommand(): db, err := tsdb.OpenDBReadOnly(*listPath, nil) if err != nil { - exitWithError(err) + return err } defer func() { - if err := db.Close(); err != nil { - exitWithError(err) - } + err = db.Close() }() blocks, err := db.Blocks() if err != nil { - exitWithError(err) + return err } printBlocks(blocks, listCmdHumanReadable) case analyzeCmd.FullCommand(): db, err := tsdb.OpenDBReadOnly(*analyzePath, nil) if err != nil { - exitWithError(err) + return err } defer func() { - if err := db.Close(); err != nil { - exitWithError(err) - } + err = db.Close() }() blocks, err := db.Blocks() if err != nil { - exitWithError(err) + return err } var block tsdb.BlockReader if *analyzeBlockID != "" { @@ -110,21 +116,20 @@ func main() { block = blocks[len(blocks)-1] } if block == nil { - exitWithError(fmt.Errorf("block not found")) + return fmt.Errorf("block not found") } - analyzeBlock(block, *analyzeLimit) + return analyzeBlock(block, *analyzeLimit) case dumpCmd.FullCommand(): db, err := tsdb.OpenDBReadOnly(*dumpPath, nil) if err != nil { - exitWithError(err) + return err } defer func() { - if err := db.Close(); err != nil { - exitWithError(err) - } + err = db.Close() }() - dumpSamples(db, *dumpMinTime, *dumpMaxTime) + return dumpSamples(db, *dumpMinTime, *dumpMaxTime) } + return nil } type writeBenchmark struct { @@ -139,74 +144,87 @@ type writeBenchmark struct { memprof *os.File blockprof *os.File mtxprof *os.File + logger log.Logger } -func (b *writeBenchmark) run() { +func (b *writeBenchmark) run() error { if b.outPath == "" { dir, err := ioutil.TempDir("", "tsdb_bench") if err != nil { - exitWithError(err) + return err } b.outPath = dir b.cleanup = true } if err := os.RemoveAll(b.outPath); err != nil { - exitWithError(err) + return err } if err := os.MkdirAll(b.outPath, 0777); err != nil { - exitWithError(err) + return err } dir := filepath.Join(b.outPath, "storage") - l := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) - l = log.With(l, "ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller) + l := log.With(b.logger, "ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller) st, err := tsdb.Open(dir, l, nil, &tsdb.Options{ RetentionDuration: 15 * 24 * 60 * 60 * 1000, // 15 days in milliseconds BlockRanges: tsdb.ExponentialBlockRanges(2*60*60*1000, 5, 3), }) if err != nil { - exitWithError(err) + return err } b.storage = st var labels []labels.Labels - measureTime("readData", func() { + _, err = measureTime("readData", func() error { f, err := os.Open(b.samplesFile) if err != nil { - exitWithError(err) + return err } defer f.Close() labels, err = readPrometheusLabels(f, b.numMetrics) if err != nil { - exitWithError(err) + return err } + return nil }) + if err != nil { + return err + } var total uint64 - dur := measureTime("ingestScrapes", func() { + dur, err := measureTime("ingestScrapes", func() error { b.startProfiling() total, err = b.ingestScrapes(labels, 3000) if err != nil { - exitWithError(err) + return err } + return nil }) + if err != nil { + return err + } fmt.Println(" > total samples:", total) fmt.Println(" > samples/sec:", float64(total)/dur.Seconds()) - measureTime("stopStorage", func() { + _, err = measureTime("stopStorage", func() error { if err := b.storage.Close(); err != nil { - exitWithError(err) + return err } if err := b.stopProfiling(); err != nil { - exitWithError(err) + return err } + return nil }) + if err != nil { + return err + } + return nil } const timeDelta = 30000 @@ -300,37 +318,38 @@ func (b *writeBenchmark) ingestScrapesShard(lbls []labels.Labels, scrapeCount in return total, nil } -func (b *writeBenchmark) startProfiling() { +func (b *writeBenchmark) startProfiling() error { var err error // Start CPU profiling. b.cpuprof, err = os.Create(filepath.Join(b.outPath, "cpu.prof")) if err != nil { - exitWithError(fmt.Errorf("bench: could not create cpu profile: %v", err)) + return fmt.Errorf("bench: could not create cpu profile: %v", err) } if err := pprof.StartCPUProfile(b.cpuprof); err != nil { - exitWithError(fmt.Errorf("bench: could not start CPU profile: %v", err)) + return fmt.Errorf("bench: could not start CPU profile: %v", err) } // Start memory profiling. b.memprof, err = os.Create(filepath.Join(b.outPath, "mem.prof")) if err != nil { - exitWithError(fmt.Errorf("bench: could not create memory profile: %v", err)) + return fmt.Errorf("bench: could not create memory profile: %v", err) } runtime.MemProfileRate = 64 * 1024 // Start fatal profiling. b.blockprof, err = os.Create(filepath.Join(b.outPath, "block.prof")) if err != nil { - exitWithError(fmt.Errorf("bench: could not create block profile: %v", err)) + return fmt.Errorf("bench: could not create block profile: %v", err) } runtime.SetBlockProfileRate(20) b.mtxprof, err = os.Create(filepath.Join(b.outPath, "mutex.prof")) if err != nil { - exitWithError(fmt.Errorf("bench: could not create mutex profile: %v", err)) + return fmt.Errorf("bench: could not create mutex profile: %v", err) } runtime.SetMutexProfileFraction(20) + return nil } func (b *writeBenchmark) stopProfiling() error { @@ -365,12 +384,15 @@ func (b *writeBenchmark) stopProfiling() error { return nil } -func measureTime(stage string, f func()) time.Duration { +func measureTime(stage string, f func() error) (time.Duration, error) { fmt.Printf(">> start stage=%s\n", stage) start := time.Now() - f() + err := f() + if err != nil { + return 0, err + } fmt.Printf(">> completed stage=%s duration=%s\n", stage, time.Since(start)) - return time.Since(start) + return time.Since(start), nil } func readPrometheusLabels(r io.Reader, n int) ([]labels.Labels, error) { @@ -404,11 +426,6 @@ func readPrometheusLabels(r io.Reader, n int) ([]labels.Labels, error) { return mets, nil } -func exitWithError(err error) { - fmt.Fprintln(os.Stderr, err) - os.Exit(1) -} - func printBlocks(blocks []tsdb.BlockReader, humanReadable *bool) { tw := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0) defer tw.Flush() @@ -436,7 +453,7 @@ func getFormatedTime(timestamp int64, humanReadable *bool) string { return strconv.FormatInt(timestamp, 10) } -func analyzeBlock(b tsdb.BlockReader, limit int) { +func analyzeBlock(b tsdb.BlockReader, limit int) error { meta := b.Meta() fmt.Printf("Block ID: %s\n", meta.ULID) // Presume 1ms resolution that Prometheus uses. @@ -444,13 +461,13 @@ func analyzeBlock(b tsdb.BlockReader, limit int) { fmt.Printf("Series: %d\n", meta.Stats.NumSeries) ir, err := b.Index() if err != nil { - exitWithError(err) + return err } defer ir.Close() allLabelNames, err := ir.LabelNames() if err != nil { - exitWithError(err) + return err } fmt.Printf("Label names: %d\n", len(allLabelNames)) @@ -477,13 +494,13 @@ func analyzeBlock(b tsdb.BlockReader, limit int) { entries := 0 p, err := ir.Postings("", "") // The special all key. if err != nil { - exitWithError(err) + return err } lbls := labels.Labels{} chks := []chunks.Meta{} for p.Next() { if err = ir.Series(p.At(), &lbls, &chks); err != nil { - exitWithError(err) + return err } // Amount of the block time range not covered by this series. uncovered := uint64(meta.MaxTime-meta.MinTime) - uint64(chks[len(chks)-1].MaxTime-chks[0].MinTime) @@ -496,7 +513,7 @@ func analyzeBlock(b tsdb.BlockReader, limit int) { } } if p.Err() != nil { - exitWithError(p.Err()) + return p.Err() } fmt.Printf("Postings (unique label pairs): %d\n", len(labelpairsUncovered)) fmt.Printf("Postings entries (total label pairs): %d\n", entries) @@ -529,14 +546,14 @@ func analyzeBlock(b tsdb.BlockReader, limit int) { for _, n := range allLabelNames { values, err := ir.LabelValues(n) if err != nil { - exitWithError(err) + return err } var cumulativeLength uint64 for i := 0; i < values.Len(); i++ { value, _ := values.At(i) if err != nil { - exitWithError(err) + return err } for _, str := range value { cumulativeLength += uint64(len(str)) @@ -553,7 +570,7 @@ func analyzeBlock(b tsdb.BlockReader, limit int) { for _, n := range allLabelNames { lv, err := ir.LabelValues(n) if err != nil { - exitWithError(err) + return err } postingInfos = append(postingInfos, postingInfo{n, uint64(lv.Len())}) } @@ -563,46 +580,45 @@ func analyzeBlock(b tsdb.BlockReader, limit int) { postingInfos = postingInfos[:0] lv, err := ir.LabelValues("__name__") if err != nil { - exitWithError(err) + return err } for i := 0; i < lv.Len(); i++ { names, err := lv.At(i) if err != nil { - exitWithError(err) + return err } for _, n := range names { postings, err := ir.Postings("__name__", n) if err != nil { - exitWithError(err) + return err } count := 0 for postings.Next() { count++ } if postings.Err() != nil { - exitWithError(postings.Err()) + return postings.Err() } postingInfos = append(postingInfos, postingInfo{n, uint64(count)}) } } fmt.Printf("\nHighest cardinality metric names:\n") printInfo(postingInfos) + return nil } -func dumpSamples(db *tsdb.DBReadOnly, mint, maxt int64) { +func dumpSamples(db *tsdb.DBReadOnly, mint, maxt int64) (err error) { q, err := db.Querier(mint, maxt) if err != nil { - exitWithError(err) + return err } defer func() { - if err := q.Close(); err != nil { - exitWithError(err) - } + err = q.Close() }() ss, err := q.Select(labels.NewMustRegexpMatcher("", ".*")) if err != nil { - exitWithError(err) + return err } for ss.Next() { @@ -614,11 +630,12 @@ func dumpSamples(db *tsdb.DBReadOnly, mint, maxt int64) { fmt.Printf("%s %g %d\n", labels, val, ts) } if it.Err() != nil { - exitWithError(ss.Err()) + return ss.Err() } } if ss.Err() != nil { - exitWithError(ss.Err()) + return ss.Err() } + return nil } diff --git a/db.go b/db.go index e9aa5cdd..bf25d827 100644 --- a/db.go +++ b/db.go @@ -262,19 +262,17 @@ type DBReadOnly struct { // OpenDBReadOnly opens DB in the given directory for read only operations. func OpenDBReadOnly(dir string, l log.Logger) (*DBReadOnly, error) { if _, err := os.Stat(dir); err != nil { - return nil, err + return nil, errors.Wrap(err, "openning the db dir") } if l == nil { l = log.NewNopLogger() } - db := &DBReadOnly{ + return &DBReadOnly{ logger: l, dir: dir, - } - - return db, nil + }, nil } // Querier loads the wal and returns a new querier over the data partition for the given time range. @@ -355,7 +353,7 @@ func (db *DBReadOnly) Blocks() ([]BlockReader, error) { level.Warn(db.logger).Log("msg", "closing a block", err) } } - return nil, fmt.Errorf("unexpected corrupted block:%v", corrupted) + return nil, errors.Errorf("unexpected corrupted block:%v", corrupted) } if len(loadable) == 0 { From df4a37499ca6eacd3bbb25f95d9a2bd1a7d81e3c Mon Sep 17 00:00:00 2001 From: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> Date: Thu, 18 Jul 2019 11:21:29 +0300 Subject: [PATCH 24/28] use channel for the db closing and remove mutex Signed-off-by: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> --- db.go | 28 +++++++++++++++++----------- head.go | 3 ++- 2 files changed, 19 insertions(+), 12 deletions(-) diff --git a/db.go b/db.go index bf25d827..788f7ea5 100644 --- a/db.go +++ b/db.go @@ -255,8 +255,7 @@ type DBReadOnly struct { logger log.Logger dir string closers []io.Closer - mtx sync.Mutex - closed bool + closed chan struct{} } // OpenDBReadOnly opens DB in the given directory for read only operations. @@ -272,12 +271,17 @@ func OpenDBReadOnly(dir string, l log.Logger) (*DBReadOnly, error) { return &DBReadOnly{ logger: l, dir: dir, + closed: make(chan struct{}), }, nil } // Querier loads the wal and returns a new querier over the data partition for the given time range. -// A goroutine must not handle more than one open Querier. func (db *DBReadOnly) Querier(mint, maxt int64) (Querier, error) { + select { + case <-db.closed: + return nil, errors.New("db already closed") + default: + } blocksReaders, err := db.Blocks() if err != nil { return nil, err @@ -319,9 +323,7 @@ func (db *DBReadOnly) Querier(mint, maxt int64) (Querier, error) { // This is mainly to avoid blocking when closing the head. head.wal = nil - db.mtx.Lock() db.closers = append(db.closers, head) - db.mtx.Unlock() } dbWritable := &DB{ @@ -336,6 +338,11 @@ func (db *DBReadOnly) Querier(mint, maxt int64) (Querier, error) { // Blocks returns a slice of block readers for persisted blocks. func (db *DBReadOnly) Blocks() ([]BlockReader, error) { + select { + case <-db.closed: + return nil, errors.New("db already closed") + default: + } loadable, corrupted, err := openBlocks(db.logger, db.dir, nil, nil) if err != nil { return nil, err @@ -372,8 +379,6 @@ func (db *DBReadOnly) Blocks() ([]BlockReader, error) { level.Warn(db.logger).Log("msg", "overlapping blocks found during opening", "detail", overlaps.String()) } - db.mtx.Lock() - defer db.mtx.Unlock() // Close all previously open readers and add the new ones to the cache. for _, closer := range db.closers { closer.Close() @@ -392,17 +397,18 @@ func (db *DBReadOnly) Blocks() ([]BlockReader, error) { // Close all block readers. func (db *DBReadOnly) Close() error { - if db.closed { + select { + case <-db.closed: return errors.New("db already closed") + default: } + close(db.closed) + var merr tsdb_errors.MultiError - db.mtx.Lock() - defer db.mtx.Unlock() for _, b := range db.closers { merr.Add(b.Close()) } - db.closed = true return merr.Err() } diff --git a/head.go b/head.go index 3c2089de..413e8d42 100644 --- a/head.go +++ b/head.go @@ -1043,7 +1043,8 @@ func (h *Head) gc() { h.metrics.seriesRemoved.Add(float64(seriesRemoved)) h.metrics.chunksRemoved.Add(float64(chunksRemoved)) h.metrics.chunks.Sub(float64(chunksRemoved)) - // Ref: https://golang.org/pkg/sync/atomic/#AddUint64 + // Using AddUint64 to substract series removed. + // See: https://golang.org/pkg/sync/atomic/#AddUint64. atomic.AddUint64(&h.numSeries, ^uint64(seriesRemoved-1)) // Remove deleted series IDs from the postings lists. From b3f777488f9d6a98c242a06522dcd396dd55f0d5 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> Date: Thu, 18 Jul 2019 11:41:27 +0300 Subject: [PATCH 25/28] add ErrClosed and add a test for it. Signed-off-by: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> --- db.go | 9 ++++++--- db_test.go | 19 +++++++++++++++++++ 2 files changed, 25 insertions(+), 3 deletions(-) diff --git a/db.go b/db.go index 788f7ea5..3869abb3 100644 --- a/db.go +++ b/db.go @@ -250,6 +250,9 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { return m } +// ErrClosed is returned when the db is closed. +var ErrClosed = errors.New("db already closed") + // DBReadOnly provides APIs for read only operations on a database. type DBReadOnly struct { logger log.Logger @@ -279,7 +282,7 @@ func OpenDBReadOnly(dir string, l log.Logger) (*DBReadOnly, error) { func (db *DBReadOnly) Querier(mint, maxt int64) (Querier, error) { select { case <-db.closed: - return nil, errors.New("db already closed") + return nil, ErrClosed default: } blocksReaders, err := db.Blocks() @@ -340,7 +343,7 @@ func (db *DBReadOnly) Querier(mint, maxt int64) (Querier, error) { func (db *DBReadOnly) Blocks() ([]BlockReader, error) { select { case <-db.closed: - return nil, errors.New("db already closed") + return nil, ErrClosed default: } loadable, corrupted, err := openBlocks(db.logger, db.dir, nil, nil) @@ -399,7 +402,7 @@ func (db *DBReadOnly) Blocks() ([]BlockReader, error) { func (db *DBReadOnly) Close() error { select { case <-db.closed: - return errors.New("db already closed") + return ErrClosed default: } close(db.closed) diff --git a/db_test.go b/db_test.go index 44fb97d2..b06e05c7 100644 --- a/db_test.go +++ b/db_test.go @@ -2316,3 +2316,22 @@ func TestDBReadOnly(t *testing.T) { testutil.Equals(t, expDBHash, readOnlyDBHash, "after all read operations the db hash should remain the same") } } + +// TestDBReadOnlyClosing ensures that after closing the db +// all api methods return an ErrClosed. +func TestDBReadOnlyClosing(t *testing.T) { + dbDir, err := ioutil.TempDir("", "test") + testutil.Ok(t, err) + + defer func() { + testutil.Ok(t, os.RemoveAll(dbDir)) + }() + db, err := OpenDBReadOnly(dbDir, log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))) + testutil.Ok(t, err) + testutil.Ok(t, db.Close()) + testutil.Equals(t, db.Close(), ErrClosed) + _, err = db.Blocks() + testutil.Equals(t, err, ErrClosed) + _, err = db.Querier(0, 1) + testutil.Equals(t, err, ErrClosed) +} From 1a97569153f701cc8e8ff79b0343c496f866b78d Mon Sep 17 00:00:00 2001 From: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> Date: Fri, 19 Jul 2019 13:22:12 +0300 Subject: [PATCH 26/28] handle mutli errors. Signed-off-by: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> --- cmd/tsdb/main.go | 20 ++++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/cmd/tsdb/main.go b/cmd/tsdb/main.go index ca02343f..e3dc530a 100644 --- a/cmd/tsdb/main.go +++ b/cmd/tsdb/main.go @@ -34,6 +34,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/tsdb" "github.com/prometheus/tsdb/chunks" + tsdb_errors "github.com/prometheus/tsdb/errors" "github.com/prometheus/tsdb/labels" "gopkg.in/alecthomas/kingpin.v2" ) @@ -69,6 +70,7 @@ func execute() (err error) { ) logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) + var merr tsdb_errors.MultiError switch kingpin.MustParse(cli.Parse(os.Args[1:])) { case benchWriteCmd.FullCommand(): @@ -85,7 +87,9 @@ func execute() (err error) { return err } defer func() { - err = db.Close() + merr.Add(err) + merr.Add(db.Close()) + err = merr.Err() }() blocks, err := db.Blocks() if err != nil { @@ -98,7 +102,9 @@ func execute() (err error) { return err } defer func() { - err = db.Close() + merr.Add(err) + merr.Add(db.Close()) + err = merr.Err() }() blocks, err := db.Blocks() if err != nil { @@ -125,7 +131,9 @@ func execute() (err error) { return err } defer func() { - err = db.Close() + merr.Add(err) + merr.Add(db.Close()) + err = merr.Err() }() return dumpSamples(db, *dumpMinTime, *dumpMaxTime) } @@ -608,12 +616,16 @@ func analyzeBlock(b tsdb.BlockReader, limit int) error { } func dumpSamples(db *tsdb.DBReadOnly, mint, maxt int64) (err error) { + q, err := db.Querier(mint, maxt) if err != nil { return err } defer func() { - err = q.Close() + var merr tsdb_errors.MultiError + merr.Add(err) + merr.Add(q.Close()) + err = merr.Err() }() ss, err := q.Select(labels.NewMustRegexpMatcher("", ".*")) From 03b33b12324ab77c7ca5d0c7e5e411f9483c6283 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> Date: Fri, 19 Jul 2019 14:50:07 +0300 Subject: [PATCH 27/28] nits Signed-off-by: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> --- CHANGELOG.md | 6 +++--- db.go | 23 ++++++++++++++++------- 2 files changed, 19 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 88e6c0b6..10f0044d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,9 +1,9 @@ ## master / unreleased - [FEATURE] Added `DBReadOnly` to allow opening a database in read only mode. - - `DBReadOnly.Blocks()` exposes a slice of `BlockReader`s and - for this the interface is refactored to return the full block meta instead of - just MinTime/MaxTime. Required to allow reading the ULID of a block. + - `DBReadOnly.Blocks()` exposes a slice of `BlockReader`s. + - `BlockReader` interface - removed MinTime/MaxTime methods and now exposes the full block meta via `Meta()`. + ## 0.9.1 diff --git a/db.go b/db.go index 3869abb3..aa9ec178 100644 --- a/db.go +++ b/db.go @@ -254,6 +254,8 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { var ErrClosed = errors.New("db already closed") // DBReadOnly provides APIs for read only operations on a database. +// Current implementation doesn't support concurency so +// all API calls should happen in the same go routine. type DBReadOnly struct { logger log.Logger dir string @@ -279,6 +281,7 @@ func OpenDBReadOnly(dir string, l log.Logger) (*DBReadOnly, error) { } // Querier loads the wal and returns a new querier over the data partition for the given time range. +// Current implementation doesn't support multiple Queriers. func (db *DBReadOnly) Querier(mint, maxt int64) (Querier, error) { select { case <-db.closed: @@ -329,6 +332,10 @@ func (db *DBReadOnly) Querier(mint, maxt int64) (Querier, error) { db.closers = append(db.closers, head) } + // TODO: Refactor so that it is possible to obtain a Querier without initializing a writable DB instance. + // Option 1: refactor DB to have the Querier implementation using the DBReadOnly.Querier implementation not the opposite. + // Option 2: refactor Querier to use another independent func which + // can than be used by a read only and writable db instances without any code duplication. dbWritable := &DB{ dir: db.dir, logger: db.logger, @@ -679,13 +686,15 @@ func (db *DB) compact() (err error) { return nil } -func getBlock(loaded []*Block, id ulid.ULID) *Block { - for _, b := range loaded { +// getBlock iterates a given block range to find a block by a given id. +// If found it returns the block itself and a boolean to indicate that it was found. +func getBlock(allBlocks []*Block, id ulid.ULID) (*Block, bool) { + for _, b := range allBlocks { if b.Meta().ULID == id { - return b + return b, true } } - return nil + return nil, false } // reload blocks and trigger head truncation if new blocks appeared. @@ -718,7 +727,7 @@ func (db *DB) reload() (err error) { if len(corrupted) > 0 { // Close all new blocks to release the lock for windows. for _, block := range loadable { - if b := getBlock(db.blocks, block.Meta().ULID); b == nil { + if _, open := getBlock(db.blocks, block.Meta().ULID); !open { block.Close() } } @@ -801,8 +810,8 @@ func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Po } // See if we already have the block in memory or open it otherwise. - block := getBlock(loaded, meta.ULID) - if block == nil { + block, open := getBlock(loaded, meta.ULID) + if !open { block, err = OpenBlock(l, bDir, chunkPool) if err != nil { corrupted[meta.ULID] = err From beeaefa6b7be7cc5e55c546fa4305c091a5cc0a8 Mon Sep 17 00:00:00 2001 From: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> Date: Mon, 22 Jul 2019 12:09:18 +0300 Subject: [PATCH 28/28] head meta comment Signed-off-by: Krasi Georgiev <8903888+krasi-georgiev@users.noreply.github.com> --- head.go | 1 + 1 file changed, 1 insertion(+) diff --git a/head.go b/head.go index 413e8d42..5e5475cb 100644 --- a/head.go +++ b/head.go @@ -1128,6 +1128,7 @@ func (h *Head) NumSeries() uint64 { } // Meta returns meta information about the head. +// The head is dynamic so will return dynamic results. func (h *Head) Meta() BlockMeta { var id [16]byte copy(id[:], "______head______")