/
service.go
114 lines (97 loc) · 2.07 KB
/
service.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package slsh
import (
"context"
"fmt"
"log"
"os"
"sync"
"time"
"github.com/sirupsen/logrus"
)
type service struct {
BufferSize int
Interval time.Duration
Flush func(...Message) error
chMessage chan Message
chQuit chan struct{}
onClose *sync.Once
stopped bool
}
func NewService(bufferSize int, interval time.Duration, flush func(...Message) error) *service {
return &service{
BufferSize: bufferSize,
Interval: interval,
Flush: flush,
chMessage: make(chan Message, bufferSize),
chQuit: make(chan struct{}),
onClose: &sync.Once{},
}
}
func (s *service) Push(ctx context.Context, message Message) error {
if s.stopped {
s.trace("Discard message %v", message)
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
case s.chMessage <- message:
return nil
}
}
func (s *service) Start() {
s.trace("aliyun-log-service start")
defer s.trace("aliyun-log-service stopped")
flushTime := time.Now()
buffer := make([]Message, 0, s.BufferSize)
tryFlush := func(force bool) {
if size := len(buffer); size <= 0 ||
!force && size < s.BufferSize && time.Since(flushTime) < s.Interval {
return
}
defer func() {
flushTime = time.Now()
buffer = buffer[:0]
}()
st := time.Now()
if err := s.Flush(buffer...); err != nil {
_, _ = fmt.Fprintf(os.Stderr, "Fail to flush logs: %v\n", err)
return
}
s.trace("[%v] Flush %d logs",
time.Since(st).Truncate(time.Millisecond), len(buffer))
}
Loop:
for {
timer := time.NewTimer(s.Interval / 10)
select {
case <-timer.C:
case message, ok := <-s.chMessage:
if !ok {
break Loop
}
buffer = append(buffer, message)
}
tryFlush(false)
timer.Stop()
}
tryFlush(true)
close(s.chQuit)
}
func (s *service) Stop(ctx context.Context) (err error) {
s.onClose.Do(func() {
s.stopped = true
close(s.chMessage)
select {
case <-ctx.Done():
err = ctx.Err()
case <-s.chQuit:
}
})
return
}
func (s *service) trace(message string, args ...interface{}) {
if logrus.IsLevelEnabled(logrus.TraceLevel) {
log.Printf(message, args...)
}
}