diff --git a/daemon/logger/alilogs/alilogs.go b/daemon/logger/alilogs/alilogs.go index 0d533a7dac2ca..eec7ddeb82be3 100644 --- a/daemon/logger/alilogs/alilogs.go +++ b/daemon/logger/alilogs/alilogs.go @@ -160,32 +160,32 @@ var newTicker = func(freq time.Duration) *time.Ticker { // the maximum number of total bytes in a batch (defined in // maximumBytesPerPut). func (ls *logStream) collectLogs() { + le := logrus.WithFields(logrus.Fields{ + "endpoint": ls.endpoint, + "project": ls.projectName, + "logstore": ls.logstoreName, + }) + timer := newTicker(batchPublishFrequency) for { select { case <-timer.C: ls.publishLogs() - logrus.WithFields(logrus.Fields{ - "time trigger": "send data", - "endpoint": ls.endpoint, - "project": ls.projectName, - "logstore": ls.logstoreName, - "published log number": len(ls.logGroup.Logs), - "published log size": ls.logGroup.Size(), - }).Debug("publish log when timer timeout") + le.WithFields(logrus.Fields{ + "trigger": "time", + "count": len(ls.logGroup.Logs), + "size": ls.logGroup.Size(), + }).Debug("") ls.logGroup.Reset() ls.logGroup.Topic = proto.String(ls.topic) case msg, more := <-ls.messages: if !more { ls.publishLogs() logrus.WithFields(logrus.Fields{ - "no more log": "send data", - "endpoint": ls.endpoint, - "project": ls.projectName, - "logstore": ls.logstoreName, - "published log number": len(ls.logGroup.Logs), - "published log size": ls.logGroup.Size(), - }).Debug("publish log when no more logs") + "trigger": "EOF", + "count": len(ls.logGroup.Logs), + "size": ls.logGroup.Size(), + }).Debug("") return } unprocessedLine := msg.Line @@ -205,13 +205,10 @@ func (ls *logStream) collectLogs() { // line would push it over the maximum number of total bytes. ls.publishLogs() logrus.WithFields(logrus.Fields{ - "get limit": "send data", - "endpoint": ls.endpoint, - "project": ls.projectName, - "logstore": ls.logstoreName, - "published log number": len(ls.logGroup.Logs), - "published log size": ls.logGroup.Size(), - }).Debug("publish logs when touch the limit") + "trigger": "size", + "count": len(ls.logGroup.Logs), + "size": ls.logGroup.Size(), + }).Debug("") ls.logGroup.Reset() ls.logGroup.Topic = proto.String(ls.topic) } @@ -225,16 +222,18 @@ func (ls *logStream) collectLogs() { func (ls *logStream) publishLogs() { err := ls.client.PutLogs(ls.logGroup) if err != nil { + le := logrus.WithFields(logrus.Fields{ + "endpoint": ls.endpoint, + "project": ls.projectName, + "logstore": ls.logstoreName, + }) if serviceErr, ok := err.(sls.Error); ok { - logrus.WithFields(logrus.Fields{ + le.WithFields(logrus.Fields{ "errorCode": serviceErr.Code, "errorMessage": serviceErr.Message, - "endpoint": ls.endpoint, - "project": ls.projectName, - "logstore": ls.logstoreName, }).Error("PutLogs occurs sls error") } else { - logrus.Error(err) + le.Error("PutLogs occurs err:", err) } } } diff --git a/daemon/logger/alilogs/alilogs_test.go b/daemon/logger/alilogs/alilogs_test.go index 373b217c743df..29c0fc7aaf433 100644 --- a/daemon/logger/alilogs/alilogs_test.go +++ b/daemon/logger/alilogs/alilogs_test.go @@ -1,11 +1,10 @@ package alilogs import ( + "sync" "testing" "time" - "sync" - "github.com/docker/docker/daemon/logger" sls "github.com/galaxydi/go-loghub" "github.com/gogo/protobuf/proto" @@ -14,7 +13,7 @@ import ( func TestCollectLogsNumberLimit(t *testing.T) { extraContents := []*sls.LogContent{} mockClient := NewMockClient() - mockClient.ErrType = 0 + mockClient.ErrType = NoError stream := &logStream{ endpoint: "test-endpoint", @@ -107,7 +106,6 @@ func TestCollectLogsSimple(t *testing.T) { } extraContents := []*sls.LogContent{ec1, ec2} mockClient := NewMockClient() - mockClient.ErrType = 0 stream := &logStream{ endpoint: "test-endpoint", projectName: "test-project", @@ -122,7 +120,15 @@ func TestCollectLogsSimple(t *testing.T) { messages: make(chan *logger.Message, maximumLogsPerPut), } + ticks := make(chan time.Time) + newTicker = func(_ time.Duration) *time.Ticker { + return &time.Ticker{ + C: ticks, + } + } + go stream.collectLogs() + stream.Log(&logger.Message{ Line: []byte("this is test log 1"), Timestamp: time.Time{}, @@ -135,8 +141,13 @@ func TestCollectLogsSimple(t *testing.T) { Line: []byte("this is test log 3"), Timestamp: time.Time{}, }) - time.Sleep(batchPublishFrequency) + + ticks <- time.Time{} stream.Close() + + // Wait a moment for the logs were writted into mockClient + time.Sleep(1 * time.Second) + if len(mockClient.Logs) != 3 { t.Errorf("should be 3 number logs, actual log numbers: %v", len(mockClient.Logs)) } @@ -178,13 +189,13 @@ func TestPublishLogs(t *testing.T) { Contents: contents, } stream.logGroup.Logs = append(stream.logGroup.Logs, &logRecord) - mockClient.ErrType = 0 + mockClient.ErrType = NoError stream.publishLogs() - mockClient.ErrType = 1 + mockClient.ErrType = InternalServerError stream.publishLogs() - mockClient.ErrType = 2 + mockClient.ErrType = UnknownError stream.publishLogs() } diff --git a/daemon/logger/alilogs/mock_client_test.go b/daemon/logger/alilogs/mock_client_test.go index 83a76050f9bad..e80bc7036da6c 100644 --- a/daemon/logger/alilogs/mock_client_test.go +++ b/daemon/logger/alilogs/mock_client_test.go @@ -6,16 +6,24 @@ import ( sls "github.com/galaxydi/go-loghub" ) +type errorType int + +const ( + NoError errorType = iota + InternalServerError + UnknownError +) + // MockClient is an autogenerated mock type for the AliLogAPI type type MockClient struct { - ErrType int + ErrType errorType Topic string Logs []*sls.Log } func NewMockClient() *MockClient { client := &MockClient{ - ErrType: 0, + ErrType: NoError, Topic: "", Logs: []*sls.Log{}, } @@ -25,21 +33,19 @@ func NewMockClient() *MockClient { // PutLogs provides a mock function with given fields: _a0 func (client *MockClient) PutLogs(lg *sls.LogGroup) error { switch client.ErrType { - case 0: - // no error + case NoError: client.Topic = lg.GetTopic() - for _, v := range lg.GetLogs() { - client.Logs = append(client.Logs, v) - } + client.Logs = append(client.Logs, lg.GetLogs()...) return nil - case 1: + case InternalServerError: slsErr := &sls.Error{ Code: "InternalServerError", Message: "loghub service is not avaliable", } return slsErr + case UnknownError: + return fmt.Errorf("Unknown error") default: - err := fmt.Errorf("unknown error") - return err + return fmt.Errorf("Unknown error") } }