diff --git a/go_test.mod b/go_test.mod index 1d94d2bcf..961cceaec 100644 --- a/go_test.mod +++ b/go_test.mod @@ -4,7 +4,7 @@ go 1.17 require ( github.com/golang/protobuf v1.4.2 - github.com/nats-io/nats-server/v2 v2.9.2 + github.com/nats-io/nats-server/v2 v2.9.6 github.com/nats-io/nkeys v0.3.0 github.com/nats-io/nuid v1.0.1 google.golang.org/protobuf v1.23.0 diff --git a/go_test.sum b/go_test.sum index dab6e1c47..23c41c442 100644 --- a/go_test.sum +++ b/go_test.sum @@ -20,9 +20,9 @@ github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY= github.com/nats-io/jwt/v2 v2.3.0 h1:z2mA1a7tIf5ShggOFlR1oBPgd6hGqcDYsISxZByUzdI= github.com/nats-io/jwt/v2 v2.3.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k= -github.com/nats-io/nats-server/v2 v2.9.2 h1:XNDgJgOYYaYlquLdbSHI3xssLipfKUOq3EmYIMNCOsE= -github.com/nats-io/nats-server/v2 v2.9.2/go.mod h1:4sq8wvrpbvSzL1n3ZfEYnH4qeUuIl5W990j3kw13rRk= -github.com/nats-io/nats.go v1.17.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w= +github.com/nats-io/nats-server/v2 v2.9.6 h1:RTtK+rv/4CcliOuqGsy58g7MuWkBaWmF5TUNwuUo9Uw= +github.com/nats-io/nats-server/v2 v2.9.6/go.mod h1:AB6hAnGZDlYfqb7CTAm66ZKMZy9DpfierY1/PbpvI2g= +github.com/nats-io/nats.go v1.19.0/go.mod h1:tLqubohF7t4z3du1QDPYJIQQyhb4wl6DhjxEajSI7UA= github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8= github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4= github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= diff --git a/js.go b/js.go index 072940907..607204406 100644 --- a/js.go +++ b/js.go @@ -2575,6 +2575,12 @@ func checkMsg(msg *Msg, checkSts, isNoWait bool) (usrMsg bool, err error) { // one message when making requests without no_wait. err = ErrTimeout } + case jetStream409Sts: + if strings.Contains(strings.ToLower(string(msg.Header.Get(descrHdr))), "consumer deleted") { + err = ErrConsumerDeleted + break + } + fallthrough default: err = fmt.Errorf("nats: %s", msg.Header.Get(descrHdr)) } diff --git a/jserrors.go b/jserrors.go index a97344c0c..6ef90f6f2 100644 --- a/jserrors.go +++ b/jserrors.go @@ -98,6 +98,9 @@ var ( // ErrCantAckIfConsumerAckNone is returned when attempting to ack a message for consumer with AckNone policy set. ErrCantAckIfConsumerAckNone JetStreamError = &jsError{message: "cannot acknowledge a message for a consumer with AckNone policy"} + // ErrConsumerDeleted is returned when attempting to send pull request to a consumer which does not exist + ErrConsumerDeleted JetStreamError = &jsError{message: "consumer deleted"} + // DEPRECATED: ErrInvalidDurableName is no longer returned and will be removed in future releases. // Use ErrInvalidConsumerName instead. ErrInvalidDurableName = errors.New("nats: invalid durable name") diff --git a/nats.go b/nats.go index 27ce44371..206dedd60 100644 --- a/nats.go +++ b/nats.go @@ -3450,6 +3450,7 @@ const ( noResponders = "503" noMessagesSts = "404" reqTimeoutSts = "408" + jetStream409Sts = "409" controlMsg = "100" statusLen = 3 // e.g. 20x, 40x, 50x ) diff --git a/test/js_test.go b/test/js_test.go index 124b47073..e53dcbcc5 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -956,6 +956,63 @@ func TestJetStreamSubscribe(t *testing.T) { } } +func TestPullSubscribeConsumerDeleted(t *testing.T) { + s := RunBasicJetStreamServer() + defer shutdownJSServerAndRemoveStorage(t, s) + + nc, js := jsClient(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if _, err := js.Publish("foo", []byte("msg")); err != nil { + t.Fatal(err) + } + t.Run("delete consumer", func(t *testing.T) { + sub, err := js.PullSubscribe("foo", "cons") + if err != nil { + t.Fatal(err) + } + defer sub.Unsubscribe() + if err != nil { + t.Fatal(err) + } + + if _, err = sub.Fetch(1, nats.MaxWait(10*time.Millisecond)); err != nil { + t.Fatalf("Expected error: %v; got: %v", nats.ErrTimeout, err) + } + time.AfterFunc(50*time.Millisecond, func() { js.DeleteConsumer("TEST", "cons") }) + if _, err = sub.Fetch(1, nats.MaxWait(100*time.Millisecond)); !errors.Is(err, nats.ErrConsumerDeleted) { + t.Fatalf("Expected error: %v; got: %v", nats.ErrConsumerDeleted, err) + } + }) + + t.Run("delete stream", func(t *testing.T) { + sub, err := js.PullSubscribe("foo", "cons") + if err != nil { + t.Fatal(err) + } + defer sub.Unsubscribe() + if err != nil { + t.Fatal(err) + } + + if _, err = sub.Fetch(1, nats.MaxWait(10*time.Millisecond)); err != nil { + t.Fatalf("Expected error: %v; got: %v", nats.ErrTimeout, err) + } + time.AfterFunc(50*time.Millisecond, func() { js.DeleteStream("TEST") }) + if _, err = sub.Fetch(1, nats.MaxWait(100*time.Millisecond)); !errors.Is(err, nats.ErrConsumerDeleted) { + t.Fatalf("Expected error: %v; got: %v", nats.ErrConsumerDeleted, err) + } + }) + +} + func TestJetStreamAckPending_Pull(t *testing.T) { s := RunBasicJetStreamServer() defer shutdownJSServerAndRemoveStorage(t, s)