From 91c39e123c0794277f5e20edc50f0671455109e5 Mon Sep 17 00:00:00 2001 From: Yevgeniy Firsov Date: Tue, 20 Sep 2022 11:40:06 -0700 Subject: [PATCH 1/2] feat: High level pub/sub APIs --- schema/schema.go | 26 ++++-- schema/schema_test.go | 63 +++++++++++--- tigris/client_test.go | 2 +- tigris/collection_test.go | 18 ++-- tigris/database.go | 23 ++++- tigris/topic.go | 84 +++++++++++++++++++ tigris/topic_test.go | 172 ++++++++++++++++++++++++++++++++++++++ tigris/types.go | 5 ++ 8 files changed, 365 insertions(+), 28 deletions(-) create mode 100644 tigris/topic.go create mode 100644 tigris/topic_test.go diff --git a/schema/schema.go b/schema/schema.go index 623017d..d1dbd00 100644 --- a/schema/schema.go +++ b/schema/schema.go @@ -52,6 +52,12 @@ const ( id = "ID" ) +// Collection types +const ( + Documents = "documents" // Regular collection containing documents + Messages = "messages" // Publish/subscribe collection containing messages +) + // Model represents types supported as collection models type Model interface{} @@ -106,6 +112,8 @@ type Schema struct { Desc string `json:"description,omitempty"` Fields map[string]*Field `json:"properties,omitempty"` PrimaryKey []string `json:"primary_key,omitempty"` + + CollectionType string `json:"collection_type,omitempty"` } // DatabaseModelName returns name of the database derived from the given database model @@ -356,7 +364,7 @@ func traverseFields(prefix string, t reflect.Type, fields map[string]*Field, pk return nil } -func fromCollectionModel(model interface{}) (*Schema, error) { +func fromCollectionModel(model interface{}, typ string) (*Schema, error) { t := reflect.TypeOf(model) if t.Kind() == reflect.Ptr { t = t.Elem() @@ -392,7 +400,7 @@ func fromCollectionModel(model interface{}) (*Schema, error) { } // No explicit primary key defined and no tigris.Model embedded - if len(pk) == 0 { + if len(pk) == 0 && typ == Documents { var p *Field var ok bool name := id @@ -411,22 +419,28 @@ func fromCollectionModel(model interface{}) (*Schema, error) { sch.PrimaryKey = append(sch.PrimaryKey, name) } + if len(pk) != 0 && typ == Messages { + return nil, fmt.Errorf("primary key should not be defined for `messages` collection type") + } + + sch.CollectionType = typ + return &sch, nil } // FromCollectionModels converts provided model(s) to the schema structure -func FromCollectionModels(model Model, models ...Model) (map[string]*Schema, error) { +func FromCollectionModels(tp string, model Model, models ...Model) (map[string]*Schema, error) { //models parameter added to require at least one schema to migrate schemas := make(map[string]*Schema) - schema, err := fromCollectionModel(model) + schema, err := fromCollectionModel(model, tp) if err != nil { return nil, err } schemas[schema.Name] = schema for _, m := range models { - schema, err := fromCollectionModel(m) + schema, err := fromCollectionModel(m, tp) if err != nil { return nil, err } @@ -472,7 +486,7 @@ func FromDatabaseModel(dbModel interface{}) (string, map[string]*Schema, error) tt = field.Type.Elem() } - sch, err := fromCollectionModel(reflect.New(tt).Elem().Interface()) + sch, err := fromCollectionModel(reflect.New(tt).Elem().Interface(), Documents) if err != nil { return "", nil, err } diff --git a/schema/schema_test.go b/schema/schema_test.go index 335dcae..acdd604 100644 --- a/schema/schema_test.go +++ b/schema/schema_test.go @@ -345,16 +345,23 @@ func TestCollectionSchema(t *testing.T) { for _, c := range cases { t.Run(reflect.TypeOf(c.input).Name(), func(t *testing.T) { - schema, err := fromCollectionModel(c.input) + schema, err := fromCollectionModel(c.input, Documents) assert.Equal(t, c.err, err) + if schema != nil { + assert.Equal(t, Documents, schema.CollectionType) + c.output.CollectionType = Documents + } assert.Equal(t, c.output, schema) }) } t.Run("build", func(t *testing.T) { - s, err := fromCollectionModel(allTypes{}) + s, err := fromCollectionModel(allTypes{}, Documents) require.NoError(t, err) + assert.Equal(t, Documents, s.CollectionType) + s.CollectionType = "" + b, err := s.Build() require.NoError(t, err) @@ -362,15 +369,17 @@ func TestCollectionSchema(t *testing.T) { }) t.Run("multiple_models", func(t *testing.T) { - s, err := FromCollectionModels(pk{}, pk1{}) + s, err := FromCollectionModels(Documents, pk{}, pk1{}) require.NoError(t, err) - assert.Equal(t, map[string]*Schema{"pks": {Name: "pks", Fields: map[string]*Field{"key_1": {Type: typeString}}, PrimaryKey: []string{"key_1"}}, - "pk_1": {Name: "pk_1", Fields: map[string]*Field{"key_1": {Type: typeString}}, PrimaryKey: []string{"key_1"}}}, s) + assert.Equal(t, map[string]*Schema{ + "pks": {Name: "pks", Fields: map[string]*Field{"key_1": {Type: typeString}}, PrimaryKey: []string{"key_1"}, CollectionType: Documents}, + "pk_1": {Name: "pk_1", Fields: map[string]*Field{"key_1": {Type: typeString}}, PrimaryKey: []string{"key_1"}, CollectionType: Documents}, + }, s) }) t.Run("duplicate_pk_index", func(t *testing.T) { - _, err := FromCollectionModels(pkDup{}) + _, err := FromCollectionModels(Documents, pkDup{}) if err.Error() == "duplicate primary key index 1 set for key_1 and key_2" { require.Equal(t, err, fmt.Errorf("duplicate primary key index 1 set for key_1 and key_2")) } else { @@ -408,11 +417,11 @@ func TestDatabaseSchema(t *testing.T) { _ = Db4{1} - coll1 := Schema{Name: "Coll1", Fields: map[string]*Field{"Key1": {Type: "integer"}}, PrimaryKey: []string{"Key1"}} - c1 := Schema{Name: "c1", Fields: map[string]*Field{"Key1": {Type: "integer"}}, PrimaryKey: []string{"Key1"}} - c2 := Schema{Name: "c2", Fields: map[string]*Field{"Key2": {Type: "integer"}}, PrimaryKey: []string{"Key2"}} - c3 := Schema{Name: "c3", Fields: map[string]*Field{"Key2": {Type: "integer"}}, PrimaryKey: []string{"Key2"}} - c4 := Schema{Name: "coll_4", Fields: map[string]*Field{"Key2": {Type: "integer"}}, PrimaryKey: []string{"Key2"}} + coll1 := Schema{Name: "Coll1", Fields: map[string]*Field{"Key1": {Type: "integer"}}, PrimaryKey: []string{"Key1"}, CollectionType: Documents} + c1 := Schema{Name: "c1", Fields: map[string]*Field{"Key1": {Type: "integer"}}, PrimaryKey: []string{"Key1"}, CollectionType: Documents} + c2 := Schema{Name: "c2", Fields: map[string]*Field{"Key2": {Type: "integer"}}, PrimaryKey: []string{"Key2"}, CollectionType: Documents} + c3 := Schema{Name: "c3", Fields: map[string]*Field{"Key2": {Type: "integer"}}, PrimaryKey: []string{"Key2"}, CollectionType: Documents} + c4 := Schema{Name: "coll_4", Fields: map[string]*Field{"Key2": {Type: "integer"}}, PrimaryKey: []string{"Key2"}, CollectionType: Documents} var i int64 @@ -436,3 +445,35 @@ func TestDatabaseSchema(t *testing.T) { }) } } + +func TestMessagesSchema(t *testing.T) { + type valid struct { + Key1 string `json:"key_1"` + } + + type invalid struct { + Key1 string `json:"key_1" tigris:"primary_key:1"` + } + + cases := []struct { + input interface{} + output *Schema + err error + }{ + {valid{}, &Schema{Name: "valids", Fields: map[string]*Field{ + "key_1": {Type: typeString}}, PrimaryKey: []string{}}, nil}, + {invalid{}, nil, fmt.Errorf("primary key should not be defined for `messages` collection type")}, + } + + for _, c := range cases { + t.Run(reflect.TypeOf(c.input).Name(), func(t *testing.T) { + schema, err := fromCollectionModel(c.input, Messages) + assert.Equal(t, c.err, err) + if schema != nil { + assert.Equal(t, Messages, schema.CollectionType) + c.output.CollectionType = Messages + } + assert.Equal(t, c.output, schema) + }) + } +} diff --git a/tigris/client_test.go b/tigris/client_test.go index d116760..d98c61f 100644 --- a/tigris/client_test.go +++ b/tigris/client_test.go @@ -45,7 +45,7 @@ func TestClient(t *testing.T) { mc.EXPECT().CreateOrUpdateCollection(gomock.Any(), pm(&api.CreateOrUpdateCollectionRequest{ Db: "db1", Collection: "coll_1", - Schema: []byte(`{"title":"coll_1","properties":{"Key1":{"type":"string"}},"primary_key":["Key1"]}`), + Schema: []byte(`{"title":"coll_1","properties":{"Key1":{"type":"string"}},"primary_key":["Key1"],"collection_type":"documents"}`), Options: &api.CollectionOptions{}, })).Do(func(ctx context.Context, r *api.CreateOrUpdateCollectionRequest) { }).Return(&api.CreateOrUpdateCollectionResponse{}, nil) diff --git a/tigris/collection_test.go b/tigris/collection_test.go index 6955471..89730cb 100644 --- a/tigris/collection_test.go +++ b/tigris/collection_test.go @@ -163,8 +163,8 @@ func TestCollectionBasic(t *testing.T) { m.EXPECT().BeginTx(gomock.Any(), "db1").Return(mtx, nil) - mtx.EXPECT().CreateOrUpdateCollection(gomock.Any(), "coll_1", jm(t, `{"title":"coll_1","properties":{"Key1":{"type":"string"},"Field1":{"type":"integer"}},"primary_key":["Key1"]}`)) - mtx.EXPECT().CreateOrUpdateCollection(gomock.Any(), "coll_2", jm(t, `{"title":"coll_2","properties":{"Key1":{"type":"string"},"Field1":{"type":"integer"}},"primary_key":["Key1"]}`)) + mtx.EXPECT().CreateOrUpdateCollection(gomock.Any(), "coll_1", jm(t, `{"title":"coll_1","properties":{"Key1":{"type":"string"},"Field1":{"type":"integer"}},"primary_key":["Key1"],"collection_type":"documents"}`)) + mtx.EXPECT().CreateOrUpdateCollection(gomock.Any(), "coll_2", jm(t, `{"title":"coll_2","properties":{"Key1":{"type":"string"},"Field1":{"type":"integer"}},"primary_key":["Key1"],"collection_type":"documents"}`)) mtx.EXPECT().Commit(ctx) mtx.EXPECT().Rollback(ctx) @@ -284,7 +284,7 @@ func TestCollection_Search(t *testing.T) { m.EXPECT().BeginTx(gomock.Any(), "db1").Return(mtx, nil) - mtx.EXPECT().CreateOrUpdateCollection(gomock.Any(), "coll_1", jm(t, `{"title":"coll_1","properties":{"Key1":{"type":"string"},"Field1":{"type":"integer"}},"primary_key":["Key1"]}`)) + mtx.EXPECT().CreateOrUpdateCollection(gomock.Any(), "coll_1", jm(t, `{"title":"coll_1","properties":{"Key1":{"type":"string"},"Field1":{"type":"integer"}},"primary_key":["Key1"],"collection_type":"documents"}`)) mtx.EXPECT().Commit(ctx) mtx.EXPECT().Rollback(ctx) @@ -571,7 +571,7 @@ func TestClientSchemaMigration(t *testing.T) { mc.EXPECT().CreateOrUpdateCollection(gomock.Any(), pm(&api.CreateOrUpdateCollectionRequest{ Db: "db1", Collection: "test_schema_1", - Schema: []byte(`{"title":"test_schema_1","properties":{"key_1":{"type":"string"}},"primary_key":["key_1"]}`), + Schema: []byte(`{"title":"test_schema_1","properties":{"key_1":{"type":"string"}},"primary_key":["key_1"],"collection_type":"documents"}`), Options: &api.CollectionOptions{}, })).DoAndReturn( func(ctx context.Context, r *api.CreateOrUpdateCollectionRequest) (*api.CreateOrUpdateCollectionResponse, error) { @@ -606,7 +606,7 @@ func TestClientSchemaMigration(t *testing.T) { mc.EXPECT().CreateOrUpdateCollection(gomock.Any(), pm(&api.CreateOrUpdateCollectionRequest{ Db: "db1", Collection: "test_schema_2", - Schema: []byte(`{"title":"test_schema_2","properties":{"key_2":{"type":"string"}},"primary_key":["key_2"]}`), + Schema: []byte(`{"title":"test_schema_2","properties":{"key_2":{"type":"string"}},"primary_key":["key_2"],"collection_type":"documents"}`), Options: &api.CollectionOptions{}, })).DoAndReturn( func(ctx context.Context, _ *api.CreateOrUpdateCollectionRequest) (*api.CreateOrUpdateCollectionResponse, error) { @@ -617,7 +617,7 @@ func TestClientSchemaMigration(t *testing.T) { mc.EXPECT().CreateOrUpdateCollection(gomock.Any(), pm(&api.CreateOrUpdateCollectionRequest{ Db: "db1", Collection: "test_schema_1", - Schema: []byte(`{"title":"test_schema_1","properties":{"key_1":{"type":"string"}},"primary_key":["key_1"]}`), + Schema: []byte(`{"title":"test_schema_1","properties":{"key_1":{"type":"string"}},"primary_key":["key_1"],"collection_type":"documents"}`), Options: &api.CollectionOptions{}, })).DoAndReturn( func(ctx context.Context, _ *api.CreateOrUpdateCollectionRequest) (*api.CreateOrUpdateCollectionResponse, error) { @@ -638,13 +638,13 @@ func TestClientSchemaMigration(t *testing.T) { require.NoError(t, err) var m map[string]string - _, err = schema.FromCollectionModels(&m) + _, err = schema.FromCollectionModels(schema.Documents, &m) require.Error(t, err) _, _, err = schema.FromDatabaseModel(&m) require.Error(t, err) var i int - _, err = schema.FromCollectionModels(&i) + _, err = schema.FromCollectionModels(schema.Documents, &i) require.Error(t, err) _, _, err = schema.FromDatabaseModel(&i) require.Error(t, err) @@ -668,7 +668,7 @@ func TestClientSchemaMigration(t *testing.T) { mc.EXPECT().CreateOrUpdateCollection(gomock.Any(), pm(&api.CreateOrUpdateCollectionRequest{ Db: "db1", Collection: "coll_1", - Schema: []byte(`{"title":"coll_1","properties":{"Key1":{"type":"string"}},"primary_key":["Key1"]}`), + Schema: []byte(`{"title":"coll_1","properties":{"Key1":{"type":"string"}},"primary_key":["Key1"],"collection_type":"documents"}`), Options: &api.CollectionOptions{}, })).Do(func(ctx context.Context, r *api.CreateOrUpdateCollectionRequest) { }).Return(&api.CreateOrUpdateCollectionResponse{}, nil) diff --git a/tigris/database.go b/tigris/database.go index aca7f19..136b2ed 100644 --- a/tigris/database.go +++ b/tigris/database.go @@ -49,7 +49,17 @@ func newDatabase(name string, driver driver.Driver) *Database { // This method is only needed if collections need to be created dynamically, // all static collections are created by OpenDatabase func (db *Database) CreateCollections(ctx context.Context, model schema.Model, models ...schema.Model) error { - schemas, err := schema.FromCollectionModels(model, models...) + schemas, err := schema.FromCollectionModels(schema.Documents, model, models...) + if err != nil { + return fmt.Errorf("error parsing model schema: %w", err) + } + + return db.createCollectionsFromSchemas(ctx, db.name, schemas) +} + +// CreateTopics creates message type collections +func (db *Database) CreateTopics(ctx context.Context, model schema.Model, models ...schema.Model) error { + schemas, err := schema.FromCollectionModels(schema.Messages, model, models...) if err != nil { return fmt.Errorf("error parsing model schema: %w", err) } @@ -160,3 +170,14 @@ func GetCollection[T schema.Model](db *Database) *Collection[T] { func getNamedCollection[T schema.Model](db *Database, name string) *Collection[T] { return &Collection[T]{name: name, db: db.driver.UseDatabase(db.name)} } + +// GetTopic returns topic object corresponding to topic model T +func GetTopic[T schema.Model](db *Database) *Topic[T] { + var m T + name := schema.ModelName(&m) + return getNamedTopic[T](db, name) +} + +func getNamedTopic[T schema.Model](db *Database, name string) *Topic[T] { + return &Topic[T]{name: name, db: db.driver.UseDatabase(db.name)} +} diff --git a/tigris/topic.go b/tigris/topic.go new file mode 100644 index 0000000..1de959b --- /dev/null +++ b/tigris/topic.go @@ -0,0 +1,84 @@ +// Copyright 2022 Tigris Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tigris + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/tigrisdata/tigris-client-go/driver" + "github.com/tigrisdata/tigris-client-go/filter" + "github.com/tigrisdata/tigris-client-go/schema" +) + +// Topic provides an interface for publishing and subscribing for the messages. +type Topic[T schema.Model] struct { + name string + db driver.Database +} + +// Drop drops the topic +func (c *Topic[T]) Drop(ctx context.Context) error { + return getDB(ctx, c.db).DropCollection(ctx, c.name) +} + +// Publish publishes messages to the topic +func (c *Topic[T]) Publish(ctx context.Context, docs ...*T) (*PublishResponse, error) { + var err error + + bdocs := make([]driver.Message, len(docs)) + for k, v := range docs { + if bdocs[k], err = json.Marshal(v); err != nil { + return nil, err + } + } + + md, err := getDB(ctx, c.db).Publish(ctx, c.name, bdocs) + if err != nil { + return nil, err + } + + if md == nil { + return &PublishResponse{}, nil + } + + if len(md.Keys) > 0 && len(md.Keys) != len(docs) { + return nil, fmt.Errorf("broken response. number of published messages is not the same as number of provided messages") + } + + for k, v := range md.Keys { + if err := populateModelMetadata(docs[k], md.Metadata, v); err != nil { + return nil, err + } + } + + return &PublishResponse{Keys: md.Keys}, nil +} + +// Subscribe returns published messages from the topic +func (c *Topic[T]) Subscribe(ctx context.Context, filter filter.Filter) (*Iterator[T], error) { + f, err := filter.Build() + if err != nil { + return nil, err + } + + it, err := getDB(ctx, c.db).Subscribe(ctx, c.name, f) + if err != nil { + return nil, err + } + + return &Iterator[T]{Iterator: it}, nil +} diff --git a/tigris/topic_test.go b/tigris/topic_test.go new file mode 100644 index 0000000..1b4ee68 --- /dev/null +++ b/tigris/topic_test.go @@ -0,0 +1,172 @@ +// Copyright 2022 Tigris Data, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package tigris + +import ( + "context" + "encoding/json" + "fmt" + "testing" + "time" + + "github.com/golang/mock/gomock" + "github.com/stretchr/testify/require" + "github.com/tigrisdata/tigris-client-go/config" + "github.com/tigrisdata/tigris-client-go/driver" + "github.com/tigrisdata/tigris-client-go/filter" + "github.com/tigrisdata/tigris-client-go/mock" +) + +func toMessage(t *testing.T, doc interface{}) driver.Message { + b, err := json.Marshal(doc) + require.NoError(t, err) + return b +} + +func TestTopicBasic(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + ctrl := gomock.NewController(t) + m := mock.NewMockDriver(ctrl) + + m.EXPECT().CreateDatabase(gomock.Any(), "db1") + + type Coll1 struct { + Key1 string `tigris:"primary_key"` + Field1 int64 + } + + type Coll2 struct { + Key1 string + Field1 int64 + } + + mdb := mock.NewMockDatabase(ctrl) + mtx := mock.NewMockTx(ctrl) + + m.EXPECT().BeginTx(gomock.Any(), "db1").Return(mtx, nil) + + mtx.EXPECT().CreateOrUpdateCollection(gomock.Any(), "coll_1", jm(t, `{"title":"coll_1","properties":{"Key1":{"type":"string"},"Field1":{"type":"integer"}},"primary_key":["Key1"],"collection_type":"documents"}`)) + mtx.EXPECT().Commit(ctx) + mtx.EXPECT().Rollback(ctx) + + db, err := openDatabaseFromModels(ctx, m, &config.Database{}, "db1", &Coll1{}) + require.NoError(t, err) + + m.EXPECT().BeginTx(gomock.Any(), "db1").Return(mtx, nil) + + mtx.EXPECT().CreateOrUpdateCollection(gomock.Any(), "coll_2", jm(t, `{"title":"coll_2","properties":{"Key1":{"type":"string"},"Field1":{"type":"integer"}},"collection_type":"messages"}`)) + mtx.EXPECT().Commit(ctx) + mtx.EXPECT().Rollback(ctx) + + err = db.CreateTopics(ctx, &Coll2{}) + require.NoError(t, err) + + m.EXPECT().UseDatabase("db1").Return(mdb) + + c := GetTopic[Coll2](db) + + d1 := &Coll2{Key1: "aaa", Field1: 123} + d2 := &Coll2{Key1: "bbb", Field1: 123} + + mdb.EXPECT().Publish(ctx, "coll_2", []driver.Message{toMessage(t, d1), toMessage(t, d2)}) + + _, err = c.Publish(ctx, d1, d2) + require.NoError(t, err) + + mit := mock.NewMockIterator(ctrl) + + mdb.EXPECT().Subscribe(ctx, "coll_2", + driver.Filter(`{"$or":[{"Key1":{"$eq":"aaa"}},{"Key1":{"$eq":"ccc"}}]}`), + ).Return(mit, nil) + + it, err := c.Subscribe(ctx, filter.Or( + filter.Eq("Key1", "aaa"), + filter.Eq("Key1", "ccc")), + ) + require.NoError(t, err) + + var d Coll2 + var dd driver.Document + + mit.EXPECT().Next(&dd).SetArg(0, toDocument(t, d1)).Return(true) + mit.EXPECT().Next(&dd).Return(false) + mit.EXPECT().Err().Return(nil) + + for it.Next(&d) { + require.Equal(t, *d1, d) + } + + require.NoError(t, it.Err()) + + mit.EXPECT().Close() + it.Close() + +} + +func TestTopicNegative(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + ctrl := gomock.NewController(t) + m := mock.NewMockDriver(ctrl) + mdb := mock.NewMockDatabase(ctrl) + mit := mock.NewMockIterator(ctrl) + + m.EXPECT().UseDatabase("db1").Return(mdb) + + db := newDatabase("db1", m) + + type Coll1 struct { + Key1 string `tigris:"primary_key"` + } + + c := GetTopic[Coll1](db) + + // Iterator error + var dd driver.Document + mdb.EXPECT().Subscribe(ctx, "coll_1", driver.Filter(nil)).Return(mit, nil) + mit.EXPECT().Next(&dd).Return(false) + mit.EXPECT().Err().Return(fmt.Errorf("error0")) + mit.EXPECT().Err().Return(fmt.Errorf("error0")) + + mit.EXPECT().Close() + + it, err := c.Subscribe(ctx, nil) + require.NoError(t, err) + + var d Coll1 + require.False(t, it.Next(&d)) + require.Error(t, it.Err()) + + mit.EXPECT().Err().Return(fmt.Errorf("error1")) + mit.EXPECT().Err().Return(fmt.Errorf("error1")) + + require.Error(t, it.Err()) + + it.err = fmt.Errorf("error2") + require.False(t, it.Next(nil)) + + it.Close() + + type Coll2 struct { + Key1 string `tigris:"primary_key"` + Field1 int64 + } + + err = db.CreateTopics(ctx, &Coll2{}) + require.Error(t, err) +} diff --git a/tigris/types.go b/tigris/types.go index 87953d3..86844d7 100644 --- a/tigris/types.go +++ b/tigris/types.go @@ -40,6 +40,11 @@ type DeleteResponse struct{} // Returned by Update-documents collection API. type UpdateResponse struct{} +type PublishResponse struct { + // JSON documents, which contain autogenerated fields + Keys [][]byte +} + // Error contains Tigris server error type Error driver.Error From 9caad1cc8fa1c6efdf951807c9f57a184c98bfd8 Mon Sep 17 00:00:00 2001 From: Adil Ansari Date: Wed, 21 Sep 2022 21:36:40 -0700 Subject: [PATCH 2/2] fix: rename appId to clientId (#155) * fix: rename appId to clientId * fix: using clientID instead of clientId --- config/config.go | 10 +++++----- driver/driver.go | 2 +- driver/driver_test.go | 2 +- driver/grpc.go | 8 ++++---- driver/http.go | 14 +++++++------- driver/internal.go | 12 ++++++------ driver/management.go | 4 ++-- driver/management_test.go | 16 ++++++++-------- 8 files changed, 34 insertions(+), 34 deletions(-) diff --git a/config/config.go b/config/config.go index 3e4f038..017d2c3 100644 --- a/config/config.go +++ b/config/config.go @@ -19,11 +19,11 @@ import "crypto/tls" // Driver contains connection and transport configuration type Driver struct { - TLS *tls.Config `json:"tls,omitempty"` - ApplicationId string `json:"application_id,omitempty"` - ApplicationSecret string `json:"application_secret,omitempty"` - Token string `json:"token,omitempty"` - URL string `json:"url,omitempty"` + TLS *tls.Config `json:"tls,omitempty"` + ClientID string `json:"client_id,omitempty"` + ClientSecret string `json:"client_secret,omitempty"` + Token string `json:"token,omitempty"` + URL string `json:"url,omitempty"` } // Database contains database and connection config diff --git a/driver/driver.go b/driver/driver.go index d3f50a0..6f5bfc5 100644 --- a/driver/driver.go +++ b/driver/driver.go @@ -285,7 +285,7 @@ func initConfig(cfg *config.Driver) *config.Driver { cfg = &config.Driver{} } - if cfg.TLS == nil && (cfg.ApplicationId != "" || cfg.ApplicationSecret != "" || cfg.Token != "") { + if cfg.TLS == nil && (cfg.ClientID != "" || cfg.ClientSecret != "" || cfg.Token != "") { cfg.TLS = &tls.Config{MinVersion: tls.VersionTLS12} } diff --git a/driver/driver_test.go b/driver/driver_test.go index 3ae2fd7..0ef6f71 100644 --- a/driver/driver_test.go +++ b/driver/driver_test.go @@ -1009,7 +1009,7 @@ func TestNewDriver(t *testing.T) { _, err = NewDriver(context.Background(), nil) require.Error(t, err) - cfg1 := &config.Driver{URL: test.GRPCURL(4), ApplicationSecret: "aaaa"} + cfg1 := &config.Driver{URL: test.GRPCURL(4), ClientSecret: "aaaa"} cfg1 = initConfig(cfg1) require.NotNil(t, cfg1.TLS) } diff --git a/driver/grpc.go b/driver/grpc.go index bd3fbd7..2f7d450 100644 --- a/driver/grpc.go +++ b/driver/grpc.go @@ -464,7 +464,7 @@ func (c *grpcDriver) ListApplications(ctx context.Context) ([]*Application, erro return applications, nil } -func (c *grpcDriver) RotateApplicationSecret(ctx context.Context, id string) (*Application, error) { +func (c *grpcDriver) RotateClientSecret(ctx context.Context, id string) (*Application, error) { r, err := c.mgmt.RotateApplicationSecret(ctx, &api.RotateApplicationSecretRequest{Id: id}) if err != nil { return nil, GRPCError(err) @@ -477,7 +477,7 @@ func (c *grpcDriver) RotateApplicationSecret(ctx context.Context, id string) (*A return (*Application)(r.Application), nil } -func (c *grpcDriver) GetAccessToken(ctx context.Context, applicationID string, applicationSecret string, refreshToken string) (*TokenResponse, error) { +func (c *grpcDriver) GetAccessToken(ctx context.Context, clientId string, clientSecret string, refreshToken string) (*TokenResponse, error) { tp := api.GrantType_CLIENT_CREDENTIALS if refreshToken != "" { tp = api.GrantType_REFRESH_TOKEN @@ -486,8 +486,8 @@ func (c *grpcDriver) GetAccessToken(ctx context.Context, applicationID string, a r, err := c.auth.GetAccessToken(ctx, &api.GetAccessTokenRequest{ GrantType: tp, RefreshToken: refreshToken, - ClientId: applicationID, - ClientSecret: applicationSecret, + ClientId: clientId, + ClientSecret: clientSecret, }) if err != nil { return nil, GRPCError(err) diff --git a/driver/http.go b/driver/http.go index 7a797e7..0ba8009 100644 --- a/driver/http.go +++ b/driver/http.go @@ -718,7 +718,7 @@ func (c *httpDriver) ListApplications(ctx context.Context) ([]*Application, erro return apps.Applications, nil } -func (c *httpDriver) RotateApplicationSecret(ctx context.Context, id string) (*Application, error) { +func (c *httpDriver) RotateClientSecret(ctx context.Context, id string) (*Application, error) { resp, err := c.api.ManagementRotateApplicationSecret(ctx, apiHTTP.ManagementRotateApplicationSecretJSONBody{Id: &id}) if err := HTTPError(err, resp); err != nil { return nil, err @@ -735,21 +735,21 @@ func (c *httpDriver) RotateApplicationSecret(ctx context.Context, id string) (*A return &app.Application, nil } -func (c *httpDriver) GetAccessToken(ctx context.Context, applicationID string, applicationSecret string, refreshToken string) (*TokenResponse, error) { - return getAccessToken(ctx, c.tokenURL, c.cfg, applicationID, applicationSecret, refreshToken) +func (c *httpDriver) GetAccessToken(ctx context.Context, clientId string, clientSecret string, refreshToken string) (*TokenResponse, error) { + return getAccessToken(ctx, c.tokenURL, c.cfg, clientId, clientSecret, refreshToken) } -func getAccessToken(ctx context.Context, tokenURL string, cfg *config.Driver, applicationID string, applicationSecret string, refreshToken string) (*TokenResponse, error) { +func getAccessToken(ctx context.Context, tokenURL string, cfg *config.Driver, clientId string, clientSecret string, refreshToken string) (*TokenResponse, error) { data := url.Values{ - "client_id": {applicationID}, - "client_secret": {applicationSecret}, + "client_id": {clientId}, + "client_secret": {clientSecret}, "grant_type": {grantTypeClientCredentials}, "scope": {scope}, } if refreshToken != "" { data = url.Values{ "refresh_token": {refreshToken}, - "client_id": {applicationID}, + "client_id": {clientId}, "grant_type": {grantTypeRefreshToken}, "scope": {scope}, } diff --git a/driver/internal.go b/driver/internal.go index 2f686b0..46d2180 100644 --- a/driver/internal.go +++ b/driver/internal.go @@ -60,14 +60,14 @@ type txWithOptions interface { //func configAuth(config *config.Driver) (*clientcredentials.Config, context.Context) { func configAuth(config *config.Driver) (oauth2.TokenSource, *http.Client, string) { - appID := config.ApplicationId + clientId := config.ClientID if os.Getenv(ApplicationID) != "" { - appID = os.Getenv(ApplicationID) + clientId = os.Getenv(ApplicationID) } - appSecret := config.ApplicationSecret + clientSecret := config.ClientSecret if os.Getenv(ApplicationSecret) != "" { - appSecret = os.Getenv(ApplicationSecret) + clientSecret = os.Getenv(ApplicationSecret) } token := config.Token @@ -101,8 +101,8 @@ func configAuth(config *config.Driver) (oauth2.TokenSource, *http.Client, string ocfg1 := &oauth2.Config{Endpoint: oauth2.Endpoint{TokenURL: tokenURL}} client = ocfg1.Client(ctxClient, t) ts = ocfg1.TokenSource(ctxClient, t) - } else if appID != "" || appSecret != "" { - oCfg := &clientcredentials.Config{TokenURL: tokenURL, ClientID: appID, ClientSecret: appSecret, AuthStyle: oauth2.AuthStyleInParams} + } else if clientId != "" || clientSecret != "" { + oCfg := &clientcredentials.Config{TokenURL: tokenURL, ClientID: clientId, ClientSecret: clientSecret, AuthStyle: oauth2.AuthStyleInParams} client = oCfg.Client(ctxClient) ts = oCfg.TokenSource(ctxClient) } diff --git a/driver/management.go b/driver/management.go index 5579c01..0828c77 100644 --- a/driver/management.go +++ b/driver/management.go @@ -13,8 +13,8 @@ type Management interface { DeleteApplication(ctx context.Context, id string) error UpdateApplication(ctx context.Context, id string, name string, description string) (*Application, error) ListApplications(ctx context.Context) ([]*Application, error) - RotateApplicationSecret(ctx context.Context, id string) (*Application, error) - GetAccessToken(ctx context.Context, applicationID string, applicationSecret string, refreshToken string) (*TokenResponse, error) + RotateClientSecret(ctx context.Context, id string) (*Application, error) + GetAccessToken(ctx context.Context, clientId string, clientSecret string, refreshToken string) (*TokenResponse, error) Close() error } diff --git a/driver/management_test.go b/driver/management_test.go index 1124c76..171164f 100644 --- a/driver/management_test.go +++ b/driver/management_test.go @@ -52,7 +52,7 @@ func testDriverAuthNegative(t *testing.T, c Management, mc *mock.MockManagementS pm(&api.RotateApplicationSecretRequest{Id: "ras_id1"})).Return( &api.RotateApplicationSecretResponse{Application: nil}, nil) - _, err = c.RotateApplicationSecret(ctx, "ras_id1") + _, err = c.RotateClientSecret(ctx, "ras_id1") require.Error(t, err) } @@ -117,7 +117,7 @@ func testDriverAuth(t *testing.T, c Management, mc *mock.MockManagementServer, m pm(&api.RotateApplicationSecretRequest{Id: "ras_id1"})).Return( &api.RotateApplicationSecretResponse{Application: lapp2}, nil) - app, err = c.RotateApplicationSecret(ctx, "ras_id1") + app, err = c.RotateClientSecret(ctx, "ras_id1") require.NoError(t, err) appEqual(t, lapp2, app) @@ -208,9 +208,9 @@ func TestGRPCDriverCredentials(t *testing.T) { t.Run("config", func(t *testing.T) { TokenURLOverride = testTokenURLOverride client, _, mockServers, cancel := SetupMgmtGRPCTests(t, &config.Driver{ - URL: test.GRPCURL(0), - ApplicationId: "client_id_test", - ApplicationSecret: "client_secret_test", + URL: test.GRPCURL(0), + ClientID: "client_id_test", + ClientSecret: "client_secret_test", }) defer cancel() @@ -249,9 +249,9 @@ func TestHTTPDriverCredentials(t *testing.T) { t.Run("config", func(t *testing.T) { TokenURLOverride = "" client, _, mockServers, cancel := SetupMgmtHTTPTests(t, &config.Driver{ - URL: test.HTTPURL(2), - ApplicationId: "client_id_test", - ApplicationSecret: "client_secret_test", + URL: test.HTTPURL(2), + ClientID: "client_id_test", + ClientSecret: "client_secret_test", }) defer cancel()