From e4df87dc82a8d99110afc41cab2b5d8a3aabc349 Mon Sep 17 00:00:00 2001 From: Jarema Date: Fri, 28 Oct 2022 14:28:49 +0200 Subject: [PATCH 1/8] Make lookupStreamBySubject public Signed-off-by: Jarema --- js.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/js.go b/js.go index 08a8212d7..86384538b 100644 --- a/js.go +++ b/js.go @@ -1529,7 +1529,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, // Find the stream mapped to the subject if not bound to a stream already. if o.stream == _EMPTY_ { - stream, err = js.lookupStreamBySubject(subj) + stream, err = js.LookupStreamBySubject(subj) if err != nil { return nil, err } @@ -2144,7 +2144,8 @@ type streamNamesResponse struct { Streams []string `json:"streams"` } -func (js *js) lookupStreamBySubject(subj string) (string, error) { +// LookupStreamBySubject returns a stream name that matches the subject. +func (js *js) LookupStreamBySubject(subj string) (string, error) { var slr streamNamesResponse req := &streamRequest{subj} j, err := json.Marshal(req) From e34d8e8baf6ba625138331745b5c4e1a66f0f598 Mon Sep 17 00:00:00 2001 From: Jarema Date: Fri, 28 Oct 2022 14:43:18 +0200 Subject: [PATCH 2/8] Add StreamBySubject to the jsm interface Signed-off-by: Jarema --- js.go | 6 +++--- js_test.go | 31 +++++++++++++++++++++++++++++++ jsm.go | 3 +++ 3 files changed, 37 insertions(+), 3 deletions(-) diff --git a/js.go b/js.go index 86384538b..54a9d4bb2 100644 --- a/js.go +++ b/js.go @@ -1529,7 +1529,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, // Find the stream mapped to the subject if not bound to a stream already. if o.stream == _EMPTY_ { - stream, err = js.LookupStreamBySubject(subj) + stream, err = js.StreamNameBySubject(subj) if err != nil { return nil, err } @@ -2144,8 +2144,8 @@ type streamNamesResponse struct { Streams []string `json:"streams"` } -// LookupStreamBySubject returns a stream name that matches the subject. -func (js *js) LookupStreamBySubject(subj string) (string, error) { +// StreamNameBySubject returns a stream name that matches the subject. +func (js *js) StreamNameBySubject(subj string) (string, error) { var slr streamNamesResponse req := &streamRequest{subj} j, err := json.Marshal(req) diff --git a/js_test.go b/js_test.go index 7eadba65a..4bbdb4d03 100644 --- a/js_test.go +++ b/js_test.go @@ -1230,3 +1230,34 @@ func TestJetStreamStreamInfoWithSubjectDetails(t *testing.T) { t.Fatalf("expected 0 subjects details from StreamInfo, but got %d instead", len(result.State.Subjects)) } } + +func StreamNameBySubject(t *testing.T) { + + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + var err error + + _, err = js.AddStream(&StreamConfig{ + Name: "TEST", + Subjects: []string{"test.*"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + stream, err := js.StreamNameBySubject("test.*") + if err != nil { + t.Fatalf("lookup stream should succeed for %s", "test.*") + } + if stream != "TEST" { + t.Fatalf("returned stream should be 'TEST'") + } + + if _, err := js.StreamNameBySubject("bad"); err == nil { + t.Fatalf("error should be nil, no stream with name 'bad'") + } +} diff --git a/jsm.go b/jsm.go index cd4c79ab9..04e064212 100644 --- a/jsm.go +++ b/jsm.go @@ -93,6 +93,9 @@ type JetStreamManager interface { // AccountInfo retrieves info about the JetStream usage from an account. AccountInfo(opts ...JSOpt) (*AccountInfo, error) + + // Returns a stream matching given subject. + StreamNameBySubject(string) (string, error) } // StreamConfig will determine the properties for a stream. From 96f38334be22ed05afe45b9e815c51fbaa31a231 Mon Sep 17 00:00:00 2001 From: Jarema Date: Fri, 28 Oct 2022 15:00:44 +0200 Subject: [PATCH 3/8] Add test prefix --- js_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/js_test.go b/js_test.go index 4bbdb4d03..b4cb25bef 100644 --- a/js_test.go +++ b/js_test.go @@ -1231,7 +1231,7 @@ func TestJetStreamStreamInfoWithSubjectDetails(t *testing.T) { } } -func StreamNameBySubject(t *testing.T) { +func TestStreamNameBySubject(t *testing.T) { s := RunBasicJetStreamServer() defer shutdownJSServerAndRemoveStorage(t, s) From de487d5a1b19485f5b51d401808573aed777bc07 Mon Sep 17 00:00:00 2001 From: Jarema Date: Fri, 28 Oct 2022 15:06:24 +0200 Subject: [PATCH 4/8] Move StreanNameBySubject to jsm.go Signed-off-by: Jarema --- go.mod | 3 ++- go.sum | 8 +++++++- js.go | 25 ------------------------- jsm.go | 27 ++++++++++++++++++++++++++- 4 files changed, 35 insertions(+), 28 deletions(-) diff --git a/go.mod b/go.mod index 0f9e0b911..7c79e2ec2 100644 --- a/go.mod +++ b/go.mod @@ -5,4 +5,5 @@ go 1.16 require ( github.com/nats-io/nkeys v0.3.0 github.com/nats-io/nuid v1.0.1 -) \ No newline at end of file + google.golang.org/protobuf v1.28.1 // indirect +) diff --git a/go.sum b/go.sum index 00fe58ccd..fba42fd38 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= @@ -8,4 +10,8 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= \ No newline at end of file +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w= +google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= diff --git a/js.go b/js.go index 54a9d4bb2..072940907 100644 --- a/js.go +++ b/js.go @@ -2144,31 +2144,6 @@ type streamNamesResponse struct { Streams []string `json:"streams"` } -// StreamNameBySubject returns a stream name that matches the subject. -func (js *js) StreamNameBySubject(subj string) (string, error) { - var slr streamNamesResponse - req := &streamRequest{subj} - j, err := json.Marshal(req) - if err != nil { - return _EMPTY_, err - } - resp, err := js.nc.Request(js.apiSubj(apiStreams), j, js.opts.wait) - if err != nil { - if err == ErrNoResponders { - err = ErrJetStreamNotEnabled - } - return _EMPTY_, err - } - if err := json.Unmarshal(resp.Data, &slr); err != nil { - return _EMPTY_, err - } - - if slr.Error != nil || len(slr.Streams) != 1 { - return _EMPTY_, ErrNoMatchingStream - } - return slr.Streams[0], nil -} - type subOpts struct { // For attaching. stream, consumer string diff --git a/jsm.go b/jsm.go index 04e064212..f5ecb23e0 100644 --- a/jsm.go +++ b/jsm.go @@ -94,7 +94,7 @@ type JetStreamManager interface { // AccountInfo retrieves info about the JetStream usage from an account. AccountInfo(opts ...JSOpt) (*AccountInfo, error) - // Returns a stream matching given subject. + // StreamNameBySubjec rteturns a stream matching given subject. StreamNameBySubject(string) (string, error) } @@ -1518,6 +1518,31 @@ func (jsc *js) StreamNames(opts ...JSOpt) <-chan string { return ch } +// StreamNameBySubject returns a stream name that matches the subject. +func (jsc *js) StreamNameBySubject(subj string) (string, error) { + var slr streamNamesResponse + req := &streamRequest{subj} + j, err := json.Marshal(req) + if err != nil { + return _EMPTY_, err + } + resp, err := jsc.nc.Request(jsc.apiSubj(apiStreams), j, jsc.opts.wait) + if err != nil { + if err == ErrNoResponders { + err = ErrJetStreamNotEnabled + } + return _EMPTY_, err + } + if err := json.Unmarshal(resp.Data, &slr); err != nil { + return _EMPTY_, err + } + + if slr.Error != nil || len(slr.Streams) != 1 { + return _EMPTY_, ErrNoMatchingStream + } + return slr.Streams[0], nil +} + func getJSContextOpts(defs *jsOpts, opts ...JSOpt) (*jsOpts, context.CancelFunc, error) { var o jsOpts for _, opt := range opts { From ac3ef92f2e74c1c5ddf47256eb9899ba3bd540d1 Mon Sep 17 00:00:00 2001 From: Jarema Date: Fri, 28 Oct 2022 15:30:52 +0200 Subject: [PATCH 5/8] Refactor test into test table --- js_test.go | 27 ++++++++++++++++++--------- 1 file changed, 18 insertions(+), 9 deletions(-) diff --git a/js_test.go b/js_test.go index b4cb25bef..583723629 100644 --- a/js_test.go +++ b/js_test.go @@ -1249,15 +1249,24 @@ func TestStreamNameBySubject(t *testing.T) { t.Fatalf("Unexpected error: %v", err) } - stream, err := js.StreamNameBySubject("test.*") - if err != nil { - t.Fatalf("lookup stream should succeed for %s", "test.*") - } - if stream != "TEST" { - t.Fatalf("returned stream should be 'TEST'") - } + for _, test := range []struct { + name string + streamName string + err error + }{ - if _, err := js.StreamNameBySubject("bad"); err == nil { - t.Fatalf("error should be nil, no stream with name 'bad'") + {name: "valid wildcard lookup", streamName: "test.*", err: nil}, + {name: "valid explicit lookup", streamName: "test.a", err: nil}, + {name: "lookup on not existing stream", streamName: "not.existing", err: ErrNoMatchingStream}, + } { + + stream, err := js.StreamNameBySubject(test.streamName) + if err != test.err { + t.Fatalf("expected %v, got %v", test.err, err) + } + + if stream != "TEST" && err == nil { + t.Fatalf("returned stream name should be 'TEST'") + } } } From 06dad433c9d161f2a8e46fe52264beae5a6e0dc3 Mon Sep 17 00:00:00 2001 From: Jarema Date: Fri, 28 Oct 2022 15:31:28 +0200 Subject: [PATCH 6/8] Get rid of newline --- js_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/js_test.go b/js_test.go index 583723629..fd7a87b3e 100644 --- a/js_test.go +++ b/js_test.go @@ -1232,7 +1232,6 @@ func TestJetStreamStreamInfoWithSubjectDetails(t *testing.T) { } func TestStreamNameBySubject(t *testing.T) { - s := RunBasicJetStreamServer() defer shutdownJSServerAndRemoveStorage(t, s) From bff7b16516d8068fc133a6caba0941b5b5565e1d Mon Sep 17 00:00:00 2001 From: Jarema Date: Fri, 28 Oct 2022 15:42:17 +0200 Subject: [PATCH 7/8] Revert go.mod and go.sum --- go.mod | 3 +-- go.sum | 8 +------- 2 files changed, 2 insertions(+), 9 deletions(-) diff --git a/go.mod b/go.mod index 7c79e2ec2..0f9e0b911 100644 --- a/go.mod +++ b/go.mod @@ -5,5 +5,4 @@ go 1.16 require ( github.com/nats-io/nkeys v0.3.0 github.com/nats-io/nuid v1.0.1 - google.golang.org/protobuf v1.28.1 // indirect -) +) \ No newline at end of file diff --git a/go.sum b/go.sum index fba42fd38..00fe58ccd 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,3 @@ -github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= -github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= @@ -10,8 +8,4 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= -golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= -google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w= -google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= \ No newline at end of file From a1ff3704ba3ed1fe2e1b2ad62ead59423f9950d3 Mon Sep 17 00:00:00 2001 From: Jarema Date: Mon, 31 Oct 2022 10:51:12 +0100 Subject: [PATCH 8/8] Add JSOpt to StreamNameBySubject Signed-off-by: Jarema --- jsm.go | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/jsm.go b/jsm.go index f5ecb23e0..e71d37fca 100644 --- a/jsm.go +++ b/jsm.go @@ -95,7 +95,7 @@ type JetStreamManager interface { AccountInfo(opts ...JSOpt) (*AccountInfo, error) // StreamNameBySubjec rteturns a stream matching given subject. - StreamNameBySubject(string) (string, error) + StreamNameBySubject(string, ...JSOpt) (string, error) } // StreamConfig will determine the properties for a stream. @@ -1519,14 +1519,23 @@ func (jsc *js) StreamNames(opts ...JSOpt) <-chan string { } // StreamNameBySubject returns a stream name that matches the subject. -func (jsc *js) StreamNameBySubject(subj string) (string, error) { +func (jsc *js) StreamNameBySubject(subj string, opts ...JSOpt) (string, error) { + o, cancel, err := getJSContextOpts(jsc.opts, opts...) + if err != nil { + return "", err + } + if cancel != nil { + defer cancel() + } + var slr streamNamesResponse req := &streamRequest{subj} j, err := json.Marshal(req) if err != nil { return _EMPTY_, err } - resp, err := jsc.nc.Request(jsc.apiSubj(apiStreams), j, jsc.opts.wait) + + resp, err := jsc.apiRequestWithContext(o.ctx, jsc.apiSubj(apiStreams), j) if err != nil { if err == ErrNoResponders { err = ErrJetStreamNotEnabled