Skip to content

Commit

Permalink
[CHANGED] Make lookupStreamBySubject public (#1114)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jarema committed Oct 31, 2022
1 parent 6e4828a commit 7ac1087
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 25 deletions.
26 changes: 1 addition & 25 deletions js.go
Expand Up @@ -1529,7 +1529,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,

// Find the stream mapped to the subject if not bound to a stream already.
if o.stream == _EMPTY_ {
stream, err = js.lookupStreamBySubject(subj)
stream, err = js.StreamNameBySubject(subj)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -2144,30 +2144,6 @@ type streamNamesResponse struct {
Streams []string `json:"streams"`
}

func (js *js) lookupStreamBySubject(subj string) (string, error) {
var slr streamNamesResponse
req := &streamRequest{subj}
j, err := json.Marshal(req)
if err != nil {
return _EMPTY_, err
}
resp, err := js.nc.Request(js.apiSubj(apiStreams), j, js.opts.wait)
if err != nil {
if err == ErrNoResponders {
err = ErrJetStreamNotEnabled
}
return _EMPTY_, err
}
if err := json.Unmarshal(resp.Data, &slr); err != nil {
return _EMPTY_, err
}

if slr.Error != nil || len(slr.Streams) != 1 {
return _EMPTY_, ErrNoMatchingStream
}
return slr.Streams[0], nil
}

type subOpts struct {
// For attaching.
stream, consumer string
Expand Down
39 changes: 39 additions & 0 deletions js_test.go
Expand Up @@ -1230,3 +1230,42 @@ func TestJetStreamStreamInfoWithSubjectDetails(t *testing.T) {
t.Fatalf("expected 0 subjects details from StreamInfo, but got %d instead", len(result.State.Subjects))
}
}

func TestStreamNameBySubject(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

var err error

_, err = js.AddStream(&StreamConfig{
Name: "TEST",
Subjects: []string{"test.*"},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

for _, test := range []struct {
name string
streamName string
err error
}{

{name: "valid wildcard lookup", streamName: "test.*", err: nil},
{name: "valid explicit lookup", streamName: "test.a", err: nil},
{name: "lookup on not existing stream", streamName: "not.existing", err: ErrNoMatchingStream},
} {

stream, err := js.StreamNameBySubject(test.streamName)
if err != test.err {
t.Fatalf("expected %v, got %v", test.err, err)
}

if stream != "TEST" && err == nil {
t.Fatalf("returned stream name should be 'TEST'")
}
}
}
37 changes: 37 additions & 0 deletions jsm.go
Expand Up @@ -93,6 +93,9 @@ type JetStreamManager interface {

// AccountInfo retrieves info about the JetStream usage from an account.
AccountInfo(opts ...JSOpt) (*AccountInfo, error)

// StreamNameBySubjec rteturns a stream matching given subject.
StreamNameBySubject(string, ...JSOpt) (string, error)
}

// StreamConfig will determine the properties for a stream.
Expand Down Expand Up @@ -1515,6 +1518,40 @@ func (jsc *js) StreamNames(opts ...JSOpt) <-chan string {
return ch
}

// StreamNameBySubject returns a stream name that matches the subject.
func (jsc *js) StreamNameBySubject(subj string, opts ...JSOpt) (string, error) {
o, cancel, err := getJSContextOpts(jsc.opts, opts...)
if err != nil {
return "", err
}
if cancel != nil {
defer cancel()
}

var slr streamNamesResponse
req := &streamRequest{subj}
j, err := json.Marshal(req)
if err != nil {
return _EMPTY_, err
}

resp, err := jsc.apiRequestWithContext(o.ctx, jsc.apiSubj(apiStreams), j)
if err != nil {
if err == ErrNoResponders {
err = ErrJetStreamNotEnabled
}
return _EMPTY_, err
}
if err := json.Unmarshal(resp.Data, &slr); err != nil {
return _EMPTY_, err
}

if slr.Error != nil || len(slr.Streams) != 1 {
return _EMPTY_, ErrNoMatchingStream
}
return slr.Streams[0], nil
}

func getJSContextOpts(defs *jsOpts, opts ...JSOpt) (*jsOpts, context.CancelFunc, error) {
var o jsOpts
for _, opt := range opts {
Expand Down

0 comments on commit 7ac1087

Please sign in to comment.