From 6684fc77d7b4430b0b152e4293b12ea70eb31800 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Wed, 16 Mar 2022 17:10:17 +0100 Subject: [PATCH] StartFlushDaemon: add API with more control over flushing Previously, periodic flushing was started automatically when writing to buffered files for the first time. When writing to a Logger which buffers internally, then using the same periodic flushing makes sense. In that case it has to be started explicitly. The new call grants the caller control over the flush interval. Kubernetes has a parameter for that. --- klog.go | 30 +++++++++++++++++++++--------- klog_test.go | 19 ++++++++++--------- 2 files changed, 31 insertions(+), 18 deletions(-) diff --git a/klog.go b/klog.go index 15710be3..bb6f64be 100644 --- a/klog.go +++ b/klog.go @@ -395,7 +395,7 @@ func init() { logging.addDirHeader = false logging.skipLogHeaders = false logging.oneOutput = false - logging.flushD = newFlushDaemon(flushInterval, logging.lockAndFlushAll, nil) + logging.flushD = newFlushDaemon(logging.lockAndFlushAll, nil) } // InitFlags is for explicitly initializing the flags. @@ -449,6 +449,9 @@ type loggingT struct { file [severity.NumSeverity]flushSyncWriter // flushD holds a flushDaemon that frequently flushes log file buffers. flushD *flushDaemon + // flushInterval is the interval for periodic flushing. If zero, + // the global default will be used. + flushInterval time.Duration // pcs is used in V to avoid an allocation when computing the caller's PC. pcs [1]uintptr // vmap is a cache of the V Level for each V() call site, identified by PC. @@ -976,7 +979,11 @@ const bufferSize = 256 * 1024 // createFiles creates all the log files for severity from sev down to infoLog. // l.mu is held. func (l *loggingT) createFiles(sev severity.Severity) error { - l.flushD.run() + interval := l.flushInterval + if interval == 0 { + interval = flushInterval + } + l.flushD.run(interval) now := time.Now() // Files are created in decreasing severity order, so as soon as we find one // has already been created, we can stop. @@ -1000,7 +1007,6 @@ const flushInterval = 5 * time.Second type flushDaemon struct { mu sync.Mutex clock clock.WithTicker - interval time.Duration flush func() stopC chan struct{} stopDone chan struct{} @@ -1008,20 +1014,19 @@ type flushDaemon struct { // newFlushDaemon returns a new flushDaemon. If the passed clock is nil, a // clock.RealClock is used. -func newFlushDaemon(interval time.Duration, flush func(), tickClock clock.WithTicker) *flushDaemon { +func newFlushDaemon(flush func(), tickClock clock.WithTicker) *flushDaemon { if tickClock == nil { tickClock = clock.RealClock{} } return &flushDaemon{ - interval: interval, - flush: flush, - clock: tickClock, + flush: flush, + clock: tickClock, } } // run starts a goroutine that periodically calls the daemons flush function. // Calling run on an already running daemon will have no effect. -func (f *flushDaemon) run() { +func (f *flushDaemon) run(interval time.Duration) { f.mu.Lock() defer f.mu.Unlock() @@ -1032,7 +1037,7 @@ func (f *flushDaemon) run() { f.stopC = make(chan struct{}, 1) f.stopDone = make(chan struct{}, 1) - ticker := f.clock.NewTicker(f.interval) + ticker := f.clock.NewTicker(interval) go func() { defer ticker.Stop() defer func() { f.stopDone <- struct{}{} }() @@ -1079,6 +1084,13 @@ func StopFlushDaemon() { logging.flushD.stop() } +// StartFlushDaemon ensures that the flush daemon runs with the given delay +// between flush calls. If it is already running, it gets restarted. +func StartFlushDaemon(interval time.Duration) { + StopFlushDaemon() + logging.flushD.run(interval) +} + // lockAndFlushAll is like flushAll but locks l.mu first. func (l *loggingT) lockAndFlushAll() { l.mu.Lock() diff --git a/klog_test.go b/klog_test.go index ea9936f1..0a165748 100644 --- a/klog_test.go +++ b/klog_test.go @@ -381,8 +381,8 @@ func TestSetOutputDataRace(t *testing.T) { var wg sync.WaitGroup var daemons []*flushDaemon for i := 1; i <= 50; i++ { - daemon := newFlushDaemon(time.Second, logging.lockAndFlushAll, nil) - daemon.run() + daemon := newFlushDaemon(logging.lockAndFlushAll, nil) + daemon.run(time.Second) daemons = append(daemons, daemon) } for i := 1; i <= 50; i++ { @@ -393,8 +393,8 @@ func TestSetOutputDataRace(t *testing.T) { }() } for i := 1; i <= 50; i++ { - daemon := newFlushDaemon(time.Second, logging.lockAndFlushAll, nil) - daemon.run() + daemon := newFlushDaemon(logging.lockAndFlushAll, nil) + daemon.run(time.Second) daemons = append(daemons, daemon) } for i := 1; i <= 50; i++ { @@ -405,8 +405,8 @@ func TestSetOutputDataRace(t *testing.T) { }() } for i := 1; i <= 50; i++ { - daemon := newFlushDaemon(time.Second, logging.lockAndFlushAll, nil) - daemon.run() + daemon := newFlushDaemon(logging.lockAndFlushAll, nil) + daemon.run(time.Second) daemons = append(daemons, daemon) } wg.Wait() @@ -1869,7 +1869,8 @@ func TestFlushDaemon(t *testing.T) { } testClock := testingclock.NewFakeClock(time.Now()) testLog := loggingT{ - flushD: newFlushDaemon(time.Second, spyFunc, testClock), + flushInterval: time.Second, + flushD: newFlushDaemon(spyFunc, testClock), } // Calling testLog will call createFile, which should start the daemon. @@ -1910,8 +1911,8 @@ func TestFlushDaemon(t *testing.T) { func TestStopFlushDaemon(t *testing.T) { logging.flushD.stop() - logging.flushD = newFlushDaemon(time.Second, func() {}, nil) - logging.flushD.run() + logging.flushD = newFlushDaemon(func() {}, nil) + logging.flushD.run(time.Second) if !logging.flushD.isRunning() { t.Error("expected flushD to be running") }