Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DO NOT MERGE] daemon.ContainerLogs(): fix --follow #37656

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion daemon/attach.go
Expand Up @@ -123,7 +123,7 @@ func (daemon *Daemon) containerAttach(c *container.Container, cfg *stream.Attach
return logger.ErrReadLogsNotSupported{}
}
logs := cLog.ReadLogs(logger.ReadConfig{Tail: -1})
defer logs.Close()
defer logs.ConsumerGone()

LogLoop:
for {
Expand Down
8 changes: 3 additions & 5 deletions daemon/logger/adapter.go
Expand Up @@ -94,7 +94,7 @@ func (a *pluginAdapterWithRead) ReadLogs(config ReadConfig) *LogWatcher {
dec := logdriver.NewLogEntryDecoder(stream)
for {
select {
case <-watcher.WatchClose():
case <-watcher.WatchConsumerGone():
return
default:
}
Expand All @@ -106,7 +106,7 @@ func (a *pluginAdapterWithRead) ReadLogs(config ReadConfig) *LogWatcher {
}
select {
case watcher.Err <- errors.Wrap(err, "error decoding log message"):
case <-watcher.WatchClose():
case <-watcher.WatchConsumerGone():
}
return
}
Expand All @@ -127,9 +127,7 @@ func (a *pluginAdapterWithRead) ReadLogs(config ReadConfig) *LogWatcher {

select {
case watcher.Msg <- msg:
case <-watcher.WatchClose():
// make sure the message we consumed is sent
watcher.Msg <- msg
case <-watcher.WatchConsumerGone():
return
}
}
Expand Down
1 change: 0 additions & 1 deletion daemon/logger/adapter_test.go
Expand Up @@ -174,7 +174,6 @@ func TestAdapterReadLogs(t *testing.T) {
t.Fatal("timeout waiting for message channel to close")

}
lw.Close()

lw = lr.ReadLogs(ReadConfig{Follow: true})
for _, x := range testMsg {
Expand Down
13 changes: 4 additions & 9 deletions daemon/logger/journald/journald.go
Expand Up @@ -18,14 +18,9 @@ import (
const name = "journald"

type journald struct {
mu sync.Mutex
vars map[string]string // additional variables and values to send to the journal along with the log message
readers readerList
closed bool
}

type readerList struct {
readers map[*logger.LogWatcher]*logger.LogWatcher
mu sync.Mutex
vars map[string]string // additional variables and values to send to the journal along with the log message
closed bool
}

func init() {
Expand Down Expand Up @@ -84,7 +79,7 @@ func New(info logger.Info) (logger.Logger, error) {
for k, v := range extraAttrs {
vars[k] = v
}
return &journald{vars: vars, readers: readerList{readers: make(map[*logger.LogWatcher]*logger.LogWatcher)}}, nil
return &journald{vars: vars}, nil
}

// We don't actually accept any options, but we have to supply a callback for
Expand Down
9 changes: 1 addition & 8 deletions daemon/logger/journald/read.go
Expand Up @@ -164,9 +164,6 @@ import (
func (s *journald) Close() error {
s.mu.Lock()
s.closed = true
for reader := range s.readers.readers {
reader.Close()
}
s.mu.Unlock()
return nil
}
Expand Down Expand Up @@ -253,7 +250,6 @@ drain:

func (s *journald) followJournal(logWatcher *logger.LogWatcher, j *C.sd_journal, pfd [2]C.int, cursor *C.char, untilUnixMicro uint64) *C.char {
s.mu.Lock()
s.readers.readers[logWatcher] = logWatcher
if s.closed {
// the journald Logger is closed, presumably because the container has been
// reset. So we shouldn't follow, because we'll never be woken up. But we
Expand Down Expand Up @@ -289,17 +285,14 @@ func (s *journald) followJournal(logWatcher *logger.LogWatcher, j *C.sd_journal,

// Clean up.
C.close(pfd[0])
s.mu.Lock()
delete(s.readers.readers, logWatcher)
s.mu.Unlock()
close(logWatcher.Msg)
newCursor <- cursor
}()

// Wait until we're told to stop.
select {
case cursor = <-newCursor:
case <-logWatcher.WatchClose():
case <-logWatcher.WatchConsumerGone():
// Notify the other goroutine that its work is done.
C.close(pfd[1])
cursor = <-newCursor
Expand Down
20 changes: 7 additions & 13 deletions daemon/logger/jsonfilelog/jsonfilelog.go
Expand Up @@ -23,11 +23,10 @@ const Name = "json-file"

// JSONFileLogger is Logger implementation for default Docker logging.
type JSONFileLogger struct {
mu sync.Mutex
closed bool
writer *loggerutils.LogFile
readers map[*logger.LogWatcher]struct{} // stores the active log followers
tag string // tag values requested by the user to log
mu sync.Mutex
closed bool
writer *loggerutils.LogFile
tag string // tag values requested by the user to log
}

func init() {
Expand Down Expand Up @@ -116,9 +115,8 @@ func New(info logger.Info) (logger.Logger, error) {
}

return &JSONFileLogger{
writer: writer,
readers: make(map[*logger.LogWatcher]struct{}),
tag: tag,
writer: writer,
tag: tag,
}, nil
}

Expand Down Expand Up @@ -166,15 +164,11 @@ func ValidateLogOpt(cfg map[string]string) error {
return nil
}

// Close closes underlying file and signals all readers to stop.
// Close closes the underlying log file.
func (l *JSONFileLogger) Close() error {
l.mu.Lock()
l.closed = true
err := l.writer.Close()
for r := range l.readers {
r.Close()
delete(l.readers, r)
}
l.mu.Unlock()
return err
}
Expand Down
8 changes: 0 additions & 8 deletions daemon/logger/jsonfilelog/read.go
Expand Up @@ -27,15 +27,7 @@ func (l *JSONFileLogger) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {
func (l *JSONFileLogger) readLogs(watcher *logger.LogWatcher, config logger.ReadConfig) {
defer close(watcher.Msg)

l.mu.Lock()
l.readers[watcher] = struct{}{}
l.mu.Unlock()

l.writer.ReadLogs(config, watcher)

l.mu.Lock()
delete(l.readers, watcher)
l.mu.Unlock()
}

func decodeLogLine(dec *json.Decoder, l *jsonlog.JSONLog) (*logger.Message, error) {
Expand Down
3 changes: 1 addition & 2 deletions daemon/logger/jsonfilelog/read_test.go
Expand Up @@ -50,11 +50,10 @@ func BenchmarkJSONFileLoggerReadLogs(b *testing.B) {
}()

lw := jsonlogger.(*JSONFileLogger).ReadLogs(logger.ReadConfig{Follow: true})
watchClose := lw.WatchClose()
for {
select {
case <-lw.Msg:
case <-watchClose:
case <-lw.WatchConsumerGone():
return
case err := <-chError:
if err != nil {
Expand Down
6 changes: 0 additions & 6 deletions daemon/logger/local/local.go
Expand Up @@ -60,7 +60,6 @@ type driver struct {
mu sync.Mutex
closed bool
logfile *loggerutils.LogFile
readers map[*logger.LogWatcher]struct{} // stores the active log followers
}

// New creates a new local logger
Expand Down Expand Up @@ -146,7 +145,6 @@ func newDriver(logPath string, cfg *CreateConfig) (logger.Logger, error) {
}
return &driver{
logfile: lf,
readers: make(map[*logger.LogWatcher]struct{}),
}, nil
}

Expand All @@ -165,10 +163,6 @@ func (d *driver) Close() error {
d.mu.Lock()
d.closed = true
err := d.logfile.Close()
for r := range d.readers {
r.Close()
delete(d.readers, r)
}
d.mu.Unlock()
return err
}
Expand Down
9 changes: 0 additions & 9 deletions daemon/logger/local/read.go
Expand Up @@ -23,16 +23,7 @@ func (d *driver) ReadLogs(config logger.ReadConfig) *logger.LogWatcher {

func (d *driver) readLogs(watcher *logger.LogWatcher, config logger.ReadConfig) {
defer close(watcher.Msg)

d.mu.Lock()
d.readers[watcher] = struct{}{}
d.mu.Unlock()

d.logfile.ReadLogs(config, watcher)

d.mu.Lock()
delete(d.readers, watcher)
d.mu.Unlock()
}

func getTailReader(ctx context.Context, r loggerutils.SizeReaderAt, req int) (io.Reader, int, error) {
Expand Down
30 changes: 15 additions & 15 deletions daemon/logger/logger.go
Expand Up @@ -104,33 +104,33 @@ type LogWatcher struct {
// For sending log messages to a reader.
Msg chan *Message
// For sending error messages that occur while while reading logs.
Err chan error
closeOnce sync.Once
closeNotifier chan struct{}
Err chan error
consumerOnce sync.Once
consumerGone chan struct{}
}

// NewLogWatcher returns a new LogWatcher.
func NewLogWatcher() *LogWatcher {
return &LogWatcher{
Msg: make(chan *Message, logWatcherBufferSize),
Err: make(chan error, 1),
closeNotifier: make(chan struct{}),
Msg: make(chan *Message, logWatcherBufferSize),
Err: make(chan error, 1),
consumerGone: make(chan struct{}),
}
}

// Close notifies the underlying log reader to stop.
func (w *LogWatcher) Close() {
// ConsumerGone notifies that the logs consumer is gone.
func (w *LogWatcher) ConsumerGone() {
// only close if not already closed
w.closeOnce.Do(func() {
close(w.closeNotifier)
w.consumerOnce.Do(func() {
close(w.consumerGone)
})
}

// WatchClose returns a channel receiver that receives notification
// when the watcher has been closed. This should only be called from
// one goroutine.
func (w *LogWatcher) WatchClose() <-chan struct{} {
return w.closeNotifier
// WatchConsumerGone returns a channel receiver that receives notification
// when the log watcher consumer is gone. This should only be called from
// a single goroutine.
func (w *LogWatcher) WatchConsumerGone() <-chan struct{} {
return w.consumerGone
}

// Capability defines the list of capabilities that a driver can implement
Expand Down
56 changes: 15 additions & 41 deletions daemon/logger/loggerutils/logfile.go
Expand Up @@ -365,11 +365,10 @@ func (w *LogFile) ReadLogs(config logger.ReadConfig, watcher *logger.LogWatcher)
w.mu.RLock()
}

if !config.Follow || w.closed {
w.mu.RUnlock()
w.mu.RUnlock()
if !config.Follow {
return
}
w.mu.RUnlock()

notifyRotate := w.notifyRotate.Subscribe()
defer w.notifyRotate.Evict(notifyRotate)
Expand Down Expand Up @@ -488,7 +487,7 @@ func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, createDecoder m
go func() {
select {
case <-ctx.Done():
case <-watcher.WatchClose():
case <-watcher.WatchConsumerGone():
cancel()
}
}()
Expand Down Expand Up @@ -527,10 +526,11 @@ func tailFiles(files []SizeReaderAt, watcher *logger.LogWatcher, createDecoder m
if !config.Until.IsZero() && msg.Timestamp.After(config.Until) {
return
}
// send the message unless consumer is gone
select {
case <-ctx.Done():
return
case watcher.Msg <- msg:
case <-watcher.WatchConsumerGone():
return
}
}
}
Expand All @@ -550,18 +550,6 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int
fileWatcher.Close()
}()

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
select {
case <-logWatcher.WatchClose():
fileWatcher.Remove(name)
cancel()
case <-ctx.Done():
return
}
}()

var retries int
handleRotate := func() error {
f.Close()
Expand Down Expand Up @@ -594,15 +582,13 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int
decodeLogLine = createDecoder(f)
return nil
case fsnotify.Rename, fsnotify.Remove:
select {
case <-notifyRotate:
case <-ctx.Done():
return handleRotate()
case fsnotify.Chmod:
_, statErr := os.Lstat(e.Name)
if os.IsNotExist(statErr) {
// container and its log file removed
return errDone
}
if err := handleRotate(); err != nil {
return err
}
return nil
}
return errRetry
case err := <-fileWatcher.Errors():
Expand All @@ -618,7 +604,7 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int
return errRetry
}
return err
case <-ctx.Done():
case <-logWatcher.WatchConsumerGone():
return errDone
}
}
Expand Down Expand Up @@ -664,23 +650,11 @@ func followLogs(f *os.File, logWatcher *logger.LogWatcher, notifyRotate chan int
if !until.IsZero() && msg.Timestamp.After(until) {
return
}
// send the message, unless the consumer is gone
select {
case logWatcher.Msg <- msg:
case <-ctx.Done():
logWatcher.Msg <- msg
for {
msg, err := decodeLogLine()
if err != nil {
return
}
if !since.IsZero() && msg.Timestamp.Before(since) {
continue
}
if !until.IsZero() && msg.Timestamp.After(until) {
return
}
logWatcher.Msg <- msg
}
case <-logWatcher.WatchConsumerGone():
return
}
}
}
Expand Down