Skip to content

Commit

Permalink
Merge pull request #54 from algobot76/allow-auto-topic-creation
Browse files Browse the repository at this point in the history
feat(kq): allow auto topic creation
  • Loading branch information
kevwan committed Oct 6, 2023
2 parents 961c620 + 67b8ce0 commit c223788
Showing 1 changed file with 31 additions and 18 deletions.
49 changes: 31 additions & 18 deletions kq/pusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,19 @@ import (
)

type (
PushOption func(options *chunkOptions)
PushOption func(options *pushOptions)

Pusher struct {
producer *kafka.Writer
topic string
executor *executors.ChunkExecutor
}

chunkOptions struct {
pushOptions struct {
// kafka.Writer options
allowAutoTopicCreation bool

// executors.ChunkExecutor options
chunkSize int
flushInterval time.Duration
}
Expand All @@ -33,6 +37,24 @@ func NewPusher(addrs []string, topic string, opts ...PushOption) *Pusher {
Balancer: &kafka.LeastBytes{},
Compression: kafka.Snappy,
}

var options pushOptions
for _, opt := range opts {
opt(&options)
}

// apply kafka.Writer options
producer.AllowAutoTopicCreation = options.allowAutoTopicCreation

// apply ChunkExecutor options
var chunkOpts []executors.ChunkOption
if options.chunkSize > 0 {
chunkOpts = append(chunkOpts, executors.WithChunkBytes(options.chunkSize))
}
if options.flushInterval > 0 {
chunkOpts = append(chunkOpts, executors.WithFlushInterval(options.flushInterval))
}

pusher := &Pusher{
producer: producer,
topic: topic,
Expand All @@ -45,7 +67,7 @@ func NewPusher(addrs []string, topic string, opts ...PushOption) *Pusher {
if err := pusher.producer.WriteMessages(context.Background(), chunk...); err != nil {
logx.Error(err)
}
}, newOptions(opts)...)
}, chunkOpts...)

return pusher
}
Expand Down Expand Up @@ -79,30 +101,21 @@ func (p *Pusher) Push(v string) error {

// WithChunkSize customizes the Pusher with the given chunk size.
func WithChunkSize(chunkSize int) PushOption {
return func(options *chunkOptions) {
return func(options *pushOptions) {
options.chunkSize = chunkSize
}
}

// WithFlushInterval customizes the Pusher with the given flush interval.
func WithFlushInterval(interval time.Duration) PushOption {
return func(options *chunkOptions) {
return func(options *pushOptions) {
options.flushInterval = interval
}
}

func newOptions(opts []PushOption) []executors.ChunkOption {
var options chunkOptions
for _, opt := range opts {
opt(&options)
}

var chunkOpts []executors.ChunkOption
if options.chunkSize > 0 {
chunkOpts = append(chunkOpts, executors.WithChunkBytes(options.chunkSize))
}
if options.flushInterval > 0 {
chunkOpts = append(chunkOpts, executors.WithFlushInterval(options.flushInterval))
// WithAllowAutoTopicCreation allows the Pusher to create the given topic if it does not exist.
func WithAllowAutoTopicCreation() PushOption {
return func(options *pushOptions) {
options.allowAutoTopicCreation = true
}
return chunkOpts
}

0 comments on commit c223788

Please sign in to comment.