Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[azservicebus] Adding in the DeleteMessages feature. This is a new feature in preview. #22784

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
4 changes: 3 additions & 1 deletion sdk/messaging/azservicebus/CHANGELOG.md
@@ -1,9 +1,11 @@
# Release History

## 1.7.1 (Unreleased)
## 1.8.0-beta.1 (Unreleased)

### Features Added

- Receiver.DeleteMessages can delete messages in batches, service-side. This allows you to quickly purge messages in a queue or subscription. (PR#TBD)

### Breaking Changes

### Bugs Fixed
Expand Down
36 changes: 36 additions & 0 deletions sdk/messaging/azservicebus/example_receiver_test.go
Expand Up @@ -7,6 +7,8 @@ import (
"context"
"errors"
"fmt"
"log"
"os"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
Expand Down Expand Up @@ -215,3 +217,37 @@ func ExampleReceiver_ReceiveMessages_second() {
}
}
}

func ExampleReceiver_DeleteMessages() {
count, err := receiver.DeleteMessages(context.TODO(), &azservicebus.DeleteMessagesOptions{
Count: 4000,
BeforeEnqueueTime: time.Now(),
})

if err != nil {
// TODO: Update the following line with your application specific error handling logic
log.Fatalf("ERROR: %s", err)
}

fmt.Fprintf(os.Stderr, "Number of messages deleted: %d\n", count)
}

func ExampleReceiver_DeleteMessages_loop() {
// An example of how to delete messages in a loop.
now := time.Now()

for {
count, err := receiver.DeleteMessages(context.TODO(), &azservicebus.DeleteMessagesOptions{
BeforeEnqueueTime: now,
})

if err != nil {
// TODO: Update the following line with your application specific error handling logic
log.Fatalf("ERROR: %s", err)
}

if count == 0 {
break
}
}
}
8 changes: 4 additions & 4 deletions sdk/messaging/azservicebus/go.mod
Expand Up @@ -5,7 +5,7 @@ go 1.18
retract v1.1.2 // Breaks customers in situations where close is slow/infinite.

require (
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.9.2
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.11.1
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.5.1
github.com/Azure/azure-sdk-for-go/sdk/internal v1.6.0
github.com/Azure/go-amqp v1.0.5
Expand Down Expand Up @@ -35,9 +35,9 @@ require (
github.com/kylelemons/godebug v1.1.0 // indirect
github.com/pkg/browser v0.0.0-20240102092130-5ac0b6a4141c // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
golang.org/x/crypto v0.21.0 // indirect
golang.org/x/net v0.22.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/crypto v0.22.0 // indirect
golang.org/x/net v0.24.0 // indirect
golang.org/x/sys v0.19.0 // indirect
golang.org/x/text v0.14.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
16 changes: 8 additions & 8 deletions sdk/messaging/azservicebus/go.sum
@@ -1,8 +1,8 @@
code.cloudfoundry.org/clock v0.0.0-20180518195852-02e53af36e6c/go.mod h1:QD9Lzhd/ux6eNQVUDVRJX/RKTigpewimNYBi7ivZKY8=
code.cloudfoundry.org/clock v1.1.0 h1:XLzC6W3Ah/Y7ht1rmZ6+QfPdt1iGWEAAtIZXgiaj57c=
code.cloudfoundry.org/clock v1.1.0/go.mod h1:yA3fxddT9RINQL2XHS7PS+OXxKCGhfrZmlNUCIM6AKo=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.9.2 h1:c4k2FIYIh4xtwqrQwV0Ct1v5+ehlNXj5NI/MWVsiTkQ=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.9.2/go.mod h1:5FDJtLEO/GxwNgUxbwrY3LP0pEoThTQJtk2oysdXHxM=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.11.1 h1:E+OJmp2tPvt1W+amx48v1eqbjDYsgN+RzP4q16yV5eM=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.11.1/go.mod h1:a6xsAQUZg+VsS3TJ05SRp524Hs4pZ/AeFSr5ENf0Yjo=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.5.1 h1:sO0/P7g68FrryJzljemN+6GTssUXdANk6aJ7T1ZxnsQ=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.5.1/go.mod h1:h8hyGFDsU5HMivxiS2iYFZsgDbU9OnnJ163x5UGVKYo=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.6.0 h1:sUFnFjzDUie80h24I7mrKtwCKgLY9L8h5Tp2x9+TWqk=
Expand Down Expand Up @@ -52,15 +52,15 @@ github.com/tedsuo/ifrit v0.0.0-20180802180643-bea94bb476cc/go.mod h1:eyZnKCc955u
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA=
golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
golang.org/x/crypto v0.22.0 h1:g1v0xeRhjcugydODzvb3mEM9SQ0HGp9s/nh3COQ/C30=
golang.org/x/crypto v0.22.0/go.mod h1:vr6Su+7cTlO45qkww3VDJlzDn0ctJvRgYbC2NvXHt+M=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.22.0 h1:9sGLhx7iRIHEiX0oAJ3MRZMUCElJgy7Br1nO+AMN3Tc=
golang.org/x/net v0.22.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/net v0.24.0 h1:1PcaxkF854Fu3+lvBIx5SYn9wRlBzzcnHZSiaFFAb0w=
golang.org/x/net v0.24.0/go.mod h1:2Q7sJY5mzlzWjKtYUEXSlBWCdyaioyXzRB2RtU8KVE8=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand All @@ -71,8 +71,8 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.18.0 h1:DBdB3niSjOA/O0blCZBqDefyWNYveAYMNF1Wum0DYQ4=
golang.org/x/sys v0.18.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.19.0 h1:q5f1RH2jigJ1MoAWp2KTp3gm5zAGFUTarQZ5U386+4o=
golang.org/x/sys v0.19.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
Expand Down
4 changes: 2 additions & 2 deletions sdk/messaging/azservicebus/internal/amqpLinks_test.go
Expand Up @@ -44,7 +44,7 @@ func assertFailedLinks[T error, T2 error](t *testing.T, lwid *LinksWithID, expec
require.True(t, errors.Is(err, expectedErr) || errors.As(err, &expectedErr))
require.ErrorIs(t, err, expectedErr)

_, err = PeekMessages(context.TODO(), lwid.RPC, lwid.Receiver.LinkName(), 0, 1)
_, err = PeekMessages(context.TODO(), lwid.RPC, lwid.Receiver.LinkName(), 0, 1, nil)
require.True(t, errors.Is(err, expectedRPCError) || errors.As(err, &expectedRPCError))

msg, err := lwid.Receiver.Receive(context.TODO(), nil)
Expand All @@ -62,7 +62,7 @@ func assertLinks(t *testing.T, lwid *LinksWithID) {
}, nil)
require.NoError(t, err)

_, err = PeekMessages(context.TODO(), lwid.RPC, lwid.Receiver.LinkName(), 0, 1)
_, err = PeekMessages(context.TODO(), lwid.RPC, lwid.Receiver.LinkName(), 0, 1, nil)
require.NoError(t, err)

require.NoError(t, lwid.Receiver.IssueCredit(1))
Expand Down
6 changes: 6 additions & 0 deletions sdk/messaging/azservicebus/internal/amqpwrap/rpc.go
Expand Up @@ -13,6 +13,12 @@ import (
type RPCResponse struct {
// Code is the response code - these originate from Service Bus. Some
// common values are called out below, with the RPCResponseCode* constants.
//
// NOTE: These status codes are intended to mirror HTTP status codes. For instance
// peeking messages returns http.StatusOK, etc...
//
// See https://learn.microsoft.com/en-us/azure/service-bus-messaging/service-bus-amqp-request-response
// for more details about the ins and outs of each operation.
Code int
Description string
Message *amqp.Message
Expand Down
2 changes: 1 addition & 1 deletion sdk/messaging/azservicebus/internal/constants.go
Expand Up @@ -4,4 +4,4 @@
package internal

// Version is the semantic version number
const Version = "v1.7.1"
const Version = "v1.8.0-beta.1"
6 changes: 3 additions & 3 deletions sdk/messaging/azservicebus/internal/errors_test.go
Expand Up @@ -175,9 +175,9 @@ func Test_ServiceBusError_NoRecoveryNeeded(t *testing.T) {
&amqp.Error{Condition: amqp.ErrCond("com.microsoft:operation-cancelled")},
errors.New("link is currently draining"), // not yet exposed from go-amqp
// simple timeouts from the mgmt link
RPCError{Resp: &amqpwrap.RPCResponse{Code: 408}},
RPCError{Resp: &amqpwrap.RPCResponse{Code: 503}},
RPCError{Resp: &amqpwrap.RPCResponse{Code: 500}},
RPCError{Resp: &amqpwrap.RPCResponse{Code: http.StatusRequestTimeout}}, // 408
RPCError{Resp: &amqpwrap.RPCResponse{Code: http.StatusServiceUnavailable}}, // 503
RPCError{Resp: &amqpwrap.RPCResponse{Code: http.StatusInternalServerError}}, // 500
}

for i, err := range tempErrors {
Expand Down
35 changes: 13 additions & 22 deletions sdk/messaging/azservicebus/internal/mgmt.go
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"
"fmt"
"net/http"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/internal/uuid"
Expand Down Expand Up @@ -111,17 +112,23 @@ func ReceiveDeferred(ctx context.Context, rpcLink amqpwrap.RPCLink, linkName str
return transformedMessages, nil
}

func PeekMessages(ctx context.Context, rpcLink amqpwrap.RPCLink, linkName string, fromSequenceNumber int64, messageCount int32) ([]*amqp.Message, error) {
func PeekMessages(ctx context.Context, rpcLink amqpwrap.RPCLink, linkName string, fromSequenceNumber int64, messageCount int32, sessionID *string) ([]*amqp.Message, error) {
const messagesField, messageField = "messages", "message"

value := map[string]any{
"from-sequence-number": fromSequenceNumber,
"message-count": messageCount,
}

if sessionID != nil {
value["session-id"] = *sessionID
}

msg := &amqp.Message{
ApplicationProperties: map[string]any{
"operation": "com.microsoft:peek-message",
},
Value: map[string]any{
"from-sequence-number": fromSequenceNumber,
"message-count": messageCount,
},
Value: value,
}

addAssociatedLinkName(linkName, msg)
Expand All @@ -135,7 +142,7 @@ func PeekMessages(ctx context.Context, rpcLink amqpwrap.RPCLink, linkName string
return nil, err
}

if rsp.Code == 204 {
if rsp.Code == http.StatusNoContent {
// no messages available
return nil, nil
}
Expand Down Expand Up @@ -190,24 +197,8 @@ func PeekMessages(ctx context.Context, rpcLink amqpwrap.RPCLink, linkName string
}

transformedMessages[i] = &rehydrated

// transformedMessages[i], err = MessageFromAMQPMessage(&rehydrated)
// if err != nil {
// tab.For(ctx).Error(err)
// return nil, err
// }

// transformedMessages[i].useSession = r.isSessionFilterSet
// transformedMessages[i].sessionID = r.sessionID
}

// This sort is done to ensure that folks wanting to peek messages in sequence order may do so.
// sort.Slice(transformedMessages, func(i, j int) bool {
// iSeq := *transformedMessages[i].SystemProperties.SequenceNumber
// jSeq := *transformedMessages[j].SystemProperties.SequenceNumber
// return iSeq < jSeq
// })

return transformedMessages, nil
}

Expand Down
18 changes: 13 additions & 5 deletions sdk/messaging/azservicebus/internal/utils/types.go
Expand Up @@ -121,10 +121,18 @@ func ISO8601StringToDuration(durationStr *string) (*time.Duration, error) {
return &duration, nil
}

func Int32ToPtr(val *int32) *int32 {
if val != nil && *val > 0 {
return val
// ToInt64 - allow any size signed-int to be upconverted into an int64
func ToInt64(v any, defValue int64) (int64, bool) {
switch tmpV := v.(type) {
case int:
return int64(tmpV), true
case int8:
return int64(tmpV), true
case int32:
return int64(tmpV), true
case int64:
return int64(tmpV), true
default:
return defValue, false
}

return nil
}
23 changes: 23 additions & 0 deletions sdk/messaging/azservicebus/internal/utils/types_test.go
Expand Up @@ -33,3 +33,26 @@ func TestISO8601StringToDuration(t *testing.T) {
})
}
}

func TestToInt64(t *testing.T) {
tests := []struct {
V any
Default int64
Expected int64
OK bool
}{
{100, -1, 100, true},
{int64(100), -1, 100, true},
{int32(100), -1, 100, true},
{int8(100), -1, 100, true},

{uint32(100), -1, -1, false},
{"oops, all strings", -1, -1, false},
}

for _, test := range tests {
v, ok := ToInt64(test.V, test.Default)
require.Equal(t, test.Expected, v)
require.Equal(t, test.OK, ok)
}
}
4 changes: 2 additions & 2 deletions sdk/messaging/azservicebus/liveTestHelpers_test.go
Expand Up @@ -184,13 +184,13 @@ func deleteSubscription(t *testing.T, ac *admin.Client, topicName string, subscr

// peekSingleMessageForTest wraps a standard Receiver.Peek() call so it returns at least one message
// and fails tests otherwise.
func peekSingleMessageForTest(t *testing.T, receiver *Receiver) *ReceivedMessage {
func peekSingleMessageForTest(t *testing.T, receiver *Receiver, options *PeekMessagesOptions) *ReceivedMessage {
var msg *ReceivedMessage

// Peek, unlike Receive, doesn't block until at least one message has arrived, so we have to poll
// to get a similar effect.
err := utils.Retry(context.Background(), EventReceiver, "peekSingleForTest", func(ctx context.Context, args *utils.RetryFnArgs) error {
peekedMessages, err := receiver.PeekMessages(context.Background(), 1, nil)
peekedMessages, err := receiver.PeekMessages(context.Background(), 1, options)
require.NoError(t, err)

if len(peekedMessages) == 1 {
Expand Down
18 changes: 4 additions & 14 deletions sdk/messaging/azservicebus/message.go
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/internal/log"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/amqpwrap"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/utils"
"github.com/Azure/go-amqp"
)

Expand Down Expand Up @@ -389,7 +390,9 @@ func newReceivedMessage(amqpMsg *amqp.Message, receiver amqpwrap.AMQPReceiver) *
msg.EnqueuedSequenceNumber = to.Ptr(enqueuedSequenceNumber.(int64))
}

switch asInt64(amqpMsg.Annotations[messageStateAnnotation], 0) {
msgState, _ := utils.ToInt64(amqpMsg.Annotations[messageStateAnnotation], 0)

switch msgState {
case 1:
msg.State = MessageStateDeferred
case 2:
Expand Down Expand Up @@ -475,16 +478,3 @@ func uuidFromLockTokenBytes(bytes []byte) (*amqp.UUID, error) {

return &amqpUUID, nil
}

func asInt64(v any, defVal int64) int64 {
switch v2 := v.(type) {
case int32:
return int64(v2)
case int64:
return int64(v2)
case int:
return int64(v2)
default:
return defVal
}
}
4 changes: 2 additions & 2 deletions sdk/messaging/azservicebus/messageSettler_test.go
Expand Up @@ -138,7 +138,7 @@ func TestDeferredMessages(t *testing.T) {
require.NoError(t, err)

// we can peek it without altering anything here.
peekedMessage := peekSingleMessageForTest(t, receiver)
peekedMessage := peekSingleMessageForTest(t, receiver, nil)
require.Equal(t, originalDeferredMessage.DeliveryCount+1, peekedMessage.DeliveryCount, "Delivery count is incremented")
})

Expand All @@ -155,7 +155,7 @@ func TestDeferredMessages(t *testing.T) {
msg := testStuff.deferMessageForTest(t)
require.EqualValues(t, MessageStateDeferred, msg.State)

peekedMsg := peekSingleMessageForTest(t, receiver)
peekedMsg := peekSingleMessageForTest(t, receiver, nil)
require.EqualValues(t, MessageStateDeferred, peekedMsg.State)

// double defer!
Expand Down