Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

jetstream: add option to pass arbitrary headers on publish #1392

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
24 changes: 24 additions & 0 deletions jetstream/jetstream_test.go
Expand Up @@ -16,6 +16,7 @@ package jetstream
import (
"errors"
"fmt"
"reflect"
"testing"
"time"

Expand Down Expand Up @@ -274,3 +275,26 @@ func TestRetryWithBackoff(t *testing.T) {
})
}
}

func TestOptions(t *testing.T) {
t.Run("publish", func(t *testing.T) {
t.Run("WithHeader", func(t *testing.T) {
customHeader := "test"
customHeaderValues := []string{"some", "values"}
po := pubOpts{}
sut := WithHeader(customHeader, customHeaderValues)

err := sut(&po)
if err != nil {
t.Fatalf("expected nil error got: %s", err)
}
if po.headerValues == nil {
t.Fatal("nil headerValues")
}

if got := po.headerValues.Values(customHeader); !reflect.DeepEqual(got, customHeaderValues) {
t.Fatalf("header %s not set - expected %+v got %+v", customHeader, customHeaderValues, got)
}
})
})
}
13 changes: 13 additions & 0 deletions jetstream/options.go
Expand Up @@ -16,6 +16,8 @@ package jetstream
import (
"fmt"
"time"

"github.com/nats-io/nats.go"
)

type pullOptFunc func(*consumeOpts) error
Expand Down Expand Up @@ -281,6 +283,17 @@ func WithStreamListSubject(subject string) StreamListOpt {
}
}

// WithHeader adds an arbitrary header to the underlying message.
func WithHeader(key string, value []string) PublishOpt {
return func(opts *pubOpts) error {
if opts.headerValues == nil {
opts.headerValues = nats.Header{}
}
opts.headerValues[key] = value
return nil
}
}

// WithMsgID sets the message ID used for deduplication.
func WithMsgID(id string) PublishOpt {
return func(opts *pubOpts) error {
Expand Down
15 changes: 15 additions & 0 deletions jetstream/publish.go
Expand Up @@ -52,6 +52,9 @@ type (

// stallWait is the max wait of a async pub ack.
stallWait time.Duration

// headerValues contains any arbitrary headers provided by the caller
headerValues nats.Header
}

// PubAckFuture is a future for a PubAck.
Expand Down Expand Up @@ -171,6 +174,12 @@ func (js *jetStream) PublishMsg(ctx context.Context, m *nats.Msg, opts ...Publis
m.Header.Set(ExpectedLastSubjSeqHeader, strconv.FormatUint(*o.lastSubjectSeq, 10))
}

if o.headerValues != nil {
for k, v := range o.headerValues {
m.Header[k] = v
}
}

var resp *nats.Msg
var err error

Expand Down Expand Up @@ -245,6 +254,12 @@ func (js *jetStream) PublishMsgAsync(m *nats.Msg, opts ...PublishOpt) (PubAckFut
m.Header.Set(ExpectedLastSubjSeqHeader, strconv.FormatUint(*o.lastSubjectSeq, 10))
}

if o.headerValues != nil {
for k, v := range o.headerValues {
m.Header[k] = v
}
}

// Reply
if m.Reply != "" {
return nil, ErrAsyncPublishReplySubjectSet
Expand Down