Skip to content

Commit

Permalink
AzBlob State: Migrate to Track2 SDK
Browse files Browse the repository at this point in the history
Signed-off-by: Bernd Verst <4535280+berndverst@users.noreply.github.com>
  • Loading branch information
berndverst committed Nov 18, 2022
1 parent 17d38d7 commit 7ccca4d
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 84 deletions.
208 changes: 130 additions & 78 deletions state/azure/blobstorage/blobstorage.go
Expand Up @@ -36,16 +36,21 @@ Concurrency is supported with ETags according to https://docs.microsoft.com/en-u
package blobstorage

import (
"bytes"
"context"
b64 "encoding/base64"
"fmt"
"io"
"net"
"net/url"
"reflect"
"strings"

"github.com/Azure/azure-storage-blob-go/azblob"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
jsoniter "github.com/json-iterator/go"

azauth "github.com/dapr/components-contrib/internal/authentication/azure"
Expand All @@ -63,13 +68,14 @@ const (
contentLanguage = "ContentLanguage"
contentDisposition = "ContentDisposition"
cacheControl = "CacheControl"
endpointKey = "endpoint"
)

// StateStore Type.
type StateStore struct {
state.DefaultBulkStore
containerURL azblob.ContainerURL
json jsoniter.API
containerClient *container.Client
json jsoniter.API

features []state.Feature
logger logger.Logger
Expand All @@ -78,49 +84,77 @@ type StateStore struct {
type blobStorageMetadata struct {
AccountName string
ContainerName string
AccountKey string
}

// Init the connection to blob storage, optionally creates a blob container if it doesn't exist.
func (r *StateStore) Init(metadata state.Metadata) error {
meta, err := getBlobStorageMetadata(metadata.Properties)
m, err := getBlobStorageMetadata(metadata.Properties)
if err != nil {
return err
}

credential, env, err := azauth.GetAzureStorageBlobCredentials(r.logger, meta.AccountName, metadata.Properties)
if err != nil {
return fmt.Errorf("invalid credentials with error: %s", err.Error())
}

userAgent := "dapr-" + logger.DaprVersion
options := azblob.PipelineOptions{
Telemetry: azblob.TelemetryOptions{Value: userAgent},
options := container.ClientOptions{
ClientOptions: azcore.ClientOptions{
Telemetry: policy.TelemetryOptions{
ApplicationID: userAgent,
},
},
}
p := azblob.NewPipeline(credential, options)

settings, err := azauth.NewEnvironmentSettings("storage", metadata.Properties)
if err != nil {
return err
}
customEndpoint, ok := metadata.Properties[endpointKey]
var URL *url.URL
customEndpoint, ok := mdutils.GetMetadataProperty(metadata.Properties, azauth.StorageEndpointKeys...)
if ok && customEndpoint != "" {
URL, err = url.Parse(fmt.Sprintf("%s/%s/%s", customEndpoint, meta.AccountName, meta.ContainerName))
var parseErr error
URL, parseErr = url.Parse(fmt.Sprintf("%s/%s/%s", customEndpoint, m.AccountName, m.ContainerName))
if parseErr != nil {
return parseErr
}
} else {
URL, err = url.Parse(fmt.Sprintf("https://%s.blob.%s/%s", meta.AccountName, env.StorageEndpointSuffix, meta.ContainerName))
env := settings.AzureEnvironment
URL, _ = url.Parse(fmt.Sprintf("https://%s.blob.%s/%s", m.AccountName, env.StorageEndpointSuffix, m.ContainerName))
}
if err != nil {
return err
}
containerURL := azblob.NewContainerURL(*URL, p)

_, err = net.LookupHost(URL.Hostname())
if err != nil {
return err
var clientErr error
var client *container.Client
// Try using shared key credentials first
if m.AccountKey != "" {
credential, newSharedKeyErr := azblob.NewSharedKeyCredential(m.AccountName, m.AccountKey)
if err != nil {
return fmt.Errorf("invalid credentials with error: %w", newSharedKeyErr)
}
client, clientErr = container.NewClientWithSharedKeyCredential(URL.String(), credential, &options)
if clientErr != nil {
return fmt.Errorf("cannot init Blobstorage container client: %w", err)
}
container.NewClientWithSharedKeyCredential(URL.String(), credential, &options)
r.containerClient = client
} else {
// fallback to AAD
credential, tokenErr := settings.GetTokenCredential()
if err != nil {
return fmt.Errorf("invalid credentials with error: %w", tokenErr)
}
client, clientErr = container.NewClient(URL.String(), credential, &options)
}
if clientErr != nil {
return fmt.Errorf("cannot init Blobstorage client: %w", clientErr)
}

ctx := context.Background()
_, err = containerURL.Create(ctx, azblob.Metadata{}, azblob.PublicAccessNone)
r.logger.Debugf("error creating container: %s", err)

r.containerURL = containerURL
r.logger.Debugf("using container '%s'", meta.ContainerName)
createContainerOptions := container.CreateOptions{
Access: nil,
}
timeoutCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
_, err = client.Create(timeoutCtx, &createContainerOptions)
cancel()
// Don't return error, container might already exist
r.logger.Debugf("error creating container: %w", err)
r.containerClient = client

return nil
}
Expand Down Expand Up @@ -149,10 +183,11 @@ func (r *StateStore) Set(req *state.SetRequest) error {
}

func (r *StateStore) Ping() error {
accessConditions := azblob.BlobAccessConditions{}

if _, err := r.containerURL.GetProperties(context.Background(), accessConditions.LeaseAccessConditions); err != nil {
return fmt.Errorf("blob storage: error connecting to Blob storage at %s: %s", r.containerURL.URL().Host, err)
getPropertiesOptions := container.GetPropertiesOptions{
LeaseAccessConditions: &container.LeaseAccessConditions{},
}
if _, err := r.containerClient.GetProperties(context.Background(), &getPropertiesOptions); err != nil {
return fmt.Errorf("blob storage: error connecting to Blob storage at %s: %s", r.containerClient.URL(), err)
}

return nil
Expand Down Expand Up @@ -197,9 +232,13 @@ func getBlobStorageMetadata(meta map[string]string) (*blobStorageMetadata, error
}

func (r *StateStore) readFile(ctx context.Context, req *state.GetRequest) (*state.GetResponse, error) {
blobURL := r.containerURL.NewBlockBlobURL(getFileName(req.Key))
blockBlobClient := r.containerClient.NewBlockBlobClient(getFileName(req.Key))

downloadOptions := azblob.DownloadStreamOptions{
AccessConditions: &blob.AccessConditions{},
}

resp, err := blobURL.Download(ctx, 0, azblob.CountToEnd, azblob.BlobAccessConditions{}, false)
blobDownloadResponse, err := blockBlobClient.DownloadStream(ctx, &downloadOptions)
if err != nil {
r.logger.Debugf("download file %s, err %s", req.Key, err)

Expand All @@ -210,107 +249,124 @@ func (r *StateStore) readFile(ctx context.Context, req *state.GetRequest) (*stat
return &state.GetResponse{}, err
}

bodyStream := resp.Body(azblob.RetryReaderOptions{})
data, err := io.ReadAll(bodyStream)
blobData := &bytes.Buffer{}
reader := blobDownloadResponse.Body
_, err = blobData.ReadFrom(reader)
if err != nil {
r.logger.Debugf("read file %s, err %s", req.Key, err)
return &state.GetResponse{}, err
return &state.GetResponse{}, fmt.Errorf("error reading az blob: %w", err)
}
err = reader.Close()
if err != nil {
return &state.GetResponse{}, fmt.Errorf("error closing az blob reader: %w", err)
}

contentType := resp.ContentType()
contentType := blobDownloadResponse.ContentType

return &state.GetResponse{
Data: data,
ETag: ptr.Of(string(resp.ETag())),
ContentType: &contentType,
Data: blobData.Bytes(),
ETag: ptr.Of(string(*blobDownloadResponse.ETag)),
ContentType: contentType,
}, nil
}

func (r *StateStore) writeFile(ctx context.Context, req *state.SetRequest) error {
accessConditions := azblob.BlobAccessConditions{}
modifiedAccessConditions := blob.ModifiedAccessConditions{}

if req.ETag != nil && *req.ETag != "" {
accessConditions.IfMatch = azblob.ETag(*req.ETag)
modifiedAccessConditions.IfMatch = ptr.Of(azcore.ETag(*req.ETag))
}
if req.Options.Concurrency == state.FirstWrite && (req.ETag == nil || *req.ETag == "") {
accessConditions.IfNoneMatch = azblob.ETag("*")
modifiedAccessConditions.IfNoneMatch = ptr.Of(azcore.ETagAny)
}

blobURL := r.containerURL.NewBlockBlobURL(getFileName(req.Key))
accessConditions := blob.AccessConditions{
ModifiedAccessConditions: &modifiedAccessConditions,
}

blobHTTPHeaders, err := r.createBlobHTTPHeadersFromRequest(req)
if err != nil {
return err
}
_, err = azblob.UploadBufferToBlockBlob(ctx, r.marshal(req), blobURL, azblob.UploadToBlockBlobOptions{

uploadOptions := azblob.UploadBufferOptions{
AccessConditions: &accessConditions,
Metadata: req.Metadata,
AccessConditions: accessConditions,
BlobHTTPHeaders: blobHTTPHeaders,
})
if err != nil {
r.logger.Debugf("write file %s, err %s", req.Key, err)
HTTPHeaders: &blobHTTPHeaders,
Concurrency: 16,
}

blockBlobClient := r.containerClient.NewBlockBlobClient(getFileName(req.Key))
_, err = blockBlobClient.UploadBuffer(ctx, r.marshal(req), &uploadOptions)

if err != nil {
// Check if the error is due to ETag conflict
if req.ETag != nil && isETagConflictError(err) {
return state.NewETagError(state.ETagMismatch, err)
}

return err
return fmt.Errorf("error uploading az blob: %w", err)
}

return nil
}

func (r *StateStore) createBlobHTTPHeadersFromRequest(req *state.SetRequest) (azblob.BlobHTTPHeaders, error) {
var blobHTTPHeaders azblob.BlobHTTPHeaders
func (r *StateStore) createBlobHTTPHeadersFromRequest(req *state.SetRequest) (blob.HTTPHeaders, error) {
blobHTTPHeaders := blob.HTTPHeaders{}
if val, ok := req.Metadata[contentType]; ok && val != "" {
blobHTTPHeaders.ContentType = val
blobHTTPHeaders.BlobContentType = &val
delete(req.Metadata, contentType)
}

if req.ContentType != nil {
if blobHTTPHeaders.ContentType != "" {
r.logger.Warnf("ContentType received from request Metadata %s, as well as ContentType property %s, choosing value from contentType property", blobHTTPHeaders.ContentType, *req.ContentType)
if blobHTTPHeaders.BlobContentType != nil {
r.logger.Warnf("ContentType received from request Metadata %s, as well as ContentType property %s, choosing value from contentType property", blobHTTPHeaders.BlobContentType, req.ContentType)
}
blobHTTPHeaders.ContentType = *req.ContentType
blobHTTPHeaders.BlobContentType = req.ContentType
}

if val, ok := req.Metadata[contentMD5]; ok && val != "" {
sDec, err := b64.StdEncoding.DecodeString(val)
if err != nil || len(sDec) != 16 {
return azblob.BlobHTTPHeaders{}, fmt.Errorf("the MD5 value specified in Content MD5 is invalid, MD5 value must be 128 bits and base64 encoded")
return blob.HTTPHeaders{}, fmt.Errorf("the MD5 value specified in Content MD5 is invalid, MD5 value must be 128 bits and base64 encoded")
}
blobHTTPHeaders.ContentMD5 = sDec
blobHTTPHeaders.BlobContentMD5 = sDec
delete(req.Metadata, contentMD5)
}
if val, ok := req.Metadata[contentEncoding]; ok && val != "" {
blobHTTPHeaders.ContentEncoding = val
blobHTTPHeaders.BlobContentEncoding = &val
delete(req.Metadata, contentEncoding)
}
if val, ok := req.Metadata[contentLanguage]; ok && val != "" {
blobHTTPHeaders.ContentLanguage = val
blobHTTPHeaders.BlobContentLanguage = &val
delete(req.Metadata, contentLanguage)
}
if val, ok := req.Metadata[contentDisposition]; ok && val != "" {
blobHTTPHeaders.ContentDisposition = val
blobHTTPHeaders.BlobContentDisposition = &val
delete(req.Metadata, contentDisposition)
}
if val, ok := req.Metadata[cacheControl]; ok && val != "" {
blobHTTPHeaders.CacheControl = val
blobHTTPHeaders.BlobCacheControl = &val
delete(req.Metadata, cacheControl)
}
return blobHTTPHeaders, nil
}

func (r *StateStore) deleteFile(ctx context.Context, req *state.DeleteRequest) error {
blobURL := r.containerURL.NewBlockBlobURL(getFileName(req.Key))
accessConditions := azblob.BlobAccessConditions{}
blockBlobClient := r.containerClient.NewBlockBlobClient(getFileName(req.Key))

modifiedAccessConditions := blob.ModifiedAccessConditions{}
if req.ETag != nil && *req.ETag != "" {
accessConditions.IfMatch = azblob.ETag(*req.ETag)
modifiedAccessConditions.IfMatch = ptr.Of(azcore.ETag(*req.ETag))
}

_, err := blobURL.Delete(ctx, azblob.DeleteSnapshotsOptionNone, accessConditions)
deleteOptions := blob.DeleteOptions{
DeleteSnapshots: nil,
AccessConditions: &blob.AccessConditions{
ModifiedAccessConditions: &modifiedAccessConditions,
},
}

_, err := blockBlobClient.Delete(ctx, &deleteOptions)
if err != nil {
r.logger.Debugf("delete file %s, err %s", req.Key, err)

Expand Down Expand Up @@ -349,13 +405,9 @@ func (r *StateStore) marshal(req *state.SetRequest) []byte {
}

func isNotFoundError(err error) bool {
azureError, ok := err.(azblob.StorageError)

return ok && azureError.ServiceCode() == azblob.ServiceCodeBlobNotFound
return bloberror.HasCode(err, bloberror.BlobNotFound)
}

func isETagConflictError(err error) bool {
azureError, ok := err.(azblob.StorageError)

return ok && azureError.ServiceCode() == azblob.ServiceCodeConditionNotMet
return bloberror.HasCode(err, bloberror.ConditionNotMet)
}
12 changes: 6 additions & 6 deletions state/azure/blobstorage/blobstorage_test.go
Expand Up @@ -34,8 +34,7 @@ func TestInit(t *testing.T) {
}
err := s.Init(m)
assert.Nil(t, err)
assert.Equal(t, "acc.blob.core.windows.net", s.containerURL.URL().Host)
assert.Equal(t, "/dapr", s.containerURL.URL().Path)
assert.Equal(t, "https://acc.blob.core.windows.net/dapr", s.containerClient.URL())
})

t.Run("Init with missing metadata", func(t *testing.T) {
Expand All @@ -53,7 +52,8 @@ func TestInit(t *testing.T) {
"accountKey": "e+Dnvl8EOxYxV94nurVaRQ==",
"containerName": "dapr",
}
err := s.Init(m)
s.Init(m)
err := s.Ping()
assert.NotNil(t, err)
})
}
Expand Down Expand Up @@ -100,7 +100,7 @@ func TestBlobHTTPHeaderGeneration(t *testing.T) {

blobHeaders, err := s.createBlobHTTPHeadersFromRequest(req)
assert.Nil(t, err)
assert.Equal(t, "application/json", blobHeaders.ContentType)
assert.Equal(t, "application/json", *blobHeaders.BlobContentType)
})
t.Run("Content type and metadata provided (conflict), content type chosen", func(t *testing.T) {
contentType := "application/json"
Expand All @@ -113,7 +113,7 @@ func TestBlobHTTPHeaderGeneration(t *testing.T) {

blobHeaders, err := s.createBlobHTTPHeadersFromRequest(req)
assert.Nil(t, err)
assert.Equal(t, "application/json", blobHeaders.ContentType)
assert.Equal(t, "application/json", *blobHeaders.BlobContentType)
})
t.Run("ContentType not provided, metadata provided set backward compatibility", func(t *testing.T) {
req := &state.SetRequest{
Expand All @@ -124,6 +124,6 @@ func TestBlobHTTPHeaderGeneration(t *testing.T) {

blobHeaders, err := s.createBlobHTTPHeadersFromRequest(req)
assert.Nil(t, err)
assert.Equal(t, "text/plain", blobHeaders.ContentType)
assert.Equal(t, "text/plain", *blobHeaders.BlobContentType)
})
}

0 comments on commit 7ccca4d

Please sign in to comment.