Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix: remove replace mqtt package #2216

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions .build-tools/go.mod
Expand Up @@ -17,3 +17,5 @@ require (
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)

replace github.com/dapr/dapr => github.com/1046102779/dapr v0.0.0-20221021130037-635b70c24259
16 changes: 16 additions & 0 deletions Makefile
Expand Up @@ -23,6 +23,7 @@ GIT_COMMIT = $(shell git rev-list -1 HEAD)
GIT_VERSION = $(shell git describe --always --abbrev=7 --dirty)
# By default, disable CGO_ENABLED. See the details on https://golang.org/cmd/cgo
CGO ?= 0
DAPR_PACKAGE ?= $(dapr_package)

LOCAL_ARCH := $(shell uname -m)
ifeq ($(LOCAL_ARCH),x86_64)
Expand Down Expand Up @@ -129,19 +130,34 @@ modtidy-$(1):
cd $(shell dirname $(1)); go mod tidy -compat=1.19; cd -
endef

define replaceruntime-dapr
.PHONY: replaceruntime-$(1)
replaceruntime-$(1):
cd $(shell dirname $(1)); go mod edit -replace github.com/dapr/dapr=$(DAPR_PACKAGE); cd -
endef

# Generate modtidy target action for each go.mod file
$(foreach MODFILE,$(MODFILES),$(eval $(call modtidy-target,$(MODFILE))))

# Go get dapr package to tests/.../go.mod.
$(foreach MODFILE,$(MODFILES),$(eval $(call replaceruntime-dapr,$(MODFILE))))

# Enumerate all generated modtidy targets
# Note that the order of execution matters: root and tests/certification go.mod
# are dependencies in each certification test. This order is preserved by the
# tree walk when finding the go.mod files.
TIDY_MODFILES:=$(foreach ITEM,$(MODFILES),modtidy-$(ITEM))

REPLACERUNTIME_MODFILES:=$(foreach ITEM,$(MODFILES),replaceruntime-$(ITEM))

# Define modtidy-all action trigger to run make on all generated modtidy targets
.PHONY: modtidy-all
modtidy-all: $(TIDY_MODFILES)

# Define replaceruntime-all action trigger to go get dapr package specified.
.PHONY: replaceruntime-all
replaceruntime-all: $(REPLACERUNTIME_MODFILES)

################################################################################
# Target: modtidy #
################################################################################
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Expand Up @@ -364,3 +364,5 @@ replace github.com/toolkits/concurrent => github.com/niean/gotools v0.0.0-201512

// this is a fork which addresses a performance issues due to go routines
replace dubbo.apache.org/dubbo-go/v3 => dubbo.apache.org/dubbo-go/v3 v3.0.3-0.20220610080020-48691a404537

replace github.com/dapr/dapr => github.com/1046102779/dapr v0.0.0-20221021130037-635b70c24259
2 changes: 2 additions & 0 deletions middleware/http/wasm/basic/example/go.mod
Expand Up @@ -3,3 +3,5 @@ module github.com/dapr/components-contrib/middleware/wasm/example
go 1.19

require github.com/wapc/wapc-guest-tinygo v0.3.3

replace github.com/dapr/dapr => github.com/1046102779/dapr v0.0.0-20221021130037-635b70c24259
2 changes: 2 additions & 0 deletions middleware/http/wasm/basic/internal/e2e-guests/go.mod
Expand Up @@ -3,3 +3,5 @@ module github.com/dapr/components-contrib/middleware/wasm/internal
go 1.19

require github.com/wapc/wapc-guest-tinygo v0.3.3

replace github.com/dapr/dapr => github.com/1046102779/dapr v0.0.0-20221021130037-635b70c24259
7 changes: 4 additions & 3 deletions state/aerospike/aerospike.go
Expand Up @@ -14,6 +14,7 @@ limitations under the License.
package aerospike

import (
"context"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -110,7 +111,7 @@ func (aspike *Aerospike) Features() []state.Feature {
}

// Set stores value for a key to Aerospike. It honors ETag (for concurrency) and consistency settings.
func (aspike *Aerospike) Set(req *state.SetRequest) error {
func (aspike *Aerospike) Set(ctx context.Context, req *state.SetRequest) error {
err := state.CheckRequestOptions(req.Options)
if err != nil {
return err
Expand Down Expand Up @@ -162,7 +163,7 @@ func (aspike *Aerospike) Set(req *state.SetRequest) error {
}

// Get retrieves state from Aerospike with a key.
func (aspike *Aerospike) Get(req *state.GetRequest) (*state.GetResponse, error) {
func (aspike *Aerospike) Get(ctx context.Context, req *state.GetRequest) (*state.GetResponse, error) {
asKey, err := as.NewKey(aspike.namespace, aspike.set, req.Key)
if err != nil {
return nil, err
Expand Down Expand Up @@ -196,7 +197,7 @@ func (aspike *Aerospike) Get(req *state.GetRequest) (*state.GetResponse, error)
}

// Delete performs a delete operation.
func (aspike *Aerospike) Delete(req *state.DeleteRequest) error {
func (aspike *Aerospike) Delete(ctx context.Context, req *state.DeleteRequest) error {
err := state.CheckRequestOptions(req.Options)
if err != nil {
return err
Expand Down
19 changes: 10 additions & 9 deletions state/alicloud/tablestore/tablestore.go
Expand Up @@ -14,6 +14,7 @@ limitations under the License.
package tablestore

import (
"context"
"encoding/json"

"github.com/agrea/ptr"
Expand Down Expand Up @@ -68,7 +69,7 @@ func (s *AliCloudTableStore) Features() []state.Feature {
return s.features
}

func (s *AliCloudTableStore) Get(req *state.GetRequest) (*state.GetResponse, error) {
func (s *AliCloudTableStore) Get(ctx context.Context, req *state.GetRequest) (*state.GetResponse, error) {
criteria := &tablestore.SingleRowQueryCriteria{
PrimaryKey: s.primaryKey(req.Key),
TableName: s.metadata.TableName,
Expand Down Expand Up @@ -103,7 +104,7 @@ func (s *AliCloudTableStore) getResp(columns []*tablestore.AttributeColumn) *sta
return getResp
}

func (s *AliCloudTableStore) BulkGet(reqs []state.GetRequest) (bool, []state.BulkGetResponse, error) {
func (s *AliCloudTableStore) BulkGet(ctx context.Context, reqs []state.GetRequest) (bool, []state.BulkGetResponse, error) {
// "len == 0": empty request, directly return empty response
if len(reqs) == 0 {
return true, []state.BulkGetResponse{}, nil
Expand Down Expand Up @@ -139,7 +140,7 @@ func (s *AliCloudTableStore) BulkGet(reqs []state.GetRequest) (bool, []state.Bul
return true, responseList, nil
}

func (s *AliCloudTableStore) Set(req *state.SetRequest) error {
func (s *AliCloudTableStore) Set(ctx context.Context, req *state.SetRequest) error {
change := s.updateRowChange(req)

request := &tablestore.UpdateRowRequest{
Expand Down Expand Up @@ -183,7 +184,7 @@ func unmarshal(val interface{}) []byte {
return []byte(output)
}

func (s *AliCloudTableStore) Delete(req *state.DeleteRequest) error {
func (s *AliCloudTableStore) Delete(ctx context.Context, req *state.DeleteRequest) error {
change := s.deleteRowChange(req)

deleteRowReq := &tablestore.DeleteRowRequest{
Expand All @@ -205,15 +206,15 @@ func (s *AliCloudTableStore) deleteRowChange(req *state.DeleteRequest) *tablesto
return change
}

func (s *AliCloudTableStore) BulkSet(reqs []state.SetRequest) error {
return s.batchWrite(reqs, nil)
func (s *AliCloudTableStore) BulkSet(ctx context.Context, reqs []state.SetRequest) error {
return s.batchWrite(ctx, reqs, nil)
}

func (s *AliCloudTableStore) BulkDelete(reqs []state.DeleteRequest) error {
return s.batchWrite(nil, reqs)
func (s *AliCloudTableStore) BulkDelete(ctx context.Context, reqs []state.DeleteRequest) error {
return s.batchWrite(ctx, nil, reqs)
}

func (s *AliCloudTableStore) batchWrite(setReqs []state.SetRequest, deleteReqs []state.DeleteRequest) error {
func (s *AliCloudTableStore) batchWrite(ctx context.Context, setReqs []state.SetRequest, deleteReqs []state.DeleteRequest) error {
bathReq := &tablestore.BatchWriteRowRequest{
IsAtomic: true,
}
Expand Down
17 changes: 9 additions & 8 deletions state/alicloud/tablestore/tablestore_test.go
Expand Up @@ -14,6 +14,7 @@ limitations under the License.
package tablestore

import (
"context"
"testing"

"github.com/agrea/ptr"
Expand Down Expand Up @@ -63,15 +64,15 @@ func TestReadAndWrite(t *testing.T) {
Value: "value of key",
ETag: ptr.String("the etag"),
}
err := store.Set(setReq)
err := store.Set(context.TODO(), setReq)
assert.Nil(t, err)
})

t.Run("test get 1", func(t *testing.T) {
getReq := &state.GetRequest{
Key: "theFirstKey",
}
resp, err := store.Get(getReq)
resp, err := store.Get(context.TODO(), getReq)
assert.Nil(t, err)
assert.NotNil(t, resp)
assert.Equal(t, "value of key", string(resp.Data))
Expand All @@ -83,22 +84,22 @@ func TestReadAndWrite(t *testing.T) {
Value: "1234",
ETag: ptr.String("the etag"),
}
err := store.Set(setReq)
err := store.Set(context.TODO(), setReq)
assert.Nil(t, err)
})

t.Run("test get 2", func(t *testing.T) {
getReq := &state.GetRequest{
Key: "theSecondKey",
}
resp, err := store.Get(getReq)
resp, err := store.Get(context.TODO(), getReq)
assert.Nil(t, err)
assert.NotNil(t, resp)
assert.Equal(t, "1234", string(resp.Data))
})

t.Run("test BulkSet", func(t *testing.T) {
err := store.BulkSet([]state.SetRequest{{
err := store.BulkSet(context.TODO(), []state.SetRequest{{
Key: "theFirstKey",
Value: "666",
}, {
Expand All @@ -110,7 +111,7 @@ func TestReadAndWrite(t *testing.T) {
})

t.Run("test BulkGet", func(t *testing.T) {
_, resp, err := store.BulkGet([]state.GetRequest{{
_, resp, err := store.BulkGet(context.TODO(), []state.GetRequest{{
Key: "theFirstKey",
}, {
Key: "theSecondKey",
Expand All @@ -126,12 +127,12 @@ func TestReadAndWrite(t *testing.T) {
req := &state.DeleteRequest{
Key: "theFirstKey",
}
err := store.Delete(req)
err := store.Delete(context.TODO(), req)
assert.Nil(t, err)
})

t.Run("test BulkGet2", func(t *testing.T) {
_, resp, err := store.BulkGet([]state.GetRequest{{
_, resp, err := store.BulkGet(context.TODO(), []state.GetRequest{{
Key: "theFirstKey",
}, {
Key: "theSecondKey",
Expand Down
27 changes: 14 additions & 13 deletions state/aws/dynamodb/dynamodb.go
Expand Up @@ -14,6 +14,7 @@ limitations under the License.
package dynamodb

import (
"context"
"crypto/rand"
"encoding/binary"
"encoding/json"
Expand Down Expand Up @@ -79,7 +80,7 @@ func (d *StateStore) Features() []state.Feature {
}

// Get retrieves a dynamoDB item.
func (d *StateStore) Get(req *state.GetRequest) (*state.GetResponse, error) {
func (d *StateStore) Get(ctx context.Context, req *state.GetRequest) (*state.GetResponse, error) {
input := &dynamodb.GetItemInput{
ConsistentRead: aws.Bool(req.Options.Consistency == state.Strong),
TableName: aws.String(d.table),
Expand All @@ -90,7 +91,7 @@ func (d *StateStore) Get(req *state.GetRequest) (*state.GetResponse, error) {
},
}

result, err := d.client.GetItem(input)
result, err := d.client.GetItemWithContext(ctx, input)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -133,13 +134,13 @@ func (d *StateStore) Get(req *state.GetRequest) (*state.GetResponse, error) {
}

// BulkGet performs a bulk get operations.
func (d *StateStore) BulkGet(req []state.GetRequest) (bool, []state.BulkGetResponse, error) {
func (d *StateStore) BulkGet(ctx context.Context, req []state.GetRequest) (bool, []state.BulkGetResponse, error) {
// TODO: replace with dynamodb.BatchGetItem for performance
return false, nil, nil
}

// Set saves a dynamoDB item.
func (d *StateStore) Set(req *state.SetRequest) error {
func (d *StateStore) Set(ctx context.Context, req *state.SetRequest) error {
item, err := d.getItemFromReq(req)
if err != nil {
return err
Expand All @@ -165,7 +166,7 @@ func (d *StateStore) Set(req *state.SetRequest) error {
input.ConditionExpression = &condExpr
}

_, err = d.client.PutItem(input)
_, err = d.client.PutItemWithContext(ctx, input)
if err != nil && haveEtag {
switch cErr := err.(type) {
case *dynamodb.ConditionalCheckFailedException:
Expand All @@ -177,11 +178,11 @@ func (d *StateStore) Set(req *state.SetRequest) error {
}

// BulkSet performs a bulk set operation.
func (d *StateStore) BulkSet(req []state.SetRequest) error {
func (d *StateStore) BulkSet(ctx context.Context, req []state.SetRequest) error {
writeRequests := []*dynamodb.WriteRequest{}

if len(req) == 1 {
return d.Set(&req[0])
return d.Set(ctx, &req[0])
}

for _, r := range req {
Expand Down Expand Up @@ -210,15 +211,15 @@ func (d *StateStore) BulkSet(req []state.SetRequest) error {
requestItems := map[string][]*dynamodb.WriteRequest{}
requestItems[d.table] = writeRequests

_, e := d.client.BatchWriteItem(&dynamodb.BatchWriteItemInput{
_, e := d.client.BatchWriteItemWithContext(ctx, &dynamodb.BatchWriteItemInput{
RequestItems: requestItems,
})

return e
}

// Delete performs a delete operation.
func (d *StateStore) Delete(req *state.DeleteRequest) error {
func (d *StateStore) Delete(ctx context.Context, req *state.DeleteRequest) error {
input := &dynamodb.DeleteItemInput{
Key: map[string]*dynamodb.AttributeValue{
"key": {
Expand All @@ -238,7 +239,7 @@ func (d *StateStore) Delete(req *state.DeleteRequest) error {
input.ExpressionAttributeValues = exprAttrValues
}

_, err := d.client.DeleteItem(input)
_, err := d.client.DeleteItemWithContext(ctx, input)
if err != nil {
switch cErr := err.(type) {
case *dynamodb.ConditionalCheckFailedException:
Expand All @@ -250,11 +251,11 @@ func (d *StateStore) Delete(req *state.DeleteRequest) error {
}

// BulkDelete performs a bulk delete operation.
func (d *StateStore) BulkDelete(req []state.DeleteRequest) error {
func (d *StateStore) BulkDelete(ctx context.Context, req []state.DeleteRequest) error {
writeRequests := []*dynamodb.WriteRequest{}

if len(req) == 1 {
return d.Delete(&req[0])
return d.Delete(ctx, &req[0])
}

for _, r := range req {
Expand All @@ -277,7 +278,7 @@ func (d *StateStore) BulkDelete(req []state.DeleteRequest) error {
requestItems := map[string][]*dynamodb.WriteRequest{}
requestItems[d.table] = writeRequests

_, e := d.client.BatchWriteItem(&dynamodb.BatchWriteItemInput{
_, e := d.client.BatchWriteItemWithContext(ctx, &dynamodb.BatchWriteItemInput{
RequestItems: requestItems,
})

Expand Down