Skip to content

Commit

Permalink
[azservicebus] Allow link creation to be cancelled (Azure#17598)
Browse files Browse the repository at this point in the history
AcceptNextSessionFor(Queue|Subscription) can block for a long time (server dependent) if there are no available sessions. HOWEVER, it was intended to be cancellable, which wasn't working.

This is a simpler workaround until we get context support plumbed through go-amqp itself.

Fixes Azure#17565
  • Loading branch information
richardpark-msft committed Apr 16, 2022
1 parent 4413113 commit 63195b9
Show file tree
Hide file tree
Showing 11 changed files with 151 additions and 43 deletions.
3 changes: 3 additions & 0 deletions sdk/messaging/azservicebus/CHANGELOG.md
Expand Up @@ -12,6 +12,9 @@

### Bugs Fixed

- Fixing issue where the AcceptNextSessionForQueue and AcceptNextSessionForSubscription
couldn't be cancelled, forcing the user to wait for the service to timeout. (#17598)

### Other Changes

## 0.4.0 (2022-04-06)
Expand Down
27 changes: 26 additions & 1 deletion sdk/messaging/azservicebus/client_test.go
Expand Up @@ -13,7 +13,9 @@ import (
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/azidentity"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/admin"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/test"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/internal/sas"
Expand Down Expand Up @@ -204,7 +206,7 @@ func TestNewClientNewReceiverNotFound(t *testing.T) {
assertRPCNotFound(t, err)
}

func TestNewClientNewSessionReceiverNotFound(t *testing.T) {
func TestClientNewSessionReceiverNotFound(t *testing.T) {
connectionString := test.GetConnectionString(t)
client, err := NewClientFromConnectionString(connectionString, nil)
require.NoError(t, err)
Expand Down Expand Up @@ -258,6 +260,29 @@ func TestClientCloseVsClosePermanently(t *testing.T) {
require.Nil(t, sessionReceiver)
}

func TestClientNewSessionReceiverCancel(t *testing.T) {
// Both the session APIs create the receiver immediately however AcceptNextSession() has a quirk
// where it takes an excessively long time.
connectionString := test.GetConnectionString(t)

queue, cleanup := createQueue(t, connectionString, &admin.QueueProperties{
RequiresSession: to.Ptr(true),
})

defer cleanup()

client, err := NewClientFromConnectionString(connectionString, nil)
require.NoError(t, err)

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

// non-cancelled version
receiver, err := client.AcceptNextSessionForQueue(ctx, queue, nil)
require.ErrorIs(t, err, context.DeadlineExceeded)
require.Nil(t, receiver)
}

func TestNewClientUnitTests(t *testing.T) {
t.Run("WithTokenCredential", func(t *testing.T) {
fakeTokenCredential := struct{ azcore.TokenCredential }{}
Expand Down
5 changes: 3 additions & 2 deletions sdk/messaging/azservicebus/internal/amqpLinks.go
Expand Up @@ -11,6 +11,7 @@ import (
"sync"

"github.com/Azure/azure-sdk-for-go/sdk/internal/log"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/amqpwrap"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/exported"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/tracing"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/utils"
Expand Down Expand Up @@ -85,7 +86,7 @@ type AMQPLinksImpl struct {
RPCLink RPCLink

// the AMQP session for either the 'sender' or 'receiver' link
session AMQPSessionCloser
session amqpwrap.AMQPSession

// these are populated by your `createLinkFunc` when you construct
// the amqpLinks
Expand All @@ -104,7 +105,7 @@ type AMQPLinksImpl struct {

// CreateLinkFunc creates the links, using the given session. Typically you'll only create either an
// *amqp.Sender or a *amqp.Receiver. AMQPLinks handles it either way.
type CreateLinkFunc func(ctx context.Context, session AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error)
type CreateLinkFunc func(ctx context.Context, session amqpwrap.AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error)

type NewAMQPLinksArgs struct {
NS NamespaceForAMQPLinks
Expand Down
33 changes: 17 additions & 16 deletions sdk/messaging/azservicebus/internal/amqpLinks_test.go
Expand Up @@ -12,6 +12,7 @@ import (
"time"

"github.com/Azure/azure-sdk-for-go/sdk/internal/log"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/amqpwrap"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/exported"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/test"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/utils"
Expand Down Expand Up @@ -77,7 +78,7 @@ func TestAMQPLinksBasic(t *testing.T) {
links := NewAMQPLinks(NewAMQPLinksArgs{
NS: ns,
EntityPath: entityPath,
CreateLinkFunc: func(ctx context.Context, session AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
CreateLinkFunc: func(ctx context.Context, session amqpwrap.AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
return newLinksForAMQPLinksTest(entityPath, session)
},
GetRecoveryKindFunc: GetRecoveryKind,
Expand Down Expand Up @@ -112,7 +113,7 @@ func TestAMQPLinksLive(t *testing.T) {
links := NewAMQPLinks(NewAMQPLinksArgs{
NS: ns,
EntityPath: entityPath,
CreateLinkFunc: func(ctx context.Context, session AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
CreateLinkFunc: func(ctx context.Context, session amqpwrap.AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
createLinksCalled++
return newLinksForAMQPLinksTest(entityPath, session)
},
Expand Down Expand Up @@ -185,7 +186,7 @@ func TestAMQPLinksLiveRecoverLink(t *testing.T) {
links := NewAMQPLinks(NewAMQPLinksArgs{
NS: ns,
EntityPath: entityPath,
CreateLinkFunc: func(ctx context.Context, session AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
CreateLinkFunc: func(ctx context.Context, session amqpwrap.AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
createLinksCalled++
return newLinksForAMQPLinksTest(entityPath, session)
},
Expand Down Expand Up @@ -223,7 +224,7 @@ func TestAMQPLinksLiveRace(t *testing.T) {
links := NewAMQPLinks(NewAMQPLinksArgs{
NS: ns,
EntityPath: entityPath,
CreateLinkFunc: func(ctx context.Context, session AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
CreateLinkFunc: func(ctx context.Context, session amqpwrap.AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
createLinksCalled++
return newLinksForAMQPLinksTest(entityPath, session)
},
Expand Down Expand Up @@ -275,7 +276,7 @@ func TestAMQPLinksLiveRaceLink(t *testing.T) {
links := NewAMQPLinks(NewAMQPLinksArgs{
NS: ns,
EntityPath: entityPath,
CreateLinkFunc: func(ctx context.Context, session AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
CreateLinkFunc: func(ctx context.Context, session amqpwrap.AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
createLinksCalled++
return newLinksForAMQPLinksTest(entityPath, session)
},
Expand Down Expand Up @@ -319,7 +320,7 @@ func TestAMQPLinksRetry(t *testing.T) {
links := NewAMQPLinks(NewAMQPLinksArgs{
NS: ns,
EntityPath: entityPath,
CreateLinkFunc: func(ctx context.Context, session AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
CreateLinkFunc: func(ctx context.Context, session amqpwrap.AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
createLinksCalled++
return newLinksForAMQPLinksTest(entityPath, session)
},
Expand Down Expand Up @@ -361,7 +362,7 @@ func TestAMQPLinksMultipleWithSameConnection(t *testing.T) {
links := NewAMQPLinks(NewAMQPLinksArgs{
NS: ns,
EntityPath: entityPath,
CreateLinkFunc: func(ctx context.Context, session AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
CreateLinkFunc: func(ctx context.Context, session amqpwrap.AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
createLinksCalled++
return newLinksForAMQPLinksTest(entityPath, session)
},
Expand All @@ -377,7 +378,7 @@ func TestAMQPLinksMultipleWithSameConnection(t *testing.T) {
links2 := NewAMQPLinks(NewAMQPLinksArgs{
NS: ns,
EntityPath: entityPath,
CreateLinkFunc: func(ctx context.Context, session AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
CreateLinkFunc: func(ctx context.Context, session amqpwrap.AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
createLinksCalled2++
return newLinksForAMQPLinksTest(entityPath, session)
},
Expand Down Expand Up @@ -456,7 +457,7 @@ func TestAMQPLinksCloseIfNeeded(t *testing.T) {
links := NewAMQPLinks(NewAMQPLinksArgs{
NS: ns,
EntityPath: "entityPath",
CreateLinkFunc: func(ctx context.Context, session AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
CreateLinkFunc: func(ctx context.Context, session amqpwrap.AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
return sender, receiver, nil
},
GetRecoveryKindFunc: GetRecoveryKind,
Expand Down Expand Up @@ -486,7 +487,7 @@ func TestAMQPLinksCloseIfNeeded(t *testing.T) {
links := NewAMQPLinks(NewAMQPLinksArgs{
NS: ns,
EntityPath: "entityPath",
CreateLinkFunc: func(ctx context.Context, session AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
CreateLinkFunc: func(ctx context.Context, session amqpwrap.AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
return sender, receiver, nil
},
GetRecoveryKindFunc: GetRecoveryKind,
Expand Down Expand Up @@ -515,7 +516,7 @@ func TestAMQPLinksCloseIfNeeded(t *testing.T) {
links := NewAMQPLinks(NewAMQPLinksArgs{
NS: ns,
EntityPath: "entityPath",
CreateLinkFunc: func(ctx context.Context, session AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
CreateLinkFunc: func(ctx context.Context, session amqpwrap.AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
return sender, receiver, nil
},
GetRecoveryKindFunc: GetRecoveryKind,
Expand Down Expand Up @@ -544,7 +545,7 @@ func TestAMQPLinksCloseIfNeeded(t *testing.T) {
links := NewAMQPLinks(NewAMQPLinksArgs{
NS: ns,
EntityPath: "entityPath",
CreateLinkFunc: func(ctx context.Context, session AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
CreateLinkFunc: func(ctx context.Context, session amqpwrap.AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
return sender, receiver, nil
},
GetRecoveryKindFunc: GetRecoveryKind,
Expand Down Expand Up @@ -607,7 +608,7 @@ func TestAMQPLinksRetriesUnit(t *testing.T) {
links := NewAMQPLinks(NewAMQPLinksArgs{
NS: ns,
EntityPath: "entityPath",
CreateLinkFunc: func(ctx context.Context, session AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
CreateLinkFunc: func(ctx context.Context, session amqpwrap.AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
return sender, receiver, nil
},
GetRecoveryKindFunc: GetRecoveryKind,
Expand Down Expand Up @@ -651,7 +652,7 @@ func TestAMQPLinks_Logging(t *testing.T) {
links := NewAMQPLinks(NewAMQPLinksArgs{
NS: ns,
EntityPath: "entityPath",
CreateLinkFunc: func(ctx context.Context, session AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
CreateLinkFunc: func(ctx context.Context, session amqpwrap.AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
return nil, receiver, nil
},
GetRecoveryKindFunc: GetRecoveryKind,
Expand Down Expand Up @@ -684,7 +685,7 @@ func TestAMQPLinks_Logging(t *testing.T) {
links := NewAMQPLinks(NewAMQPLinksArgs{
NS: ns,
EntityPath: "entityPath",
CreateLinkFunc: func(ctx context.Context, session AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
CreateLinkFunc: func(ctx context.Context, session amqpwrap.AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
return nil, receiver, nil
}, GetRecoveryKindFunc: GetRecoveryKind,
})
Expand All @@ -710,7 +711,7 @@ func TestAMQPLinks_Logging(t *testing.T) {
})
}

func newLinksForAMQPLinksTest(entityPath string, session AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
func newLinksForAMQPLinksTest(entityPath string, session amqpwrap.AMQPSession) (AMQPSenderCloser, AMQPReceiverCloser, error) {
receiveMode := amqp.ModeSecond

opts := []amqp.LinkOption{
Expand Down
22 changes: 18 additions & 4 deletions sdk/messaging/azservicebus/internal/amqp_test_utils.go
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"

"github.com/Azure/azure-sdk-for-go/sdk/internal/log"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/amqpwrap"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/exported"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/utils"
"github.com/Azure/go-amqp"
Expand All @@ -18,7 +19,7 @@ type FakeNS struct {
recovered uint64
clientRevisions []uint64
RPCLink RPCLink
Session AMQPSessionCloser
Session amqpwrap.AMQPSession
AMQPLinks *FakeAMQPLinks

CloseCalled int
Expand All @@ -30,7 +31,10 @@ type FakeAMQPSender struct {
}

type FakeAMQPSession struct {
AMQPSessionCloser
amqpwrap.AMQPSession

NewReceiverFn func(opts ...amqp.LinkOption) (AMQPReceiverCloser, error)

closed int
}

Expand All @@ -54,7 +58,8 @@ type FakeAMQPLinks struct {

type FakeAMQPReceiver struct {
AMQPReceiver
Closed int
Closed int
CloseFn func(ctx context.Context) error

DrainCalled int
DrainCreditImpl func(ctx context.Context) error
Expand Down Expand Up @@ -139,6 +144,11 @@ func (r *FakeAMQPReceiver) Prefetched(ctx context.Context) (*amqp.Message, error

func (r *FakeAMQPReceiver) Close(ctx context.Context) error {
r.Closed++

if r.CloseFn != nil {
return r.CloseFn(ctx)
}

return nil
}

Expand Down Expand Up @@ -189,6 +199,10 @@ func (s *FakeAMQPSender) Close(ctx context.Context) error {
return nil
}

func (s *FakeAMQPSession) NewReceiver(opts ...amqp.LinkOption) (AMQPReceiverCloser, error) {
return s.NewReceiverFn(opts...)
}

func (s *FakeAMQPSession) Close(ctx context.Context) error {
s.closed++
return nil
Expand All @@ -207,7 +221,7 @@ func (ns *FakeNS) GetEntityAudience(entityPath string) string {
return fmt.Sprintf("audience: %s", entityPath)
}

func (ns *FakeNS) NewAMQPSession(ctx context.Context) (AMQPSessionCloser, uint64, error) {
func (ns *FakeNS) NewAMQPSession(ctx context.Context) (amqpwrap.AMQPSession, uint64, error) {
return ns.Session, ns.recovered + 100, nil
}

Expand Down
10 changes: 5 additions & 5 deletions sdk/messaging/azservicebus/internal/amqpwrap/amqpwrap.go
Expand Up @@ -78,22 +78,22 @@ func (w *AMQPClientWrapper) NewSession(opts ...amqp.SessionOption) (AMQPSession,
}

return &AMQPSessionWrapper{
inner: sess,
Inner: sess,
}, nil
}

type AMQPSessionWrapper struct {
inner *amqp.Session
Inner *amqp.Session
}

func (w *AMQPSessionWrapper) Close(ctx context.Context) error {
return w.inner.Close(ctx)
return w.Inner.Close(ctx)
}

func (w *AMQPSessionWrapper) NewReceiver(opts ...amqp.LinkOption) (AMQPReceiverCloser, error) {
return w.inner.NewReceiver(opts...)
return w.Inner.NewReceiver(opts...)
}

func (w *AMQPSessionWrapper) NewSender(opts ...amqp.LinkOption) (AMQPSenderCloser, error) {
return w.inner.NewSender(opts...)
return w.Inner.NewSender(opts...)
}
6 changes: 3 additions & 3 deletions sdk/messaging/azservicebus/internal/namespace.go
Expand Up @@ -70,7 +70,7 @@ type NamespaceWithNewAMQPLinks interface {
// NamespaceForAMQPLinks is the Namespace surface needed for the internals of AMQPLinks.
type NamespaceForAMQPLinks interface {
NegotiateClaim(ctx context.Context, entityPath string) (context.CancelFunc, <-chan struct{}, error)
NewAMQPSession(ctx context.Context) (AMQPSessionCloser, uint64, error)
NewAMQPSession(ctx context.Context) (amqpwrap.AMQPSession, uint64, error)
NewRPCLink(ctx context.Context, managementPath string) (RPCLink, error)
GetEntityAudience(entityPath string) string
Recover(ctx context.Context, clientRevision uint64) (bool, error)
Expand Down Expand Up @@ -192,7 +192,7 @@ func (ns *Namespace) newClient(ctx context.Context) (*amqp.Client, error) {

// NewAMQPSession creates a new AMQP session with the internally cached *amqp.Client.
// Returns a closeable AMQP session and the current client revision.
func (ns *Namespace) NewAMQPSession(ctx context.Context) (AMQPSessionCloser, uint64, error) {
func (ns *Namespace) NewAMQPSession(ctx context.Context) (amqpwrap.AMQPSession, uint64, error) {
client, clientRevision, err := ns.GetAMQPClientImpl(ctx)

if err != nil {
Expand All @@ -205,7 +205,7 @@ func (ns *Namespace) NewAMQPSession(ctx context.Context) (AMQPSessionCloser, uin
return nil, 0, err
}

return session, clientRevision, err
return &amqpwrap.AMQPSessionWrapper{Inner: session}, clientRevision, err
}

// NewRPCLink creates a new amqp-common *rpc.Link with the internally cached *amqp.Client.
Expand Down

0 comments on commit 63195b9

Please sign in to comment.