Skip to content

Commit

Permalink
fix comments
Browse files Browse the repository at this point in the history
  • Loading branch information
yinzhuo.dingyz committed Jan 6, 2017
1 parent 4304665 commit a41ccd2
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 45 deletions.
53 changes: 26 additions & 27 deletions daemon/logger/alilogs/alilogs.go
Expand Up @@ -160,32 +160,32 @@ var newTicker = func(freq time.Duration) *time.Ticker {
// the maximum number of total bytes in a batch (defined in
// maximumBytesPerPut).
func (ls *logStream) collectLogs() {
le := logrus.WithFields(logrus.Fields{
"endpoint": ls.endpoint,
"project": ls.projectName,
"logstore": ls.logstoreName,
})

timer := newTicker(batchPublishFrequency)
for {
select {
case <-timer.C:
ls.publishLogs()
logrus.WithFields(logrus.Fields{
"time trigger": "send data",
"endpoint": ls.endpoint,
"project": ls.projectName,
"logstore": ls.logstoreName,
"published log number": len(ls.logGroup.Logs),
"published log size": ls.logGroup.Size(),
}).Debug("publish log when timer timeout")
le.WithFields(logrus.Fields{
"trigger": "time",
"count": len(ls.logGroup.Logs),
"size": ls.logGroup.Size(),
}).Debug("")
ls.logGroup.Reset()
ls.logGroup.Topic = proto.String(ls.topic)
case msg, more := <-ls.messages:
if !more {
ls.publishLogs()
logrus.WithFields(logrus.Fields{
"no more log": "send data",
"endpoint": ls.endpoint,
"project": ls.projectName,
"logstore": ls.logstoreName,
"published log number": len(ls.logGroup.Logs),
"published log size": ls.logGroup.Size(),
}).Debug("publish log when no more logs")
"trigger": "EOF",
"count": len(ls.logGroup.Logs),
"size": ls.logGroup.Size(),
}).Debug("")
return
}
unprocessedLine := msg.Line
Expand All @@ -205,13 +205,10 @@ func (ls *logStream) collectLogs() {
// line would push it over the maximum number of total bytes.
ls.publishLogs()
logrus.WithFields(logrus.Fields{
"get limit": "send data",
"endpoint": ls.endpoint,
"project": ls.projectName,
"logstore": ls.logstoreName,
"published log number": len(ls.logGroup.Logs),
"published log size": ls.logGroup.Size(),
}).Debug("publish logs when touch the limit")
"trigger": "size",
"count": len(ls.logGroup.Logs),
"size": ls.logGroup.Size(),
}).Debug("")
ls.logGroup.Reset()
ls.logGroup.Topic = proto.String(ls.topic)
}
Expand All @@ -225,16 +222,18 @@ func (ls *logStream) collectLogs() {
func (ls *logStream) publishLogs() {
err := ls.client.PutLogs(ls.logGroup)
if err != nil {
le := logrus.WithFields(logrus.Fields{
"endpoint": ls.endpoint,
"project": ls.projectName,
"logstore": ls.logstoreName,
})
if serviceErr, ok := err.(sls.Error); ok {
logrus.WithFields(logrus.Fields{
le.WithFields(logrus.Fields{
"errorCode": serviceErr.Code,
"errorMessage": serviceErr.Message,
"endpoint": ls.endpoint,
"project": ls.projectName,
"logstore": ls.logstoreName,
}).Error("PutLogs occurs sls error")
} else {
logrus.Error(err)
le.Error("PutLogs occurs err:", err)
}
}
}
Expand Down
27 changes: 19 additions & 8 deletions daemon/logger/alilogs/alilogs_test.go
@@ -1,11 +1,10 @@
package alilogs

import (
"sync"
"testing"
"time"

"sync"

"github.com/docker/docker/daemon/logger"
sls "github.com/galaxydi/go-loghub"
"github.com/gogo/protobuf/proto"
Expand All @@ -14,7 +13,7 @@ import (
func TestCollectLogsNumberLimit(t *testing.T) {
extraContents := []*sls.LogContent{}
mockClient := NewMockClient()
mockClient.ErrType = 0
mockClient.ErrType = NoError

stream := &logStream{
endpoint: "test-endpoint",
Expand Down Expand Up @@ -107,7 +106,6 @@ func TestCollectLogsSimple(t *testing.T) {
}
extraContents := []*sls.LogContent{ec1, ec2}
mockClient := NewMockClient()
mockClient.ErrType = 0
stream := &logStream{
endpoint: "test-endpoint",
projectName: "test-project",
Expand All @@ -122,7 +120,15 @@ func TestCollectLogsSimple(t *testing.T) {
messages: make(chan *logger.Message, maximumLogsPerPut),
}

ticks := make(chan time.Time)
newTicker = func(_ time.Duration) *time.Ticker {
return &time.Ticker{
C: ticks,
}
}

go stream.collectLogs()

stream.Log(&logger.Message{
Line: []byte("this is test log 1"),
Timestamp: time.Time{},
Expand All @@ -135,8 +141,13 @@ func TestCollectLogsSimple(t *testing.T) {
Line: []byte("this is test log 3"),
Timestamp: time.Time{},
})
time.Sleep(batchPublishFrequency)

ticks <- time.Time{}
stream.Close()

// Wait a moment for the logs were writted into mockClient
time.Sleep(1 * time.Second)

if len(mockClient.Logs) != 3 {
t.Errorf("should be 3 number logs, actual log numbers: %v", len(mockClient.Logs))
}
Expand Down Expand Up @@ -178,13 +189,13 @@ func TestPublishLogs(t *testing.T) {
Contents: contents,
}
stream.logGroup.Logs = append(stream.logGroup.Logs, &logRecord)
mockClient.ErrType = 0
mockClient.ErrType = NoError
stream.publishLogs()

mockClient.ErrType = 1
mockClient.ErrType = InternalServerError
stream.publishLogs()

mockClient.ErrType = 2
mockClient.ErrType = UnknownError
stream.publishLogs()
}

Expand Down
26 changes: 16 additions & 10 deletions daemon/logger/alilogs/mock_client_test.go
Expand Up @@ -6,16 +6,24 @@ import (
sls "github.com/galaxydi/go-loghub"
)

type errorType int

const (
NoError errorType = iota
InternalServerError
UnknownError
)

// MockClient is an autogenerated mock type for the AliLogAPI type
type MockClient struct {
ErrType int
ErrType errorType
Topic string
Logs []*sls.Log
}

func NewMockClient() *MockClient {
client := &MockClient{
ErrType: 0,
ErrType: NoError,
Topic: "",
Logs: []*sls.Log{},
}
Expand All @@ -25,21 +33,19 @@ func NewMockClient() *MockClient {
// PutLogs provides a mock function with given fields: _a0
func (client *MockClient) PutLogs(lg *sls.LogGroup) error {
switch client.ErrType {
case 0:
// no error
case NoError:
client.Topic = lg.GetTopic()
for _, v := range lg.GetLogs() {
client.Logs = append(client.Logs, v)
}
client.Logs = append(client.Logs, lg.GetLogs()...)
return nil
case 1:
case InternalServerError:
slsErr := &sls.Error{
Code: "InternalServerError",
Message: "loghub service is not avaliable",
}
return slsErr
case UnknownError:
return fmt.Errorf("Unknown error")
default:
err := fmt.Errorf("unknown error")
return err
return fmt.Errorf("Unknown error")
}
}

0 comments on commit a41ccd2

Please sign in to comment.