diff --git a/CHANGELOG.md b/CHANGELOG.md index 9d057a3b..10f0044d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,9 @@ -## Master / unreleased +## master / unreleased + + - [FEATURE] Added `DBReadOnly` to allow opening a database in read only mode. + - `DBReadOnly.Blocks()` exposes a slice of `BlockReader`s. + - `BlockReader` interface - removed MinTime/MaxTime methods and now exposes the full block meta via `Meta()`. + ## 0.9.1 diff --git a/block.go b/block.go index 6a8237f1..d0fe2b2f 100644 --- a/block.go +++ b/block.go @@ -138,11 +138,8 @@ type BlockReader interface { // Tombstones returns a TombstoneReader over the block's deleted data. Tombstones() (TombstoneReader, error) - // MinTime returns the min time of the block. - MinTime() int64 - - // MaxTime returns the max time of the block. - MaxTime() int64 + // Meta provides meta information about the block reader. + Meta() BlockMeta } // Appendable defines an entity to which data can be appended. diff --git a/block_test.go b/block_test.go index c56d08c4..3f39a899 100644 --- a/block_test.go +++ b/block_test.go @@ -175,8 +175,7 @@ func TestBlockSize(t *testing.T) { testutil.Ok(t, blockInit.Close()) }() expSizeInit = blockInit.Size() - actSizeInit, err := testutil.DirSize(blockInit.Dir()) - testutil.Ok(t, err) + actSizeInit := testutil.DirSize(t, blockInit.Dir()) testutil.Equals(t, expSizeInit, actSizeInit) } @@ -185,7 +184,7 @@ func TestBlockSize(t *testing.T) { testutil.Ok(t, blockInit.Delete(1, 10, labels.NewMustRegexpMatcher("", ".*"))) expAfterDelete := blockInit.Size() testutil.Assert(t, expAfterDelete > expSizeInit, "after a delete the block size should be bigger as the tombstone file should grow %v > %v", expAfterDelete, expSizeInit) - actAfterDelete, err := testutil.DirSize(blockDirInit) + actAfterDelete := testutil.DirSize(t, blockDirInit) testutil.Ok(t, err) testutil.Equals(t, expAfterDelete, actAfterDelete, "after a delete reported block size doesn't match actual disk size") @@ -199,8 +198,7 @@ func TestBlockSize(t *testing.T) { testutil.Ok(t, blockAfterCompact.Close()) }() expAfterCompact := blockAfterCompact.Size() - actAfterCompact, err := testutil.DirSize(blockAfterCompact.Dir()) - testutil.Ok(t, err) + actAfterCompact := testutil.DirSize(t, blockAfterCompact.Dir()) testutil.Assert(t, actAfterDelete > actAfterCompact, "after a delete and compaction the block size should be smaller %v,%v", actAfterDelete, actAfterCompact) testutil.Equals(t, expAfterCompact, actAfterCompact, "after a delete and compaction reported block size doesn't match actual disk size") } diff --git a/cmd/tsdb/main.go b/cmd/tsdb/main.go index 74dfed4c..e3dc530a 100644 --- a/cmd/tsdb/main.go +++ b/cmd/tsdb/main.go @@ -34,11 +34,19 @@ import ( "github.com/pkg/errors" "github.com/prometheus/tsdb" "github.com/prometheus/tsdb/chunks" + tsdb_errors "github.com/prometheus/tsdb/errors" "github.com/prometheus/tsdb/labels" "gopkg.in/alecthomas/kingpin.v2" ) func main() { + if err := execute(); err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } +} + +func execute() (err error) { var ( defaultDBPath = filepath.Join("benchout", "storage") @@ -61,8 +69,8 @@ func main() { dumpMaxTime = dumpCmd.Flag("max-time", "maximum timestamp to dump").Default(strconv.FormatInt(math.MaxInt64, 10)).Int64() ) - safeDBOptions := *tsdb.DefaultOptions - safeDBOptions.RetentionDuration = 0 + logger := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) + var merr tsdb_errors.MultiError switch kingpin.MustParse(cli.Parse(os.Args[1:])) { case benchWriteCmd.FullCommand(): @@ -70,21 +78,39 @@ func main() { outPath: *benchWriteOutPath, numMetrics: *benchWriteNumMetrics, samplesFile: *benchSamplesFile, + logger: logger, } - wb.run() + return wb.run() case listCmd.FullCommand(): - db, err := tsdb.Open(*listPath, nil, nil, &safeDBOptions) + db, err := tsdb.OpenDBReadOnly(*listPath, nil) if err != nil { - exitWithError(err) + return err } - printBlocks(db.Blocks(), listCmdHumanReadable) + defer func() { + merr.Add(err) + merr.Add(db.Close()) + err = merr.Err() + }() + blocks, err := db.Blocks() + if err != nil { + return err + } + printBlocks(blocks, listCmdHumanReadable) case analyzeCmd.FullCommand(): - db, err := tsdb.Open(*analyzePath, nil, nil, &safeDBOptions) + db, err := tsdb.OpenDBReadOnly(*analyzePath, nil) + if err != nil { + return err + } + defer func() { + merr.Add(err) + merr.Add(db.Close()) + err = merr.Err() + }() + blocks, err := db.Blocks() if err != nil { - exitWithError(err) + return err } - blocks := db.Blocks() - var block *tsdb.Block + var block tsdb.BlockReader if *analyzeBlockID != "" { for _, b := range blocks { if b.Meta().ULID.String() == *analyzeBlockID { @@ -96,16 +122,22 @@ func main() { block = blocks[len(blocks)-1] } if block == nil { - exitWithError(fmt.Errorf("block not found")) + return fmt.Errorf("block not found") } - analyzeBlock(block, *analyzeLimit) + return analyzeBlock(block, *analyzeLimit) case dumpCmd.FullCommand(): - db, err := tsdb.Open(*dumpPath, nil, nil, &safeDBOptions) + db, err := tsdb.OpenDBReadOnly(*dumpPath, nil) if err != nil { - exitWithError(err) + return err } - dumpSamples(db, *dumpMinTime, *dumpMaxTime) + defer func() { + merr.Add(err) + merr.Add(db.Close()) + err = merr.Err() + }() + return dumpSamples(db, *dumpMinTime, *dumpMaxTime) } + return nil } type writeBenchmark struct { @@ -120,74 +152,87 @@ type writeBenchmark struct { memprof *os.File blockprof *os.File mtxprof *os.File + logger log.Logger } -func (b *writeBenchmark) run() { +func (b *writeBenchmark) run() error { if b.outPath == "" { dir, err := ioutil.TempDir("", "tsdb_bench") if err != nil { - exitWithError(err) + return err } b.outPath = dir b.cleanup = true } if err := os.RemoveAll(b.outPath); err != nil { - exitWithError(err) + return err } if err := os.MkdirAll(b.outPath, 0777); err != nil { - exitWithError(err) + return err } dir := filepath.Join(b.outPath, "storage") - l := log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) - l = log.With(l, "ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller) + l := log.With(b.logger, "ts", log.DefaultTimestampUTC, "caller", log.DefaultCaller) st, err := tsdb.Open(dir, l, nil, &tsdb.Options{ RetentionDuration: 15 * 24 * 60 * 60 * 1000, // 15 days in milliseconds BlockRanges: tsdb.ExponentialBlockRanges(2*60*60*1000, 5, 3), }) if err != nil { - exitWithError(err) + return err } b.storage = st var labels []labels.Labels - measureTime("readData", func() { + _, err = measureTime("readData", func() error { f, err := os.Open(b.samplesFile) if err != nil { - exitWithError(err) + return err } defer f.Close() labels, err = readPrometheusLabels(f, b.numMetrics) if err != nil { - exitWithError(err) + return err } + return nil }) + if err != nil { + return err + } var total uint64 - dur := measureTime("ingestScrapes", func() { + dur, err := measureTime("ingestScrapes", func() error { b.startProfiling() total, err = b.ingestScrapes(labels, 3000) if err != nil { - exitWithError(err) + return err } + return nil }) + if err != nil { + return err + } fmt.Println(" > total samples:", total) fmt.Println(" > samples/sec:", float64(total)/dur.Seconds()) - measureTime("stopStorage", func() { + _, err = measureTime("stopStorage", func() error { if err := b.storage.Close(); err != nil { - exitWithError(err) + return err } if err := b.stopProfiling(); err != nil { - exitWithError(err) + return err } + return nil }) + if err != nil { + return err + } + return nil } const timeDelta = 30000 @@ -281,37 +326,38 @@ func (b *writeBenchmark) ingestScrapesShard(lbls []labels.Labels, scrapeCount in return total, nil } -func (b *writeBenchmark) startProfiling() { +func (b *writeBenchmark) startProfiling() error { var err error // Start CPU profiling. b.cpuprof, err = os.Create(filepath.Join(b.outPath, "cpu.prof")) if err != nil { - exitWithError(fmt.Errorf("bench: could not create cpu profile: %v", err)) + return fmt.Errorf("bench: could not create cpu profile: %v", err) } if err := pprof.StartCPUProfile(b.cpuprof); err != nil { - exitWithError(fmt.Errorf("bench: could not start CPU profile: %v", err)) + return fmt.Errorf("bench: could not start CPU profile: %v", err) } // Start memory profiling. b.memprof, err = os.Create(filepath.Join(b.outPath, "mem.prof")) if err != nil { - exitWithError(fmt.Errorf("bench: could not create memory profile: %v", err)) + return fmt.Errorf("bench: could not create memory profile: %v", err) } runtime.MemProfileRate = 64 * 1024 // Start fatal profiling. b.blockprof, err = os.Create(filepath.Join(b.outPath, "block.prof")) if err != nil { - exitWithError(fmt.Errorf("bench: could not create block profile: %v", err)) + return fmt.Errorf("bench: could not create block profile: %v", err) } runtime.SetBlockProfileRate(20) b.mtxprof, err = os.Create(filepath.Join(b.outPath, "mutex.prof")) if err != nil { - exitWithError(fmt.Errorf("bench: could not create mutex profile: %v", err)) + return fmt.Errorf("bench: could not create mutex profile: %v", err) } runtime.SetMutexProfileFraction(20) + return nil } func (b *writeBenchmark) stopProfiling() error { @@ -346,12 +392,15 @@ func (b *writeBenchmark) stopProfiling() error { return nil } -func measureTime(stage string, f func()) time.Duration { +func measureTime(stage string, f func() error) (time.Duration, error) { fmt.Printf(">> start stage=%s\n", stage) start := time.Now() - f() + err := f() + if err != nil { + return 0, err + } fmt.Printf(">> completed stage=%s duration=%s\n", stage, time.Since(start)) - return time.Since(start) + return time.Since(start), nil } func readPrometheusLabels(r io.Reader, n int) ([]labels.Labels, error) { @@ -385,12 +434,7 @@ func readPrometheusLabels(r io.Reader, n int) ([]labels.Labels, error) { return mets, nil } -func exitWithError(err error) { - fmt.Fprintln(os.Stderr, err) - os.Exit(1) -} - -func printBlocks(blocks []*tsdb.Block, humanReadable *bool) { +func printBlocks(blocks []tsdb.BlockReader, humanReadable *bool) { tw := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0) defer tw.Flush() @@ -417,21 +461,21 @@ func getFormatedTime(timestamp int64, humanReadable *bool) string { return strconv.FormatInt(timestamp, 10) } -func analyzeBlock(b *tsdb.Block, limit int) { - fmt.Printf("Block path: %s\n", b.Dir()) +func analyzeBlock(b tsdb.BlockReader, limit int) error { meta := b.Meta() + fmt.Printf("Block ID: %s\n", meta.ULID) // Presume 1ms resolution that Prometheus uses. fmt.Printf("Duration: %s\n", (time.Duration(meta.MaxTime-meta.MinTime) * 1e6).String()) fmt.Printf("Series: %d\n", meta.Stats.NumSeries) ir, err := b.Index() if err != nil { - exitWithError(err) + return err } defer ir.Close() allLabelNames, err := ir.LabelNames() if err != nil { - exitWithError(err) + return err } fmt.Printf("Label names: %d\n", len(allLabelNames)) @@ -458,13 +502,13 @@ func analyzeBlock(b *tsdb.Block, limit int) { entries := 0 p, err := ir.Postings("", "") // The special all key. if err != nil { - exitWithError(err) + return err } lbls := labels.Labels{} chks := []chunks.Meta{} for p.Next() { if err = ir.Series(p.At(), &lbls, &chks); err != nil { - exitWithError(err) + return err } // Amount of the block time range not covered by this series. uncovered := uint64(meta.MaxTime-meta.MinTime) - uint64(chks[len(chks)-1].MaxTime-chks[0].MinTime) @@ -477,7 +521,7 @@ func analyzeBlock(b *tsdb.Block, limit int) { } } if p.Err() != nil { - exitWithError(p.Err()) + return p.Err() } fmt.Printf("Postings (unique label pairs): %d\n", len(labelpairsUncovered)) fmt.Printf("Postings entries (total label pairs): %d\n", entries) @@ -510,14 +554,14 @@ func analyzeBlock(b *tsdb.Block, limit int) { for _, n := range allLabelNames { values, err := ir.LabelValues(n) if err != nil { - exitWithError(err) + return err } var cumulativeLength uint64 for i := 0; i < values.Len(); i++ { value, _ := values.At(i) if err != nil { - exitWithError(err) + return err } for _, str := range value { cumulativeLength += uint64(len(str)) @@ -534,7 +578,7 @@ func analyzeBlock(b *tsdb.Block, limit int) { for _, n := range allLabelNames { lv, err := ir.LabelValues(n) if err != nil { - exitWithError(err) + return err } postingInfos = append(postingInfos, postingInfo{n, uint64(lv.Len())}) } @@ -544,41 +588,49 @@ func analyzeBlock(b *tsdb.Block, limit int) { postingInfos = postingInfos[:0] lv, err := ir.LabelValues("__name__") if err != nil { - exitWithError(err) + return err } for i := 0; i < lv.Len(); i++ { names, err := lv.At(i) if err != nil { - exitWithError(err) + return err } for _, n := range names { postings, err := ir.Postings("__name__", n) if err != nil { - exitWithError(err) + return err } count := 0 for postings.Next() { count++ } if postings.Err() != nil { - exitWithError(postings.Err()) + return postings.Err() } postingInfos = append(postingInfos, postingInfo{n, uint64(count)}) } } fmt.Printf("\nHighest cardinality metric names:\n") printInfo(postingInfos) + return nil } -func dumpSamples(db *tsdb.DB, mint, maxt int64) { +func dumpSamples(db *tsdb.DBReadOnly, mint, maxt int64) (err error) { + q, err := db.Querier(mint, maxt) if err != nil { - exitWithError(err) + return err } + defer func() { + var merr tsdb_errors.MultiError + merr.Add(err) + merr.Add(q.Close()) + err = merr.Err() + }() ss, err := q.Select(labels.NewMustRegexpMatcher("", ".*")) if err != nil { - exitWithError(err) + return err } for ss.Next() { @@ -590,11 +642,12 @@ func dumpSamples(db *tsdb.DB, mint, maxt int64) { fmt.Printf("%s %g %d\n", labels, val, ts) } if it.Err() != nil { - exitWithError(ss.Err()) + return ss.Err() } } if ss.Err() != nil { - exitWithError(ss.Err()) + return ss.Err() } + return nil } diff --git a/compact.go b/compact.go index e19b7ed7..a41b5252 100644 --- a/compact.go +++ b/compact.go @@ -662,7 +662,7 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, }() c.metrics.populatingBlocks.Set(1) - globalMaxt := blocks[0].MaxTime() + globalMaxt := blocks[0].Meta().MaxTime for i, b := range blocks { select { case <-c.ctx.Done(): @@ -671,13 +671,13 @@ func (c *LeveledCompactor) populateBlock(blocks []BlockReader, meta *BlockMeta, } if !overlapping { - if i > 0 && b.MinTime() < globalMaxt { + if i > 0 && b.Meta().MinTime < globalMaxt { c.metrics.overlappingBlocks.Inc() overlapping = true level.Warn(c.logger).Log("msg", "found overlapping blocks during compaction", "ulid", meta.ULID) } - if b.MaxTime() > globalMaxt { - globalMaxt = b.MaxTime() + if b.Meta().MaxTime > globalMaxt { + globalMaxt = b.Meta().MaxTime } } diff --git a/compact_test.go b/compact_test.go index cbf331f9..18990ed5 100644 --- a/compact_test.go +++ b/compact_test.go @@ -458,8 +458,7 @@ type erringBReader struct{} func (erringBReader) Index() (IndexReader, error) { return nil, errors.New("index") } func (erringBReader) Chunks() (ChunkReader, error) { return nil, errors.New("chunks") } func (erringBReader) Tombstones() (TombstoneReader, error) { return nil, errors.New("tombstones") } -func (erringBReader) MinTime() int64 { return 0 } -func (erringBReader) MaxTime() int64 { return 0 } +func (erringBReader) Meta() BlockMeta { return BlockMeta{} } type nopChunkWriter struct{} diff --git a/db.go b/db.go index e07f7d3e..aa9ec178 100644 --- a/db.go +++ b/db.go @@ -250,6 +250,178 @@ func newDBMetrics(db *DB, r prometheus.Registerer) *dbMetrics { return m } +// ErrClosed is returned when the db is closed. +var ErrClosed = errors.New("db already closed") + +// DBReadOnly provides APIs for read only operations on a database. +// Current implementation doesn't support concurency so +// all API calls should happen in the same go routine. +type DBReadOnly struct { + logger log.Logger + dir string + closers []io.Closer + closed chan struct{} +} + +// OpenDBReadOnly opens DB in the given directory for read only operations. +func OpenDBReadOnly(dir string, l log.Logger) (*DBReadOnly, error) { + if _, err := os.Stat(dir); err != nil { + return nil, errors.Wrap(err, "openning the db dir") + } + + if l == nil { + l = log.NewNopLogger() + } + + return &DBReadOnly{ + logger: l, + dir: dir, + closed: make(chan struct{}), + }, nil +} + +// Querier loads the wal and returns a new querier over the data partition for the given time range. +// Current implementation doesn't support multiple Queriers. +func (db *DBReadOnly) Querier(mint, maxt int64) (Querier, error) { + select { + case <-db.closed: + return nil, ErrClosed + default: + } + blocksReaders, err := db.Blocks() + if err != nil { + return nil, err + } + blocks := make([]*Block, len(blocksReaders)) + for i, b := range blocksReaders { + b, ok := b.(*Block) + if !ok { + return nil, errors.New("unable to convert a read only block to a normal block") + } + blocks[i] = b + } + + head, err := NewHead(nil, db.logger, nil, 1) + if err != nil { + return nil, err + } + maxBlockTime := int64(math.MinInt64) + if len(blocks) > 0 { + maxBlockTime = blocks[len(blocks)-1].Meta().MaxTime + } + + // Also add the WAL if the current blocks don't cover the requestes time range. + if maxBlockTime <= maxt { + w, err := wal.Open(db.logger, nil, filepath.Join(db.dir, "wal")) + if err != nil { + return nil, err + } + head, err = NewHead(nil, db.logger, w, 1) + if err != nil { + return nil, err + } + // Set the min valid time for the ingested wal samples + // to be no lower than the maxt of the last block. + if err := head.Init(maxBlockTime); err != nil { + return nil, errors.Wrap(err, "read WAL") + } + // Set the wal to nil to disable all wal operations. + // This is mainly to avoid blocking when closing the head. + head.wal = nil + + db.closers = append(db.closers, head) + } + + // TODO: Refactor so that it is possible to obtain a Querier without initializing a writable DB instance. + // Option 1: refactor DB to have the Querier implementation using the DBReadOnly.Querier implementation not the opposite. + // Option 2: refactor Querier to use another independent func which + // can than be used by a read only and writable db instances without any code duplication. + dbWritable := &DB{ + dir: db.dir, + logger: db.logger, + blocks: blocks, + head: head, + } + + return dbWritable.Querier(mint, maxt) +} + +// Blocks returns a slice of block readers for persisted blocks. +func (db *DBReadOnly) Blocks() ([]BlockReader, error) { + select { + case <-db.closed: + return nil, ErrClosed + default: + } + loadable, corrupted, err := openBlocks(db.logger, db.dir, nil, nil) + if err != nil { + return nil, err + } + + // Corrupted blocks that have been superseded by a loadable block can be safely ignored. + for _, block := range loadable { + for _, b := range block.Meta().Compaction.Parents { + delete(corrupted, b.ULID) + } + } + if len(corrupted) > 0 { + for _, b := range loadable { + if err := b.Close(); err != nil { + level.Warn(db.logger).Log("msg", "closing a block", err) + } + } + return nil, errors.Errorf("unexpected corrupted block:%v", corrupted) + } + + if len(loadable) == 0 { + return nil, errors.New("no blocks found") + } + + sort.Slice(loadable, func(i, j int) bool { + return loadable[i].Meta().MinTime < loadable[j].Meta().MinTime + }) + + blockMetas := make([]BlockMeta, 0, len(loadable)) + for _, b := range loadable { + blockMetas = append(blockMetas, b.Meta()) + } + if overlaps := OverlappingBlocks(blockMetas); len(overlaps) > 0 { + level.Warn(db.logger).Log("msg", "overlapping blocks found during opening", "detail", overlaps.String()) + } + + // Close all previously open readers and add the new ones to the cache. + for _, closer := range db.closers { + closer.Close() + } + + blockClosers := make([]io.Closer, len(loadable)) + blockReaders := make([]BlockReader, len(loadable)) + for i, b := range loadable { + blockClosers[i] = b + blockReaders[i] = b + } + db.closers = blockClosers + + return blockReaders, nil +} + +// Close all block readers. +func (db *DBReadOnly) Close() error { + select { + case <-db.closed: + return ErrClosed + default: + } + close(db.closed) + + var merr tsdb_errors.MultiError + + for _, b := range db.closers { + merr.Add(b.Close()) + } + return merr.Err() +} + // Open returns a new DB in the given directory. func Open(dir string, l log.Logger, r prometheus.Registerer, opts *Options) (db *DB, err error) { if err := os.MkdirAll(dir, 0777); err != nil { @@ -514,8 +686,10 @@ func (db *DB) compact() (err error) { return nil } -func (db *DB) getBlock(id ulid.ULID) (*Block, bool) { - for _, b := range db.blocks { +// getBlock iterates a given block range to find a block by a given id. +// If found it returns the block itself and a boolean to indicate that it was found. +func getBlock(allBlocks []*Block, id ulid.ULID) (*Block, bool) { + for _, b := range allBlocks { if b.Meta().ULID == id { return b, true } @@ -533,14 +707,14 @@ func (db *DB) reload() (err error) { db.metrics.reloads.Inc() }() - loadable, corrupted, err := db.openBlocks() + loadable, corrupted, err := openBlocks(db.logger, db.dir, db.blocks, db.chunkPool) if err != nil { return err } deletable := db.deletableBlocks(loadable) - // Corrupted blocks that have been replaced by parents can be safely ignored and deleted. + // Corrupted blocks that have been superseded by a loadable block can be safely ignored. // This makes it resilient against the process crashing towards the end of a compaction. // Creation of a new block and deletion of its parents cannot happen atomically. // By creating blocks with their parents, we can pick up the deletion where it left off during a crash. @@ -553,7 +727,7 @@ func (db *DB) reload() (err error) { if len(corrupted) > 0 { // Close all new blocks to release the lock for windows. for _, block := range loadable { - if _, loaded := db.getBlock(block.Meta().ULID); !loaded { + if _, open := getBlock(db.blocks, block.Meta().ULID); !open { block.Close() } } @@ -621,24 +795,24 @@ func (db *DB) reload() (err error) { return errors.Wrap(db.head.Truncate(maxt), "head truncate failed") } -func (db *DB) openBlocks() (blocks []*Block, corrupted map[ulid.ULID]error, err error) { - dirs, err := blockDirs(db.dir) +func openBlocks(l log.Logger, dir string, loaded []*Block, chunkPool chunkenc.Pool) (blocks []*Block, corrupted map[ulid.ULID]error, err error) { + bDirs, err := blockDirs(dir) if err != nil { return nil, nil, errors.Wrap(err, "find blocks") } corrupted = make(map[ulid.ULID]error) - for _, dir := range dirs { - meta, _, err := readMetaFile(dir) + for _, bDir := range bDirs { + meta, _, err := readMetaFile(bDir) if err != nil { - level.Error(db.logger).Log("msg", "not a block dir", "dir", dir) + level.Error(l).Log("msg", "not a block dir", "dir", bDir) continue } // See if we already have the block in memory or open it otherwise. - block, ok := db.getBlock(meta.ULID) - if !ok { - block, err = OpenBlock(db.logger, dir, db.chunkPool) + block, open := getBlock(loaded, meta.ULID) + if !open { + block, err = OpenBlock(l, bDir, chunkPool) if err != nil { corrupted[meta.ULID] = err continue diff --git a/db_test.go b/db_test.go index 715450ee..b06e05c7 100644 --- a/db_test.go +++ b/db_test.go @@ -1113,8 +1113,7 @@ func TestSizeRetention(t *testing.T) { testutil.Ok(t, db.reload()) // Reload the db to register the new db size. testutil.Equals(t, len(blocks), len(db.Blocks())) // Ensure all blocks are registered. expSize := int64(prom_testutil.ToFloat64(db.metrics.blocksBytes)) // Use the the actual internal metrics. - actSize, err := testutil.DirSize(db.Dir()) - testutil.Ok(t, err) + actSize := testutil.DirSize(t, db.Dir()) testutil.Equals(t, expSize, actSize, "registered size doesn't match actual disk size") // Decrease the max bytes limit so that a delete is triggered. @@ -1128,8 +1127,7 @@ func TestSizeRetention(t *testing.T) { actBlocks := db.Blocks() expSize = int64(prom_testutil.ToFloat64(db.metrics.blocksBytes)) actRetentCount := int(prom_testutil.ToFloat64(db.metrics.sizeRetentionCount)) - actSize, err = testutil.DirSize(db.Dir()) - testutil.Ok(t, err) + actSize = testutil.DirSize(t, db.Dir()) testutil.Equals(t, 1, actRetentCount, "metric retention count mismatch") testutil.Equals(t, actSize, expSize, "metric db size doesn't match actual disk size") @@ -2232,3 +2230,108 @@ func TestBlockRanges(t *testing.T) { t.Fatalf("new block overlaps old:%v,new:%v", db.Blocks()[2].Meta(), db.Blocks()[3].Meta()) } } + +// TestDBReadOnly ensures that opening a DB in readonly mode doesn't modify any files on the disk. +// It also checks that the API calls return equivalent results as a normal db.Open() mode. +func TestDBReadOnly(t *testing.T) { + var ( + dbDir string + logger = log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr)) + expBlocks []*Block + expSeries map[string][]tsdbutil.Sample + expSeriesCount int + expDBHash []byte + matchAll = labels.NewEqualMatcher("", "") + err error + ) + + // Boostrap the db. + { + dbDir, err = ioutil.TempDir("", "test") + testutil.Ok(t, err) + + defer func() { + testutil.Ok(t, os.RemoveAll(dbDir)) + }() + + dbBlocks := []*BlockMeta{ + {MinTime: 10, MaxTime: 11}, + {MinTime: 11, MaxTime: 12}, + {MinTime: 12, MaxTime: 13}, + } + + for _, m := range dbBlocks { + createBlock(t, dbDir, genSeries(1, 1, m.MinTime, m.MaxTime)) + } + expSeriesCount++ + } + + // Open a normal db to use for a comparison. + { + dbWritable, err := Open(dbDir, logger, nil, nil) + testutil.Ok(t, err) + dbWritable.DisableCompactions() + + dbSizeBeforeAppend := testutil.DirSize(t, dbWritable.Dir()) + app := dbWritable.Appender() + _, err = app.Add(labels.FromStrings("foo", "bar"), dbWritable.Head().MaxTime()+1, 0) + testutil.Ok(t, err) + testutil.Ok(t, app.Commit()) + expSeriesCount++ + + expBlocks = dbWritable.Blocks() + expDbSize := testutil.DirSize(t, dbWritable.Dir()) + testutil.Assert(t, expDbSize > dbSizeBeforeAppend, "db size didn't increase after an append") + + q, err := dbWritable.Querier(math.MinInt64, math.MaxInt64) + testutil.Ok(t, err) + expSeries = query(t, q, matchAll) + + testutil.Ok(t, dbWritable.Close()) // Close here to allow getting the dir hash for windows. + expDBHash = testutil.DirHash(t, dbWritable.Dir()) + } + + // Open a read only db and ensure that the API returns the same result as the normal DB. + { + dbReadOnly, err := OpenDBReadOnly(dbDir, logger) + testutil.Ok(t, err) + defer func() { + testutil.Ok(t, dbReadOnly.Close()) + }() + blocks, err := dbReadOnly.Blocks() + testutil.Ok(t, err) + testutil.Equals(t, len(expBlocks), len(blocks)) + + for i, expBlock := range expBlocks { + testutil.Equals(t, expBlock.Meta(), blocks[i].Meta(), "block meta mismatch") + } + + q, err := dbReadOnly.Querier(math.MinInt64, math.MaxInt64) + testutil.Ok(t, err) + readOnlySeries := query(t, q, matchAll) + readOnlyDBHash := testutil.DirHash(t, dbDir) + + testutil.Equals(t, expSeriesCount, len(readOnlySeries), "total series mismatch") + testutil.Equals(t, expSeries, readOnlySeries, "series mismatch") + testutil.Equals(t, expDBHash, readOnlyDBHash, "after all read operations the db hash should remain the same") + } +} + +// TestDBReadOnlyClosing ensures that after closing the db +// all api methods return an ErrClosed. +func TestDBReadOnlyClosing(t *testing.T) { + dbDir, err := ioutil.TempDir("", "test") + testutil.Ok(t, err) + + defer func() { + testutil.Ok(t, os.RemoveAll(dbDir)) + }() + db, err := OpenDBReadOnly(dbDir, log.NewLogfmtLogger(log.NewSyncWriter(os.Stderr))) + testutil.Ok(t, err) + testutil.Ok(t, db.Close()) + testutil.Equals(t, db.Close(), ErrClosed) + _, err = db.Blocks() + testutil.Equals(t, err, ErrClosed) + _, err = db.Querier(0, 1) + testutil.Equals(t, err, ErrClosed) +} diff --git a/head.go b/head.go index 5e2eae85..5e5475cb 100644 --- a/head.go +++ b/head.go @@ -25,6 +25,7 @@ import ( "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" + "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/tsdb/chunkenc" @@ -64,6 +65,7 @@ type Head struct { logger log.Logger appendPool sync.Pool bytesPool sync.Pool + numSeries uint64 minTime, maxTime int64 // Current min and max of the samples included in the head. minValidTime int64 // Mint allowed to be added to the head. It shouldn't be lower than the maxt of the last persisted block. @@ -84,7 +86,7 @@ type Head struct { type headMetrics struct { activeAppenders prometheus.Gauge - series prometheus.Gauge + series prometheus.GaugeFunc seriesCreated prometheus.Counter seriesRemoved prometheus.Counter seriesNotFound prometheus.Counter @@ -112,9 +114,11 @@ func newHeadMetrics(h *Head, r prometheus.Registerer) *headMetrics { Name: "prometheus_tsdb_head_active_appenders", Help: "Number of currently active appender transactions", }) - m.series = prometheus.NewGauge(prometheus.GaugeOpts{ + m.series = prometheus.NewGaugeFunc(prometheus.GaugeOpts{ Name: "prometheus_tsdb_head_series", Help: "Total number of series in the head block.", + }, func() float64 { + return float64(h.NumSeries()) }) m.seriesCreated = prometheus.NewCounter(prometheus.CounterOpts{ Name: "prometheus_tsdb_head_series_created_total", @@ -698,6 +702,21 @@ func (h *rangeHead) MaxTime() int64 { return h.maxt } +func (h *rangeHead) NumSeries() uint64 { + return h.head.NumSeries() +} + +func (h *rangeHead) Meta() BlockMeta { + return BlockMeta{ + MinTime: h.MinTime(), + MaxTime: h.MaxTime(), + ULID: h.head.Meta().ULID, + Stats: BlockStats{ + NumSeries: h.NumSeries(), + }, + } +} + // initAppender is a helper to initialize the time bounds of the head // upon the first sample it receives. type initAppender struct { @@ -1022,9 +1041,11 @@ func (h *Head) gc() { seriesRemoved := len(deleted) h.metrics.seriesRemoved.Add(float64(seriesRemoved)) - h.metrics.series.Sub(float64(seriesRemoved)) h.metrics.chunksRemoved.Add(float64(chunksRemoved)) h.metrics.chunks.Sub(float64(chunksRemoved)) + // Using AddUint64 to substract series removed. + // See: https://golang.org/pkg/sync/atomic/#AddUint64. + atomic.AddUint64(&h.numSeries, ^uint64(seriesRemoved-1)) // Remove deleted series IDs from the postings lists. h.postings.Delete(deleted) @@ -1101,6 +1122,26 @@ func (h *Head) chunksRange(mint, maxt int64) *headChunkReader { return &headChunkReader{head: h, mint: mint, maxt: maxt} } +// NumSeries returns the number of active series in the head. +func (h *Head) NumSeries() uint64 { + return atomic.LoadUint64(&h.numSeries) +} + +// Meta returns meta information about the head. +// The head is dynamic so will return dynamic results. +func (h *Head) Meta() BlockMeta { + var id [16]byte + copy(id[:], "______head______") + return BlockMeta{ + MinTime: h.MinTime(), + MaxTime: h.MaxTime(), + ULID: ulid.ULID(id), + Stats: BlockStats{ + NumSeries: h.NumSeries(), + }, + } +} + // MinTime returns the lowest time bound on visible data in the head. func (h *Head) MinTime() int64 { return atomic.LoadInt64(&h.minTime) @@ -1347,8 +1388,8 @@ func (h *Head) getOrCreateWithID(id, hash uint64, lset labels.Labels) (*memSerie return s, false } - h.metrics.series.Inc() h.metrics.seriesCreated.Inc() + atomic.AddUint64(&h.numSeries, 1) h.postings.Add(id, lset) diff --git a/mocks_test.go b/mocks_test.go index 243d5cf1..5442c680 100644 --- a/mocks_test.go +++ b/mocks_test.go @@ -73,5 +73,4 @@ type mockBReader struct { func (r *mockBReader) Index() (IndexReader, error) { return r.ir, nil } func (r *mockBReader) Chunks() (ChunkReader, error) { return r.cr, nil } func (r *mockBReader) Tombstones() (TombstoneReader, error) { return newMemTombstones(), nil } -func (r *mockBReader) MinTime() int64 { return r.mint } -func (r *mockBReader) MaxTime() int64 { return r.maxt } +func (r *mockBReader) Meta() BlockMeta { return BlockMeta{MinTime: r.mint, MaxTime: r.maxt} } diff --git a/testutil/directory.go b/testutil/directory.go index e74b342b..5f1c3155 100644 --- a/testutil/directory.go +++ b/testutil/directory.go @@ -14,9 +14,13 @@ package testutil import ( + "crypto/sha256" + "io" "io/ioutil" "os" "path/filepath" + "strconv" + "testing" ) const ( @@ -130,16 +134,49 @@ func NewTemporaryDirectory(name string, t T) (handler TemporaryDirectory) { } // DirSize returns the size in bytes of all files in a directory. -func DirSize(path string) (int64, error) { +func DirSize(t *testing.T, path string) int64 { var size int64 err := filepath.Walk(path, func(_ string, info os.FileInfo, err error) error { - if err != nil { - return err - } + Ok(t, err) if !info.IsDir() { size += info.Size() } return nil }) - return size, err + Ok(t, err) + return size +} + +// DirHash returns a hash of all files attribites and their content within a directory. +func DirHash(t *testing.T, path string) []byte { + hash := sha256.New() + err := filepath.Walk(path, func(path string, info os.FileInfo, err error) error { + Ok(t, err) + + if info.IsDir() { + return nil + } + f, err := os.Open(path) + Ok(t, err) + defer f.Close() + + _, err = io.Copy(hash, f) + Ok(t, err) + + _, err = io.WriteString(hash, strconv.Itoa(int(info.Size()))) + Ok(t, err) + + _, err = io.WriteString(hash, info.Name()) + Ok(t, err) + + modTime, err := info.ModTime().GobEncode() + Ok(t, err) + + _, err = io.WriteString(hash, string(modTime)) + Ok(t, err) + return nil + }) + Ok(t, err) + + return hash.Sum(nil) } diff --git a/wal/wal.go b/wal/wal.go index 39daba97..878aae6b 100644 --- a/wal/wal.go +++ b/wal/wal.go @@ -203,6 +203,48 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi stopc: make(chan chan struct{}), compress: compress, } + registerMetrics(reg, w) + + _, j, err := w.Segments() + // Index of the Segment we want to open and write to. + writeSegmentIndex := 0 + if err != nil { + return nil, errors.Wrap(err, "get segment range") + } + // If some segments already exist create one with a higher index than the last segment. + if j != -1 { + writeSegmentIndex = j + 1 + } + + segment, err := CreateSegment(w.dir, writeSegmentIndex) + if err != nil { + return nil, err + } + + if err := w.setSegment(segment); err != nil { + return nil, err + } + + go w.run() + + return w, nil +} + +// Open an existing WAL. +func Open(logger log.Logger, reg prometheus.Registerer, dir string) (*WAL, error) { + if logger == nil { + logger = log.NewNopLogger() + } + w := &WAL{ + dir: dir, + logger: logger, + } + + registerMetrics(reg, w) + return w, nil +} + +func registerMetrics(reg prometheus.Registerer, w *WAL) { w.fsyncDuration = prometheus.NewSummary(prometheus.SummaryOpts{ Name: "prometheus_tsdb_wal_fsync_duration_seconds", Help: "Duration of WAL fsync.", @@ -231,30 +273,6 @@ func NewSize(logger log.Logger, reg prometheus.Registerer, dir string, segmentSi if reg != nil { reg.MustRegister(w.fsyncDuration, w.pageFlushes, w.pageCompletions, w.truncateFail, w.truncateTotal, w.currentSegment) } - - _, j, err := w.Segments() - // Index of the Segment we want to open and write to. - writeSegmentIndex := 0 - if err != nil { - return nil, errors.Wrap(err, "get segment range") - } - // If some segments already exist create one with a higher index than the last segment. - if j != -1 { - writeSegmentIndex = j + 1 - } - - segment, err := CreateSegment(w.dir, writeSegmentIndex) - if err != nil { - return nil, err - } - - if err := w.setSegment(segment); err != nil { - return nil, err - } - - go w.run() - - return w, nil } // CompressionEnabled returns if compression is enabled on this WAL. @@ -302,7 +320,6 @@ func (w *WAL) Repair(origErr error) error { if cerr.Segment < 0 { return errors.New("corruption error does not specify position") } - level.Warn(w.logger).Log("msg", "starting corruption repair", "segment", cerr.Segment, "offset", cerr.Offset) @@ -487,7 +504,6 @@ func (w *WAL) flushPage(clear bool) error { // First Byte of header format: // [ 4 bits unallocated] [1 bit snappy compression flag] [ 3 bit record type ] - const ( snappyMask = 1 << 3 recTypeMask = snappyMask - 1 diff --git a/wal/wal_test.go b/wal/wal_test.go index d2a6ccc2..12fe1a2d 100644 --- a/wal/wal_test.go +++ b/wal/wal_test.go @@ -408,10 +408,8 @@ func TestCompression(t *testing.T) { testutil.Ok(t, os.RemoveAll(dirUnCompressed)) }() - uncompressedSize, err := testutil.DirSize(dirUnCompressed) - testutil.Ok(t, err) - compressedSize, err := testutil.DirSize(dirCompressed) - testutil.Ok(t, err) + uncompressedSize := testutil.DirSize(t, dirUnCompressed) + compressedSize := testutil.DirSize(t, dirCompressed) testutil.Assert(t, float64(uncompressedSize)*0.75 > float64(compressedSize), "Compressing zeroes should save at least 25%% space - uncompressedSize: %d, compressedSize: %d", uncompressedSize, compressedSize) }