From 4304665782bc205d0e7e6c101c5ac4265c7f7a65 Mon Sep 17 00:00:00 2001 From: "yinzhuo.dingyz" Date: Wed, 4 Jan 2017 11:26:44 +0800 Subject: [PATCH 1/2] add ut --- daemon/logger/alilogs/alilogs.go | 185 +++++++------ daemon/logger/alilogs/alilogs_test.go | 316 ++++++++++++++++++++++ daemon/logger/alilogs/logapi.go | 61 +++-- daemon/logger/alilogs/mock_client_test.go | 45 +++ 4 files changed, 501 insertions(+), 106 deletions(-) create mode 100644 daemon/logger/alilogs/alilogs_test.go create mode 100644 daemon/logger/alilogs/mock_client_test.go diff --git a/daemon/logger/alilogs/alilogs.go b/daemon/logger/alilogs/alilogs.go index 385b70f193ab0..0d533a7dac2ca 100644 --- a/daemon/logger/alilogs/alilogs.go +++ b/daemon/logger/alilogs/alilogs.go @@ -57,14 +57,26 @@ const ( ) type logStream struct { + endpoint string + projectName string + logstoreName string topic string extraLogContents []*sls.LogContent client AliLogAPI + logGroup *sls.LogGroup messages chan *logger.Message lock sync.RWMutex closed bool } +type contextParams struct { + accessKeyID string + accessKeySecret string + securityToken string + topicName string + extraContents []*sls.LogContent +} + // init registers the alilogs driver func init() { if err := logger.RegisterLogDriver(name, New); err != nil { @@ -80,57 +92,27 @@ func New(ctx logger.Context) (logger.Logger, error) { endpoint := ctx.Config[endpointKey] projectName := ctx.Config[projectKey] logstoreName := ctx.Config[logstoreKey] - extraContents := []*sls.LogContent{} - accessKeyID := "" - accessKeySecret := "" - securityToken := "" - topicName := "" - - extra := ctx.ExtraAttributes(nil) - value, ok := extra[accessKeyIDEnvKey] - if ok { - accessKeyID = value - delete(extra, accessKeyIDEnvKey) - } else { - return nil, fmt.Errorf("must specify a value for env '%s'", accessKeyIDEnvKey) - } - - value, ok = extra[accessKeySecretEnvKey] - if ok { - accessKeySecret = value - delete(extra, accessKeySecretEnvKey) - } else { - return nil, fmt.Errorf("must specify a value for env '%s'", accessKeySecretEnvKey) - } - if value, ok = extra[securityTokenEnvKey]; ok { - securityToken = value - delete(extra, securityTokenEnvKey) - } - - if value, ok = extra[topicEnvKey]; ok { - topicName = value - delete(extra, topicEnvKey) - } - - // add extra contents to log record - for key, value := range extra { - logContent := &sls.LogContent{ - Key: proto.String(key), - Value: proto.String(value), - } - extraContents = append(extraContents, logContent) + contextInput, err := parseContext(&ctx) + if err != nil { + return nil, err } - - aliLogClient, err := NewAliLogClient(endpoint, projectName, logstoreName, accessKeyID, accessKeySecret, securityToken) + aliLogClient, err := NewAliLogClient(endpoint, projectName, logstoreName, contextInput.accessKeyID, contextInput.accessKeySecret, contextInput.securityToken) if err != nil { return nil, err } containerStream := &logStream{ - topic: topicName, - extraLogContents: extraContents, + endpoint: endpoint, + projectName: projectName, + logstoreName: logstoreName, + topic: contextInput.topicName, + extraLogContents: contextInput.extraContents, client: aliLogClient, - messages: make(chan *logger.Message, maximumLogsPerPut), + logGroup: &sls.LogGroup{ + Topic: proto.String(contextInput.topicName), + Logs: []*sls.Log{}, + }, + messages: make(chan *logger.Message, maximumLogsPerPut), } go containerStream.collectLogs() @@ -170,7 +152,7 @@ var newTicker = func(freq time.Duration) *time.Ticker { return time.NewTicker(freq) } -// PutLogs executes as a goroutine to perform put logs for +// collectLogs executes as a goroutine to perform put logs for // submission to the logstore. Batching is performed on time- and size- // bases. Time-based batching occurs at a 5 second interval (defined in the // batchPublishFrequency const). Size-based batching is performed on the @@ -178,34 +160,31 @@ var newTicker = func(freq time.Duration) *time.Ticker { // the maximum number of total bytes in a batch (defined in // maximumBytesPerPut). func (ls *logStream) collectLogs() { - aliLogClient := ls.client.(*AliLogClient) - logGroup := sls.LogGroup{ - Topic: proto.String(ls.topic), - Logs: []*sls.Log{}, - } timer := newTicker(batchPublishFrequency) for { select { case <-timer.C: - ls.publishLogs(&logGroup) + ls.publishLogs() logrus.WithFields(logrus.Fields{ - "endpoint": aliLogClient.Endpoint, - "project": aliLogClient.ProjectName, - "logstore": aliLogClient.LogstoreName, - "published log number": len(logGroup.Logs), - "published log size": logGroup.Size(), + "time trigger": "send data", + "endpoint": ls.endpoint, + "project": ls.projectName, + "logstore": ls.logstoreName, + "published log number": len(ls.logGroup.Logs), + "published log size": ls.logGroup.Size(), }).Debug("publish log when timer timeout") - logGroup.Reset() - logGroup.Topic = proto.String(ls.topic) + ls.logGroup.Reset() + ls.logGroup.Topic = proto.String(ls.topic) case msg, more := <-ls.messages: if !more { - ls.publishLogs(&logGroup) + ls.publishLogs() logrus.WithFields(logrus.Fields{ - "endpoint": aliLogClient.Endpoint, - "project": aliLogClient.ProjectName, - "logstore": aliLogClient.LogstoreName, - "published log number": len(logGroup.Logs), - "published log size": logGroup.Size(), + "no more log": "send data", + "endpoint": ls.endpoint, + "project": ls.projectName, + "logstore": ls.logstoreName, + "published log number": len(ls.logGroup.Logs), + "published log size": ls.logGroup.Size(), }).Debug("publish log when no more logs") return } @@ -221,38 +200,38 @@ func (ls *logStream) collectLogs() { Contents: contents, } if len(unprocessedLine) > 0 { - if (len(logGroup.Logs) >= maximumLogsPerPut) || (logGroup.Size()+logRecord.Size() > maximumBytesPerPut) { + if (len(ls.logGroup.Logs) >= maximumLogsPerPut) || (ls.logGroup.Size()+logRecord.Size() > maximumBytesPerPut) { // Publish an existing batch if it's already over the maximum number of logs or if adding this // line would push it over the maximum number of total bytes. - ls.publishLogs(&logGroup) + ls.publishLogs() logrus.WithFields(logrus.Fields{ - "endpoint": aliLogClient.Endpoint, - "project": aliLogClient.ProjectName, - "logstore": aliLogClient.LogstoreName, - "published log number": len(logGroup.Logs), - "published log size": logGroup.Size(), + "get limit": "send data", + "endpoint": ls.endpoint, + "project": ls.projectName, + "logstore": ls.logstoreName, + "published log number": len(ls.logGroup.Logs), + "published log size": ls.logGroup.Size(), }).Debug("publish logs when touch the limit") - logGroup.Reset() - logGroup.Topic = proto.String(ls.topic) + ls.logGroup.Reset() + ls.logGroup.Topic = proto.String(ls.topic) } - logGroup.Logs = append(logGroup.Logs, &logRecord) + ls.logGroup.Logs = append(ls.logGroup.Logs, &logRecord) } } } } // publishLogs calls PutLogs for a given LogGroup -func (ls *logStream) publishLogs(lg *sls.LogGroup) { - err := ls.client.PutLogs(lg) +func (ls *logStream) publishLogs() { + err := ls.client.PutLogs(ls.logGroup) if err != nil { if serviceErr, ok := err.(sls.Error); ok { - aliLogClient := ls.client.(*AliLogClient) logrus.WithFields(logrus.Fields{ "errorCode": serviceErr.Code, "errorMessage": serviceErr.Message, - "endpoint": aliLogClient.Endpoint, - "project": aliLogClient.ProjectName, - "logstore": aliLogClient.LogstoreName, + "endpoint": ls.endpoint, + "project": ls.projectName, + "logstore": ls.logstoreName, }).Error("PutLogs occurs sls error") } else { logrus.Error(err) @@ -280,3 +259,49 @@ func ValidateLogOpt(cfg map[string]string) error { } return nil } + +func parseContext(ctx *logger.Context) (*contextParams, error) { + input := &contextParams{ + accessKeyID: "", + accessKeySecret: "", + securityToken: "", + topicName: "", + extraContents: []*sls.LogContent{}, + } + extra := ctx.ExtraAttributes(nil) + value, ok := extra[accessKeyIDEnvKey] + if ok { + input.accessKeyID = value + delete(extra, accessKeyIDEnvKey) + } else { + return nil, fmt.Errorf("must specify a value for env '%s'", accessKeyIDEnvKey) + } + + value, ok = extra[accessKeySecretEnvKey] + if ok { + input.accessKeySecret = value + delete(extra, accessKeySecretEnvKey) + } else { + return nil, fmt.Errorf("must specify a value for env '%s'", accessKeySecretEnvKey) + } + + if value, ok = extra[securityTokenEnvKey]; ok { + input.securityToken = value + delete(extra, securityTokenEnvKey) + } + + if value, ok = extra[topicEnvKey]; ok { + input.topicName = value + delete(extra, topicEnvKey) + } + + // add extra contents to log record + for key, value := range extra { + logContent := &sls.LogContent{ + Key: proto.String(key), + Value: proto.String(value), + } + input.extraContents = append(input.extraContents, logContent) + } + return input, nil +} diff --git a/daemon/logger/alilogs/alilogs_test.go b/daemon/logger/alilogs/alilogs_test.go new file mode 100644 index 0000000000000..373b217c743df --- /dev/null +++ b/daemon/logger/alilogs/alilogs_test.go @@ -0,0 +1,316 @@ +package alilogs + +import ( + "testing" + "time" + + "sync" + + "github.com/docker/docker/daemon/logger" + sls "github.com/galaxydi/go-loghub" + "github.com/gogo/protobuf/proto" +) + +func TestCollectLogsNumberLimit(t *testing.T) { + extraContents := []*sls.LogContent{} + mockClient := NewMockClient() + mockClient.ErrType = 0 + + stream := &logStream{ + endpoint: "test-endpoint", + projectName: "test-project", + logstoreName: "test-logstore", + topic: "demo_topic", + extraLogContents: extraContents, + client: mockClient, + logGroup: &sls.LogGroup{ + Topic: proto.String("demo_topic"), + Logs: []*sls.Log{}, + }, + messages: make(chan *logger.Message, maximumLogsPerPut), + } + + go stream.collectLogs() + + var wg sync.WaitGroup + wg.Add(maximumLogsPerPut + 102) + + for i := 0; i < maximumLogsPerPut+102; i++ { + go worker(stream, &wg) + } + wg.Wait() + time.Sleep(batchPublishFrequency) + stream.Close() + if mockClient.Topic != "demo_topic" { + t.Errorf("check topic fail, expect:%s, actual:%s", stream.topic, mockClient.Topic) + } + if len(mockClient.Logs) != maximumLogsPerPut+102 { + t.Errorf("check log number fail, expect:%v, actual:%v", maximumLogsPerPut+102, len(mockClient.Logs)) + } + +} + +func worker(stream *logStream, wg *sync.WaitGroup) { + stream.Log(&logger.Message{ + Line: []byte("test log"), + Timestamp: time.Time{}, + }) + wg.Done() +} + +func TestValidateOpt(t *testing.T) { + // endpointKey, projectKey, logstoreKey, labelsKey, envKey + opt := map[string]string{} + opt[endpointKey] = "" + err := ValidateLogOpt(opt) + if err == nil { + t.Errorf("check log opt fail: %v", opt) + } + + opt[endpointKey] = "test-endpoint" + opt[projectKey] = "" + err = ValidateLogOpt(opt) + if err == nil { + t.Errorf("check log opt fail: %v", opt) + } + + opt[projectKey] = "test-project" + opt[logstoreKey] = "" + err = ValidateLogOpt(opt) + if err == nil { + t.Errorf("check log opt fail: %v", opt) + } + + opt[logstoreKey] = "test-logstore" + opt[labelsKey] = "attr1,attr2" + opt[envKey] = "e1=v1,e2=v2" + err = ValidateLogOpt(opt) + if err != nil { + t.Errorf("check log opt fail: %v", opt) + } + + opt["error-key"] = "unsupported" + err = ValidateLogOpt(opt) + if err == nil { + t.Errorf("check log opt fail: %v", opt) + } +} + +func TestCollectLogsSimple(t *testing.T) { + ec1 := &sls.LogContent{ + Key: proto.String("ex1"), + Value: proto.String("ex1 value"), + } + ec2 := &sls.LogContent{ + Key: proto.String("ex2"), + Value: proto.String("ex2 value"), + } + extraContents := []*sls.LogContent{ec1, ec2} + mockClient := NewMockClient() + mockClient.ErrType = 0 + stream := &logStream{ + endpoint: "test-endpoint", + projectName: "test-project", + logstoreName: "test-logstore", + topic: "demo_topic", + extraLogContents: extraContents, + client: mockClient, + logGroup: &sls.LogGroup{ + Topic: proto.String("demo_topic"), + Logs: []*sls.Log{}, + }, + messages: make(chan *logger.Message, maximumLogsPerPut), + } + + go stream.collectLogs() + stream.Log(&logger.Message{ + Line: []byte("this is test log 1"), + Timestamp: time.Time{}, + }) + stream.Log(&logger.Message{ + Line: []byte("this is test log 2"), + Timestamp: time.Time{}, + }) + stream.Log(&logger.Message{ + Line: []byte("this is test log 3"), + Timestamp: time.Time{}, + }) + time.Sleep(batchPublishFrequency) + stream.Close() + if len(mockClient.Logs) != 3 { + t.Errorf("should be 3 number logs, actual log numbers: %v", len(mockClient.Logs)) + } +} + +func TestPublishLogs(t *testing.T) { + ec1 := &sls.LogContent{ + Key: proto.String("ex1"), + Value: proto.String("ex1 value"), + } + ec2 := &sls.LogContent{ + Key: proto.String("ex2"), + Value: proto.String("ex2 value"), + } + extraContents := []*sls.LogContent{ec1, ec2} + mockClient := NewMockClient() + stream := &logStream{ + endpoint: "test-endpoint", + projectName: "test-project", + logstoreName: "test-logstore", + topic: "demo_topic", + extraLogContents: extraContents, + client: mockClient, + logGroup: &sls.LogGroup{ + Topic: proto.String("demo_topic"), + Logs: []*sls.Log{}, + }, + messages: make(chan *logger.Message, maximumLogsPerPut), + } + + logMsg := &sls.LogContent{ + Key: proto.String("message"), + Value: proto.String(string("this is a log")), + } + contents := stream.extraLogContents + contents = append(contents, logMsg) + logRecord := sls.Log{ + Time: proto.Uint32(uint32(time.Now().Unix())), + Contents: contents, + } + stream.logGroup.Logs = append(stream.logGroup.Logs, &logRecord) + mockClient.ErrType = 0 + stream.publishLogs() + + mockClient.ErrType = 1 + stream.publishLogs() + + mockClient.ErrType = 2 + stream.publishLogs() +} + +func TestNewContainerStream(t *testing.T) { + extraContents := []*sls.LogContent{} + containerStream := &logStream{ + topic: "demo_topic", + extraLogContents: extraContents, + client: NewMockClient(), + messages: make(chan *logger.Message, maximumLogsPerPut), + } + if containerStream == nil { + t.Errorf("failed to new containerStream\n") + } + if containerStream.Name() != "alilogs" { + t.Errorf("error logger name: %s", containerStream.Name()) + } + + containerStream.Log(&logger.Message{ + Line: []byte("this is one log"), + Timestamp: time.Time{}, + }) + msg := containerStream.messages + if msg == nil { + t.Errorf("stream should has one log") + } + err := containerStream.Close() + if err != nil { + t.Errorf("stream should be close successful, err: %v", err) + } + if containerStream.closed != true { + t.Errorf("stream should be closed, close flag: %v", containerStream.closed) + } +} + +func TestParseContext(t *testing.T) { + envSlice := []string{"accessKeyID=mock_id", "accessKeySecret=mock_key", "securityToken=mock_token", "topic=mock_topic"} + labelMap := map[string]string{} + labelMap["a1"] = "v1" + labelMap["a2"] = "v2" + labelMap["a3"] = "v3" + + ctx := logger.Context{ + Config: map[string]string{ + endpointKey: "log.cn-hangzhou.aliyuncs.com", + projectKey: "test-project", + logstoreKey: "test-logstore", + "env": "accessKeyID,accessKeySecret,securityToken,topic", + "labels": "a1,a2,a3", + }, + ContainerEnv: envSlice, + ContainerLabels: labelMap, + } + input, err := parseContext(&ctx) + if err != nil { + t.Errorf("failed to parse context") + } + if input.accessKeyID != "mock_id" { + t.Errorf("parse accessKeyID fail:%s", input.accessKeyID) + } + if input.accessKeySecret != "mock_key" { + t.Errorf("parse accessKeySecret fail:%s", input.accessKeySecret) + } + if input.topicName != "mock_topic" { + t.Errorf("parse topic fail:%s", input.topicName) + } + if len(input.extraContents) != 3 { + t.Errorf("parse extraContents fail:%v", input.extraContents) + } +} + +func TestParseContextError(t *testing.T) { + envSlice := []string{"accessKeySecret=mock_key", "securityToken=mock_token", "topic=mock_topic"} + labelMap := map[string]string{} + labelMap["a1"] = "v1" + + ctx := logger.Context{ + Config: map[string]string{ + endpointKey: "log.cn-hangzhou.aliyuncs.com", + projectKey: "test-project", + logstoreKey: "test-logstore", + "env": "accessKeyID,accessKeySecret,securityToken,topic", + "labels": "a1,a2,a3", + }, + ContainerEnv: envSlice, + ContainerLabels: labelMap, + } + _, err := parseContext(&ctx) + if err == nil { + t.Errorf("invalid accessKeyID") + } + + envSlice = []string{"accessKeyID=mock_id", "securityToken=mock_token", "topic=mock_topic"} + ctx = logger.Context{ + Config: map[string]string{ + endpointKey: "log.cn-hangzhou.aliyuncs.com", + projectKey: "test-project", + logstoreKey: "test-logstore", + "env": "accessKeyID,accessKeySecret,securityToken,topic", + "labels": "a1,a2,a3", + }, + ContainerEnv: envSlice, + ContainerLabels: labelMap, + } + _, err = parseContext(&ctx) + if err == nil { + t.Errorf("invalid accessKeySecret") + } + + envSlice = []string{"accessKeyID=mock_id", "accessKeySecret=mock_key"} + ctx = logger.Context{ + Config: map[string]string{ + endpointKey: "log.cn-hangzhou.aliyuncs.com", + projectKey: "test-project", + logstoreKey: "test-logstore", + "env": "accessKeyID,accessKeySecret,securityToken,topic", + "labels": "a1,a2,a3", + }, + ContainerEnv: envSlice, + ContainerLabels: labelMap, + } + input, _ := parseContext(&ctx) + if input.securityToken != "" { + t.Errorf("token should be empty") + } + if input.topicName != "" { + t.Errorf("topic should be empty") + } +} diff --git a/daemon/logger/alilogs/logapi.go b/daemon/logger/alilogs/logapi.go index 67bd2d2b8736f..78c9a2cf4934d 100644 --- a/daemon/logger/alilogs/logapi.go +++ b/daemon/logger/alilogs/logapi.go @@ -16,10 +16,25 @@ type AliLogAPI interface { // AliLogClient implements AliLogAPI interface type AliLogClient struct { - Endpoint string - ProjectName string - LogstoreName string - logStore *sls.LogStore + logStore *sls.LogStore +} + +// NewAliLogClient ... +func NewAliLogClient(endpoint, projectName, logstoreName, accessKeyID, accessKeySecret, securityToken string) (AliLogAPI, error) { + client := AliLogClient{} + logStore, err := client.getLogStore(endpoint, projectName, logstoreName, accessKeyID, accessKeySecret, securityToken) + if err != nil { + return nil, err + } + client.logStore = logStore + + logrus.WithFields(logrus.Fields{ + "endpoint": endpoint, + "projectName": projectName, + "logstoreName": logstoreName, + }).Info("Created alilogs client") + + return &client, nil } // PutLogs implements ali PutLogs method @@ -27,13 +42,22 @@ func (client *AliLogClient) PutLogs(logGroup *sls.LogGroup) error { return client.logStore.PutLogs(logGroup) } -// NewAliLogClient ... -func NewAliLogClient(endpoint, projectName, logstoreName, accessKeyID, accessKeySecret, securityToken string) (AliLogAPI, error) { - client := AliLogClient{} - client.Endpoint = endpoint - client.ProjectName = projectName - client.LogstoreName = logstoreName +func (client *AliLogClient) getLogStore(endpoint, projectName, logstoreName, accessKeyID, accessKeySecret, securityToken string) (*sls.LogStore, error) { + logProject, err := client.getLogProject(projectName, endpoint, accessKeyID, accessKeySecret, securityToken) + if err != nil { + return nil, err + } + logStore, err := logProject.GetLogStore(logstoreName) + if err != nil { + logrus.WithFields(logrus.Fields{ + "error": err, + }).Error("Could not get ali logstore") + return nil, errors.New("Could not get ali logstore") + } + return logStore, nil +} +func (client *AliLogClient) getLogProject(projectName, endpoint, accessKeyID, accessKeySecret, securityToken string) (*sls.LogProject, error) { logProject, err := sls.NewLogProject(projectName, endpoint, accessKeyID, accessKeySecret) if err != nil { logrus.WithFields(logrus.Fields{ @@ -44,20 +68,5 @@ func NewAliLogClient(endpoint, projectName, logstoreName, accessKeyID, accessKey if securityToken != "" { logProject.WithToken(securityToken) } - - client.logStore, err = logProject.GetLogStore(logstoreName) - if err != nil { - logrus.WithFields(logrus.Fields{ - "error": err, - }).Error("Could not get ali logstore") - return nil, errors.New("Could not get ali logstore") - } - - logrus.WithFields(logrus.Fields{ - "endpoint": endpoint, - "projectName": projectName, - "logstoreName": logstoreName, - }).Info("Created alilogs client") - - return &client, nil + return logProject, nil } diff --git a/daemon/logger/alilogs/mock_client_test.go b/daemon/logger/alilogs/mock_client_test.go new file mode 100644 index 0000000000000..83a76050f9bad --- /dev/null +++ b/daemon/logger/alilogs/mock_client_test.go @@ -0,0 +1,45 @@ +package alilogs + +import ( + "fmt" + + sls "github.com/galaxydi/go-loghub" +) + +// MockClient is an autogenerated mock type for the AliLogAPI type +type MockClient struct { + ErrType int + Topic string + Logs []*sls.Log +} + +func NewMockClient() *MockClient { + client := &MockClient{ + ErrType: 0, + Topic: "", + Logs: []*sls.Log{}, + } + return client +} + +// PutLogs provides a mock function with given fields: _a0 +func (client *MockClient) PutLogs(lg *sls.LogGroup) error { + switch client.ErrType { + case 0: + // no error + client.Topic = lg.GetTopic() + for _, v := range lg.GetLogs() { + client.Logs = append(client.Logs, v) + } + return nil + case 1: + slsErr := &sls.Error{ + Code: "InternalServerError", + Message: "loghub service is not avaliable", + } + return slsErr + default: + err := fmt.Errorf("unknown error") + return err + } +} From a41ccd2fb993f7f5acda3965cdd88c3b900d1712 Mon Sep 17 00:00:00 2001 From: "yinzhuo.dingyz" Date: Fri, 6 Jan 2017 15:10:22 +0800 Subject: [PATCH 2/2] fix comments --- daemon/logger/alilogs/alilogs.go | 53 +++++++++++------------ daemon/logger/alilogs/alilogs_test.go | 27 ++++++++---- daemon/logger/alilogs/mock_client_test.go | 26 ++++++----- 3 files changed, 61 insertions(+), 45 deletions(-) diff --git a/daemon/logger/alilogs/alilogs.go b/daemon/logger/alilogs/alilogs.go index 0d533a7dac2ca..eec7ddeb82be3 100644 --- a/daemon/logger/alilogs/alilogs.go +++ b/daemon/logger/alilogs/alilogs.go @@ -160,32 +160,32 @@ var newTicker = func(freq time.Duration) *time.Ticker { // the maximum number of total bytes in a batch (defined in // maximumBytesPerPut). func (ls *logStream) collectLogs() { + le := logrus.WithFields(logrus.Fields{ + "endpoint": ls.endpoint, + "project": ls.projectName, + "logstore": ls.logstoreName, + }) + timer := newTicker(batchPublishFrequency) for { select { case <-timer.C: ls.publishLogs() - logrus.WithFields(logrus.Fields{ - "time trigger": "send data", - "endpoint": ls.endpoint, - "project": ls.projectName, - "logstore": ls.logstoreName, - "published log number": len(ls.logGroup.Logs), - "published log size": ls.logGroup.Size(), - }).Debug("publish log when timer timeout") + le.WithFields(logrus.Fields{ + "trigger": "time", + "count": len(ls.logGroup.Logs), + "size": ls.logGroup.Size(), + }).Debug("") ls.logGroup.Reset() ls.logGroup.Topic = proto.String(ls.topic) case msg, more := <-ls.messages: if !more { ls.publishLogs() logrus.WithFields(logrus.Fields{ - "no more log": "send data", - "endpoint": ls.endpoint, - "project": ls.projectName, - "logstore": ls.logstoreName, - "published log number": len(ls.logGroup.Logs), - "published log size": ls.logGroup.Size(), - }).Debug("publish log when no more logs") + "trigger": "EOF", + "count": len(ls.logGroup.Logs), + "size": ls.logGroup.Size(), + }).Debug("") return } unprocessedLine := msg.Line @@ -205,13 +205,10 @@ func (ls *logStream) collectLogs() { // line would push it over the maximum number of total bytes. ls.publishLogs() logrus.WithFields(logrus.Fields{ - "get limit": "send data", - "endpoint": ls.endpoint, - "project": ls.projectName, - "logstore": ls.logstoreName, - "published log number": len(ls.logGroup.Logs), - "published log size": ls.logGroup.Size(), - }).Debug("publish logs when touch the limit") + "trigger": "size", + "count": len(ls.logGroup.Logs), + "size": ls.logGroup.Size(), + }).Debug("") ls.logGroup.Reset() ls.logGroup.Topic = proto.String(ls.topic) } @@ -225,16 +222,18 @@ func (ls *logStream) collectLogs() { func (ls *logStream) publishLogs() { err := ls.client.PutLogs(ls.logGroup) if err != nil { + le := logrus.WithFields(logrus.Fields{ + "endpoint": ls.endpoint, + "project": ls.projectName, + "logstore": ls.logstoreName, + }) if serviceErr, ok := err.(sls.Error); ok { - logrus.WithFields(logrus.Fields{ + le.WithFields(logrus.Fields{ "errorCode": serviceErr.Code, "errorMessage": serviceErr.Message, - "endpoint": ls.endpoint, - "project": ls.projectName, - "logstore": ls.logstoreName, }).Error("PutLogs occurs sls error") } else { - logrus.Error(err) + le.Error("PutLogs occurs err:", err) } } } diff --git a/daemon/logger/alilogs/alilogs_test.go b/daemon/logger/alilogs/alilogs_test.go index 373b217c743df..29c0fc7aaf433 100644 --- a/daemon/logger/alilogs/alilogs_test.go +++ b/daemon/logger/alilogs/alilogs_test.go @@ -1,11 +1,10 @@ package alilogs import ( + "sync" "testing" "time" - "sync" - "github.com/docker/docker/daemon/logger" sls "github.com/galaxydi/go-loghub" "github.com/gogo/protobuf/proto" @@ -14,7 +13,7 @@ import ( func TestCollectLogsNumberLimit(t *testing.T) { extraContents := []*sls.LogContent{} mockClient := NewMockClient() - mockClient.ErrType = 0 + mockClient.ErrType = NoError stream := &logStream{ endpoint: "test-endpoint", @@ -107,7 +106,6 @@ func TestCollectLogsSimple(t *testing.T) { } extraContents := []*sls.LogContent{ec1, ec2} mockClient := NewMockClient() - mockClient.ErrType = 0 stream := &logStream{ endpoint: "test-endpoint", projectName: "test-project", @@ -122,7 +120,15 @@ func TestCollectLogsSimple(t *testing.T) { messages: make(chan *logger.Message, maximumLogsPerPut), } + ticks := make(chan time.Time) + newTicker = func(_ time.Duration) *time.Ticker { + return &time.Ticker{ + C: ticks, + } + } + go stream.collectLogs() + stream.Log(&logger.Message{ Line: []byte("this is test log 1"), Timestamp: time.Time{}, @@ -135,8 +141,13 @@ func TestCollectLogsSimple(t *testing.T) { Line: []byte("this is test log 3"), Timestamp: time.Time{}, }) - time.Sleep(batchPublishFrequency) + + ticks <- time.Time{} stream.Close() + + // Wait a moment for the logs were writted into mockClient + time.Sleep(1 * time.Second) + if len(mockClient.Logs) != 3 { t.Errorf("should be 3 number logs, actual log numbers: %v", len(mockClient.Logs)) } @@ -178,13 +189,13 @@ func TestPublishLogs(t *testing.T) { Contents: contents, } stream.logGroup.Logs = append(stream.logGroup.Logs, &logRecord) - mockClient.ErrType = 0 + mockClient.ErrType = NoError stream.publishLogs() - mockClient.ErrType = 1 + mockClient.ErrType = InternalServerError stream.publishLogs() - mockClient.ErrType = 2 + mockClient.ErrType = UnknownError stream.publishLogs() } diff --git a/daemon/logger/alilogs/mock_client_test.go b/daemon/logger/alilogs/mock_client_test.go index 83a76050f9bad..e80bc7036da6c 100644 --- a/daemon/logger/alilogs/mock_client_test.go +++ b/daemon/logger/alilogs/mock_client_test.go @@ -6,16 +6,24 @@ import ( sls "github.com/galaxydi/go-loghub" ) +type errorType int + +const ( + NoError errorType = iota + InternalServerError + UnknownError +) + // MockClient is an autogenerated mock type for the AliLogAPI type type MockClient struct { - ErrType int + ErrType errorType Topic string Logs []*sls.Log } func NewMockClient() *MockClient { client := &MockClient{ - ErrType: 0, + ErrType: NoError, Topic: "", Logs: []*sls.Log{}, } @@ -25,21 +33,19 @@ func NewMockClient() *MockClient { // PutLogs provides a mock function with given fields: _a0 func (client *MockClient) PutLogs(lg *sls.LogGroup) error { switch client.ErrType { - case 0: - // no error + case NoError: client.Topic = lg.GetTopic() - for _, v := range lg.GetLogs() { - client.Logs = append(client.Logs, v) - } + client.Logs = append(client.Logs, lg.GetLogs()...) return nil - case 1: + case InternalServerError: slsErr := &sls.Error{ Code: "InternalServerError", Message: "loghub service is not avaliable", } return slsErr + case UnknownError: + return fmt.Errorf("Unknown error") default: - err := fmt.Errorf("unknown error") - return err + return fmt.Errorf("Unknown error") } }