This repository has been archived by the owner on Feb 16, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
/
kafkalock.go
309 lines (249 loc) · 7.36 KB
/
kafkalock.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
package kafkalock
import (
"context"
"errors"
"fmt"
"sync"
"time"
"github.com/Shopify/sarama"
"github.com/jonboulle/clockwork"
)
var (
kafkaClientFactory = sarama.NewClient
kafkaConsumerGroupFactory = sarama.NewConsumerGroupFromClient
clock = clockwork.NewRealClock()
)
type consumerState uint8
const (
consumerStateProcessingPartition consumerState = iota
consumerStatePartitionNotAssigned
consumerStateConsumerError
)
// KafkaLocker is the main interface of this package: use it to control your locks.
// New instance of KafkaLocker can be created using NewKafkaLocker().
type KafkaLocker interface {
// Returns the context that represents state of the KafkaLocker. Context of unlocked KafkaLocker is canceled.
Context() context.Context
// Shortcut for LockContext(context.Background())
Lock() (context.Context, error)
// Blocks until the lock is acquired or an error returned. An error may be returned if:
// - Kafka client or consumer group can't be created
// - given context is canceled
// - FlapGuard detects too many call to LockContext()
// Returned context is canceled as soon as a call to Unlock() is made or if the lock is lost (consumer error, ...).
LockContext(context.Context) (context.Context, error)
// Releases the currently held lock. If the lock is not held at the moment then this call is a no-op.
Unlock()
// Unlocks the KafkaLocker and frees all it's resources. The instance is not usable after this.
// Use Unlock() if you want to free the lock and then reuse the instance later.
Close() error
}
// LoggerProvider interface provides access to the *Logger used by KafkaLocker.
// Default implementation of KafkaLocker implements this interface.
// You can use it like this:
// locker, _ := NewKafkaLogger(config)
// locker.(LoggerProvider).GetLogger().EnableKafkaLogging()
type LoggerProvider interface {
GetLogger() *Logger
}
type kafkaLock struct {
config *Config
delay time.Duration
mu sync.Mutex
client sarama.Client
consumerGroup sarama.ConsumerGroup
lockAcquiredCh chan struct{}
cancel context.CancelFunc
consumerStateCh chan consumerState
wg sync.WaitGroup
ctx context.Context
}
func (kl *kafkaLock) Lock() (context.Context, error) {
return kl.LockContext(context.Background())
}
func (kl *kafkaLock) LockContext(ctx context.Context) (context.Context, error) {
kl.mu.Lock()
defer kl.mu.Unlock()
if kl.ctx.Err() == nil {
return kl.ctx, nil
}
err := kl.config.FlapGuard.Check(kl.delay)
if err != nil {
return nil, err
}
if kl.client == nil {
kl.client, err = kafkaClientFactory(kl.config.BootstrapServers, kl.config.KafkaConfig)
if err != nil {
return nil, fmt.Errorf("failed to create Kafka client: %w", err)
}
}
kl.wg.Wait()
kl.consumerGroup, err = kafkaConsumerGroupFactory(kl.config.Topic, kl.client)
if err != nil {
return nil, fmt.Errorf("failed to create Kafka consumer group: %w", err)
}
var consumerCtx context.Context
consumerCtx, kl.cancel = context.WithCancel(context.Background())
kl.consumerStateCh = make(chan consumerState)
kl.lockAcquiredCh = make(chan struct{})
kl.wg.Add(2)
go func() {
defer kl.wg.Done()
defer close(kl.consumerStateCh)
for {
err := kl.consumerGroup.Consume(consumerCtx, []string{kl.config.Topic}, kl)
if err != nil {
if errors.Is(err, sarama.ErrClosedConsumerGroup) {
return
}
kl.config.Logger.Warn("unexpected consumer error: ", err.Error())
kl.consumerStateCh <- consumerStateConsumerError
}
if consumerCtx.Err() != nil {
kl.config.Logger.Debug("consumer context canceled: ", ctx.Err())
return
}
}
}()
go func() {
defer kl.wg.Done()
var lastState consumerState
var delay <-chan time.Time
for {
select {
case state, ok := <-kl.consumerStateCh:
if !ok {
return
}
switch state {
case consumerStateProcessingPartition:
select {
case <-kl.lockAcquiredCh:
delay = nil
default:
kl.config.Logger.Debug("partition assigned: delay started")
delay = clock.After(kl.delay)
}
case consumerStatePartitionNotAssigned:
select {
case <-kl.lockAcquiredCh:
kl.config.Logger.Debug("partition revoked: delay started")
delay = clock.After(kl.delay)
default:
delay = nil
}
case consumerStateConsumerError:
if lastState == state {
continue
}
select {
case <-kl.lockAcquiredCh:
kl.config.Logger.Debug("consumer error: delay started")
delay = clock.After(kl.delay)
default:
delay = nil
}
}
lastState = state
case <-delay:
kl.config.Logger.Debug("delay passed")
delay = nil
switch lastState {
case consumerStateProcessingPartition:
kl.config.Logger.Debug("changing state to: locked")
kl.config.KafkaConfig.Consumer.Group.Member.UserData = []byte{1}
close(kl.lockAcquiredCh)
case consumerStatePartitionNotAssigned:
fallthrough
case consumerStateConsumerError:
kl.config.Logger.Debug("changing state to: unlocked")
go kl.Unlock()
}
}
}
}()
kl.config.Logger.Debug("waiting for the lock")
select {
case <-kl.lockAcquiredCh:
kl.config.Logger.Debug("lock acquired")
kl.ctx = consumerCtx
return consumerCtx, nil
case <-ctx.Done():
kl.cancel()
kl.config.Logger.Debugw("stopped waiting", "reason", ctx.Err())
kl.doUnlock()
return nil, ctx.Err()
}
}
func (kl *kafkaLock) Context() context.Context {
return kl.ctx
}
func (kl *kafkaLock) Unlock() {
kl.mu.Lock()
defer kl.mu.Unlock()
kl.doUnlock()
}
func (kl *kafkaLock) doUnlock() {
kl.config.Logger.Debug("unlocking")
if kl.consumerGroup == nil {
kl.config.Logger.Debug("already unlocked")
return
}
kl.consumerGroup.Close()
kl.wg.Wait()
kl.consumerGroup = nil
kl.config.KafkaConfig.Consumer.Group.Member.UserData = nil
kl.cancel()
kl.config.Logger.Debug("lock released")
}
func (kl *kafkaLock) Close() error {
kl.mu.Lock()
defer kl.mu.Unlock()
kl.config.Logger.Debug("closing")
kl.doUnlock()
if kl.client != nil {
kl.client.Close()
}
kl.config.FlapGuard.Close()
return nil
}
func (kl *kafkaLock) Setup(sess sarama.ConsumerGroupSession) error {
kl.config.Logger.Debugw("setup", "memberID", sess.MemberID())
if len(sess.Claims()) == 0 {
kl.consumerStateCh <- consumerStatePartitionNotAssigned
}
return nil
}
func (kl *kafkaLock) Cleanup(sess sarama.ConsumerGroupSession) error {
return nil
}
func (kl *kafkaLock) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
kl.consumerStateCh <- consumerStateProcessingPartition
kl.config.Logger.Debug("started processing")
<-claim.Messages()
return nil
}
func (kl *kafkaLock) GetLogger() *Logger {
return kl.config.Logger
}
var _ KafkaLocker = (*kafkaLock)(nil)
var _ sarama.ConsumerGroupHandler = (*kafkaLock)(nil)
var _ LoggerProvider = (*kafkaLock)(nil)
// NewKafkaLocker creates a new instance of KafkaLocker. Use NewConfig() to obtain the Config struct.
func NewKafkaLocker(config *Config) (KafkaLocker, error) {
config, err := processConfig(config)
if err != nil {
return nil, err
}
err = validateTopic(config)
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(context.Background())
cancel()
return &kafkaLock{
config: config,
delay: time.Duration(float64(config.KafkaConfig.Consumer.Group.Session.Timeout) * 1.2),
ctx: ctx,
}, nil
}