Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Extract ErrConsumerDeleted in Fetch() #1125

Merged
merged 2 commits into from Nov 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion go_test.mod
Expand Up @@ -4,7 +4,7 @@ go 1.17

require (
github.com/golang/protobuf v1.4.2
github.com/nats-io/nats-server/v2 v2.9.2
github.com/nats-io/nats-server/v2 v2.9.6
github.com/nats-io/nkeys v0.3.0
github.com/nats-io/nuid v1.0.1
google.golang.org/protobuf v1.23.0
Expand Down
6 changes: 3 additions & 3 deletions go_test.sum
Expand Up @@ -20,9 +20,9 @@ github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.3.0 h1:z2mA1a7tIf5ShggOFlR1oBPgd6hGqcDYsISxZByUzdI=
github.com/nats-io/jwt/v2 v2.3.0/go.mod h1:0tqz9Hlu6bCBFLWAASKhE5vUA4c24L9KPUUgvwumE/k=
github.com/nats-io/nats-server/v2 v2.9.2 h1:XNDgJgOYYaYlquLdbSHI3xssLipfKUOq3EmYIMNCOsE=
github.com/nats-io/nats-server/v2 v2.9.2/go.mod h1:4sq8wvrpbvSzL1n3ZfEYnH4qeUuIl5W990j3kw13rRk=
github.com/nats-io/nats.go v1.17.0/go.mod h1:BPko4oXsySz4aSWeFgOHLZs3G4Jq4ZAyE6/zMCxRT6w=
github.com/nats-io/nats-server/v2 v2.9.6 h1:RTtK+rv/4CcliOuqGsy58g7MuWkBaWmF5TUNwuUo9Uw=
github.com/nats-io/nats-server/v2 v2.9.6/go.mod h1:AB6hAnGZDlYfqb7CTAm66ZKMZy9DpfierY1/PbpvI2g=
github.com/nats-io/nats.go v1.19.0/go.mod h1:tLqubohF7t4z3du1QDPYJIQQyhb4wl6DhjxEajSI7UA=
github.com/nats-io/nkeys v0.3.0 h1:cgM5tL53EvYRU+2YLXIK0G2mJtK12Ft9oeooSZMA2G8=
github.com/nats-io/nkeys v0.3.0/go.mod h1:gvUNGjVcM2IPr5rCsRsC6Wb3Hr2CQAm08dsxtV6A5y4=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
Expand Down
6 changes: 6 additions & 0 deletions js.go
Expand Up @@ -2575,6 +2575,12 @@ func checkMsg(msg *Msg, checkSts, isNoWait bool) (usrMsg bool, err error) {
// one message when making requests without no_wait.
err = ErrTimeout
}
case jetStream409Sts:
if strings.Contains(strings.ToLower(string(msg.Header.Get(descrHdr))), "consumer deleted") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so how are we handling this client facing breaking change?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This status was introduced last week in the server (notifying about consumer being deleted) and we only depend on "consumer deleted" string, so think we can treat this as a new feature: nats-io/nats-server#3605

WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's also a nats.go release that went around the same time, this error could have been surfaced to real users by now right?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meantion the nats.go release being 3 days ago to highlight that the next nats.go release is probably not tomorrow - meaning the delta between current users on a current server and this change langing in a release is likely to be quite big, relatively speaking. Thus increasing the risk that this will impact real users.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right about that. To shorten the delta we could issue a patch release sooner (e.g. tomorrow), minimizing the impact.

err = ErrConsumerDeleted
break
}
fallthrough
default:
err = fmt.Errorf("nats: %s", msg.Header.Get(descrHdr))
}
Expand Down
3 changes: 3 additions & 0 deletions jserrors.go
Expand Up @@ -98,6 +98,9 @@ var (
// ErrCantAckIfConsumerAckNone is returned when attempting to ack a message for consumer with AckNone policy set.
ErrCantAckIfConsumerAckNone JetStreamError = &jsError{message: "cannot acknowledge a message for a consumer with AckNone policy"}

// ErrConsumerDeleted is returned when attempting to send pull request to a consumer which does not exist
ErrConsumerDeleted JetStreamError = &jsError{message: "consumer deleted"}

// DEPRECATED: ErrInvalidDurableName is no longer returned and will be removed in future releases.
// Use ErrInvalidConsumerName instead.
ErrInvalidDurableName = errors.New("nats: invalid durable name")
Expand Down
1 change: 1 addition & 0 deletions nats.go
Expand Up @@ -3450,6 +3450,7 @@ const (
noResponders = "503"
noMessagesSts = "404"
reqTimeoutSts = "408"
jetStream409Sts = "409"
controlMsg = "100"
statusLen = 3 // e.g. 20x, 40x, 50x
)
Expand Down
57 changes: 57 additions & 0 deletions test/js_test.go
Expand Up @@ -956,6 +956,63 @@ func TestJetStreamSubscribe(t *testing.T) {
}
}

func TestPullSubscribeConsumerDeleted(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)

nc, js := jsClient(t, s)
defer nc.Close()

_, err := js.AddStream(&nats.StreamConfig{
Name: "TEST",
Subjects: []string{"foo"},
})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if _, err := js.Publish("foo", []byte("msg")); err != nil {
t.Fatal(err)
}
t.Run("delete consumer", func(t *testing.T) {
sub, err := js.PullSubscribe("foo", "cons")
if err != nil {
t.Fatal(err)
}
defer sub.Unsubscribe()
if err != nil {
t.Fatal(err)
}

if _, err = sub.Fetch(1, nats.MaxWait(10*time.Millisecond)); err != nil {
t.Fatalf("Expected error: %v; got: %v", nats.ErrTimeout, err)
}
time.AfterFunc(50*time.Millisecond, func() { js.DeleteConsumer("TEST", "cons") })
if _, err = sub.Fetch(1, nats.MaxWait(100*time.Millisecond)); !errors.Is(err, nats.ErrConsumerDeleted) {
t.Fatalf("Expected error: %v; got: %v", nats.ErrConsumerDeleted, err)
}
})

t.Run("delete stream", func(t *testing.T) {
sub, err := js.PullSubscribe("foo", "cons")
if err != nil {
t.Fatal(err)
}
defer sub.Unsubscribe()
if err != nil {
t.Fatal(err)
}

if _, err = sub.Fetch(1, nats.MaxWait(10*time.Millisecond)); err != nil {
t.Fatalf("Expected error: %v; got: %v", nats.ErrTimeout, err)
}
time.AfterFunc(50*time.Millisecond, func() { js.DeleteStream("TEST") })
if _, err = sub.Fetch(1, nats.MaxWait(100*time.Millisecond)); !errors.Is(err, nats.ErrConsumerDeleted) {
t.Fatalf("Expected error: %v; got: %v", nats.ErrConsumerDeleted, err)
}
})

}

func TestJetStreamAckPending_Pull(t *testing.T) {
s := RunBasicJetStreamServer()
defer shutdownJSServerAndRemoveStorage(t, s)
Expand Down