/
alilogs.go
306 lines (278 loc) · 8.38 KB
/
alilogs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
// Package alilogs provides the logdriver for forwarding container logs to Ali Log Service
package alilogs
import (
"fmt"
"sync"
"time"
"github.com/Sirupsen/logrus"
"github.com/docker/docker/daemon/logger"
"github.com/galaxydi/go-loghub"
"github.com/golang/protobuf/proto"
)
/*
Ali logging driver usage
docker run -d --name test-logger \
--log-driver alilogs \
--log-opt alilogs-endpoint=cn-hangzhou.log.aliyuncs.com \
--log-opt alilogs-project=test_project \
--log-opt alilogs-logstore=test-logstore \
// You can add these extra attributes to log message
--log-opt labels=attr1,attr2,attr3 \
--label attr1=attr1Value \
--label attr2=attr2Value \
--label attr3=attr3Value \
// You assign these environment variables for alilogs logging driver to work
// "securityToken" and "topic" are optinal
--log-opt env=accessKeyID,accessKeySecret,securityToken,topic \
--env "accessKeyID=xxx" \
--env "accessKeySecret=xxx" \
--env "securityToken=xxx" \
--env "topic=demo_topic" \
log-producer
*/
const (
name = "alilogs"
endpointKey = "alilogs-endpoint"
projectKey = "alilogs-project"
logstoreKey = "alilogs-logstore"
envKey = "env"
labelsKey = "labels"
accessKeyIDEnvKey = "accessKeyID"
accessKeySecretEnvKey = "accessKeySecret"
securityTokenEnvKey = "securityToken"
topicEnvKey = "topic"
// PutLogs limit in Loghub, 3MB or 4096 records per put
batchPublishFrequency = 5 * time.Second
maximumBytesPerPut = 3145728
maximumLogsPerPut = 4096
)
type logStream struct {
endpoint string
projectName string
logstoreName string
topic string
extraLogContents []*sls.LogContent
client AliLogAPI
logGroup *sls.LogGroup
messages chan *logger.Message
lock sync.RWMutex
closed bool
}
type contextParams struct {
accessKeyID string
accessKeySecret string
securityToken string
topicName string
extraContents []*sls.LogContent
}
// init registers the alilogs driver
func init() {
if err := logger.RegisterLogDriver(name, New); err != nil {
logrus.Fatal(err)
}
if err := logger.RegisterLogOptValidator(name, ValidateLogOpt); err != nil {
logrus.Fatal(err)
}
}
// New creates an alilogs logger using the configuration passed in on the context
func New(ctx logger.Context) (logger.Logger, error) {
endpoint := ctx.Config[endpointKey]
projectName := ctx.Config[projectKey]
logstoreName := ctx.Config[logstoreKey]
contextInput, err := parseContext(&ctx)
if err != nil {
return nil, err
}
aliLogClient, err := NewAliLogClient(endpoint, projectName, logstoreName, contextInput.accessKeyID, contextInput.accessKeySecret, contextInput.securityToken)
if err != nil {
return nil, err
}
containerStream := &logStream{
endpoint: endpoint,
projectName: projectName,
logstoreName: logstoreName,
topic: contextInput.topicName,
extraLogContents: contextInput.extraContents,
client: aliLogClient,
logGroup: &sls.LogGroup{
Topic: proto.String(contextInput.topicName),
Logs: []*sls.Log{},
},
messages: make(chan *logger.Message, maximumLogsPerPut),
}
go containerStream.collectLogs()
return containerStream, nil
}
// Name returns the name of ali logging driver
func (ls *logStream) Name() string {
return name
}
// Log submits messages for logging by an instance of the alilogs logging driver
func (ls *logStream) Log(msg *logger.Message) error {
ls.lock.RLock()
defer ls.lock.RUnlock()
if !ls.closed {
// buffer up the data, making sure to copy the Line data
ls.messages <- msg
}
return nil
}
// Close closes the instance of the alilogs logging driver
func (ls *logStream) Close() error {
ls.lock.Lock()
defer ls.lock.Unlock()
if !ls.closed {
close(ls.messages)
}
ls.closed = true
return nil
}
// newTicker is used for time-based batching. newTicker is a variable such
// that the implementation can be swapped out for unit tests.
var newTicker = func(freq time.Duration) *time.Ticker {
return time.NewTicker(freq)
}
// collectLogs executes as a goroutine to perform put logs for
// submission to the logstore. Batching is performed on time- and size-
// bases. Time-based batching occurs at a 5 second interval (defined in the
// batchPublishFrequency const). Size-based batching is performed on the
// maximum number of logs per batch (defined in maximumLogsPerPut) and
// the maximum number of total bytes in a batch (defined in
// maximumBytesPerPut).
func (ls *logStream) collectLogs() {
le := logrus.WithFields(logrus.Fields{
"endpoint": ls.endpoint,
"project": ls.projectName,
"logstore": ls.logstoreName,
})
timer := newTicker(batchPublishFrequency)
for {
select {
case <-timer.C:
ls.publishLogs()
le.WithFields(logrus.Fields{
"trigger": "time",
"count": len(ls.logGroup.Logs),
"size": ls.logGroup.Size(),
}).Debug("")
ls.logGroup.Reset()
ls.logGroup.Topic = proto.String(ls.topic)
case msg, more := <-ls.messages:
if !more {
ls.publishLogs()
logrus.WithFields(logrus.Fields{
"trigger": "EOF",
"count": len(ls.logGroup.Logs),
"size": ls.logGroup.Size(),
}).Debug("")
return
}
unprocessedLine := msg.Line
logMsg := &sls.LogContent{
Key: proto.String("message"),
Value: proto.String(string(unprocessedLine)),
}
contents := ls.extraLogContents
contents = append(contents, logMsg)
logRecord := sls.Log{
Time: proto.Uint32(uint32(time.Now().Unix())),
Contents: contents,
}
if len(unprocessedLine) > 0 {
if (len(ls.logGroup.Logs) >= maximumLogsPerPut) || (ls.logGroup.Size()+logRecord.Size() > maximumBytesPerPut) {
// Publish an existing batch if it's already over the maximum number of logs or if adding this
// line would push it over the maximum number of total bytes.
ls.publishLogs()
logrus.WithFields(logrus.Fields{
"trigger": "size",
"count": len(ls.logGroup.Logs),
"size": ls.logGroup.Size(),
}).Debug("")
ls.logGroup.Reset()
ls.logGroup.Topic = proto.String(ls.topic)
}
ls.logGroup.Logs = append(ls.logGroup.Logs, &logRecord)
}
}
}
}
// publishLogs calls PutLogs for a given LogGroup
func (ls *logStream) publishLogs() {
err := ls.client.PutLogs(ls.logGroup)
if err != nil {
le := logrus.WithFields(logrus.Fields{
"endpoint": ls.endpoint,
"project": ls.projectName,
"logstore": ls.logstoreName,
})
if serviceErr, ok := err.(sls.Error); ok {
le.WithFields(logrus.Fields{
"errorCode": serviceErr.Code,
"errorMessage": serviceErr.Message,
}).Error("PutLogs occurs sls error")
} else {
le.Error("PutLogs occurs err:", err)
}
}
}
// ValidateLogOpt looks for alilogs-specific log options
func ValidateLogOpt(cfg map[string]string) error {
for key := range cfg {
switch key {
case endpointKey, projectKey, logstoreKey, labelsKey, envKey:
default:
return fmt.Errorf("unknown log opt '%s' for %s log driver", key, name)
}
}
if cfg[endpointKey] == "" {
return fmt.Errorf("must specify a value for log opt '%s'", endpointKey)
}
if cfg[projectKey] == "" {
return fmt.Errorf("must specify a value for log opt '%s'", projectKey)
}
if cfg[logstoreKey] == "" {
return fmt.Errorf("must specify a value for log opt '%s'", logstoreKey)
}
return nil
}
func parseContext(ctx *logger.Context) (*contextParams, error) {
input := &contextParams{
accessKeyID: "",
accessKeySecret: "",
securityToken: "",
topicName: "",
extraContents: []*sls.LogContent{},
}
extra := ctx.ExtraAttributes(nil)
value, ok := extra[accessKeyIDEnvKey]
if ok {
input.accessKeyID = value
delete(extra, accessKeyIDEnvKey)
} else {
return nil, fmt.Errorf("must specify a value for env '%s'", accessKeyIDEnvKey)
}
value, ok = extra[accessKeySecretEnvKey]
if ok {
input.accessKeySecret = value
delete(extra, accessKeySecretEnvKey)
} else {
return nil, fmt.Errorf("must specify a value for env '%s'", accessKeySecretEnvKey)
}
if value, ok = extra[securityTokenEnvKey]; ok {
input.securityToken = value
delete(extra, securityTokenEnvKey)
}
if value, ok = extra[topicEnvKey]; ok {
input.topicName = value
delete(extra, topicEnvKey)
}
// add extra contents to log record
for key, value := range extra {
logContent := &sls.LogContent{
Key: proto.String(key),
Value: proto.String(value),
}
input.extraContents = append(input.extraContents, logContent)
}
return input, nil
}