Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

StartFlushDaemon: add API with more control over flushing #307

Merged
merged 1 commit into from Mar 16, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
30 changes: 21 additions & 9 deletions klog.go
Expand Up @@ -395,7 +395,7 @@ func init() {
logging.addDirHeader = false
logging.skipLogHeaders = false
logging.oneOutput = false
logging.flushD = newFlushDaemon(flushInterval, logging.lockAndFlushAll, nil)
logging.flushD = newFlushDaemon(logging.lockAndFlushAll, nil)
}

// InitFlags is for explicitly initializing the flags.
Expand Down Expand Up @@ -449,6 +449,9 @@ type loggingT struct {
file [severity.NumSeverity]flushSyncWriter
// flushD holds a flushDaemon that frequently flushes log file buffers.
flushD *flushDaemon
// flushInterval is the interval for periodic flushing. If zero,
// the global default will be used.
flushInterval time.Duration
// pcs is used in V to avoid an allocation when computing the caller's PC.
pcs [1]uintptr
// vmap is a cache of the V Level for each V() call site, identified by PC.
Expand Down Expand Up @@ -976,7 +979,11 @@ const bufferSize = 256 * 1024
// createFiles creates all the log files for severity from sev down to infoLog.
// l.mu is held.
func (l *loggingT) createFiles(sev severity.Severity) error {
l.flushD.run()
interval := l.flushInterval
if interval == 0 {
interval = flushInterval
}
l.flushD.run(interval)
now := time.Now()
// Files are created in decreasing severity order, so as soon as we find one
// has already been created, we can stop.
Expand All @@ -1000,28 +1007,26 @@ const flushInterval = 5 * time.Second
type flushDaemon struct {
mu sync.Mutex
clock clock.WithTicker
interval time.Duration
flush func()
stopC chan struct{}
stopDone chan struct{}
}

// newFlushDaemon returns a new flushDaemon. If the passed clock is nil, a
// clock.RealClock is used.
func newFlushDaemon(interval time.Duration, flush func(), tickClock clock.WithTicker) *flushDaemon {
func newFlushDaemon(flush func(), tickClock clock.WithTicker) *flushDaemon {
if tickClock == nil {
tickClock = clock.RealClock{}
}
return &flushDaemon{
interval: interval,
flush: flush,
clock: tickClock,
flush: flush,
clock: tickClock,
}
}

// run starts a goroutine that periodically calls the daemons flush function.
// Calling run on an already running daemon will have no effect.
func (f *flushDaemon) run() {
func (f *flushDaemon) run(interval time.Duration) {
f.mu.Lock()
defer f.mu.Unlock()

Expand All @@ -1032,7 +1037,7 @@ func (f *flushDaemon) run() {
f.stopC = make(chan struct{}, 1)
f.stopDone = make(chan struct{}, 1)

ticker := f.clock.NewTicker(f.interval)
ticker := f.clock.NewTicker(interval)
go func() {
defer ticker.Stop()
defer func() { f.stopDone <- struct{}{} }()
Expand Down Expand Up @@ -1079,6 +1084,13 @@ func StopFlushDaemon() {
logging.flushD.stop()
}

// StartFlushDaemon ensures that the flush daemon runs with the given delay
// between flush calls. If it is already running, it gets restarted.
func StartFlushDaemon(interval time.Duration) {
StopFlushDaemon()
logging.flushD.run(interval)
}

Copy link
Author

@pohly pohly Mar 16, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This gets used in Kubernetes like this:

kubernetes/kubernetes@1fda093

// lockAndFlushAll is like flushAll but locks l.mu first.
func (l *loggingT) lockAndFlushAll() {
l.mu.Lock()
Expand Down
19 changes: 10 additions & 9 deletions klog_test.go
Expand Up @@ -381,8 +381,8 @@ func TestSetOutputDataRace(t *testing.T) {
var wg sync.WaitGroup
var daemons []*flushDaemon
for i := 1; i <= 50; i++ {
daemon := newFlushDaemon(time.Second, logging.lockAndFlushAll, nil)
daemon.run()
daemon := newFlushDaemon(logging.lockAndFlushAll, nil)
daemon.run(time.Second)
daemons = append(daemons, daemon)
}
for i := 1; i <= 50; i++ {
Expand All @@ -393,8 +393,8 @@ func TestSetOutputDataRace(t *testing.T) {
}()
}
for i := 1; i <= 50; i++ {
daemon := newFlushDaemon(time.Second, logging.lockAndFlushAll, nil)
daemon.run()
daemon := newFlushDaemon(logging.lockAndFlushAll, nil)
daemon.run(time.Second)
daemons = append(daemons, daemon)
}
for i := 1; i <= 50; i++ {
Expand All @@ -405,8 +405,8 @@ func TestSetOutputDataRace(t *testing.T) {
}()
}
for i := 1; i <= 50; i++ {
daemon := newFlushDaemon(time.Second, logging.lockAndFlushAll, nil)
daemon.run()
daemon := newFlushDaemon(logging.lockAndFlushAll, nil)
daemon.run(time.Second)
daemons = append(daemons, daemon)
}
wg.Wait()
Expand Down Expand Up @@ -1869,7 +1869,8 @@ func TestFlushDaemon(t *testing.T) {
}
testClock := testingclock.NewFakeClock(time.Now())
testLog := loggingT{
flushD: newFlushDaemon(time.Second, spyFunc, testClock),
flushInterval: time.Second,
flushD: newFlushDaemon(spyFunc, testClock),
}

// Calling testLog will call createFile, which should start the daemon.
Expand Down Expand Up @@ -1910,8 +1911,8 @@ func TestFlushDaemon(t *testing.T) {

func TestStopFlushDaemon(t *testing.T) {
logging.flushD.stop()
logging.flushD = newFlushDaemon(time.Second, func() {}, nil)
logging.flushD.run()
logging.flushD = newFlushDaemon(func() {}, nil)
logging.flushD.run(time.Second)
if !logging.flushD.isRunning() {
t.Error("expected flushD to be running")
}
Expand Down