diff --git a/js.go b/js.go index 08a8212d7..072940907 100644 --- a/js.go +++ b/js.go @@ -1529,7 +1529,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, // Find the stream mapped to the subject if not bound to a stream already. if o.stream == _EMPTY_ { - stream, err = js.lookupStreamBySubject(subj) + stream, err = js.StreamNameBySubject(subj) if err != nil { return nil, err } @@ -2144,30 +2144,6 @@ type streamNamesResponse struct { Streams []string `json:"streams"` } -func (js *js) lookupStreamBySubject(subj string) (string, error) { - var slr streamNamesResponse - req := &streamRequest{subj} - j, err := json.Marshal(req) - if err != nil { - return _EMPTY_, err - } - resp, err := js.nc.Request(js.apiSubj(apiStreams), j, js.opts.wait) - if err != nil { - if err == ErrNoResponders { - err = ErrJetStreamNotEnabled - } - return _EMPTY_, err - } - if err := json.Unmarshal(resp.Data, &slr); err != nil { - return _EMPTY_, err - } - - if slr.Error != nil || len(slr.Streams) != 1 { - return _EMPTY_, ErrNoMatchingStream - } - return slr.Streams[0], nil -} - type subOpts struct { // For attaching. stream, consumer string diff --git a/js_test.go b/js_test.go index 7eadba65a..fd7a87b3e 100644 --- a/js_test.go +++ b/js_test.go @@ -1230,3 +1230,42 @@ func TestJetStreamStreamInfoWithSubjectDetails(t *testing.T) { t.Fatalf("expected 0 subjects details from StreamInfo, but got %d instead", len(result.State.Subjects)) } } + +func TestStreamNameBySubject(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + var err error + + _, err = js.AddStream(&StreamConfig{ + Name: "TEST", + Subjects: []string{"test.*"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + for _, test := range []struct { + name string + streamName string + err error + }{ + + {name: "valid wildcard lookup", streamName: "test.*", err: nil}, + {name: "valid explicit lookup", streamName: "test.a", err: nil}, + {name: "lookup on not existing stream", streamName: "not.existing", err: ErrNoMatchingStream}, + } { + + stream, err := js.StreamNameBySubject(test.streamName) + if err != test.err { + t.Fatalf("expected %v, got %v", test.err, err) + } + + if stream != "TEST" && err == nil { + t.Fatalf("returned stream name should be 'TEST'") + } + } +} diff --git a/jsm.go b/jsm.go index cd4c79ab9..e71d37fca 100644 --- a/jsm.go +++ b/jsm.go @@ -93,6 +93,9 @@ type JetStreamManager interface { // AccountInfo retrieves info about the JetStream usage from an account. AccountInfo(opts ...JSOpt) (*AccountInfo, error) + + // StreamNameBySubjec rteturns a stream matching given subject. + StreamNameBySubject(string, ...JSOpt) (string, error) } // StreamConfig will determine the properties for a stream. @@ -1515,6 +1518,40 @@ func (jsc *js) StreamNames(opts ...JSOpt) <-chan string { return ch } +// StreamNameBySubject returns a stream name that matches the subject. +func (jsc *js) StreamNameBySubject(subj string, opts ...JSOpt) (string, error) { + o, cancel, err := getJSContextOpts(jsc.opts, opts...) + if err != nil { + return "", err + } + if cancel != nil { + defer cancel() + } + + var slr streamNamesResponse + req := &streamRequest{subj} + j, err := json.Marshal(req) + if err != nil { + return _EMPTY_, err + } + + resp, err := jsc.apiRequestWithContext(o.ctx, jsc.apiSubj(apiStreams), j) + if err != nil { + if err == ErrNoResponders { + err = ErrJetStreamNotEnabled + } + return _EMPTY_, err + } + if err := json.Unmarshal(resp.Data, &slr); err != nil { + return _EMPTY_, err + } + + if slr.Error != nil || len(slr.Streams) != 1 { + return _EMPTY_, ErrNoMatchingStream + } + return slr.Streams[0], nil +} + func getJSContextOpts(defs *jsOpts, opts ...JSOpt) (*jsOpts, context.CancelFunc, error) { var o jsOpts for _, opt := range opts {