From 209373692baeb77b50ccba220c815437da62c53b Mon Sep 17 00:00:00 2001 From: ripark Date: Fri, 26 Apr 2024 02:20:38 +0000 Subject: [PATCH 1/5] Adding in the DeleteMessages feature. This is a new feature in preview. --- sdk/messaging/azservicebus/CHANGELOG.md | 4 +- .../azservicebus/example_receiver_test.go | 36 +++ sdk/messaging/azservicebus/go.mod | 8 +- sdk/messaging/azservicebus/go.sum | 6 + .../azservicebus/internal/amqpLinks_test.go | 4 +- .../azservicebus/internal/amqpwrap/rpc.go | 6 + .../azservicebus/internal/constants.go | 2 +- .../azservicebus/internal/errors_test.go | 6 +- sdk/messaging/azservicebus/internal/mgmt.go | 35 +-- .../azservicebus/internal/utils/types.go | 18 +- .../azservicebus/internal/utils/types_test.go | 22 ++ .../azservicebus/liveTestHelpers_test.go | 4 +- sdk/messaging/azservicebus/message.go | 18 +- .../azservicebus/messageSettler_test.go | 4 +- sdk/messaging/azservicebus/receiver.go | 89 ++++++- sdk/messaging/azservicebus/receiver_test.go | 236 ++++++++++++++++++ sdk/messaging/azservicebus/sender_test.go | 2 +- .../azservicebus/session_receiver.go | 10 + .../azservicebus/session_receiver_test.go | 124 +++++++++ 19 files changed, 576 insertions(+), 58 deletions(-) diff --git a/sdk/messaging/azservicebus/CHANGELOG.md b/sdk/messaging/azservicebus/CHANGELOG.md index 84d1152cd9d1..c3cb50691cea 100644 --- a/sdk/messaging/azservicebus/CHANGELOG.md +++ b/sdk/messaging/azservicebus/CHANGELOG.md @@ -1,9 +1,11 @@ # Release History -## 1.7.1 (Unreleased) +## 1.8.0.beta.1 (Unreleased) ### Features Added +- Receiver.DeleteMessages can delete messages in batches, service-side. This allows you to quickly purge messages in a queue or subscription. (PR#TBD) + ### Breaking Changes ### Bugs Fixed diff --git a/sdk/messaging/azservicebus/example_receiver_test.go b/sdk/messaging/azservicebus/example_receiver_test.go index f13d7cc708a7..1cbff6051554 100644 --- a/sdk/messaging/azservicebus/example_receiver_test.go +++ b/sdk/messaging/azservicebus/example_receiver_test.go @@ -7,6 +7,8 @@ import ( "context" "errors" "fmt" + "log" + "os" "time" "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" @@ -215,3 +217,37 @@ func ExampleReceiver_ReceiveMessages_second() { } } } + +func ExampleReceiver_DeleteMessages() { + count, err := receiver.DeleteMessages(context.TODO(), &azservicebus.DeleteMessagesOptions{ + Count: 4000, + BeforeEnqueueTime: time.Now(), + }) + + if err != nil { + // TODO: Update the following line with your application specific error handling logic + log.Fatalf("ERROR: %s", err) + } + + fmt.Fprintf(os.Stderr, "Number of messages deleted: %d\n", count) +} + +func ExampleReceiver_DeleteMessages_loop() { + // An example of how to delete messages in a loop. + now := time.Now() + + for { + count, err := receiver.DeleteMessages(context.TODO(), &azservicebus.DeleteMessagesOptions{ + BeforeEnqueueTime: now, + }) + + if err != nil { + // TODO: Update the following line with your application specific error handling logic + log.Fatalf("ERROR: %s", err) + } + + if count == 0 { + break + } + } +} diff --git a/sdk/messaging/azservicebus/go.mod b/sdk/messaging/azservicebus/go.mod index fb992b4734b1..a2e832b56eed 100644 --- a/sdk/messaging/azservicebus/go.mod +++ b/sdk/messaging/azservicebus/go.mod @@ -5,7 +5,7 @@ go 1.18 retract v1.1.2 // Breaks customers in situations where close is slow/infinite. require ( - github.com/Azure/azure-sdk-for-go/sdk/azcore v1.9.2 + github.com/Azure/azure-sdk-for-go/sdk/azcore v1.11.1 github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.5.1 github.com/Azure/azure-sdk-for-go/sdk/internal v1.6.0 github.com/Azure/go-amqp v1.0.5 @@ -35,9 +35,9 @@ require ( github.com/kylelemons/godebug v1.1.0 // indirect github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - golang.org/x/crypto v0.21.0 // indirect - golang.org/x/net v0.22.0 // indirect - golang.org/x/sys v0.18.0 // indirect + golang.org/x/crypto v0.22.0 // indirect + golang.org/x/net v0.24.0 // indirect + golang.org/x/sys v0.19.0 // indirect golang.org/x/text v0.14.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/sdk/messaging/azservicebus/go.sum b/sdk/messaging/azservicebus/go.sum index db3c25fe20c5..6c2f942961f6 100644 --- a/sdk/messaging/azservicebus/go.sum +++ b/sdk/messaging/azservicebus/go.sum @@ -3,6 +3,8 @@ code.cloudfoundry.org/clock v1.1.0 h1:XLzC6W3Ah/Y7ht1rmZ6+QfPdt1iGWEAAtIZXgiaj57 code.cloudfoundry.org/clock v1.1.0/go.mod h1:yA3fxddT9RINQL2XHS7PS+OXxKCGhfrZmlNUCIM6AKo= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.9.2 h1:c4k2FIYIh4xtwqrQwV0Ct1v5+ehlNXj5NI/MWVsiTkQ= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.9.2/go.mod h1:5FDJtLEO/GxwNgUxbwrY3LP0pEoThTQJtk2oysdXHxM= +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.11.1 h1:E+OJmp2tPvt1W+amx48v1eqbjDYsgN+RzP4q16yV5eM= +github.com/Azure/azure-sdk-for-go/sdk/azcore v1.11.1/go.mod h1:a6xsAQUZg+VsS3TJ05SRp524Hs4pZ/AeFSr5ENf0Yjo= github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.5.1 h1:sO0/P7g68FrryJzljemN+6GTssUXdANk6aJ7T1ZxnsQ= github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.5.1/go.mod h1:h8hyGFDsU5HMivxiS2iYFZsgDbU9OnnJ163x5UGVKYo= github.com/Azure/azure-sdk-for-go/sdk/internal v1.6.0 h1:sUFnFjzDUie80h24I7mrKtwCKgLY9L8h5Tp2x9+TWqk= @@ -54,6 +56,7 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= +golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= @@ -61,6 +64,8 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc= golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= +golang.org/x/net v0.24.0 h1:1PcaxkF854Fu3+lvBIx5SYn9wRlBzzcnHZSiaFFAb0w= +golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -73,6 +78,7 @@ golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= diff --git a/sdk/messaging/azservicebus/internal/amqpLinks_test.go b/sdk/messaging/azservicebus/internal/amqpLinks_test.go index fe1558fb20e1..374f9705fb33 100644 --- a/sdk/messaging/azservicebus/internal/amqpLinks_test.go +++ b/sdk/messaging/azservicebus/internal/amqpLinks_test.go @@ -44,7 +44,7 @@ func assertFailedLinks[T error, T2 error](t *testing.T, lwid *LinksWithID, expec require.True(t, errors.Is(err, expectedErr) || errors.As(err, &expectedErr)) require.ErrorIs(t, err, expectedErr) - _, err = PeekMessages(context.TODO(), lwid.RPC, lwid.Receiver.LinkName(), 0, 1) + _, err = PeekMessages(context.TODO(), lwid.RPC, lwid.Receiver.LinkName(), 0, 1, nil) require.True(t, errors.Is(err, expectedRPCError) || errors.As(err, &expectedRPCError)) msg, err := lwid.Receiver.Receive(context.TODO(), nil) @@ -62,7 +62,7 @@ func assertLinks(t *testing.T, lwid *LinksWithID) { }, nil) require.NoError(t, err) - _, err = PeekMessages(context.TODO(), lwid.RPC, lwid.Receiver.LinkName(), 0, 1) + _, err = PeekMessages(context.TODO(), lwid.RPC, lwid.Receiver.LinkName(), 0, 1, nil) require.NoError(t, err) require.NoError(t, lwid.Receiver.IssueCredit(1)) diff --git a/sdk/messaging/azservicebus/internal/amqpwrap/rpc.go b/sdk/messaging/azservicebus/internal/amqpwrap/rpc.go index 4804f1176939..65f1de5e1663 100644 --- a/sdk/messaging/azservicebus/internal/amqpwrap/rpc.go +++ b/sdk/messaging/azservicebus/internal/amqpwrap/rpc.go @@ -13,6 +13,12 @@ import ( type RPCResponse struct { // Code is the response code - these originate from Service Bus. Some // common values are called out below, with the RPCResponseCode* constants. + // + // NOTE: These status codes are intended to mirror HTTP status codes. For instance + // peeking messages returns http.StatusOK, etc... + // + // See https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-amqp-request-response + // for more details about the ins and outs of each operation. Code int Description string Message *amqp.Message diff --git a/sdk/messaging/azservicebus/internal/constants.go b/sdk/messaging/azservicebus/internal/constants.go index f30790173260..acdd23fd0793 100644 --- a/sdk/messaging/azservicebus/internal/constants.go +++ b/sdk/messaging/azservicebus/internal/constants.go @@ -4,4 +4,4 @@ package internal // Version is the semantic version number -const Version = "v1.7.1" +const Version = "v1.8.0.beta.1" diff --git a/sdk/messaging/azservicebus/internal/errors_test.go b/sdk/messaging/azservicebus/internal/errors_test.go index 1343417dcfbc..1e0808aaefdc 100644 --- a/sdk/messaging/azservicebus/internal/errors_test.go +++ b/sdk/messaging/azservicebus/internal/errors_test.go @@ -175,9 +175,9 @@ func Test_ServiceBusError_NoRecoveryNeeded(t *testing.T) { &amqp.Error{Condition: amqp.ErrCond("com.microsoft:operation-cancelled")}, errors.New("link is currently draining"), // not yet exposed from go-amqp // simple timeouts from the mgmt link - RPCError{Resp: &amqpwrap.RPCResponse{Code: 408}}, - RPCError{Resp: &amqpwrap.RPCResponse{Code: 503}}, - RPCError{Resp: &amqpwrap.RPCResponse{Code: 500}}, + RPCError{Resp: &amqpwrap.RPCResponse{Code: http.StatusRequestTimeout}}, // 408 + RPCError{Resp: &amqpwrap.RPCResponse{Code: http.StatusServiceUnavailable}}, // 503 + RPCError{Resp: &amqpwrap.RPCResponse{Code: http.StatusInternalServerError}}, // 500 } for i, err := range tempErrors { diff --git a/sdk/messaging/azservicebus/internal/mgmt.go b/sdk/messaging/azservicebus/internal/mgmt.go index fe27c433f651..21ca68bbc4a1 100644 --- a/sdk/messaging/azservicebus/internal/mgmt.go +++ b/sdk/messaging/azservicebus/internal/mgmt.go @@ -7,6 +7,7 @@ import ( "context" "errors" "fmt" + "net/http" "time" "github.com/Azure/azure-sdk-for-go/sdk/internal/uuid" @@ -111,17 +112,23 @@ func ReceiveDeferred(ctx context.Context, rpcLink amqpwrap.RPCLink, linkName str return transformedMessages, nil } -func PeekMessages(ctx context.Context, rpcLink amqpwrap.RPCLink, linkName string, fromSequenceNumber int64, messageCount int32) ([]*amqp.Message, error) { +func PeekMessages(ctx context.Context, rpcLink amqpwrap.RPCLink, linkName string, fromSequenceNumber int64, messageCount int32, sessionID *string) ([]*amqp.Message, error) { const messagesField, messageField = "messages", "message" + value := map[string]any{ + "from-sequence-number": fromSequenceNumber, + "message-count": messageCount, + } + + if sessionID != nil { + value["session-id"] = *sessionID + } + msg := &amqp.Message{ ApplicationProperties: map[string]any{ "operation": "com.microsoft:peek-message", }, - Value: map[string]any{ - "from-sequence-number": fromSequenceNumber, - "message-count": messageCount, - }, + Value: value, } addAssociatedLinkName(linkName, msg) @@ -135,7 +142,7 @@ func PeekMessages(ctx context.Context, rpcLink amqpwrap.RPCLink, linkName string return nil, err } - if rsp.Code == 204 { + if rsp.Code == http.StatusNoContent { // no messages available return nil, nil } @@ -190,24 +197,8 @@ func PeekMessages(ctx context.Context, rpcLink amqpwrap.RPCLink, linkName string } transformedMessages[i] = &rehydrated - - // transformedMessages[i], err = MessageFromAMQPMessage(&rehydrated) - // if err != nil { - // tab.For(ctx).Error(err) - // return nil, err - // } - - // transformedMessages[i].useSession = r.isSessionFilterSet - // transformedMessages[i].sessionID = r.sessionID } - // This sort is done to ensure that folks wanting to peek messages in sequence order may do so. - // sort.Slice(transformedMessages, func(i, j int) bool { - // iSeq := *transformedMessages[i].SystemProperties.SequenceNumber - // jSeq := *transformedMessages[j].SystemProperties.SequenceNumber - // return iSeq < jSeq - // }) - return transformedMessages, nil } diff --git a/sdk/messaging/azservicebus/internal/utils/types.go b/sdk/messaging/azservicebus/internal/utils/types.go index a73b93236c70..096039874150 100644 --- a/sdk/messaging/azservicebus/internal/utils/types.go +++ b/sdk/messaging/azservicebus/internal/utils/types.go @@ -121,10 +121,18 @@ func ISO8601StringToDuration(durationStr *string) (*time.Duration, error) { return &duration, nil } -func Int32ToPtr(val *int32) *int32 { - if val != nil && *val > 0 { - return val +// ToInt64 - allow any size signed-int to be upconverted into an int64 +func ToInt64(v any, defValue int64) (int64, bool) { + switch tmpV := v.(type) { + case int: + return int64(tmpV), true + case int8: + return int64(tmpV), true + case int32: + return int64(tmpV), true + case int64: + return int64(tmpV), true + default: + return defValue, false } - - return nil } diff --git a/sdk/messaging/azservicebus/internal/utils/types_test.go b/sdk/messaging/azservicebus/internal/utils/types_test.go index eb3b465ef6db..36955191ab79 100644 --- a/sdk/messaging/azservicebus/internal/utils/types_test.go +++ b/sdk/messaging/azservicebus/internal/utils/types_test.go @@ -33,3 +33,25 @@ func TestISO8601StringToDuration(t *testing.T) { }) } } + +func TestToInt64(t *testing.T) { + tests := []struct { + V any + Default int64 + Expected int64 + OK bool + }{ + {100, -1, 100, true}, + {int64(100), -1, 100, true}, + {int32(100), -1, 100, true}, + {int8(100), -1, 100, true}, + + {uint32(100), -1, -1, false}, + {"oops, all strings", -1, -1, false}, + } + for _, test := range tests { + v, ok := ToInt64(test.V, test.Default) + require.Equal(t, test.Expected, v) + require.Equal(t, test.OK, ok) + } +} diff --git a/sdk/messaging/azservicebus/liveTestHelpers_test.go b/sdk/messaging/azservicebus/liveTestHelpers_test.go index 422767a8e7b7..5674a4820665 100644 --- a/sdk/messaging/azservicebus/liveTestHelpers_test.go +++ b/sdk/messaging/azservicebus/liveTestHelpers_test.go @@ -184,13 +184,13 @@ func deleteSubscription(t *testing.T, ac *admin.Client, topicName string, subscr // peekSingleMessageForTest wraps a standard Receiver.Peek() call so it returns at least one message // and fails tests otherwise. -func peekSingleMessageForTest(t *testing.T, receiver *Receiver) *ReceivedMessage { +func peekSingleMessageForTest(t *testing.T, receiver *Receiver, options *PeekMessagesOptions) *ReceivedMessage { var msg *ReceivedMessage // Peek, unlike Receive, doesn't block until at least one message has arrived, so we have to poll // to get a similar effect. err := utils.Retry(context.Background(), EventReceiver, "peekSingleForTest", func(ctx context.Context, args *utils.RetryFnArgs) error { - peekedMessages, err := receiver.PeekMessages(context.Background(), 1, nil) + peekedMessages, err := receiver.PeekMessages(context.Background(), 1, options) require.NoError(t, err) if len(peekedMessages) == 1 { diff --git a/sdk/messaging/azservicebus/message.go b/sdk/messaging/azservicebus/message.go index 5fbe586d864c..6f56d4784d5b 100644 --- a/sdk/messaging/azservicebus/message.go +++ b/sdk/messaging/azservicebus/message.go @@ -10,6 +10,7 @@ import ( "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" "github.com/Azure/azure-sdk-for-go/sdk/internal/log" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/amqpwrap" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/utils" "github.com/Azure/go-amqp" ) @@ -389,7 +390,9 @@ func newReceivedMessage(amqpMsg *amqp.Message, receiver amqpwrap.AMQPReceiver) * msg.EnqueuedSequenceNumber = to.Ptr(enqueuedSequenceNumber.(int64)) } - switch asInt64(amqpMsg.Annotations[messageStateAnnotation], 0) { + msgState, _ := utils.ToInt64(amqpMsg.Annotations[messageStateAnnotation], 0) + + switch msgState { case 1: msg.State = MessageStateDeferred case 2: @@ -475,16 +478,3 @@ func uuidFromLockTokenBytes(bytes []byte) (*amqp.UUID, error) { return &amqpUUID, nil } - -func asInt64(v any, defVal int64) int64 { - switch v2 := v.(type) { - case int32: - return int64(v2) - case int64: - return int64(v2) - case int: - return int64(v2) - default: - return defVal - } -} diff --git a/sdk/messaging/azservicebus/messageSettler_test.go b/sdk/messaging/azservicebus/messageSettler_test.go index 1af8a7b985ec..e872fe39495e 100644 --- a/sdk/messaging/azservicebus/messageSettler_test.go +++ b/sdk/messaging/azservicebus/messageSettler_test.go @@ -138,7 +138,7 @@ func TestDeferredMessages(t *testing.T) { require.NoError(t, err) // we can peek it without altering anything here. - peekedMessage := peekSingleMessageForTest(t, receiver) + peekedMessage := peekSingleMessageForTest(t, receiver, nil) require.Equal(t, originalDeferredMessage.DeliveryCount+1, peekedMessage.DeliveryCount, "Delivery count is incremented") }) @@ -155,7 +155,7 @@ func TestDeferredMessages(t *testing.T) { msg := testStuff.deferMessageForTest(t) require.EqualValues(t, MessageStateDeferred, msg.State) - peekedMsg := peekSingleMessageForTest(t, receiver) + peekedMsg := peekSingleMessageForTest(t, receiver, nil) require.EqualValues(t, MessageStateDeferred, peekedMsg.State) // double defer! diff --git a/sdk/messaging/azservicebus/receiver.go b/sdk/messaging/azservicebus/receiver.go index 86012c27cbc7..9370f6031516 100644 --- a/sdk/messaging/azservicebus/receiver.go +++ b/sdk/messaging/azservicebus/receiver.go @@ -7,6 +7,7 @@ import ( "context" "errors" "fmt" + "net/http" "sync" "sync/atomic" "time" @@ -55,6 +56,11 @@ type Receiver struct { receiving bool retryOptions RetryOptions settler *messageSettler + + // sessionID is the actual session ID for this receiver - it's used in some management + // operations that are otherwise the same between session receivers and non-session + // receivers. + sessionID *string } // ReceiverOptions contains options for the `Client.NewReceiverForQueue` or `Client.NewReceiverForSubscription` @@ -268,7 +274,7 @@ func (r *Receiver) PeekMessages(ctx context.Context, maxMessageCount int, option updateInternalSequenceNumber = false } - messages, err := internal.PeekMessages(ctx, links.RPC, links.Receiver.LinkName(), sequenceNumber, int32(maxMessageCount)) + messages, err := internal.PeekMessages(ctx, links.RPC, links.Receiver.LinkName(), sequenceNumber, int32(maxMessageCount), r.sessionID) if err != nil { return err @@ -358,6 +364,83 @@ func (r *Receiver) DeadLetterMessage(ctx context.Context, message *ReceivedMessa return r.settler.DeadLetterMessage(ctx, message, options) } +// DeleteMessagesOptions contains the optional parameters for the [Client.DeleteMessages] method. +type DeleteMessagesOptions struct { + // Count is the maximum number of messages to delete. + // Defaults to 4000. + Count int + + // BeforeEnqueueTime - any messages older than this time can be deleted. + // Defaults to time.Now(). + BeforeEnqueueTime time.Time +} + +// DeleteMessages deletes messages from a queue or subscription. +// Messages are deleted on the service and are not transferred locally. +func (r *Receiver) DeleteMessages(ctx context.Context, options *DeleteMessagesOptions) (int64, error) { + if options == nil { + options = &DeleteMessagesOptions{} + } + + if options.BeforeEnqueueTime.IsZero() { + options.BeforeEnqueueTime = time.Now() + } + + if options.Count == 0 { + options.Count = 4000 + } + + count := int64(0) + + err := r.amqpLinks.Retry(ctx, EventReceiver, "DeleteMessages", func(ctx context.Context, lwid *internal.LinksWithID, args *utils.RetryFnArgs) error { + value := map[string]any{ + "enqueued-time-utc": options.BeforeEnqueueTime.UTC(), + "message-count": int32(options.Count), + } + + if r.sessionID != nil { + value["session-id"] = *r.sessionID + } + + msg := &amqp.Message{ + ApplicationProperties: map[string]any{ + "operation": "com.microsoft:batch-delete-messages", + }, + Value: value, + } + + rpcResponse, err := lwid.RPC.RPC(ctx, msg) + + if err != nil { + return err + } + + switch rpcResponse.Code { + case http.StatusOK: + m, ok := rpcResponse.Message.Value.(map[string]any) + + if !ok { + return fmt.Errorf("invalid response type %T", msg.Value) + } + + tmpCount, ok := utils.ToInt64(m["message-count"], 0) + + if !ok { + return fmt.Errorf("invalid integer type %T", m["message-count"]) + } + + count = tmpCount + return nil + case http.StatusNoContent: + return nil // no messages to delete + default: + return fmt.Errorf("failed to delete messages, status code %d: %s", rpcResponse.Code, rpcResponse.Description) + } + }, r.retryOptions) + + return count, err +} + func (r *Receiver) receiveMessagesImpl(ctx context.Context, maxMessages int, options *ReceiveMessagesOptions) ([]*ReceivedMessage, error) { cancelReleaser := r.cancelReleaser.Swap(emptyCancelFn).(func() string) _ = cancelReleaser() @@ -646,6 +729,10 @@ func (r *Receiver) newReleaserFunc(receiver amqpwrap.AMQPReceiver) func() { } } +func (r *Receiver) updateSessionID(sessionID string) { + r.sessionID = &sessionID +} + func getAllPrefetched(receiver amqpwrap.AMQPReceiver, max int) []*amqp.Message { var messages []*amqp.Message diff --git a/sdk/messaging/azservicebus/receiver_test.go b/sdk/messaging/azservicebus/receiver_test.go index 89425e19601e..23c1155fedc2 100644 --- a/sdk/messaging/azservicebus/receiver_test.go +++ b/sdk/messaging/azservicebus/receiver_test.go @@ -1045,6 +1045,242 @@ func TestReceiveWithDifferentWaitTime(t *testing.T) { require.Greater(t, bigger, base2) } +func TestReceiverDeleteMessages(t *testing.T) { + init := func(t *testing.T, count int, retryOptions *RetryOptions) (*Sender, *Receiver) { + if retryOptions == nil { + retryOptions = &RetryOptions{} + } + + serviceBusClient, cleanup, queueName := setupLiveTest(t, &liveTestOptions{ + ClientOptions: &ClientOptions{ + RetryOptions: *retryOptions, + }, + }) + t.Cleanup(cleanup) + + receiver, err := serviceBusClient.NewReceiverForQueue(queueName, nil) + require.NoError(t, err) + + sender, err := serviceBusClient.NewSender(queueName, nil) + require.NoError(t, err) + + if count > 0 { + var batch *MessageBatch + + for i := 0; i < count; i++ { + if batch == nil { + tmpBatch, err := sender.NewMessageBatch(context.Background(), nil) + require.NoError(t, err) + batch = tmpBatch + } + + err := batch.AddMessage(&Message{ + Body: []byte(fmt.Sprintf("[%d] %s", i, t.Name())), + }, nil) + + if err != nil { + if errors.Is(err, ErrMessageTooLarge) { + err = sender.SendMessageBatch(context.Background(), batch, nil) + require.NoError(t, err) + + batch = nil + i-- + continue + } + require.NoError(t, err) + } else if i == (count - 1) { + // last event, send whatever we have + err = sender.SendMessageBatch(context.Background(), batch, nil) + require.NoError(t, err) + batch = nil + } + } + + require.Nil(t, batch) + + // peek the last message so we can be sure the messages are available + msg := peekSingleMessageForTest(t, receiver, &PeekMessagesOptions{ + FromSequenceNumber: to.Ptr(int64(count)), + }) + require.Equal(t, fmt.Sprintf("[%d] %s", count-1, t.Name()), string(msg.Body)) + } + + return sender, receiver + } + + t.Run("EmptyQueue", func(t *testing.T) { + _, receiver := init(t, 0, nil) + + // when the queue is empty you get back a zero count (and a 204 internally) + count, err := receiver.DeleteMessages(context.Background(), nil) + require.NoError(t, err) + require.Equal(t, int64(0), count) + }) + + t.Run("DeleteOne", func(t *testing.T) { + _, receiver := init(t, 1, nil) + + count, err := receiver.DeleteMessages(context.Background(), nil) + require.NoError(t, err) + require.Equal(t, int64(1), count) + + // no messages should be available. + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + messages, err := receiver.ReceiveMessages(ctx, 1, nil) + require.Empty(t, messages) + require.ErrorIs(t, err, context.DeadlineExceeded) + }) + + t.Run("BeforeEnqueueTime", func(t *testing.T) { + _, receiver := init(t, 1, nil) + + messages, err := receiver.PeekMessages(context.Background(), 1, &PeekMessagesOptions{ + FromSequenceNumber: to.Ptr(int64(0)), + }) + require.NoError(t, err) + require.Equal(t, "[0] "+t.Name(), string(messages[0].Body)) + + // attempt to delete on the exact time of the message, which will skip our messages + // since DeleteMessages's time boundary is not inclusive. + count, err := receiver.DeleteMessages(context.Background(), &DeleteMessagesOptions{ + // this isn't inclusive so this won't delete anything + BeforeEnqueueTime: *messages[0].EnqueuedTime, + }) + require.NoError(t, err) + require.Equal(t, int64(0), count) + + messages, err = receiver.PeekMessages(context.Background(), 1, &PeekMessagesOptions{ + FromSequenceNumber: to.Ptr(int64(0)), + }) + require.NoError(t, err) + require.Equal(t, "[0] "+t.Name(), string(messages[0].Body), "Message still exists - BeforeEnqueueTime is not inclusive.") + + // now actually delete it. + // attempt to delete on the exact time of the message - this should + // be skipped. + count, err = receiver.DeleteMessages(context.Background(), &DeleteMessagesOptions{ + // include the event this time. + BeforeEnqueueTime: messages[0].EnqueuedTime.Add(time.Second), + }) + require.NoError(t, err) + require.Equal(t, int64(1), count) + + messages, err = receiver.PeekMessages(context.Background(), 1, &PeekMessagesOptions{ + FromSequenceNumber: to.Ptr(int64(0)), + }) + require.NoError(t, err) + require.Empty(t, messages) + }) + + t.Run("CountNegative", func(t *testing.T) { + t.Skipf("Dependent on service bug fix") + + // currently this fails but it ends up retrying. For now I'm going to cut this off. + _, receiver := init(t, 1, &RetryOptions{ + MaxRetries: -1, + }) + + count, err := receiver.DeleteMessages(context.Background(), &DeleteMessagesOptions{ + Count: -1, + }) + + // rpc: failed, status code 408 and description: The operation did not complete within the allocated time 00:01:00 for object + require.Contains(t, err.Error(), "rpc: failed, status code 408 ") + + // -1 doesn't do anything currently but it does cause the library to do retries. + require.Equal(t, int64(0), count) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + messages, err := receiver.ReceiveMessages(ctx, 1, nil) + require.NoError(t, err) + + // message should still be there. + require.Equal(t, "[0] "+t.Name(), string(messages[0].Body)) + }) + + t.Run("CountTooHigh", func(t *testing.T) { + t.Skipf("Dependent on service bug fix") + + // currently this fails but it ends up retrying. For now I'm going to cut this off. + _, receiver := init(t, 1, &RetryOptions{ + MaxRetries: -1, + }) + + count, err := receiver.DeleteMessages(context.Background(), &DeleteMessagesOptions{ + Count: 4000 + 1, + }) + + // rpc: failed, status code 500 and description: The service was unable to process the request; please retry the operation. + require.Contains(t, err.Error(), "rpc: failed, status code 500 ") + + // -1 doesn't do anything currently but it does cause the library to do retries. + require.Equal(t, int64(0), count) + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + messages, err := receiver.ReceiveMessages(ctx, 1, nil) + require.NoError(t, err) + + // message should still be there. + require.Equal(t, "[0] "+t.Name(), string(messages[0].Body)) + }) + + t.Run("PurgeMessages", func(t *testing.T) { + // create just enough messages to ensure we have to loop at least twice + // (in reality we'll probably loop a few times anyways since we don't always delete 4000 on + // each call) + _, receiver := init(t, 4000+1, nil) + + rounds := 0 + total := int64(0) + + purge := func() { + // NOTE: this is similar to our example functions that shows how to just loop and + // delete all messages + + // This is a simple example showing how to delete messages in a loop. + now := time.Now() + + for { + rounds++ + + count, err := receiver.DeleteMessages(context.TODO(), &DeleteMessagesOptions{ + BeforeEnqueueTime: now, + }) + + if err != nil { + // TODO: Update the following line with your application specific error handling logic + require.NoError(t, err) + } + + if count == 0 { + break + } + + total += count // added + } + } + + purge() + + require.GreaterOrEqual(t, rounds, 2) + require.Equal(t, int64(4001), total) + + // queue should be empty + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + messages, err := receiver.ReceiveMessages(ctx, 1, nil) + require.ErrorIs(t, err, context.DeadlineExceeded) + require.Empty(t, messages) + }) +} + type receivedMessageSlice []*ReceivedMessage func (messages receivedMessageSlice) Len() int { diff --git a/sdk/messaging/azservicebus/sender_test.go b/sdk/messaging/azservicebus/sender_test.go index 6adc8b37b8cc..38a4e441ac80 100644 --- a/sdk/messaging/azservicebus/sender_test.go +++ b/sdk/messaging/azservicebus/sender_test.go @@ -36,7 +36,7 @@ func Test_Sender_MessageID(t *testing.T) { }, nil) require.NoError(t, err) - peekedMsg := peekSingleMessageForTest(t, receiver) + peekedMsg := peekSingleMessageForTest(t, receiver, nil) require.EqualValues(t, MessageStateActive, peekedMsg.State) messages, err := receiver.ReceiveMessages(context.Background(), 1, nil) diff --git a/sdk/messaging/azservicebus/session_receiver.go b/sdk/messaging/azservicebus/session_receiver.go index c4cac54b1bb2..aebc2cbdaafd 100644 --- a/sdk/messaging/azservicebus/session_receiver.go +++ b/sdk/messaging/azservicebus/session_receiver.go @@ -51,6 +51,8 @@ func toReceiverOptions(sropts *SessionReceiverOptions) *ReceiverOptions { } type newSessionReceiverArgs struct { + // sessionID is the Service Bus session ID the user requested. If we're just accepting + // the next available session this will be nil. sessionID *string ns internal.NamespaceForAMQPLinks entity entity @@ -87,6 +89,10 @@ func newSessionReceiver(ctx context.Context, args newSessionReceiverArgs, option return nil, err } + // this has to be set _after_ the link has been initialized to accomodate the "ask for + // next available session" case where the broker returns the actual session ID as a link + // property. + r.updateSessionID(sessionReceiver.SessionID()) return sessionReceiver, nil } @@ -271,6 +277,10 @@ func (sr *SessionReceiver) RenewSessionLock(ctx context.Context, options *RenewS return internal.TransformError(err) } +func (sr *SessionReceiver) DeleteMessages(ctx context.Context, options *DeleteMessagesOptions) (int64, error) { + return sr.inner.DeleteMessages(ctx, options) +} + // init ensures the link was created, guaranteeing that we get our expected session lock. func (sr *SessionReceiver) init(ctx context.Context) error { // initialize the links diff --git a/sdk/messaging/azservicebus/session_receiver_test.go b/sdk/messaging/azservicebus/session_receiver_test.go index 5a0c89ff877f..21c10a6e8b0b 100644 --- a/sdk/messaging/azservicebus/session_receiver_test.go +++ b/sdk/messaging/azservicebus/session_receiver_test.go @@ -8,6 +8,7 @@ import ( "errors" "fmt" "sort" + "strings" "testing" "time" @@ -571,6 +572,129 @@ func TestSessionReceiverSendFiveReceiveFive_Subscription(t *testing.T) { } } +func TestSessionReceiverPeekMessages(t *testing.T) { + serviceBusClient, cleanup, queueName := setupLiveTest(t, &liveTestOptions{ + QueueProperties: &admin.QueueProperties{ + RequiresSession: to.Ptr(true), + }, + }) + defer cleanup() + + sender, err := serviceBusClient.NewSender(queueName, nil) + require.NoError(t, err) + defer sender.Close(context.Background()) + + for i := 0; i < 10; i++ { + err = sender.SendMessage(context.Background(), &Message{ + Body: []byte(fmt.Sprintf("%d", i)), + SessionID: to.Ptr("session-1"), + }, nil) + require.NoError(t, err) + } + + for i := 0; i < 10; i++ { + err = sender.SendMessage(context.Background(), &Message{ + Body: []byte(fmt.Sprintf("%d", i)), + SessionID: to.Ptr("session-2"), + }, nil) + require.NoError(t, err) + } + + // check that we never cross session boundaries with our peek + { + // peek the messages for session-1 + session1Receiver, err := serviceBusClient.AcceptSessionForQueue(context.Background(), queueName, "session-1", nil) + require.NoError(t, err) + + peeked, err := session1Receiver.PeekMessages(context.Background(), 10, nil) + require.NoError(t, err) + + for _, msg := range peeked { + require.Equal(t, "session-1", *msg.SessionID) + } + + // peek the messages for session-2 + session2Receiver, err := serviceBusClient.AcceptSessionForQueue(context.Background(), queueName, "session-2", nil) + require.NoError(t, err) + + peeked, err = session2Receiver.PeekMessages(context.Background(), 10, nil) + require.NoError(t, err) + + for _, msg := range peeked { + require.Equal(t, "session-2", *msg.SessionID) + } + } +} + +func TestSessionReceiverDeleteMessages(t *testing.T) { + serviceBusClient, cleanup, queueName := setupLiveTest(t, &liveTestOptions{ + QueueProperties: &admin.QueueProperties{ + RequiresSession: to.Ptr(true), + }, + }) + defer cleanup() + + sender, err := serviceBusClient.NewSender(queueName, nil) + require.NoError(t, err) + defer sender.Close(context.Background()) + + for i := 0; i < 10; i++ { + err = sender.SendMessage(context.Background(), &Message{ + Body: []byte(fmt.Sprintf("[%d] session-1", i)), + SessionID: to.Ptr("session-1"), + }, nil) + require.NoError(t, err) + } + + err = sender.SendMessage(context.Background(), &Message{ + Body: []byte("the only message for session-2"), + SessionID: to.Ptr("session-2"), + }, nil) + require.NoError(t, err) + + session2Receiver, err := serviceBusClient.AcceptSessionForQueue(context.Background(), queueName, "session-2", nil) + require.NoError(t, err) + + defer session2Receiver.Close(context.Background()) + + count, err := session2Receiver.DeleteMessages(context.Background(), &DeleteMessagesOptions{ + Count: 10, // note we're specifying enough that we could delete more than just our sessions messages. + }) + require.NoError(t, err) + require.Equal(t, int64(1), count, "only one message available for session-2") + + err = session2Receiver.Close(context.Background()) + require.NoError(t, err) + + // validate that all of session-1's messages are still there. + { + session1Receiver, err := serviceBusClient.AcceptSessionForQueue(context.Background(), queueName, "session-1", nil) + require.NoError(t, err) + + defer session1Receiver.Close(context.Background()) + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + total := 0 + + for total < 10 { + messages, err := session1Receiver.ReceiveMessages(ctx, 10, nil) + require.NoError(t, err) + + total += len(messages) + + for _, m := range messages { + require.Equal(t, "session-1", *m.SessionID) + require.Truef(t, strings.HasSuffix(string(m.Body), "session-1"), "%s has suffix session-1", string(m.Body)) + + err := session1Receiver.CompleteMessage(context.Background(), m, nil) + require.NoError(t, err) + } + } + } +} + func mustReceiveMessages(t *testing.T, receiver interface { ReceiveMessages(ctx context.Context, count int, options *ReceiveMessagesOptions) ([]*ReceivedMessage, error) }, count int, waitTime time.Duration) []*ReceivedMessage { From c7eb443ab7fe1708872423b15885ea333e098775 Mon Sep 17 00:00:00 2001 From: ripark Date: Fri, 26 Apr 2024 19:12:21 +0000 Subject: [PATCH 2/5] Adding in test for partitioned entities and skipping them for now since there's a service bug. --- sdk/messaging/azservicebus/go.sum | 10 +- .../azservicebus/internal/utils/types_test.go | 1 + sdk/messaging/azservicebus/receiver_test.go | 273 +++++++++--------- 3 files changed, 147 insertions(+), 137 deletions(-) diff --git a/sdk/messaging/azservicebus/go.sum b/sdk/messaging/azservicebus/go.sum index 6c2f942961f6..c7128b323bef 100644 --- a/sdk/messaging/azservicebus/go.sum +++ b/sdk/messaging/azservicebus/go.sum @@ -1,8 +1,6 @@ code.cloudfoundry.org/clock v0.0.0-20180518195852-02e53af36e6c/go.mod h1:QD9Lzhd/ux6eNQVUDVRJX/RKTigpewimNYBi7ivZKY8= code.cloudfoundry.org/clock v1.1.0 h1:XLzC6W3Ah/Y7ht1rmZ6+QfPdt1iGWEAAtIZXgiaj57c= code.cloudfoundry.org/clock v1.1.0/go.mod h1:yA3fxddT9RINQL2XHS7PS+OXxKCGhfrZmlNUCIM6AKo= -github.com/Azure/azure-sdk-for-go/sdk/azcore v1.9.2 h1:c4k2FIYIh4xtwqrQwV0Ct1v5+ehlNXj5NI/MWVsiTkQ= -github.com/Azure/azure-sdk-for-go/sdk/azcore v1.9.2/go.mod h1:5FDJtLEO/GxwNgUxbwrY3LP0pEoThTQJtk2oysdXHxM= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.11.1 h1:E+OJmp2tPvt1W+amx48v1eqbjDYsgN+RzP4q16yV5eM= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.11.1/go.mod h1:a6xsAQUZg+VsS3TJ05SRp524Hs4pZ/AeFSr5ENf0Yjo= github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.5.1 h1:sO0/P7g68FrryJzljemN+6GTssUXdANk6aJ7T1ZxnsQ= @@ -54,16 +52,13 @@ github.com/tedsuo/ifrit v0.0.0-20180802180643-bea94bb476cc/go.mod h1:eyZnKCc955u github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= -golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= -golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= +golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30= golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M= golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= -golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc= -golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= golang.org/x/net v0.24.0 h1:1PcaxkF854Fu3+lvBIx5SYn9wRlBzzcnHZSiaFFAb0w= golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -76,8 +71,7 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4= -golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o= golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/sdk/messaging/azservicebus/internal/utils/types_test.go b/sdk/messaging/azservicebus/internal/utils/types_test.go index 36955191ab79..724046525315 100644 --- a/sdk/messaging/azservicebus/internal/utils/types_test.go +++ b/sdk/messaging/azservicebus/internal/utils/types_test.go @@ -49,6 +49,7 @@ func TestToInt64(t *testing.T) { {uint32(100), -1, -1, false}, {"oops, all strings", -1, -1, false}, } + for _, test := range tests { v, ok := ToInt64(test.V, test.Default) require.Equal(t, test.Expected, v) diff --git a/sdk/messaging/azservicebus/receiver_test.go b/sdk/messaging/azservicebus/receiver_test.go index 23c1155fedc2..fb56d162b399 100644 --- a/sdk/messaging/azservicebus/receiver_test.go +++ b/sdk/messaging/azservicebus/receiver_test.go @@ -1046,12 +1046,18 @@ func TestReceiveWithDifferentWaitTime(t *testing.T) { } func TestReceiverDeleteMessages(t *testing.T) { - init := func(t *testing.T, count int, retryOptions *RetryOptions) (*Sender, *Receiver) { + init := func(t *testing.T, count int, queueProperties *admin.QueueProperties, retryOptions *RetryOptions) (*Sender, *Receiver) { if retryOptions == nil { retryOptions = &RetryOptions{} } + if *queueProperties.EnablePartitioning { + t.Skipf("Dependent on service bug fix") + return nil, nil + } + serviceBusClient, cleanup, queueName := setupLiveTest(t, &liveTestOptions{ + QueueProperties: queueProperties, ClientOptions: &ClientOptions{ RetryOptions: *retryOptions, }, @@ -1108,177 +1114,186 @@ func TestReceiverDeleteMessages(t *testing.T) { return sender, receiver } - t.Run("EmptyQueue", func(t *testing.T) { - _, receiver := init(t, 0, nil) + props := []*admin.QueueProperties{ + {EnablePartitioning: to.Ptr(false)}, + {EnablePartitioning: to.Ptr(true)}, + } - // when the queue is empty you get back a zero count (and a 204 internally) - count, err := receiver.DeleteMessages(context.Background(), nil) - require.NoError(t, err) - require.Equal(t, int64(0), count) - }) + for _, qp := range props { + testSuffix := fmt.Sprintf("(partitioned:%t)", *qp.EnablePartitioning) - t.Run("DeleteOne", func(t *testing.T) { - _, receiver := init(t, 1, nil) + t.Run("EmptyQueue"+testSuffix, func(t *testing.T) { + _, receiver := init(t, 0, qp, nil) - count, err := receiver.DeleteMessages(context.Background(), nil) - require.NoError(t, err) - require.Equal(t, int64(1), count) + // when the queue is empty you get back a zero count (and a 204 internally) + count, err := receiver.DeleteMessages(context.Background(), nil) + require.NoError(t, err) + require.Equal(t, int64(0), count) + }) - // no messages should be available. - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() + t.Run("DeleteOne"+testSuffix, func(t *testing.T) { + _, receiver := init(t, 1, qp, nil) - messages, err := receiver.ReceiveMessages(ctx, 1, nil) - require.Empty(t, messages) - require.ErrorIs(t, err, context.DeadlineExceeded) - }) + count, err := receiver.DeleteMessages(context.Background(), nil) + require.NoError(t, err) + require.Equal(t, int64(1), count) - t.Run("BeforeEnqueueTime", func(t *testing.T) { - _, receiver := init(t, 1, nil) + // no messages should be available. + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() - messages, err := receiver.PeekMessages(context.Background(), 1, &PeekMessagesOptions{ - FromSequenceNumber: to.Ptr(int64(0)), + messages, err := receiver.ReceiveMessages(ctx, 1, nil) + require.Empty(t, messages) + require.ErrorIs(t, err, context.DeadlineExceeded) }) - require.NoError(t, err) - require.Equal(t, "[0] "+t.Name(), string(messages[0].Body)) - // attempt to delete on the exact time of the message, which will skip our messages - // since DeleteMessages's time boundary is not inclusive. - count, err := receiver.DeleteMessages(context.Background(), &DeleteMessagesOptions{ - // this isn't inclusive so this won't delete anything - BeforeEnqueueTime: *messages[0].EnqueuedTime, - }) - require.NoError(t, err) - require.Equal(t, int64(0), count) + t.Run("BeforeEnqueueTime"+testSuffix, func(t *testing.T) { + _, receiver := init(t, 1, qp, nil) - messages, err = receiver.PeekMessages(context.Background(), 1, &PeekMessagesOptions{ - FromSequenceNumber: to.Ptr(int64(0)), - }) - require.NoError(t, err) - require.Equal(t, "[0] "+t.Name(), string(messages[0].Body), "Message still exists - BeforeEnqueueTime is not inclusive.") - - // now actually delete it. - // attempt to delete on the exact time of the message - this should - // be skipped. - count, err = receiver.DeleteMessages(context.Background(), &DeleteMessagesOptions{ - // include the event this time. - BeforeEnqueueTime: messages[0].EnqueuedTime.Add(time.Second), - }) - require.NoError(t, err) - require.Equal(t, int64(1), count) + messages, err := receiver.PeekMessages(context.Background(), 1, &PeekMessagesOptions{ + FromSequenceNumber: to.Ptr(int64(0)), + }) + require.NoError(t, err) + require.Equal(t, "[0] "+t.Name(), string(messages[0].Body)) - messages, err = receiver.PeekMessages(context.Background(), 1, &PeekMessagesOptions{ - FromSequenceNumber: to.Ptr(int64(0)), - }) - require.NoError(t, err) - require.Empty(t, messages) - }) + // attempt to delete on the exact time of the message, which will skip our messages + // since DeleteMessages's time boundary is not inclusive. + count, err := receiver.DeleteMessages(context.Background(), &DeleteMessagesOptions{ + // this isn't inclusive so this won't delete anything + BeforeEnqueueTime: *messages[0].EnqueuedTime, + }) + require.NoError(t, err) + require.Equal(t, int64(0), count) - t.Run("CountNegative", func(t *testing.T) { - t.Skipf("Dependent on service bug fix") + messages, err = receiver.PeekMessages(context.Background(), 1, &PeekMessagesOptions{ + FromSequenceNumber: to.Ptr(int64(0)), + }) + require.NoError(t, err) + require.Equal(t, "[0] "+t.Name(), string(messages[0].Body), "Message still exists - BeforeEnqueueTime is not inclusive.") + + // now actually delete it. + // attempt to delete on the exact time of the message - this should + // be skipped. + count, err = receiver.DeleteMessages(context.Background(), &DeleteMessagesOptions{ + // include the event this time. + BeforeEnqueueTime: messages[0].EnqueuedTime.Add(time.Second), + }) + require.NoError(t, err) + require.Equal(t, int64(1), count) - // currently this fails but it ends up retrying. For now I'm going to cut this off. - _, receiver := init(t, 1, &RetryOptions{ - MaxRetries: -1, + messages, err = receiver.PeekMessages(context.Background(), 1, &PeekMessagesOptions{ + FromSequenceNumber: to.Ptr(int64(0)), + }) + require.NoError(t, err) + require.Empty(t, messages) }) - count, err := receiver.DeleteMessages(context.Background(), &DeleteMessagesOptions{ - Count: -1, - }) + t.Run("CountNegative"+testSuffix, func(t *testing.T) { + t.Skipf("Dependent on service bug fix") - // rpc: failed, status code 408 and description: The operation did not complete within the allocated time 00:01:00 for object - require.Contains(t, err.Error(), "rpc: failed, status code 408 ") + // currently this fails but it ends up retrying. For now I'm going to cut this off. + _, receiver := init(t, 1, qp, &RetryOptions{ + MaxRetries: -1, + }) - // -1 doesn't do anything currently but it does cause the library to do retries. - require.Equal(t, int64(0), count) + count, err := receiver.DeleteMessages(context.Background(), &DeleteMessagesOptions{ + Count: -1, + }) - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() + // rpc: failed, status code 408 and description: The operation did not complete within the allocated time 00:01:00 for object + require.Contains(t, err.Error(), "rpc: failed, status code 408 ") - messages, err := receiver.ReceiveMessages(ctx, 1, nil) - require.NoError(t, err) + // -1 doesn't do anything currently but it does cause the library to do retries. + require.Equal(t, int64(0), count) - // message should still be there. - require.Equal(t, "[0] "+t.Name(), string(messages[0].Body)) - }) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() - t.Run("CountTooHigh", func(t *testing.T) { - t.Skipf("Dependent on service bug fix") + messages, err := receiver.ReceiveMessages(ctx, 1, nil) + require.NoError(t, err) - // currently this fails but it ends up retrying. For now I'm going to cut this off. - _, receiver := init(t, 1, &RetryOptions{ - MaxRetries: -1, + // message should still be there. + require.Equal(t, "[0] "+t.Name(), string(messages[0].Body)) }) - count, err := receiver.DeleteMessages(context.Background(), &DeleteMessagesOptions{ - Count: 4000 + 1, - }) + t.Run("CountTooHigh"+testSuffix, func(t *testing.T) { + t.Skipf("Dependent on service bug fix") - // rpc: failed, status code 500 and description: The service was unable to process the request; please retry the operation. - require.Contains(t, err.Error(), "rpc: failed, status code 500 ") + // currently this fails but it ends up retrying. For now I'm going to cut this off. + _, receiver := init(t, 1, qp, &RetryOptions{ + MaxRetries: -1, + }) - // -1 doesn't do anything currently but it does cause the library to do retries. - require.Equal(t, int64(0), count) + count, err := receiver.DeleteMessages(context.Background(), &DeleteMessagesOptions{ + Count: 4000 + 1, + }) - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() + // rpc: failed, status code 500 and description: The service was unable to process the request; please retry the operation. + require.Contains(t, err.Error(), "rpc: failed, status code 500 ") - messages, err := receiver.ReceiveMessages(ctx, 1, nil) - require.NoError(t, err) + // -1 doesn't do anything currently but it does cause the library to do retries. + require.Equal(t, int64(0), count) - // message should still be there. - require.Equal(t, "[0] "+t.Name(), string(messages[0].Body)) - }) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() - t.Run("PurgeMessages", func(t *testing.T) { - // create just enough messages to ensure we have to loop at least twice - // (in reality we'll probably loop a few times anyways since we don't always delete 4000 on - // each call) - _, receiver := init(t, 4000+1, nil) + messages, err := receiver.ReceiveMessages(ctx, 1, nil) + require.NoError(t, err) - rounds := 0 - total := int64(0) + // message should still be there. + require.Equal(t, "[0] "+t.Name(), string(messages[0].Body)) + }) - purge := func() { - // NOTE: this is similar to our example functions that shows how to just loop and - // delete all messages + t.Run("PurgeMessages"+testSuffix, func(t *testing.T) { + // create just enough messages to ensure we have to loop at least twice + // (in reality we'll probably loop a few times anyways since we don't always delete 4000 on + // each call) + _, receiver := init(t, 4000+1, qp, nil) - // This is a simple example showing how to delete messages in a loop. - now := time.Now() + rounds := 0 + total := int64(0) - for { - rounds++ + purge := func() { + // NOTE: this is similar to our example functions that shows how to just loop and + // delete all messages - count, err := receiver.DeleteMessages(context.TODO(), &DeleteMessagesOptions{ - BeforeEnqueueTime: now, - }) + // This is a simple example showing how to delete messages in a loop. + now := time.Now() - if err != nil { - // TODO: Update the following line with your application specific error handling logic - require.NoError(t, err) - } + for { + rounds++ - if count == 0 { - break - } + count, err := receiver.DeleteMessages(context.TODO(), &DeleteMessagesOptions{ + BeforeEnqueueTime: now, + }) + + if err != nil { + // TODO: Update the following line with your application specific error handling logic + require.NoError(t, err) + } + + if count == 0 { + break + } - total += count // added + total += count // added + } } - } - purge() + purge() - require.GreaterOrEqual(t, rounds, 2) - require.Equal(t, int64(4001), total) + require.GreaterOrEqual(t, rounds, 2) + require.Equal(t, int64(4001), total) - // queue should be empty - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() + // queue should be empty + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() - messages, err := receiver.ReceiveMessages(ctx, 1, nil) - require.ErrorIs(t, err, context.DeadlineExceeded) - require.Empty(t, messages) - }) + messages, err := receiver.ReceiveMessages(ctx, 1, nil) + require.ErrorIs(t, err, context.DeadlineExceeded) + require.Empty(t, messages) + }) + } } type receivedMessageSlice []*ReceivedMessage From a91fc7a644c0783ffec03c015182b04f0a0c53dd Mon Sep 17 00:00:00 2001 From: ripark Date: Fri, 26 Apr 2024 21:10:42 +0000 Subject: [PATCH 3/5] Fix doc error --- sdk/messaging/azservicebus/session_receiver.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdk/messaging/azservicebus/session_receiver.go b/sdk/messaging/azservicebus/session_receiver.go index aebc2cbdaafd..4979f61078ea 100644 --- a/sdk/messaging/azservicebus/session_receiver.go +++ b/sdk/messaging/azservicebus/session_receiver.go @@ -277,6 +277,8 @@ func (sr *SessionReceiver) RenewSessionLock(ctx context.Context, options *RenewS return internal.TransformError(err) } +// DeleteMessages deletes messages from the session for this SessionReceiver. +// Messages are deleted on the service and are not transferred locally. func (sr *SessionReceiver) DeleteMessages(ctx context.Context, options *DeleteMessagesOptions) (int64, error) { return sr.inner.DeleteMessages(ctx, options) } From 99240c5219f2e933083ba3f8ad4bc9bfa7ac9a25 Mon Sep 17 00:00:00 2001 From: ripark Date: Fri, 26 Apr 2024 22:52:16 +0000 Subject: [PATCH 4/5] Fix constant format (should have a dash, not a dot, between the version and -beta) --- sdk/messaging/azservicebus/CHANGELOG.md | 2 +- sdk/messaging/azservicebus/internal/constants.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/messaging/azservicebus/CHANGELOG.md b/sdk/messaging/azservicebus/CHANGELOG.md index c3cb50691cea..828a5618e10a 100644 --- a/sdk/messaging/azservicebus/CHANGELOG.md +++ b/sdk/messaging/azservicebus/CHANGELOG.md @@ -1,6 +1,6 @@ # Release History -## 1.8.0.beta.1 (Unreleased) +## 1.8.0-beta.1 (Unreleased) ### Features Added diff --git a/sdk/messaging/azservicebus/internal/constants.go b/sdk/messaging/azservicebus/internal/constants.go index acdd23fd0793..5b9a93dfe2ff 100644 --- a/sdk/messaging/azservicebus/internal/constants.go +++ b/sdk/messaging/azservicebus/internal/constants.go @@ -4,4 +4,4 @@ package internal // Version is the semantic version number -const Version = "v1.8.0.beta.1" +const Version = "v1.8.0-beta.1" From 30af9bf6a5085e987f42a795515177beac87c60a Mon Sep 17 00:00:00 2001 From: ripark Date: Mon, 29 Apr 2024 23:07:44 +0000 Subject: [PATCH 5/5] Make it int32 throughout. --- sdk/messaging/azservicebus/receiver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/messaging/azservicebus/receiver.go b/sdk/messaging/azservicebus/receiver.go index 9370f6031516..cf09d90edbd5 100644 --- a/sdk/messaging/azservicebus/receiver.go +++ b/sdk/messaging/azservicebus/receiver.go @@ -368,7 +368,7 @@ func (r *Receiver) DeadLetterMessage(ctx context.Context, message *ReceivedMessa type DeleteMessagesOptions struct { // Count is the maximum number of messages to delete. // Defaults to 4000. - Count int + Count int32 // BeforeEnqueueTime - any messages older than this time can be deleted. // Defaults to time.Now().