Skip to content

Commit

Permalink
Merge pull request #307 from pohly/start-flush-daemon
Browse files Browse the repository at this point in the history
StartFlushDaemon: add API with more control over flushing
  • Loading branch information
k8s-ci-robot committed Mar 16, 2022
2 parents f5927b0 + 6684fc7 commit 263155b
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 18 deletions.
30 changes: 21 additions & 9 deletions klog.go
Expand Up @@ -395,7 +395,7 @@ func init() {
logging.addDirHeader = false
logging.skipLogHeaders = false
logging.oneOutput = false
logging.flushD = newFlushDaemon(flushInterval, logging.lockAndFlushAll, nil)
logging.flushD = newFlushDaemon(logging.lockAndFlushAll, nil)
}

// InitFlags is for explicitly initializing the flags.
Expand Down Expand Up @@ -449,6 +449,9 @@ type loggingT struct {
file [severity.NumSeverity]flushSyncWriter
// flushD holds a flushDaemon that frequently flushes log file buffers.
flushD *flushDaemon
// flushInterval is the interval for periodic flushing. If zero,
// the global default will be used.
flushInterval time.Duration
// pcs is used in V to avoid an allocation when computing the caller's PC.
pcs [1]uintptr
// vmap is a cache of the V Level for each V() call site, identified by PC.
Expand Down Expand Up @@ -976,7 +979,11 @@ const bufferSize = 256 * 1024
// createFiles creates all the log files for severity from sev down to infoLog.
// l.mu is held.
func (l *loggingT) createFiles(sev severity.Severity) error {
l.flushD.run()
interval := l.flushInterval
if interval == 0 {
interval = flushInterval
}
l.flushD.run(interval)
now := time.Now()
// Files are created in decreasing severity order, so as soon as we find one
// has already been created, we can stop.
Expand All @@ -1000,28 +1007,26 @@ const flushInterval = 5 * time.Second
type flushDaemon struct {
mu sync.Mutex
clock clock.WithTicker
interval time.Duration
flush func()
stopC chan struct{}
stopDone chan struct{}
}

// newFlushDaemon returns a new flushDaemon. If the passed clock is nil, a
// clock.RealClock is used.
func newFlushDaemon(interval time.Duration, flush func(), tickClock clock.WithTicker) *flushDaemon {
func newFlushDaemon(flush func(), tickClock clock.WithTicker) *flushDaemon {
if tickClock == nil {
tickClock = clock.RealClock{}
}
return &flushDaemon{
interval: interval,
flush: flush,
clock: tickClock,
flush: flush,
clock: tickClock,
}
}

// run starts a goroutine that periodically calls the daemons flush function.
// Calling run on an already running daemon will have no effect.
func (f *flushDaemon) run() {
func (f *flushDaemon) run(interval time.Duration) {
f.mu.Lock()
defer f.mu.Unlock()

Expand All @@ -1032,7 +1037,7 @@ func (f *flushDaemon) run() {
f.stopC = make(chan struct{}, 1)
f.stopDone = make(chan struct{}, 1)

ticker := f.clock.NewTicker(f.interval)
ticker := f.clock.NewTicker(interval)
go func() {
defer ticker.Stop()
defer func() { f.stopDone <- struct{}{} }()
Expand Down Expand Up @@ -1079,6 +1084,13 @@ func StopFlushDaemon() {
logging.flushD.stop()
}

// StartFlushDaemon ensures that the flush daemon runs with the given delay
// between flush calls. If it is already running, it gets restarted.
func StartFlushDaemon(interval time.Duration) {
StopFlushDaemon()
logging.flushD.run(interval)
}

// lockAndFlushAll is like flushAll but locks l.mu first.
func (l *loggingT) lockAndFlushAll() {
l.mu.Lock()
Expand Down
19 changes: 10 additions & 9 deletions klog_test.go
Expand Up @@ -381,8 +381,8 @@ func TestSetOutputDataRace(t *testing.T) {
var wg sync.WaitGroup
var daemons []*flushDaemon
for i := 1; i <= 50; i++ {
daemon := newFlushDaemon(time.Second, logging.lockAndFlushAll, nil)
daemon.run()
daemon := newFlushDaemon(logging.lockAndFlushAll, nil)
daemon.run(time.Second)
daemons = append(daemons, daemon)
}
for i := 1; i <= 50; i++ {
Expand All @@ -393,8 +393,8 @@ func TestSetOutputDataRace(t *testing.T) {
}()
}
for i := 1; i <= 50; i++ {
daemon := newFlushDaemon(time.Second, logging.lockAndFlushAll, nil)
daemon.run()
daemon := newFlushDaemon(logging.lockAndFlushAll, nil)
daemon.run(time.Second)
daemons = append(daemons, daemon)
}
for i := 1; i <= 50; i++ {
Expand All @@ -405,8 +405,8 @@ func TestSetOutputDataRace(t *testing.T) {
}()
}
for i := 1; i <= 50; i++ {
daemon := newFlushDaemon(time.Second, logging.lockAndFlushAll, nil)
daemon.run()
daemon := newFlushDaemon(logging.lockAndFlushAll, nil)
daemon.run(time.Second)
daemons = append(daemons, daemon)
}
wg.Wait()
Expand Down Expand Up @@ -1869,7 +1869,8 @@ func TestFlushDaemon(t *testing.T) {
}
testClock := testingclock.NewFakeClock(time.Now())
testLog := loggingT{
flushD: newFlushDaemon(time.Second, spyFunc, testClock),
flushInterval: time.Second,
flushD: newFlushDaemon(spyFunc, testClock),
}

// Calling testLog will call createFile, which should start the daemon.
Expand Down Expand Up @@ -1910,8 +1911,8 @@ func TestFlushDaemon(t *testing.T) {

func TestStopFlushDaemon(t *testing.T) {
logging.flushD.stop()
logging.flushD = newFlushDaemon(time.Second, func() {}, nil)
logging.flushD.run()
logging.flushD = newFlushDaemon(func() {}, nil)
logging.flushD.run(time.Second)
if !logging.flushD.isRunning() {
t.Error("expected flushD to be running")
}
Expand Down

0 comments on commit 263155b

Please sign in to comment.