Skip to content

Commit

Permalink
Add TriggerLevelWriter. (#602)
Browse files Browse the repository at this point in the history
See: #583
  • Loading branch information
mitar committed Nov 28, 2023
1 parent 83e03c7 commit 93fb5cb
Show file tree
Hide file tree
Showing 3 changed files with 191 additions and 0 deletions.
4 changes: 4 additions & 0 deletions globals.go
Expand Up @@ -132,6 +132,10 @@ var (
FatalLevel: "FTL",
PanicLevel: "PNC",
}

// TriggerLevelWriterBufferReuseLimit is a limit in bytes that a buffer is dropped
// from the TriggerLevelWriter buffer pool if the buffer grows above the limit.
TriggerLevelWriterBufferReuseLimit = 64 * 1024
)

var (
Expand Down
132 changes: 132 additions & 0 deletions writer.go
Expand Up @@ -180,3 +180,135 @@ func (w *FilteredLevelWriter) WriteLevel(level Level, p []byte) (int, error) {
}
return len(p), nil
}

var triggerWriterPool = &sync.Pool{
New: func() interface{} {
return bytes.NewBuffer(make([]byte, 0, 1024))
},
}

// TriggerLevelWriter buffers log lines at the ConditionalLevel or below
// until a trigger level (or higher) line is emitted. Log lines with level
// higher than ConditionalLevel are always written out to the destination
// writer. If trigger never happens, buffered log lines are never written out.
//
// It can be used to configure "log level per request".
type TriggerLevelWriter struct {
// Destination writer. If LevelWriter is provided (usually), its WriteLevel is used
// instead of Write.
io.Writer

// ConditionalLevel is the level (and below) at which lines are buffered until
// a trigger level (or higher) line is emitted. Usually this is set to DebugLevel.
ConditionalLevel Level

// TriggerLevel is the lowest level that triggers the sending of the conditional
// level lines. Usually this is set to ErrorLevel.
TriggerLevel Level

buf *bytes.Buffer
triggered bool
mu sync.Mutex
}

func (w *TriggerLevelWriter) WriteLevel(l Level, p []byte) (n int, err error) {
w.mu.Lock()
defer w.mu.Unlock()

// At first trigger level or above log line, we flush the buffer and change the
// trigger state to triggered.
if !w.triggered && l >= w.TriggerLevel {
err := w.trigger()
if err != nil {
return 0, err
}
}

// Unless triggered, we buffer everything at and below ConditionalLevel.
if !w.triggered && l <= w.ConditionalLevel {
if w.buf == nil {
w.buf = triggerWriterPool.Get().(*bytes.Buffer)
}

// We prefix each log line with a byte with the level.
// Hopefully we will never have a level value which equals a newline
// (which could interfere with reconstruction of log lines in the trigger method).
w.buf.WriteByte(byte(l))
w.buf.Write(p)
return len(p), nil
}

// Anything above ConditionalLevel is always passed through.
// Once triggered, everything is passed through.
if lw, ok := w.Writer.(LevelWriter); ok {
return lw.WriteLevel(l, p)
}
return w.Write(p)
}

// trigger expects lock to be held.
func (w *TriggerLevelWriter) trigger() error {
if w.triggered {
return nil
}
w.triggered = true

if w.buf == nil {
return nil
}

p := w.buf.Bytes()
for len(p) > 0 {
// We do not use bufio.Scanner here because we already have full buffer
// in the memory and we do not want extra copying from the buffer to
// scanner's token slice, nor we want to hit scanner's token size limit,
// and we also want to preserve newlines.
i := bytes.IndexByte(p, '\n')
line := p[0 : i+1]
p = p[i+1:]
// We prefixed each log line with a byte with the level.
level := Level(line[0])
line = line[1:]
var err error
if lw, ok := w.Writer.(LevelWriter); ok {
_, err = lw.WriteLevel(level, line)
} else {
_, err = w.Write(line)
}
if err != nil {
return err
}
}

return nil
}

// Trigger forces flushing the buffer and change the trigger state to
// triggered, if the writer has not already been triggered before.
func (w *TriggerLevelWriter) Trigger() error {
w.mu.Lock()
defer w.mu.Unlock()

return w.trigger()
}

// Close closes the writer and returns the buffer to the pool.
func (w *TriggerLevelWriter) Close() error {
w.mu.Lock()
defer w.mu.Unlock()

if w.buf == nil {
return nil
}

// We return the buffer only if it has not grown above the limit.
// This prevents accumulation of large buffers in the pool just
// because occasionally a large buffer might be needed.
if w.buf.Cap() <= TriggerLevelWriterBufferReuseLimit {
w.buf.Reset()
triggerWriterPool.Put(w.buf)
}
w.buf = nil

return nil
}
55 changes: 55 additions & 0 deletions writer_test.go
Expand Up @@ -195,3 +195,58 @@ func TestFilteredLevelWriter(t *testing.T) {
t.Errorf("Expected %q, got %q.", want, p)
}
}

type testWrite struct {
Level
Line []byte
}

func TestTriggerLevelWriter(t *testing.T) {
tests := []struct {
write []testWrite
want []byte
all []byte
}{{
[]testWrite{
{DebugLevel, []byte("no\n")},
{InfoLevel, []byte("yes\n")},
},
[]byte("yes\n"),
[]byte("yes\nno\n"),
}, {
[]testWrite{
{DebugLevel, []byte("yes1\n")},
{InfoLevel, []byte("yes2\n")},
{ErrorLevel, []byte("yes3\n")},
{DebugLevel, []byte("yes4\n")},
},
[]byte("yes2\nyes1\nyes3\nyes4\n"),
[]byte("yes2\nyes1\nyes3\nyes4\n"),
}}

for k, tt := range tests {
t.Run(fmt.Sprintf("case=%d", k), func(t *testing.T) {
buf := bytes.Buffer{}
writer := TriggerLevelWriter{Writer: LevelWriterAdapter{&buf}, ConditionalLevel: DebugLevel, TriggerLevel: ErrorLevel}
t.Cleanup(func() { writer.Close() })
for _, w := range tt.write {
_, err := writer.WriteLevel(w.Level, w.Line)
if err != nil {
t.Error(err)
}
}
p := buf.Bytes()
if want := tt.want; !bytes.Equal([]byte(want), p) {
t.Errorf("Expected %q, got %q.", want, p)
}
err := writer.Trigger()
if err != nil {
t.Error(err)
}
p = buf.Bytes()
if want := tt.all; !bytes.Equal([]byte(want), p) {
t.Errorf("Expected %q, got %q.", want, p)
}
})
}
}

0 comments on commit 93fb5cb

Please sign in to comment.