From 647486dc468643a7f3337c77807b89211fe7c320 Mon Sep 17 00:00:00 2001 From: Maisem Ali Date: Fri, 29 Oct 2021 12:04:57 -0700 Subject: [PATCH] logtail/filch: limit buffer file size to 50MB Signed-off-by: Maisem Ali (cherry picked from commit 05e55f4a0b8008e46965488ae5ec6f95f12ebd0c) --- logtail/filch/filch.go | 35 +++++++++++++++++++++++++++++++++-- logtail/filch/filch_test.go | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 66 insertions(+), 2 deletions(-) diff --git a/logtail/filch/filch.go b/logtail/filch/filch.go index 67831b8fdff3a..34a38627c92bd 100644 --- a/logtail/filch/filch.go +++ b/logtail/filch/filch.go @@ -17,8 +17,11 @@ import ( var stderrFD = 2 // a variable for testing +const defaultMaxFileSize = 50 << 20 + type Options struct { ReplaceStderr bool // dup over fd 2 so everything written to stderr comes here + MaxFileSize int } // A Filch uses two alternating files as a simplistic ring buffer. @@ -30,6 +33,10 @@ type Filch struct { alt *os.File altscan *bufio.Scanner recovered int64 + + maxFileSize int64 + writeCounter int + // buf is an initial buffer for altscan. // As of August 2021, 99.96% of all log lines // are below 4096 bytes in length. @@ -38,7 +45,7 @@ type Filch struct { // so that the whole struct takes 4096 bytes // (less on 32 bit platforms). // This reduces allocation waste. - buf [4096 - 48]byte + buf [4096 - 64]byte } // TryReadline implements the logtail.Buffer interface. @@ -91,6 +98,22 @@ func (f *Filch) scan() ([]byte, error) { func (f *Filch) Write(b []byte) (int, error) { f.mu.Lock() defer f.mu.Unlock() + if f.writeCounter == 100 { + // Check the file size every 100 writes. + f.writeCounter = 0 + fi, err := f.cur.Stat() + if err != nil { + return 0, err + } + if fi.Size() >= f.maxFileSize { + // This most likely means we are not draining. + // To limit the amount of space we use, throw away the old logs. + if err := moveContents(f.alt, f.cur); err != nil { + return 0, err + } + } + } + f.writeCounter++ if len(b) == 0 || b[len(b)-1] != '\n' { bnl := make([]byte, len(b)+1) @@ -159,8 +182,13 @@ func New(filePrefix string, opts Options) (f *Filch, err error) { return nil, err } + mfs := defaultMaxFileSize + if opts.MaxFileSize > 0 { + mfs = opts.MaxFileSize + } f = &Filch{ - OrigStderr: os.Stderr, // temporary, for past logs recovery + OrigStderr: os.Stderr, // temporary, for past logs recovery + maxFileSize: int64(mfs), } // Neither, either, or both files may exist and contain logs from @@ -234,6 +262,9 @@ func moveContents(dst, src *os.File) (err error) { if _, err := src.Seek(0, io.SeekStart); err != nil { return err } + if _, err := dst.Seek(0, io.SeekStart); err != nil { + return err + } if _, err := io.Copy(dst, src); err != nil { return err } diff --git a/logtail/filch/filch_test.go b/logtail/filch/filch_test.go index 7bb6392080b36..4253c12a5e273 100644 --- a/logtail/filch/filch_test.go +++ b/logtail/filch/filch_test.go @@ -57,6 +57,39 @@ func (f *filchTest) close(t *testing.T) { } } +func TestDropOldLogs(t *testing.T) { + const line1 = "123456789" // 10 bytes (9+newline) + tests := []struct { + write, read int + }{ + {10, 10}, + {100, 100}, + {200, 200}, + {250, 150}, + {500, 200}, + } + for _, tc := range tests { + t.Run(fmt.Sprintf("w%d-r%d", tc.write, tc.read), func(t *testing.T) { + filePrefix := t.TempDir() + f := newFilchTest(t, filePrefix, Options{ReplaceStderr: false, MaxFileSize: 1000}) + defer f.close(t) + // Make filch rotate the logs 3 times + for i := 0; i < tc.write; i++ { + f.write(t, line1) + } + // We should only be able to read the last 150 lines + for i := 0; i < tc.read; i++ { + f.read(t, line1) + if t.Failed() { + t.Logf("could only read %d lines", i) + break + } + } + f.readEOF(t) + }) + } +} + func TestQueue(t *testing.T) { filePrefix := t.TempDir() f := newFilchTest(t, filePrefix, Options{ReplaceStderr: false})