diff --git a/daemon/attach.go b/daemon/attach.go index fb14691d242af..0e12441ad1e2e 100644 --- a/daemon/attach.go +++ b/daemon/attach.go @@ -123,7 +123,7 @@ func (daemon *Daemon) containerAttach(c *container.Container, cfg *stream.Attach return logger.ErrReadLogsNotSupported{} } logs := cLog.ReadLogs(logger.ReadConfig{Tail: -1}) - defer logs.Close() + defer logs.ConsumerGone() LogLoop: for { diff --git a/daemon/logger/adapter.go b/daemon/logger/adapter.go index 95aff9bf3babc..677daf4b7658a 100644 --- a/daemon/logger/adapter.go +++ b/daemon/logger/adapter.go @@ -94,7 +94,9 @@ func (a *pluginAdapterWithRead) ReadLogs(config ReadConfig) *LogWatcher { dec := logdriver.NewLogEntryDecoder(stream) for { select { - case <-watcher.WatchClose(): + case <-watcher.WatchProducerGone(): + return + case <-watcher.WatchConsumerGone(): return default: } @@ -106,7 +108,8 @@ func (a *pluginAdapterWithRead) ReadLogs(config ReadConfig) *LogWatcher { } select { case watcher.Err <- errors.Wrap(err, "error decoding log message"): - case <-watcher.WatchClose(): + case <-watcher.WatchProducerGone(): + case <-watcher.WatchConsumerGone(): } return } @@ -127,9 +130,9 @@ func (a *pluginAdapterWithRead) ReadLogs(config ReadConfig) *LogWatcher { select { case watcher.Msg <- msg: - case <-watcher.WatchClose(): - // make sure the message we consumed is sent - watcher.Msg <- msg + case <-watcher.WatchConsumerGone(): + return + case <-watcher.WatchProducerGone(): return } } diff --git a/daemon/logger/adapter_test.go b/daemon/logger/adapter_test.go index f47e711c892ea..d14a48e477627 100644 --- a/daemon/logger/adapter_test.go +++ b/daemon/logger/adapter_test.go @@ -174,7 +174,7 @@ func TestAdapterReadLogs(t *testing.T) { t.Fatal("timeout waiting for message channel to close") } - lw.Close() + lw.ProducerGone() lw = lr.ReadLogs(ReadConfig{Follow: true}) for _, x := range testMsg { diff --git a/daemon/logger/journald/read.go b/daemon/logger/journald/read.go index d4bcc62d9aefe..4f24ab62581ac 100644 --- a/daemon/logger/journald/read.go +++ b/daemon/logger/journald/read.go @@ -165,7 +165,7 @@ func (s *journald) Close() error { s.mu.Lock() s.closed = true for reader := range s.readers.readers { - reader.Close() + reader.ProducerGone() } s.mu.Unlock() return nil @@ -299,7 +299,11 @@ func (s *journald) followJournal(logWatcher *logger.LogWatcher, j *C.sd_journal, // Wait until we're told to stop. select { case cursor = <-newCursor: - case <-logWatcher.WatchClose(): + case <-logWatcher.WatchProducerGone(): + // Notify the other goroutine that its work is done. + C.close(pfd[1]) + cursor = <-newCursor + case <-logWatcher.WatchConsumerGone(): // Notify the other goroutine that its work is done. C.close(pfd[1]) cursor = <-newCursor diff --git a/daemon/logger/jsonfilelog/jsonfilelog.go b/daemon/logger/jsonfilelog/jsonfilelog.go index b806a5ad1752a..28a6e4be90715 100644 --- a/daemon/logger/jsonfilelog/jsonfilelog.go +++ b/daemon/logger/jsonfilelog/jsonfilelog.go @@ -166,13 +166,14 @@ func ValidateLogOpt(cfg map[string]string) error { return nil } -// Close closes underlying file and signals all readers to stop. +// Close closes underlying file and signals all the readers +// that the logs producer is gone. func (l *JSONFileLogger) Close() error { l.mu.Lock() l.closed = true err := l.writer.Close() for r := range l.readers { - r.Close() + r.ProducerGone() delete(l.readers, r) } l.mu.Unlock() diff --git a/daemon/logger/jsonfilelog/read_test.go b/daemon/logger/jsonfilelog/read_test.go index cad8003e5efd3..936fe244cc1ef 100644 --- a/daemon/logger/jsonfilelog/read_test.go +++ b/daemon/logger/jsonfilelog/read_test.go @@ -49,11 +49,12 @@ func BenchmarkJSONFileLoggerReadLogs(b *testing.B) { }() lw := jsonlogger.(*JSONFileLogger).ReadLogs(logger.ReadConfig{Follow: true}) - watchClose := lw.WatchClose() for { select { case <-lw.Msg: - case <-watchClose: + case <-lw.WatchProducerGone(): + return + case <-lw.WatchConsumerGone(): return case err := <-chError: if err != nil { diff --git a/daemon/logger/logger.go b/daemon/logger/logger.go index 912e855c7f967..256fb05bb29b9 100644 --- a/daemon/logger/logger.go +++ b/daemon/logger/logger.go @@ -104,33 +104,52 @@ type LogWatcher struct { // For sending log messages to a reader. Msg chan *Message // For sending error messages that occur while while reading logs. - Err chan error - closeOnce sync.Once - closeNotifier chan struct{} + Err chan error + producerOnce sync.Once + producerGone chan struct{} + consumerOnce sync.Once + consumerGone chan struct{} } // NewLogWatcher returns a new LogWatcher. func NewLogWatcher() *LogWatcher { return &LogWatcher{ - Msg: make(chan *Message, logWatcherBufferSize), - Err: make(chan error, 1), - closeNotifier: make(chan struct{}), + Msg: make(chan *Message, logWatcherBufferSize), + Err: make(chan error, 1), + producerGone: make(chan struct{}), + consumerGone: make(chan struct{}), } } -// Close notifies the underlying log reader to stop. -func (w *LogWatcher) Close() { +// ProducerGone notifies the underlying log reader that +// the logs producer (a container) is gone. +func (w *LogWatcher) ProducerGone() { // only close if not already closed - w.closeOnce.Do(func() { - close(w.closeNotifier) + w.producerOnce.Do(func() { + close(w.producerGone) }) } -// WatchClose returns a channel receiver that receives notification -// when the watcher has been closed. This should only be called from -// one goroutine. -func (w *LogWatcher) WatchClose() <-chan struct{} { - return w.closeNotifier +// WatchProducerGone returns a channel receiver that receives notification +// once the logs producer (a container) is gone. This should only be called +// from a single goroutine. +func (w *LogWatcher) WatchProducerGone() <-chan struct{} { + return w.producerGone +} + +// ConsumerGone notifies that the logs consumer is gone. +func (w *LogWatcher) ConsumerGone() { + // only close if not already closed + w.consumerOnce.Do(func() { + close(w.consumerGone) + }) +} + +// WatchConsumerGone returns a channel receiver that receives notification +// when the log watcher consumer is gone. This should only be called from +// a single goroutine. +func (w *LogWatcher) WatchConsumerGone() <-chan struct{} { + return w.consumerGone } // Capability defines the list of capabilities that a driver can implement diff --git a/daemon/logger/loggerutils/logfile.go b/daemon/logger/loggerutils/logfile.go index fe338e53e81b2..659faa90bac13 100644 --- a/daemon/logger/loggerutils/logfile.go +++ b/daemon/logger/loggerutils/logfile.go @@ -482,10 +482,11 @@ func tailFile(f io.ReadSeeker, watcher *logger.LogWatcher, createDecoder makeDec if !config.Until.IsZero() && msg.Timestamp.After(config.Until) { return } + // send the message unless consumer is gone select { - case <-watcher.WatchClose(): - return case watcher.Msg <- msg: + case <-watcher.WatchConsumerGone(): + return } } } @@ -505,11 +506,13 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int fileWatcher.Close() }() + // wait till the logs producer is gone, cancel the context + // to signal there'll be no more log rotation. ctx, cancel := context.WithCancel(context.Background()) defer cancel() go func() { select { - case <-logWatcher.WatchClose(): + case <-logWatcher.WatchProducerGone(): fileWatcher.Remove(name) cancel() case <-ctx.Done(): @@ -573,7 +576,9 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int return errRetry } return err - case <-ctx.Done(): + case <-ctx.Done(): // container producing logs is gone + return errDone + case <-logWatcher.WatchConsumerGone(): return errDone } } @@ -619,23 +624,11 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int if !until.IsZero() && msg.Timestamp.After(until) { return } + // send the message, unless the consumer is gone select { case logWatcher.Msg <- msg: - case <-ctx.Done(): - logWatcher.Msg <- msg - for { - msg, err := decodeLogLine() - if err != nil { - return - } - if !since.IsZero() && msg.Timestamp.Before(since) { - continue - } - if !until.IsZero() && msg.Timestamp.After(until) { - return - } - logWatcher.Msg <- msg - } + case <-logWatcher.WatchConsumerGone(): + return } } } diff --git a/daemon/logs.go b/daemon/logs.go index 5e5c34992eec6..668a75c778306 100644 --- a/daemon/logs.go +++ b/daemon/logs.go @@ -110,8 +110,8 @@ func (daemon *Daemon) ContainerLogs(ctx context.Context, containerName string, c } }() } - // set up some defers - defer logs.Close() + // signal that the log reader is gone + defer logs.ConsumerGone() // close the messages channel. closing is the only way to signal above // that we're doing with logs (other than context cancel i guess).