Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Move Azure Service Bus to track2 sdk #1648

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
138 changes: 76 additions & 62 deletions bindings/azure/servicebusqueues/servicebusqueues.go
Expand Up @@ -21,7 +21,8 @@ import (
"sync/atomic"
"time"

servicebus "github.com/Azure/azure-service-bus-go"
servicebus "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
admin "github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/admin"
"github.com/cenkalti/backoff/v4"

azauth "github.com/dapr/components-contrib/authentication/azure"
Expand All @@ -43,8 +44,8 @@ const (
// AzureServiceBusQueues is an input/output binding reading from and sending events to Azure Service Bus queues.
type AzureServiceBusQueues struct {
metadata *serviceBusQueuesMetadata
ns *servicebus.Namespace
queue *servicebus.QueueEntity
client *servicebus.Client
adminClient *admin.Client
shutdownSignal int32
logger logger.Logger
ctx context.Context
Expand Down Expand Up @@ -72,10 +73,18 @@ func (a *AzureServiceBusQueues) Init(metadata bindings.Metadata) error {
userAgent := "dapr-" + logger.DaprVersion
a.metadata = meta

var ns *servicebus.Namespace
var client *servicebus.Client
var adminClient *admin.Client
if a.metadata.ConnectionString != "" {
ns, err = servicebus.NewNamespace(servicebus.NamespaceWithConnectionString(a.metadata.ConnectionString),
servicebus.NamespaceWithUserAgent(userAgent))
client, err = servicebus.NewClientFromConnectionString(a.metadata.ConnectionString, &servicebus.ClientOptions{
ApplicationID: userAgent,
})

if err != nil {
return err
}

adminClient, err = admin.NewClientFromConnectionString(a.metadata.ConnectionString, &admin.ClientOptions{})
if err != nil {
return err
}
Expand All @@ -86,45 +95,31 @@ func (a *AzureServiceBusQueues) Init(metadata bindings.Metadata) error {
return sErr
}

tokenProvider, tErr := settings.GetAADTokenProvider()
token, tErr := settings.GetTokenCredential()
if tErr != nil {
return tErr
}

ns, err = servicebus.NewNamespace(servicebus.NamespaceWithTokenProvider(tokenProvider),
servicebus.NamespaceWithUserAgent(userAgent))
client, err = servicebus.NewClient(a.metadata.NamespaceName, token, &servicebus.ClientOptions{
ApplicationID: userAgent,
})

if err != nil {
return err
}

// We set these separately as the ServiceBus SDK does not provide a way to pass the environment via the options
// pattern unless you allow it to recreate the entire environment which seems wasteful.
ns.Name = a.metadata.NamespaceName
ns.Environment = *settings.AzureEnvironment
ns.Suffix = settings.AzureEnvironment.ServiceBusEndpointSuffix
adminClient, err = admin.NewClient(a.metadata.NamespaceName, token, &admin.ClientOptions{})
if err != nil {
return err
}
}
a.ns = ns

qm := ns.NewQueueManager()
a.client = client
a.adminClient = adminClient

ctx := context.Background()

queues, err := qm.List(ctx)
_, err = adminClient.GetQueue(ctx, a.metadata.QueueName, nil)
if err != nil {
return err
}

var entity *servicebus.QueueEntity
for _, q := range queues {
if q.Name == a.metadata.QueueName {
entity = q

break
}
}

// Create queue if it does not exist
if entity == nil {
var ttl time.Duration
var ok bool
ttl, ok, err = contrib_metadata.TryGetTTL(metadata.Properties)
Expand All @@ -135,12 +130,15 @@ func (a *AzureServiceBusQueues) Init(metadata bindings.Metadata) error {
if !ok {
ttl = a.metadata.ttl
}
entity, err = qm.Put(ctx, a.metadata.QueueName, servicebus.QueueEntityWithMessageTimeToLive(&ttl))

properties := &admin.QueueProperties{}
properties.DefaultMessageTimeToLive = &ttl

_, err := adminClient.CreateQueue(ctx, a.metadata.QueueName, properties, nil)
if err != nil {
return err
}
}
a.queue = entity

a.clearShutdown()

Expand Down Expand Up @@ -191,18 +189,18 @@ func (a *AzureServiceBusQueues) Invoke(req *bindings.InvokeRequest) (*bindings.I
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()

client, err := a.ns.NewQueue(a.queue.Name)
sender, err := a.client.NewSender(a.metadata.QueueName, nil)
if err != nil {
return nil, err
}
defer client.Close(context.Background())
defer sender.Close(context.Background())

msg := servicebus.NewMessage(req.Data)
msg := &servicebus.Message{Body: req.Data}
if val, ok := req.Metadata[id]; ok && val != "" {
msg.ID = val
msg.MessageID = &val
}
if val, ok := req.Metadata[correlationID]; ok && val != "" {
msg.CorrelationID = val
msg.CorrelationID = &val
}

ttl, ok, err := contrib_metadata.TryGetTTL(req.Metadata)
Expand All @@ -211,55 +209,71 @@ func (a *AzureServiceBusQueues) Invoke(req *bindings.InvokeRequest) (*bindings.I
}

if ok {
msg.TTL = &ttl
msg.TimeToLive = &ttl
}

return nil, client.Send(ctx, msg)
return nil, sender.SendMessage(ctx, msg)
}

func (a *AzureServiceBusQueues) Read(handler func(*bindings.ReadResponse) ([]byte, error)) error {
var sbHandler servicebus.HandlerFunc = func(ctx context.Context, msg *servicebus.Message) error {
_, err := handler(&bindings.ReadResponse{
Data: msg.Data,
Metadata: map[string]string{id: msg.ID, correlationID: msg.CorrelationID, label: msg.Label},
})
if err == nil {
return msg.Complete(ctx)
}

return msg.Abandon(ctx)
}

// Connections need to retry forever with a maximum backoff of 5 minutes and exponential scaling.
connConfig := retry.DefaultConfig()
connConfig.Policy = retry.PolicyExponential
connConfig.MaxInterval, _ = time.ParseDuration("5m")
connBackoff := connConfig.NewBackOffWithContext(a.ctx)

for !a.isShutdown() {
client := a.attemptConnectionForever(connBackoff)
receiver := a.attemptConnectionForever(connBackoff)

if client == nil {
if receiver == nil {
a.logger.Errorf("Failed to connect to Azure Service Bus Queue.")
continue
}
defer client.Close(context.Background())
defer receiver.Close(context.Background())

if err := client.Receive(a.ctx, sbHandler); err != nil {
msgs, err := receiver.ReceiveMessages(a.ctx, 10, nil)
if err != nil {
a.logger.Warnf("Error reading from Azure Service Bus Queue binding: %s", err.Error())
}

for _, msg := range msgs {
body, err := msg.Body()
if err != nil {
a.logger.Warnf("Error reading message body: %s", err.Error())
receiver.AbandonMessage(a.ctx, msg, nil)
}

metadata := make(map[string]string)
metadata[id] = msg.MessageID
if msg.CorrelationID != nil {
metadata[correlationID] = *msg.CorrelationID
}
if msg.Subject != nil {
metadata[label] = *msg.Subject
}

_, err = handler(&bindings.ReadResponse{
Data: body,
Metadata: metadata,
})
if err == nil {
return receiver.CompleteMessage(a.ctx, msg)
}

receiver.AbandonMessage(a.ctx, msg, nil)
}
}
return nil
}

func (a *AzureServiceBusQueues) attemptConnectionForever(backoff backoff.BackOff) *servicebus.Queue {
var client *servicebus.Queue
func (a *AzureServiceBusQueues) attemptConnectionForever(backoff backoff.BackOff) *servicebus.Receiver {
var receiver *servicebus.Receiver
retry.NotifyRecover(func() error {
clientAttempt, err := a.ns.NewQueue(a.queue.Name)
clientAttempt, err := a.client.NewReceiverForQueue(a.metadata.QueueName, nil)
if err != nil {
return err
}
client = clientAttempt
receiver = clientAttempt
return nil
}, backoff,
func(err error, d time.Duration) {
Expand All @@ -269,7 +283,7 @@ func (a *AzureServiceBusQueues) attemptConnectionForever(backoff backoff.BackOff
a.logger.Debug("Successfully reconnected to Azure Service Bus.")
backoff.Reset()
})
return client
return receiver
}

func (a *AzureServiceBusQueues) Close() error {
Expand Down
12 changes: 7 additions & 5 deletions go.mod
Expand Up @@ -7,18 +7,19 @@ require (
cloud.google.com/go/datastore v1.1.0
cloud.google.com/go/pubsub v1.12.2
cloud.google.com/go/storage v1.10.0
github.com/Azure/azure-amqp-common-go/v3 v3.1.0
github.com/Azure/azure-event-hubs-go/v3 v3.3.10
github.com/Azure/azure-amqp-common-go/v3 v3.2.3
github.com/Azure/azure-event-hubs-go/v3 v3.3.17
github.com/Azure/azure-sdk-for-go v59.3.0+incompatible
github.com/Azure/azure-sdk-for-go/sdk/azcore v0.20.0
github.com/Azure/azure-sdk-for-go/sdk/azidentity v0.12.0
github.com/Azure/azure-sdk-for-go/sdk/keyvault/azsecrets v0.3.0
github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v0.3.3-0.20211210013014-7a0cef22a2ef
github.com/Azure/azure-service-bus-go v0.10.10
github.com/Azure/azure-storage-blob-go v0.10.0
github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd
github.com/Azure/go-amqp v0.13.1
github.com/Azure/go-amqp v0.17.0
github.com/Azure/go-autorest/autorest v0.11.23
github.com/Azure/go-autorest/autorest/adal v0.9.16
github.com/Azure/go-autorest/autorest/adal v0.9.17
github.com/Azure/go-autorest/autorest/azure/auth v0.5.8
github.com/DATA-DOG/go-sqlmock v1.5.0
github.com/Shopify/sarama v1.23.1
Expand Down Expand Up @@ -177,8 +178,9 @@ require (
github.com/99designs/keyring v1.2.0 // indirect
github.com/AthenZ/athenz v1.10.39 // indirect
github.com/Azure/azure-pipeline-go v0.2.2 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v0.8.1 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v0.8.2 // indirect
github.com/Azure/azure-sdk-for-go/sdk/keyvault/internal v0.1.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/messaging/internal v0.0.0-20211208010914-2b10e91d237e // indirect
github.com/Azure/go-autorest v14.2.0+incompatible // indirect
github.com/Azure/go-autorest/autorest/azure/cli v0.4.2 // indirect
github.com/Azure/go-autorest/autorest/date v0.3.0 // indirect
Expand Down