Skip to content

Commit

Permalink
Merge pull request moby#2 from galaxydi/add-ut
Browse files Browse the repository at this point in the history
add ut
  • Loading branch information
wanghq committed Jan 6, 2017
2 parents 438c28d + a41ccd2 commit 30a3c09
Show file tree
Hide file tree
Showing 4 changed files with 523 additions and 112 deletions.
196 changes: 110 additions & 86 deletions daemon/logger/alilogs/alilogs.go
Expand Up @@ -57,14 +57,26 @@ const (
)

type logStream struct {
endpoint string
projectName string
logstoreName string
topic string
extraLogContents []*sls.LogContent
client AliLogAPI
logGroup *sls.LogGroup
messages chan *logger.Message
lock sync.RWMutex
closed bool
}

type contextParams struct {
accessKeyID string
accessKeySecret string
securityToken string
topicName string
extraContents []*sls.LogContent
}

// init registers the alilogs driver
func init() {
if err := logger.RegisterLogDriver(name, New); err != nil {
Expand All @@ -80,57 +92,27 @@ func New(ctx logger.Context) (logger.Logger, error) {
endpoint := ctx.Config[endpointKey]
projectName := ctx.Config[projectKey]
logstoreName := ctx.Config[logstoreKey]
extraContents := []*sls.LogContent{}
accessKeyID := ""
accessKeySecret := ""
securityToken := ""
topicName := ""

extra := ctx.ExtraAttributes(nil)
value, ok := extra[accessKeyIDEnvKey]
if ok {
accessKeyID = value
delete(extra, accessKeyIDEnvKey)
} else {
return nil, fmt.Errorf("must specify a value for env '%s'", accessKeyIDEnvKey)
}

value, ok = extra[accessKeySecretEnvKey]
if ok {
accessKeySecret = value
delete(extra, accessKeySecretEnvKey)
} else {
return nil, fmt.Errorf("must specify a value for env '%s'", accessKeySecretEnvKey)
}

if value, ok = extra[securityTokenEnvKey]; ok {
securityToken = value
delete(extra, securityTokenEnvKey)
}

if value, ok = extra[topicEnvKey]; ok {
topicName = value
delete(extra, topicEnvKey)
}

// add extra contents to log record
for key, value := range extra {
logContent := &sls.LogContent{
Key: proto.String(key),
Value: proto.String(value),
}
extraContents = append(extraContents, logContent)
contextInput, err := parseContext(&ctx)
if err != nil {
return nil, err
}

aliLogClient, err := NewAliLogClient(endpoint, projectName, logstoreName, accessKeyID, accessKeySecret, securityToken)
aliLogClient, err := NewAliLogClient(endpoint, projectName, logstoreName, contextInput.accessKeyID, contextInput.accessKeySecret, contextInput.securityToken)
if err != nil {
return nil, err
}
containerStream := &logStream{
topic: topicName,
extraLogContents: extraContents,
endpoint: endpoint,
projectName: projectName,
logstoreName: logstoreName,
topic: contextInput.topicName,
extraLogContents: contextInput.extraContents,
client: aliLogClient,
messages: make(chan *logger.Message, maximumLogsPerPut),
logGroup: &sls.LogGroup{
Topic: proto.String(contextInput.topicName),
Logs: []*sls.Log{},
},
messages: make(chan *logger.Message, maximumLogsPerPut),
}

go containerStream.collectLogs()
Expand Down Expand Up @@ -170,43 +152,40 @@ var newTicker = func(freq time.Duration) *time.Ticker {
return time.NewTicker(freq)
}

// PutLogs executes as a goroutine to perform put logs for
// collectLogs executes as a goroutine to perform put logs for
// submission to the logstore. Batching is performed on time- and size-
// bases. Time-based batching occurs at a 5 second interval (defined in the
// batchPublishFrequency const). Size-based batching is performed on the
// maximum number of logs per batch (defined in maximumLogsPerPut) and
// the maximum number of total bytes in a batch (defined in
// maximumBytesPerPut).
func (ls *logStream) collectLogs() {
aliLogClient := ls.client.(*AliLogClient)
logGroup := sls.LogGroup{
Topic: proto.String(ls.topic),
Logs: []*sls.Log{},
}
le := logrus.WithFields(logrus.Fields{
"endpoint": ls.endpoint,
"project": ls.projectName,
"logstore": ls.logstoreName,
})

timer := newTicker(batchPublishFrequency)
for {
select {
case <-timer.C:
ls.publishLogs(&logGroup)
logrus.WithFields(logrus.Fields{
"endpoint": aliLogClient.Endpoint,
"project": aliLogClient.ProjectName,
"logstore": aliLogClient.LogstoreName,
"published log number": len(logGroup.Logs),
"published log size": logGroup.Size(),
}).Debug("publish log when timer timeout")
logGroup.Reset()
logGroup.Topic = proto.String(ls.topic)
ls.publishLogs()
le.WithFields(logrus.Fields{
"trigger": "time",
"count": len(ls.logGroup.Logs),
"size": ls.logGroup.Size(),
}).Debug("")
ls.logGroup.Reset()
ls.logGroup.Topic = proto.String(ls.topic)
case msg, more := <-ls.messages:
if !more {
ls.publishLogs(&logGroup)
ls.publishLogs()
logrus.WithFields(logrus.Fields{
"endpoint": aliLogClient.Endpoint,
"project": aliLogClient.ProjectName,
"logstore": aliLogClient.LogstoreName,
"published log number": len(logGroup.Logs),
"published log size": logGroup.Size(),
}).Debug("publish log when no more logs")
"trigger": "EOF",
"count": len(ls.logGroup.Logs),
"size": ls.logGroup.Size(),
}).Debug("")
return
}
unprocessedLine := msg.Line
Expand All @@ -221,41 +200,40 @@ func (ls *logStream) collectLogs() {
Contents: contents,
}
if len(unprocessedLine) > 0 {
if (len(logGroup.Logs) >= maximumLogsPerPut) || (logGroup.Size()+logRecord.Size() > maximumBytesPerPut) {
if (len(ls.logGroup.Logs) >= maximumLogsPerPut) || (ls.logGroup.Size()+logRecord.Size() > maximumBytesPerPut) {
// Publish an existing batch if it's already over the maximum number of logs or if adding this
// line would push it over the maximum number of total bytes.
ls.publishLogs(&logGroup)
ls.publishLogs()
logrus.WithFields(logrus.Fields{
"endpoint": aliLogClient.Endpoint,
"project": aliLogClient.ProjectName,
"logstore": aliLogClient.LogstoreName,
"published log number": len(logGroup.Logs),
"published log size": logGroup.Size(),
}).Debug("publish logs when touch the limit")
logGroup.Reset()
logGroup.Topic = proto.String(ls.topic)
"trigger": "size",
"count": len(ls.logGroup.Logs),
"size": ls.logGroup.Size(),
}).Debug("")
ls.logGroup.Reset()
ls.logGroup.Topic = proto.String(ls.topic)
}
logGroup.Logs = append(logGroup.Logs, &logRecord)
ls.logGroup.Logs = append(ls.logGroup.Logs, &logRecord)
}
}
}
}

// publishLogs calls PutLogs for a given LogGroup
func (ls *logStream) publishLogs(lg *sls.LogGroup) {
err := ls.client.PutLogs(lg)
func (ls *logStream) publishLogs() {
err := ls.client.PutLogs(ls.logGroup)
if err != nil {
le := logrus.WithFields(logrus.Fields{
"endpoint": ls.endpoint,
"project": ls.projectName,
"logstore": ls.logstoreName,
})
if serviceErr, ok := err.(sls.Error); ok {
aliLogClient := ls.client.(*AliLogClient)
logrus.WithFields(logrus.Fields{
le.WithFields(logrus.Fields{
"errorCode": serviceErr.Code,
"errorMessage": serviceErr.Message,
"endpoint": aliLogClient.Endpoint,
"project": aliLogClient.ProjectName,
"logstore": aliLogClient.LogstoreName,
}).Error("PutLogs occurs sls error")
} else {
logrus.Error(err)
le.Error("PutLogs occurs err:", err)
}
}
}
Expand All @@ -280,3 +258,49 @@ func ValidateLogOpt(cfg map[string]string) error {
}
return nil
}

func parseContext(ctx *logger.Context) (*contextParams, error) {
input := &contextParams{
accessKeyID: "",
accessKeySecret: "",
securityToken: "",
topicName: "",
extraContents: []*sls.LogContent{},
}
extra := ctx.ExtraAttributes(nil)
value, ok := extra[accessKeyIDEnvKey]
if ok {
input.accessKeyID = value
delete(extra, accessKeyIDEnvKey)
} else {
return nil, fmt.Errorf("must specify a value for env '%s'", accessKeyIDEnvKey)
}

value, ok = extra[accessKeySecretEnvKey]
if ok {
input.accessKeySecret = value
delete(extra, accessKeySecretEnvKey)
} else {
return nil, fmt.Errorf("must specify a value for env '%s'", accessKeySecretEnvKey)
}

if value, ok = extra[securityTokenEnvKey]; ok {
input.securityToken = value
delete(extra, securityTokenEnvKey)
}

if value, ok = extra[topicEnvKey]; ok {
input.topicName = value
delete(extra, topicEnvKey)
}

// add extra contents to log record
for key, value := range extra {
logContent := &sls.LogContent{
Key: proto.String(key),
Value: proto.String(value),
}
input.extraContents = append(input.extraContents, logContent)
}
return input, nil
}

0 comments on commit 30a3c09

Please sign in to comment.