Skip to content

Commit

Permalink
Merge branch 'master' into azblobbindingtrack2
Browse files Browse the repository at this point in the history
  • Loading branch information
berndverst committed Nov 17, 2022
2 parents e3abe5b + bd534a3 commit b0bcb14
Show file tree
Hide file tree
Showing 13 changed files with 119 additions and 213 deletions.
58 changes: 32 additions & 26 deletions bindings/rabbitmq/rabbitmq_integration_test.go
Expand Up @@ -130,12 +130,14 @@ func TestPublishingWithTTL(t *testing.T) {
const maxGetDuration = ttlInSeconds * time.Second

metadata := bindings.Metadata{
Name: "testQueue",
Properties: map[string]string{
"queueName": queueName,
"host": rabbitmqHost,
"deleteWhenUnused": strconv.FormatBool(exclusive),
"durable": strconv.FormatBool(durable),
Base: contribMetadata.Base{
Name: "testQueue",
Properties: map[string]string{
"queueName": queueName,
"host": rabbitmqHost,
"deleteWhenUnused": strconv.FormatBool(exclusive),
"durable": strconv.FormatBool(durable),
},
},
}

Expand All @@ -162,7 +164,7 @@ func TestPublishingWithTTL(t *testing.T) {
},
}

_, err = rabbitMQBinding1.Invoke(context.Backgound(), &writeRequest)
_, err = rabbitMQBinding1.Invoke(context.Background(), &writeRequest)
assert.Nil(t, err)

time.Sleep(time.Second + (ttlInSeconds * time.Second))
Expand All @@ -183,7 +185,7 @@ func TestPublishingWithTTL(t *testing.T) {
contribMetadata.TTLMetadataKey: strconv.Itoa(ttlInSeconds * 1000),
},
}
_, err = rabbitMQBinding2.Invoke(context.Backgound(), &writeRequest)
_, err = rabbitMQBinding2.Invoke(context.Background(), &writeRequest)
assert.Nil(t, err)

msg, ok, err := getMessageWithRetries(ch, queueName, maxGetDuration)
Expand All @@ -204,14 +206,16 @@ func TestExclusiveQueue(t *testing.T) {
const maxGetDuration = ttlInSeconds * time.Second

metadata := bindings.Metadata{
Name: "testQueue",
Properties: map[string]string{
"queueName": queueName,
"host": rabbitmqHost,
"deleteWhenUnused": strconv.FormatBool(exclusive),
"durable": strconv.FormatBool(durable),
"exclusive": strconv.FormatBool(exclusive),
contribMetadata.TTLMetadataKey: strconv.FormatInt(ttlInSeconds, 10),
Base: contribMetadata.Base{
Name: "testQueue",
Properties: map[string]string{
"queueName": queueName,
"host": rabbitmqHost,
"deleteWhenUnused": strconv.FormatBool(exclusive),
"durable": strconv.FormatBool(durable),
"exclusive": strconv.FormatBool(exclusive),
contribMetadata.TTLMetadataKey: strconv.FormatInt(ttlInSeconds, 10),
},
},
}

Expand Down Expand Up @@ -257,13 +261,15 @@ func TestPublishWithPriority(t *testing.T) {
const maxPriority = 10

metadata := bindings.Metadata{
Name: "testQueue",
Properties: map[string]string{
"queueName": queueName,
"host": rabbitmqHost,
"deleteWhenUnused": strconv.FormatBool(exclusive),
"durable": strconv.FormatBool(durable),
"maxPriority": strconv.FormatInt(maxPriority, 10),
Base: contribMetadata.Base{
Name: "testQueue",
Properties: map[string]string{
"queueName": queueName,
"host": rabbitmqHost,
"deleteWhenUnused": strconv.FormatBool(exclusive),
"durable": strconv.FormatBool(durable),
"maxPriority": strconv.FormatInt(maxPriority, 10),
},
},
}

Expand All @@ -283,7 +289,7 @@ func TestPublishWithPriority(t *testing.T) {
defer ch.Close()

const middlePriorityMsgContent = "middle"
_, err = r.Invoke(context.Backgound(), &bindings.InvokeRequest{
_, err = r.Invoke(context.Background(), &bindings.InvokeRequest{
Metadata: map[string]string{
contribMetadata.PriorityMetadataKey: "5",
},
Expand All @@ -292,7 +298,7 @@ func TestPublishWithPriority(t *testing.T) {
assert.Nil(t, err)

const lowPriorityMsgContent = "low"
_, err = r.Invoke(context.Backgound(), &bindings.InvokeRequest{
_, err = r.Invoke(context.Background(), &bindings.InvokeRequest{
Metadata: map[string]string{
contribMetadata.PriorityMetadataKey: "1",
},
Expand All @@ -301,7 +307,7 @@ func TestPublishWithPriority(t *testing.T) {
assert.Nil(t, err)

const highPriorityMsgContent = "high"
_, err = r.Invoke(context.Backgound(), &bindings.InvokeRequest{
_, err = r.Invoke(context.Background(), &bindings.InvokeRequest{
Metadata: map[string]string{
contribMetadata.PriorityMetadataKey: "10",
},
Expand Down
10 changes: 5 additions & 5 deletions pubsub/azure/eventhubs/eventhubs_integration_test.go
Expand Up @@ -52,11 +52,11 @@ func createIotHubPubsubMetadata() pubsub.Metadata {
metadata := pubsub.Metadata{
Base: metadata.Base{
Properties: map[string]string{
connectionString: os.Getenv(iotHubConnectionStringEnvKey),
consumerID: os.Getenv(iotHubConsumerGroupEnvKey),
storageAccountName: os.Getenv(storageAccountNameEnvKey),
storageAccountKey: os.Getenv(storageAccountKeyEnvKey),
storageContainerName: testStorageContainerName,
"connectionString": os.Getenv(iotHubConnectionStringEnvKey),
"consumerID": os.Getenv(iotHubConsumerGroupEnvKey),
"storageAccountName": os.Getenv(storageAccountNameEnvKey),
"storageAccountKey": os.Getenv(storageAccountKeyEnvKey),
"storageContainerName": testStorageContainerName,
},
},
}
Expand Down
10 changes: 0 additions & 10 deletions state/cockroachdb/cockroachdb_access.go
Expand Up @@ -114,11 +114,6 @@ func (p *cockroachDBAccess) Init(metadata state.Metadata) error {

// Set makes an insert or update to the database.
func (p *cockroachDBAccess) Set(req *state.SetRequest) error {
return state.SetWithOptions(p.setValue, req)
}

// setValue is an internal implementation of set to enable passing the logic to state.SetWithRetries as a func.
func (p *cockroachDBAccess) setValue(req *state.SetRequest) error {
p.logger.Debug("Setting state value in CockroachDB")

value, isBinary, err := validateAndReturnValue(req)
Expand Down Expand Up @@ -240,11 +235,6 @@ func (p *cockroachDBAccess) Get(req *state.GetRequest) (*state.GetResponse, erro

// Delete removes an item from the state store.
func (p *cockroachDBAccess) Delete(req *state.DeleteRequest) error {
return state.DeleteWithOptions(p.deleteValue, req)
}

// deleteValue is an internal implementation of delete to enable passing the logic to state.DeleteWithRetries as a func.
func (p *cockroachDBAccess) deleteValue(req *state.DeleteRequest) error {
p.logger.Debug("Deleting state value from CockroachDB")
if req.Key == "" {
return fmt.Errorf("missing key in delete operation")
Expand Down
16 changes: 4 additions & 12 deletions state/gcp/firestore/firestore.go
Expand Up @@ -113,7 +113,8 @@ func (f *Firestore) Get(req *state.GetRequest) (*state.GetResponse, error) {
}, nil
}

func (f *Firestore) setValue(req *state.SetRequest) error {
// Set saves state into Firestore.
func (f *Firestore) Set(req *state.SetRequest) error {
err := state.CheckRequestOptions(req.Options)
if err != nil {
return err
Expand Down Expand Up @@ -142,12 +143,8 @@ func (f *Firestore) setValue(req *state.SetRequest) error {
return nil
}

// Set saves state into Firestore with retry.
func (f *Firestore) Set(req *state.SetRequest) error {
return state.SetWithOptions(f.setValue, req)
}

func (f *Firestore) deleteValue(req *state.DeleteRequest) error {
// Delete performs a delete operation.
func (f *Firestore) Delete(req *state.DeleteRequest) error {
ctx := context.Background()
key := datastore.NameKey(f.entityKind, req.Key, nil)

Expand All @@ -159,11 +156,6 @@ func (f *Firestore) deleteValue(req *state.DeleteRequest) error {
return nil
}

// Delete performs a delete operation.
func (f *Firestore) Delete(req *state.DeleteRequest) error {
return state.DeleteWithOptions(f.deleteValue, req)
}

func getFirestoreMetadata(meta state.Metadata) (*firestoreMetadata, error) {
m := firestoreMetadata{
EntityKind: defaultEntityKind,
Expand Down
6 changes: 1 addition & 5 deletions state/memcached/memcached.go
Expand Up @@ -146,7 +146,7 @@ func (m *Memcached) parseTTL(req *state.SetRequest) (*int32, error) {
return nil, nil
}

func (m *Memcached) setValue(req *state.SetRequest) error {
func (m *Memcached) Set(req *state.SetRequest) error {
var bt []byte
ttl, err := m.parseTTL(req)
if err != nil {
Expand Down Expand Up @@ -194,10 +194,6 @@ func (m *Memcached) Get(req *state.GetRequest) (*state.GetResponse, error) {
}, nil
}

func (m *Memcached) Set(req *state.SetRequest) error {
return state.SetWithOptions(m.setValue, req)
}

func (m *Memcached) GetComponentMetadata() map[string]string {
metadataStruct := memcachedMetadata{}
metadataInfo := map[string]string{}
Expand Down
9 changes: 2 additions & 7 deletions state/mysql/mysql.go
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/dapr/components-contrib/metadata"
"github.com/dapr/components-contrib/state"
"github.com/dapr/kit/logger"
"github.com/dapr/kit/ptr"
)

// Optimistic Concurrency is implemented using a string column that stores a UUID.
Expand Down Expand Up @@ -337,8 +336,6 @@ func (m *MySQL) Delete(req *state.DeleteRequest) error {
return m.deleteValue(m.db, req)
}

// deleteValue is an internal implementation of delete to enable passing the
// logic to state.DeleteWithRetries as a func.
func (m *MySQL) deleteValue(querier querier, req *state.DeleteRequest) error {
m.logger.Debug("Deleting state value from MySql")

Expand Down Expand Up @@ -458,14 +455,14 @@ func (m *MySQL) Get(req *state.GetRequest) (*state.GetResponse, error) {

return &state.GetResponse{
Data: data,
ETag: ptr.Of(eTag),
ETag: &eTag,
Metadata: req.Metadata,
}, nil
}

return &state.GetResponse{
Data: value,
ETag: ptr.Of(eTag),
ETag: &eTag,
Metadata: req.Metadata,
}, nil
}
Expand All @@ -476,8 +473,6 @@ func (m *MySQL) Set(req *state.SetRequest) error {
return m.setValue(m.db, req)
}

// setValue is an internal implementation of set to enable passing the logic
// to state.SetWithRetries as a func.
func (m *MySQL) setValue(querier querier, req *state.SetRequest) error {
m.logger.Debug("Setting state value in MySql")

Expand Down
24 changes: 4 additions & 20 deletions state/oracledatabase/oracledatabaseaccess.go
Expand Up @@ -114,11 +114,6 @@ func (o *oracleDatabaseAccess) Init(metadata state.Metadata) error {
return nil
}

// Set makes an insert or update to the database.
func (o *oracleDatabaseAccess) Set(req *state.SetRequest) error {
return state.SetWithOptions(o.setValue, req)
}

func parseTTL(requestMetadata map[string]string) (*int, error) {
if val, found := requestMetadata[metadataTTLKey]; found && val != "" {
parsedVal, err := strconv.ParseInt(val, 10, 0)
Expand All @@ -133,8 +128,8 @@ func parseTTL(requestMetadata map[string]string) (*int, error) {
return nil, nil
}

// setValue is an internal implementation of set to enable passing the logic to state.SetWithRetries as a func.
func (o *oracleDatabaseAccess) setValue(req *state.SetRequest) error {
// Set makes an insert or update to the database.
func (o *oracleDatabaseAccess) Set(req *state.SetRequest) error {
o.logger.Debug("Setting state value in OracleDatabase")
err := state.CheckRequestOptions(req.Options)
if err != nil {
Expand Down Expand Up @@ -204,6 +199,7 @@ func (o *oracleDatabaseAccess) setValue(req *state.SetRequest) error {
result, err = tx.Exec(mergeStatement, req.Key, value, binaryYN, etag, ttlSeconds)
} else {
// when first write policy is indicated, an existing record has to be updated - one that has the etag provided.
// TODO: Needs to update ttl_in_seconds
updateStatement := fmt.Sprintf(
`UPDATE %s SET value = :value, binary_yn = :binary_yn, etag = :new_etag
WHERE key = :key AND etag = :etag`,
Expand Down Expand Up @@ -273,11 +269,6 @@ func (o *oracleDatabaseAccess) Get(req *state.GetRequest) (*state.GetResponse, e

// Delete removes an item from the state store.
func (o *oracleDatabaseAccess) Delete(req *state.DeleteRequest) error {
return state.DeleteWithOptions(o.deleteValue, req)
}

// deleteValue is an internal implementation of delete to enable passing the logic to state.DeleteWithRetries as a func.
func (o *oracleDatabaseAccess) deleteValue(req *state.DeleteRequest) error {
o.logger.Debug("Deleting state value from OracleDatabase")
if req.Key == "" {
return fmt.Errorf("missing key in delete operation")
Expand Down Expand Up @@ -354,7 +345,7 @@ func (o *oracleDatabaseAccess) ExecuteMulti(sets []state.SetRequest, deletes []s
return err
}

// Close implements io.Close.
// Close implements io.Closer.
func (o *oracleDatabaseAccess) Close() error {
if o.db != nil {
return o.db.Close()
Expand Down Expand Up @@ -391,10 +382,3 @@ func tableExists(db *sql.DB, tableName string) (bool, error) {
exists := tblCount > 0
return exists, err
}

// func handleError(msg string, err error) {
// if err != nil {
// fmt.Println(msg, err)

// }
// }

0 comments on commit b0bcb14

Please sign in to comment.