Skip to content

Commit

Permalink
Rework nsq components to new APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
Jeffail committed Nov 28, 2023
1 parent fd2c1ed commit f893114
Show file tree
Hide file tree
Showing 10 changed files with 261 additions and 205 deletions.
2 changes: 0 additions & 2 deletions internal/component/input/config.go
Expand Up @@ -17,7 +17,6 @@ type Config struct {
Dynamic DynamicConfig `json:"dynamic" yaml:"dynamic"`
Generate GenerateConfig `json:"generate" yaml:"generate"`
Inproc InprocConfig `json:"inproc" yaml:"inproc"`
NSQ NSQConfig `json:"nsq" yaml:"nsq"`
Plugin any `json:"plugin,omitempty" yaml:"plugin,omitempty"`
ReadUntil ReadUntilConfig `json:"read_until" yaml:"read_until"`
Resource string `json:"resource" yaml:"resource"`
Expand All @@ -41,7 +40,6 @@ func NewConfig() Config {
Dynamic: NewDynamicConfig(),
Generate: NewGenerateConfig(),
Inproc: NewInprocConfig(),
NSQ: NewNSQConfig(),
Plugin: nil,
ReadUntil: NewReadUntilConfig(),
Resource: "",
Expand Down
31 changes: 0 additions & 31 deletions internal/component/input/config_nsq.go

This file was deleted.

2 changes: 0 additions & 2 deletions internal/component/output/config.go
Expand Up @@ -21,7 +21,6 @@ type Config struct {
Dynamic DynamicConfig `json:"dynamic" yaml:"dynamic"`
Fallback TryConfig `json:"fallback" yaml:"fallback"`
Inproc string `json:"inproc" yaml:"inproc"`
NSQ NSQConfig `json:"nsq" yaml:"nsq"`
Plugin any `json:"plugin,omitempty" yaml:"plugin,omitempty"`
Reject string `json:"reject" yaml:"reject"`
Resource string `json:"resource" yaml:"resource"`
Expand Down Expand Up @@ -50,7 +49,6 @@ func NewConfig() Config {
Dynamic: NewDynamicConfig(),
Fallback: NewTryConfig(),
Inproc: "",
NSQ: NewNSQConfig(),
Plugin: nil,
Reject: "",
Resource: "",
Expand Down
25 changes: 0 additions & 25 deletions internal/component/output/config_nsq.go

This file was deleted.

28 changes: 28 additions & 0 deletions internal/impl/nsq/docker-compose.yaml
@@ -0,0 +1,28 @@
# Surprisingly, there still seems to be absolutely no options available for
# running a single node set up of NSQ for testing purposes, which means it's
# extremely awkward to write real integration tests. Instead, we have this
# docker-compose set up where if you run it and then execute unit tests for this
# package it'll run them.
version: '3'
services:
nsqlookupd:
image: nsqio/nsq
command: /nsqlookupd
ports:
- "4160:4160"
- "4161"
nsqd:
image: nsqio/nsq
command: /nsqd --lookupd-tcp-address=nsqlookupd:4160
depends_on:
- nsqlookupd
ports:
- "4150:4150"
- "4151"
nsqadmin:
image: nsqio/nsq
command: /nsqadmin --lookupd-http-address=nsqlookupd:4161
depends_on:
- nsqlookupd
ports:
- "4171"
155 changes: 94 additions & 61 deletions internal/impl/nsq/input.go
Expand Up @@ -11,62 +11,70 @@ import (

"github.com/nsqio/go-nsq"

"github.com/benthosdev/benthos/v4/internal/bundle"
"github.com/benthosdev/benthos/v4/internal/component"
"github.com/benthosdev/benthos/v4/internal/component/input"
"github.com/benthosdev/benthos/v4/internal/component/input/processors"
"github.com/benthosdev/benthos/v4/internal/docs"
"github.com/benthosdev/benthos/v4/internal/log"
"github.com/benthosdev/benthos/v4/internal/message"
btls "github.com/benthosdev/benthos/v4/internal/tls"
"github.com/benthosdev/benthos/v4/public/service"
)

func init() {
err := bundle.AllInputs.Add(processors.WrapConstructor(newNSQInput), docs.ComponentSpec{
Name: "nsq",
Summary: `Subscribe to an NSQ instance topic and channel.`,
Description: `
const (
niFieldNSQDAddrs = "nsqd_tcp_addresses"
niFieldLookupDAddrs = "lookupd_http_addresses"
niFieldTLS = "tls"
niFieldMaxInFlight = "max_in_flight"
niFieldTopic = "topic"
niFieldChannel = "channel"
niFieldUserAgent = "user_agent"
niFieldMaxAttempts = "max_attempts"
)

func inputConfigSpec() *service.ConfigSpec {
return service.NewConfigSpec().
Stable().
Categories("Services").
Summary(`Subscribe to an NSQ instance topic and channel.`).
Description(`
### Metadata
This input adds the following metadata fields to each message:
` + "``` text" + `
`+"``` text"+`
- nsq_attempts
- nsq_id
- nsq_nsqd_address
- nsq_timestamp
` + "```" + `
`+"```"+`
You can access these metadata fields using [function interpolation](/docs/configuration/interpolation#bloblang-queries).
`,
Config: docs.FieldComponent().WithChildren(
docs.FieldString("nsqd_tcp_addresses", "A list of nsqd addresses to connect to.").Array(),
docs.FieldString("lookupd_http_addresses", "A list of nsqlookupd addresses to connect to.").Array(),
btls.FieldSpec(),
docs.FieldString("topic", "The topic to consume from."),
docs.FieldString("channel", "The channel to consume from."),
docs.FieldString("user_agent", "A user agent to assume when connecting."),
docs.FieldInt("max_in_flight", "The maximum number of pending messages to consume at any given time."),
docs.FieldInt("max_attempts", "The maximum number of attempts to successfully consume a messages."),
).ChildDefaultAndTypesFromStruct(input.NewNSQConfig()),
Categories: []string{
"Services",
},
`).
Fields(
service.NewStringListField(niFieldNSQDAddrs).
Description("A list of nsqd addresses to connect to."),
service.NewStringListField(niFieldLookupDAddrs).
Description("A list of nsqlookupd addresses to connect to."),
service.NewTLSToggledField(niFieldTLS),
service.NewStringField(niFieldTopic).
Description("The topic to consume from."),
service.NewStringField(niFieldChannel).
Description("The channel to consume from."),
service.NewStringField(niFieldUserAgent).
Description("A user agent to assume when connecting.").
Optional(),
service.NewIntField(niFieldMaxInFlight).
Description("The maximum number of pending messages to consume at any given time.").
Default(100),
service.NewIntField(niFieldMaxAttempts).
Description("The maximum number of attempts to successfully consume a messages.").
Default(5),
)
}

func init() {
err := service.RegisterInput("nsq", inputConfigSpec(), func(conf *service.ParsedConfig, mgr *service.Resources) (service.Input, error) {
return newNSQReaderFromParsed(conf, mgr)
})
if err != nil {
panic(err)
}
}

func newNSQInput(conf input.Config, mgr bundle.NewManagement) (input.Streamed, error) {
var n input.Async
var err error
if n, err = newNSQReader(conf.NSQ, mgr); err != nil {
return nil, err
}
return input.NewAsyncReader("nsq", n, mgr)
}

type nsqReader struct {
consumer *nsq.Consumer
cMut sync.Mutex
Expand All @@ -76,42 +84,68 @@ type nsqReader struct {
tlsConf *tls.Config
addresses []string
lookupAddresses []string
conf input.NSQConfig
log log.Modular
topic string
channel string
userAgent string
maxInFlight int
maxAttempts uint16
log *service.Logger

internalMessages chan *nsq.Message
interruptChan chan struct{}
interruptOnce sync.Once
}

func newNSQReader(conf input.NSQConfig, mgr bundle.NewManagement) (*nsqReader, error) {
n := nsqReader{
conf: conf,
func newNSQReaderFromParsed(conf *service.ParsedConfig, mgr *service.Resources) (n *nsqReader, err error) {
n = &nsqReader{
log: mgr.Logger(),
internalMessages: make(chan *nsq.Message),
interruptChan: make(chan struct{}),
}
for _, addr := range conf.Addresses {

var addresses []string
if addresses, err = conf.FieldStringList(niFieldNSQDAddrs); err != nil {
return
}
for _, addr := range addresses {
for _, splitAddr := range strings.Split(addr, ",") {
if len(splitAddr) > 0 {
n.addresses = append(n.addresses, splitAddr)
}
}
}
for _, addr := range conf.LookupAddresses {

if addresses, err = conf.FieldStringList(niFieldLookupDAddrs); err != nil {
return
}
for _, addr := range addresses {
for _, splitAddr := range strings.Split(addr, ",") {
if len(splitAddr) > 0 {
n.lookupAddresses = append(n.lookupAddresses, splitAddr)
}
}
}
if conf.TLS.Enabled {
var err error
if n.tlsConf, err = conf.TLS.Get(mgr.FS()); err != nil {
return nil, err
}

if n.tlsConf, _, err = conf.FieldTLSToggled(niFieldTLS); err != nil {
return
}

if n.topic, err = conf.FieldString(niFieldTopic); err != nil {
return
}
return &n, nil
if n.channel, err = conf.FieldString(niFieldChannel); err != nil {
return
}
n.userAgent, _ = conf.FieldString(niFieldUserAgent)
if n.maxInFlight, err = conf.FieldMaxInFlight(); err != nil {
return
}
var tmpMA int
if tmpMA, err = conf.FieldInt(niFieldMaxAttempts); err != nil {
return
}
n.maxAttempts = uint16(tmpMA)
return
}

func (n *nsqReader) HandleMessage(message *nsq.Message) error {
Expand All @@ -134,16 +168,16 @@ func (n *nsqReader) Connect(ctx context.Context) (err error) {
}

cfg := nsq.NewConfig()
cfg.UserAgent = n.conf.UserAgent
cfg.MaxInFlight = n.conf.MaxInFlight
cfg.MaxAttempts = n.conf.MaxAttempts
cfg.UserAgent = n.userAgent
cfg.MaxInFlight = n.maxInFlight
cfg.MaxAttempts = n.maxAttempts
if n.tlsConf != nil {
cfg.TlsV1 = true
cfg.TlsConfig = n.tlsConf
}

var consumer *nsq.Consumer
if consumer, err = nsq.NewConsumer(n.conf.Topic, n.conf.Channel, cfg); err != nil {
if consumer, err = nsq.NewConsumer(n.topic, n.channel, cfg); err != nil {
return
}

Expand Down Expand Up @@ -181,33 +215,32 @@ func (n *nsqReader) read(ctx context.Context) (*nsq.Message, error) {
case msg = <-n.internalMessages:
return msg, nil
case <-ctx.Done():
return nil, ctx.Err()
case <-n.interruptChan:
for _, m := range n.unAckMsgs {
m.Requeue(-1)
m.Finish()
}
n.unAckMsgs = nil
_ = n.disconnect()
return nil, component.ErrTypeClosed
return nil, service.ErrEndOfInput
}
return nil, component.ErrTimeout
}

func (n *nsqReader) ReadBatch(ctx context.Context) (message.Batch, input.AsyncAckFn, error) {
func (n *nsqReader) Read(ctx context.Context) (*service.Message, service.AckFunc, error) {
msg, err := n.read(ctx)
if err != nil {
return nil, nil, err
}
n.unAckMsgs = append(n.unAckMsgs, msg)

bmsg := message.QuickBatch([][]byte{msg.Body})
part := bmsg.Get(0)
part := service.NewMessage(msg.Body)
part.MetaSetMut("nsq_attempts", strconv.Itoa(int(msg.Attempts)))
part.MetaSetMut("nsq_id", string(msg.ID[:]))
part.MetaSetMut("nsq_timestamp", strconv.FormatInt(msg.Timestamp, 10))
part.MetaSetMut("nsq_nsqd_address", msg.NSQDAddress)

return bmsg, func(rctx context.Context, res error) error {
return part, func(rctx context.Context, res error) error {
if res != nil {
msg.Requeue(-1)
}
Expand Down

0 comments on commit f893114

Please sign in to comment.