Skip to content

Commit

Permalink
Extract ErrConsumerDeleted in Fetch()
Browse files Browse the repository at this point in the history
  • Loading branch information
piotrpio committed Nov 8, 2022
1 parent 980f955 commit fd29cc5
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 4 deletions.
2 changes: 1 addition & 1 deletion go_test.mod
Expand Up @@ -4,7 +4,7 @@ go 1.17

require (
github.com/golang/protobuf v1.4.2
github.com/nats-io/nats-server/v2 v2.9.2
github.com/nats-io/nats-server/v2 v2.9.6
github.com/nats-io/nkeys v0.3.0
github.com/nats-io/nuid v1.0.1
google.golang.org/protobuf v1.23.0
Expand Down
6 changes: 3 additions & 3 deletions go_test.sum
Expand Up @@ -20,9 +20,9 @@ github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.3.0 h1:z2mA1a7tIf5ShggOFlR1oBPgd6hGqcDYsISxZByUzdI=
github.com/nats-io/jwt/v2 v2.3.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
github.com/nats-io/nats-server/v2 v2.9.2 h1:XNDgJgOYYaYlquLdbSHI3xssLipfKUOq3EmYIMNCOsE=
github.com/nats-io/nats-server/v2 v2.9.2/go.mod h1:4sq8wvrpbvSzL1n3ZfEYnH4qeUuIl5W990j3kw13rRk=
github.com/nats-io/nats.go v1.17.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nats-server/v2 v2.9.6 h1:RTtK+rv/4CcliOuqGsy58g7MuWkBaWmF5TUNwuUo9Uw=
github.com/nats-io/nats-server/v2 v2.9.6/go.mod h1:AB6hAnGZDlYfqb7CTAm66ZKMZy9DpfierY1/PbpvI2g=
github.com/nats-io/nats.go v1.19.0/go.mod h1:tLqubohF7t4z3du1QDPYJIQQyhb4wl6DhjxEajSI7UA=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
Expand Down
5 changes: 5 additions & 0 deletions js.go
Expand Up @@ -2575,6 +2575,11 @@ func checkMsg(msg *Msg, checkSts, isNoWait bool) (usrMsg bool, err error) {
// one message when making requests without no_wait.
err = ErrTimeout
}
case jetStream409Sts:
if strings.Contains(strings.ToLower(string(msg.Header.Get(descrHdr))), "consumer deleted") {
err = ErrConsumerDeleted
}
fallthrough
default:
err = fmt.Errorf("nats: %s", msg.Header.Get(descrHdr))
}
Expand Down
3 changes: 3 additions & 0 deletions jserrors.go
Expand Up @@ -98,6 +98,9 @@ var (
// ErrCantAckIfConsumerAckNone is returned when attempting to ack a message for consumer with AckNone policy set.
ErrCantAckIfConsumerAckNone JetStreamError = &jsError{message: "cannot acknowledge a message for a consumer with AckNone policy"}

// ErrConsumerDeleted is returned when attempting to send pull request to a consumer which does not exist
ErrConsumerDeleted JetStreamError = &jsError{message: "consumer deleted"}

// DEPRECATED: ErrInvalidDurableName is no longer returned and will be removed in future releases.
// Use ErrInvalidConsumerName instead.
ErrInvalidDurableName = errors.New("nats: invalid durable name")
Expand Down
1 change: 1 addition & 0 deletions nats.go
Expand Up @@ -3450,6 +3450,7 @@ const (
noResponders = "503"
noMessagesSts = "404"
reqTimeoutSts = "408"
jetStream409Sts = "409"
controlMsg = "100"
statusLen = 3 // e.g. 20x, 40x, 50x
)
Expand Down
57 changes: 57 additions & 0 deletions test/js_test.go
Expand Up @@ -956,6 +956,63 @@ func TestJetStreamSubscribe(t *testing.T) {
}
}

func TestPullSubsctibeConsumerDeleted(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if _, err := js.Publish("foo", []byte("msg")); err != nil {
t.Fatal(err)
}
t.Run("delete consumer", func(t *testing.T) {
sub, err := js.PullSubscribe("foo", "cons")
if err != nil {
t.Fatal(err)
}
defer sub.Unsubscribe()
if err != nil {
t.Fatal(err)
}

if _, err = sub.Fetch(1, nats.MaxWait(10*time.Millisecond)); err != nil {
t.Fatalf("Expected error: %v; got: %v", nats.ErrTimeout, err)
}
time.AfterFunc(50*time.Millisecond, func() { js.DeleteConsumer("TEST", "cons") })
if _, err = sub.Fetch(1, nats.MaxWait(100*time.Millisecond)); !errors.Is(err, nats.ErrConsumerDeleted) {
t.Fatalf("Expected error: %v; got: %v", nats.ErrConsumerDeleted, err)
}
})

t.Run("delete stream", func(t *testing.T) {
sub, err := js.PullSubscribe("foo", "cons")
if err != nil {
t.Fatal(err)
}
defer sub.Unsubscribe()
if err != nil {
t.Fatal(err)
}

if _, err = sub.Fetch(1, nats.MaxWait(10*time.Millisecond)); err != nil {
t.Fatalf("Expected error: %v; got: %v", nats.ErrTimeout, err)
}
time.AfterFunc(50*time.Millisecond, func() { js.DeleteStream("TEST") })
if _, err = sub.Fetch(1, nats.MaxWait(100*time.Millisecond)); !errors.Is(err, nats.ErrConsumerDeleted) {
t.Fatalf("Expected error: %v; got: %v", nats.ErrConsumerDeleted, err)
}
})

}

func TestJetStreamAckPending_Pull(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)
Expand Down

0 comments on commit fd29cc5

Please sign in to comment.