Skip to content

Commit

Permalink
jetstream: add option to pass arbitrary headers on publish
Browse files Browse the repository at this point in the history
This will allow callers to pass custom header values without needing to
import anything beyond the jetstream sub-package.
  • Loading branch information
AlexCuse committed Sep 10, 2023
1 parent 28ec47e commit 46daa76
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 0 deletions.
24 changes: 24 additions & 0 deletions jetstream/jetstream_test.go
Expand Up @@ -16,6 +16,7 @@ package jetstream
import (
"errors"
"fmt"
"reflect"
"testing"
"time"

Expand Down Expand Up @@ -116,3 +117,26 @@ func TestValidateSubject(t *testing.T) {
})
}
}

func TestOptions(t *testing.T) {
t.Run("publish", func(t *testing.T) {
t.Run("WithHeader", func(t *testing.T) {
customHeader := "test"
customHeaderValues := []string{"some", "values"}
po := pubOpts{}
sut := WithHeader(customHeader, customHeaderValues)

err := sut(&po)
if err != nil {
t.Fatalf("expected nil error got: %s", err)
}
if po.headerValues == nil {
t.Fatal("nil headerValues")
}

if got := po.headerValues.Values(customHeader); !reflect.DeepEqual(got, customHeaderValues) {
t.Fatalf("header %s not set - expected %+v got %+v", customHeader, customHeaderValues, got)
}
})
})
}
13 changes: 13 additions & 0 deletions jetstream/options.go
Expand Up @@ -16,6 +16,8 @@ package jetstream
import (
"fmt"
"time"

"github.com/nats-io/nats.go"
)

type pullOptFunc func(*consumeOpts) error
Expand Down Expand Up @@ -250,6 +252,17 @@ func WithStreamListSubject(subject string) StreamListOpt {
}
}

// WithHeader adds an arbitrary header to the underlying message.
func WithHeader(key string, value []string) PublishOpt {
return func(opts *pubOpts) error {
if opts.headerValues == nil {
opts.headerValues = nats.Header{}
}
opts.headerValues[key] = value
return nil
}
}

// WithMsgID sets the message ID used for deduplication.
func WithMsgID(id string) PublishOpt {
return func(opts *pubOpts) error {
Expand Down
9 changes: 9 additions & 0 deletions jetstream/publish.go
Expand Up @@ -52,6 +52,9 @@ type (

// stallWait is the max wait of a async pub ack.
stallWait time.Duration

// headerValues contains any arbitrary headers provided by the caller
headerValues nats.Header
}

// PubAckFuture is a future for a PubAck.
Expand Down Expand Up @@ -171,6 +174,12 @@ func (js *jetStream) PublishMsg(ctx context.Context, m *nats.Msg, opts ...Publis
m.Header.Set(ExpectedLastSubjSeqHeader, strconv.FormatUint(*o.lastSubjectSeq, 10))
}

if o.headerValues != nil {
for k, v := range o.headerValues {
m.Header[k] = v
}
}

var resp *nats.Msg
var err error

Expand Down

0 comments on commit 46daa76

Please sign in to comment.