Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client side in-memory joins #320

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 3 additions & 2 deletions driver/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ type streamReader interface {

type readIterator struct {
streamReader
eof bool
err error
eof bool
}

func (i *readIterator) Next(d *Document) bool {
Expand Down Expand Up @@ -87,8 +87,8 @@ type searchStreamReader interface {

type searchResultIterator struct {
searchStreamReader
eof bool
err error
eof bool
}

func (i *searchResultIterator) Next(r *SearchResponse) bool {
Expand All @@ -100,6 +100,7 @@ func (i *searchResultIterator) Next(r *SearchResponse) bool {
if errors.Is(err, io.EOF) {
i.eof = true
_ = i.close()

return false
}

Expand Down
12 changes: 6 additions & 6 deletions driver/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,17 @@ func (matcher *JSONMatcher) String() string {
func (*JSONMatcher) Got(actual any) string {
switch t := actual.(type) {
case string:
return fmt.Sprintf("JSONMatcher[string]: %v", t)
return fmt.Sprintf(" JSONMatcher: %v [string]", t)
case []byte:
return fmt.Sprintf("JSONMatcher[byte]: %v", string(t))
return fmt.Sprintf(" JSONMatcher: %v [byte]", string(t))
case Filter:
return fmt.Sprintf("JSONMatcher[Filter]: %v", string(t))
return fmt.Sprintf(" JSONMatcher: %v [Filter]", string(t))
case Projection:
return fmt.Sprintf("JSONMatcher[Projection]: %v", string(t))
return fmt.Sprintf(" JSONMatcher: %v [Projection]", string(t))
case Schema:
return fmt.Sprintf("JSONMatcher[Schema]: %v", string(t))
return fmt.Sprintf(" JSONMatcher: %v [Schema]", string(t))
default:
return fmt.Sprintf("JSONMatcher[%v]: %v", reflect.TypeOf(t), t)
return fmt.Sprintf(" JSONMatcher: %v [%v]", reflect.TypeOf(t), t)
}
}

Expand Down
9 changes: 9 additions & 0 deletions filter/eq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,26 @@ import (
)

func TestEq(t *testing.T) {
s := "test"
i := 123
cases := []struct {
name string
expr Expr
exp string
}{
{"ptr", Eq("f", &s), `{"f":{"$eq":"test"}}`},
{"ptrInt", Eq("f", &i), `{"f":{"$eq":123}}`},
{"null", Eq("f", (*string)(nil)), `{"f":{"$eq":null}}`},
{"nullInt", Eq("f", (*int)(nil)), `{"f":{"$eq":null}}`},
{"int0", EqInt("f", 0), `{"f":{"$eq":0}}`},
{"intNeg", EqInt("f", -1), `{"f":{"$eq":-1}}`},
{"int", EqInt("f", 12345), `{"f":{"$eq":12345}}`},
{"int32", EqInt32("f", 12345), `{"f":{"$eq":12345}}`},
{"int64", EqInt64("f", 123456789012), `{"f":{"$eq":123456789012}}`},
{"float32", EqFloat32("f", 12345.67), `{"f":{"$eq":12345.67}}`},
{"float64", EqFloat64("f", 123456789012.34), `{"f":{"$eq":123456789012.34}}`},
{"string", EqString("f", "1234"), `{"f":{"$eq":"1234"}}`},
{"string0", EqString("f", ""), `{"f":{"$eq":""}}`},
{"bytes", EqBytes("f", []byte("123")), `{"f":{"$eq":"MTIz"}}`},
{"time", EqTime("f", time.Time{}), `{"f":{"$eq":"0001-01-01T00:00:00Z"}}`},
{
Expand Down
3 changes: 1 addition & 2 deletions filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package filter
import (
jsoniter "github.com/json-iterator/go"
"github.com/tigrisdata/tigris-client-go/driver"
"github.com/tigrisdata/tigris-client-go/schema"
)

type (
Expand Down Expand Up @@ -73,7 +72,7 @@ func Not(op Expr) Expr {

// Eq composes 'equal' operation.
// Result is equivalent to: field == value.
func Eq[T schema.PrimitiveFieldType](field string, value T) Expr {
func Eq(field string, value any) Expr {
return Expr{field: comparison{Eq: value}}
}

Expand Down
3 changes: 3 additions & 0 deletions filter/gt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,15 @@ func TestGt(t *testing.T) {
expr Expr
exp string
}{
{"int", GtInt("f", 0), `{"f":{"$gt":0}}`},
{"int", GtInt("f", -1), `{"f":{"$gt":-1}}`},
{"int", GtInt("f", 12345), `{"f":{"$gt":12345}}`},
{"int32", GtInt32("f", 12345), `{"f":{"$gt":12345}}`},
{"int64", GtInt64("f", 123456789012), `{"f":{"$gt":123456789012}}`},
{"float32", GtFloat32("f", 12345.67), `{"f":{"$gt":12345.67}}`},
{"float64", GtFloat64("f", 123456789012.34), `{"f":{"$gt":123456789012.34}}`},
{"string", GtString("f", "1234"), `{"f":{"$gt":"1234"}}`},
{"string0", GtString("f", ""), `{"f":{"$gt":""}}`},
{"bytes", GtBytes("f", []byte("123")), `{"f":{"$gt":"MTIz"}}`},
{"time", GtTime("f", time.Time{}), `{"f":{"$gt":"0001-01-01T00:00:00Z"}}`},
{
Expand Down
3 changes: 3 additions & 0 deletions filter/gte_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,15 @@ func TestGte(t *testing.T) {
expr Expr
exp string
}{
{"int0", GteInt("f", 0), `{"f":{"$gte":0}}`},
{"intNeg", GteInt32("f", -1), `{"f":{"$gte":-1}}`},
{"int", GteInt("f", 12345), `{"f":{"$gte":12345}}`},
{"int32", GteInt32("f", 12345), `{"f":{"$gte":12345}}`},
{"int64", GteInt64("f", 123456789012), `{"f":{"$gte":123456789012}}`},
{"float32", GteFloat32("f", 12345.67), `{"f":{"$gte":12345.67}}`},
{"float64", GteFloat64("f", 123456789012.34), `{"f":{"$gte":123456789012.34}}`},
{"string", GteString("f", "1234"), `{"f":{"$gte":"1234"}}`},
{"string0", GteString("f", ""), `{"f":{"$gte":""}}`},
{"bytes", GteBytes("f", []byte("123")), `{"f":{"$gte":"MTIz"}}`},
{"time", GteTime("f", time.Time{}), `{"f":{"$gte":"0001-01-01T00:00:00Z"}}`},
{
Expand Down
3 changes: 3 additions & 0 deletions filter/lt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,15 @@ func TestLt(t *testing.T) {
expr Expr
exp string
}{
{"int0", LtInt("f", 0), `{"f":{"$lt":0}}`},
{"intNeg", LtInt32("f", -1), `{"f":{"$lt":-1}}`},
{"int", LtInt("f", 12345), `{"f":{"$lt":12345}}`},
{"int32", LtInt32("f", 12345), `{"f":{"$lt":12345}}`},
{"int64", LtInt64("f", 123456789012), `{"f":{"$lt":123456789012}}`},
{"float32", LtFloat32("f", 12345.67), `{"f":{"$lt":12345.67}}`},
{"float64", LtFloat64("f", 123456789012.34), `{"f":{"$lt":123456789012.34}}`},
{"string", LtString("f", "1234"), `{"f":{"$lt":"1234"}}`},
{"string0", LtString("f", ""), `{"f":{"$lt":""}}`},
{"bytes", LtBytes("f", []byte("123")), `{"f":{"$lt":"MTIz"}}`},
{"time", LtTime("f", time.Time{}), `{"f":{"$lt":"0001-01-01T00:00:00Z"}}`},
{
Expand Down
3 changes: 3 additions & 0 deletions filter/lte_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,15 @@ func TestLte(t *testing.T) {
expr Expr
exp string
}{
{"int0", LteInt("f", 0), `{"f":{"$lte":0}}`},
{"intNeg", LteInt32("f", -1), `{"f":{"$lte":-1}}`},
{"int", LteInt("f", 12345), `{"f":{"$lte":12345}}`},
{"int32", LteInt32("f", 12345), `{"f":{"$lte":12345}}`},
{"int64", LteInt64("f", 123456789012), `{"f":{"$lte":123456789012}}`},
{"float32", LteFloat32("f", 12345.67), `{"f":{"$lte":12345.67}}`},
{"float64", LteFloat64("f", 123456789012.34), `{"f":{"$lte":123456789012.34}}`},
{"string", LteString("f", "1234"), `{"f":{"$lte":"1234"}}`},
{"string0", LteString("f", ""), `{"f":{"$lte":""}}`},
{"bytes", LteBytes("f", []byte("123")), `{"f":{"$lte":"MTIz"}}`},
{"time", LteTime("f", time.Time{}), `{"f":{"$lte":"0001-01-01T00:00:00Z"}}`},
{
Expand Down
7 changes: 6 additions & 1 deletion schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,12 @@ type PrimitiveFieldType interface {
int | int32 | int64 |
float32 | float64 |
[]byte | time.Time | uuid.UUID |
bool
bool |
*string |
*int | *int32 | *int64 |
*float32 | *float64 |
*[]byte | *time.Time | *uuid.UUID |
*bool
}

var plural *pluralize.Client
Expand Down
56 changes: 36 additions & 20 deletions tigris/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,38 @@ func (c *Collection[T]) Read(ctx context.Context, filter filter.Filter, fields .
return &Iterator[T]{Iterator: it}, nil
}

func setReadOptions(options *ReadOptions) (*driver.ReadOptions, error) {
opt := driver.ReadOptions{}

if options != nil {
var sortOrderbytes []byte

if options.Sort != nil {
var (
sortOrder driver.SortOrder
err error
)

if sortOrder, err = options.Sort.Built(); err != nil {
return nil, err
}

sortOrderbytes, err = jsoniter.Marshal(sortOrder)
if err != nil {
return nil, err
}
}

opt.Limit = options.Limit
opt.Skip = options.Skip
opt.Offset = options.Offset
opt.Collation = (*api.Collation)(options.Collation)
opt.Sort = sortOrderbytes
}

return &opt, nil
}

// ReadWithOptions returns specific fields of the documents according to the filter.
// It allows further configure returned documents by providing options:
//
Expand All @@ -224,29 +256,13 @@ func (c *Collection[T]) ReadWithOptions(ctx context.Context, filter filter.Filte
if err != nil {
return nil, err
}
if options == nil {
return nil, fmt.Errorf("API expecting options but received null")
}

var sortOrderbytes []byte
if options.Sort != nil {
sortOrder, err := options.Sort.Built()
if err != nil {
return nil, err
}
sortOrderbytes, err = jsoniter.Marshal(sortOrder)
if err != nil {
return nil, err
}
opt, err := setReadOptions(options)
if err != nil {
return nil, err
}

it, err := getDB(ctx, c.db).Read(ctx, c.name, f, p, &driver.ReadOptions{
Limit: options.Limit,
Skip: options.Skip,
Offset: options.Offset,
Collation: (*api.Collation)(options.Collation),
Sort: sortOrderbytes,
})
it, err := getDB(ctx, c.db).Read(ctx, c.name, f, p, opt)
if err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions tigris/collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,7 @@ func TestCollection(t *testing.T) {
Sort: []byte(`[{"Key1":"$asc"},{"Key2":"$desc"}]`),
},
).Return(mit, nil)

_, err := c.ReadWithOptions(ctx, filter.All,
fields.All,
&ReadOptions{
Expand Down
39 changes: 39 additions & 0 deletions tigris/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"os"

"github.com/google/uuid"
"github.com/tigrisdata/tigris-client-go/code"
"github.com/tigrisdata/tigris-client-go/filter"
"github.com/tigrisdata/tigris-client-go/search"
Expand Down Expand Up @@ -269,3 +270,41 @@ func ExampleIterator_Array() {
fmt.Fprintf(os.Stderr, "doc %v = %+v\n", k, v)
}
}

func ExampleGetJoin() {
ctx := context.TODO()

type User struct {
ID uuid.UUID
Name string
}

type Order struct {
ID uuid.UUID
UserID uuid.UUID
Price float64
}

db := tigris.MustOpenDatabase(ctx, &tigris.Config{Project: "db1"}, &User{}, &Order{})

join := tigris.GetJoin[User, Order](db, "ID", "UserID")

it, err := join.Read(ctx, filter.All) // return all users
if err != nil {
panic(err)
}

var (
user User
orders []*Order
)

// iterate the users with corresponding orders
for it.Next(&user, &orders) {
fmt.Printf("User: %s", user.Name)

for _, o := range orders {
fmt.Printf("Order: %f", o.Price)
}
}
}