-
Notifications
You must be signed in to change notification settings - Fork 8
/
log_accumulator.go
110 lines (100 loc) · 3.98 KB
/
log_accumulator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package tencentcloud_cls_sdk_go
import (
"errors"
"sync"
asyncAtomic "sync/atomic"
"go.uber.org/atomic"
)
type Accumulator struct {
lock sync.RWMutex
logTopicData map[string]*ProducerBatch
producerConfig *AsyncProducerClientConfig
worker *Worker
shutDownFlag *atomic.Bool
threadPool *SendThreadPool
producer *AsyncProducerClient
batchID *atomic.Int64
producerHash string
}
// NewAccumulator ...
func NewAccumulator(config *AsyncProducerClientConfig, worker *Worker, threadPool *SendThreadPool, producer *AsyncProducerClient) *Accumulator {
return &Accumulator{
logTopicData: make(map[string]*ProducerBatch),
producerConfig: config,
worker: worker,
shutDownFlag: atomic.NewBool(false),
threadPool: threadPool,
producer: producer,
batchID: atomic.NewInt64(0),
producerHash: producer.producerHash,
}
}
func (accumulator *Accumulator) addOrSendProducerBatch(topicId string, producerBatch *ProducerBatch, log interface{}, callback CallBack) {
totalDataCount := producerBatch.getLogGroupCount() + 1
if producerBatch.totalDataSize > accumulator.producerConfig.MaxBatchSize &&
producerBatch.totalDataSize < 5242880 &&
totalDataCount <= accumulator.producerConfig.MaxBatchCount {
producerBatch.addLogToLogGroup(log)
if callback != nil {
producerBatch.addProducerBatchCallBack(callback)
}
accumulator.innerSendToServer(topicId, producerBatch)
} else if producerBatch.totalDataSize <= accumulator.producerConfig.MaxBatchSize &&
totalDataCount <= accumulator.producerConfig.MaxBatchCount {
producerBatch.addLogToLogGroup(log)
if callback != nil {
producerBatch.addProducerBatchCallBack(callback)
}
} else {
accumulator.innerSendToServer(topicId, producerBatch)
accumulator.createNewProducerBatch(log, callback, topicId)
}
}
func (accumulator *Accumulator) createNewProducerBatch(logType interface{}, callback CallBack, topicId string) {
if item, ok := logType.(*Log); ok {
newProducerBatch := NewProducerBatch(topicId, accumulator.producerConfig, callback, item, generatePackageId(accumulator.producerHash, accumulator.batchID))
accumulator.logTopicData[topicId] = newProducerBatch
} else if logList, ok := logType.([]*Log); ok {
newProducerBatch := NewProducerBatch(topicId, accumulator.producerConfig, callback, logList, generatePackageId(accumulator.producerHash, accumulator.batchID))
accumulator.logTopicData[topicId] = newProducerBatch
}
}
func (accumulator *Accumulator) innerSendToServer(topicId string, producerBatch *ProducerBatch) {
accumulator.threadPool.addTask(producerBatch)
delete(accumulator.logTopicData, topicId)
}
func (accumulator *Accumulator) addLogToProducerBatch(topicId string, logData interface{}, callback CallBack) error {
if accumulator.shutDownFlag.Load() {
return errors.New("producer has shutdown and cannot write to new logs")
}
defer accumulator.lock.Unlock()
accumulator.lock.Lock()
if mlog, ok := logData.(*Log); ok {
if producerBatch, ok := accumulator.logTopicData[topicId]; ok == true {
logSize, err := GetLogSizeCalculate(mlog)
if err != nil {
return err
}
asyncAtomic.AddInt64(&producerBatch.totalDataSize, int64(logSize))
asyncAtomic.AddInt64(&accumulator.producer.producerLogGroupSize, int64(logSize))
accumulator.addOrSendProducerBatch(topicId, producerBatch, mlog, callback)
} else {
accumulator.createNewProducerBatch(mlog, callback, topicId)
}
} else if logList, ok := logData.([]*Log); ok {
if producerBatch, ok := accumulator.logTopicData[topicId]; ok == true {
logListSize, err := GetLogListSize(logList)
if err != nil {
return err
}
asyncAtomic.AddInt64(&producerBatch.totalDataSize, int64(logListSize))
asyncAtomic.AddInt64(&accumulator.producer.producerLogGroupSize, int64(logListSize))
accumulator.addOrSendProducerBatch(topicId, producerBatch, logList, callback)
} else {
accumulator.createNewProducerBatch(logList, callback, topicId)
}
} else {
return errors.New("invalid logType")
}
return nil
}