Skip to content

Commit

Permalink
feat: Client side in-memory joins
Browse files Browse the repository at this point in the history
API:

First, join object need to be created, which specifies collections and
join condition:
```
join := tigris.GetJoin[LeftCollModel, RightCollModel](db, "{left field}", "{right field}", [options])
```
This creates a join between LeftCollModel and RightCollModel on
equality of `{left field}` to `{right field}`.

Created object then can be used to issue one or multiple read requests:

```
it, err := join.Read(ctx, filter.Eq("Field1", 1))
```

filter condition of read API is applied to the left table.
Iterator then returns of the rows matching the condition along with the
corresponding rows from the right table, which satisfies the join
condition.

var l LeftCollModel
var r []*RightCollModel

for it.Next(&l, &r) {
  fmt.Printf("l=%v r=%v\n", l, r)
}

By default the documents which doesn't have matching documents in the right
table returned in the results. These results can be skipped by providing
`&JoinOptions{Type: tigris.InnerJoin}` option to GetJoin API.

It is not required for the left field values or right field values to be unique.

The value of the array fields are matched as is by default, by using
`&JoinOptions{ArrayUnroll: true}` option individual array items can be
matched in the right table.

Implementation details:

First request is issued to the left table with filter provided to Read API.
Result is read into memory and request is prepared for the right table.
Which will have the following filter `filter.Or(filter.Eq("{right field}", {left field value fetched by left query}), ...)`.

Result from the first query is put in the map with {left field} value as the key,
while reading the result from second query we append it to the corresponding map bucket.

So as merge is done in the memory joins should be used for relatively small
result sets only.
  • Loading branch information
efirs committed Jun 6, 2023
1 parent d1a4583 commit ff7f73f
Show file tree
Hide file tree
Showing 15 changed files with 1,341 additions and 31 deletions.
5 changes: 3 additions & 2 deletions driver/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ type streamReader interface {

type readIterator struct {
streamReader
eof bool
err error
eof bool
}

func (i *readIterator) Next(d *Document) bool {
Expand Down Expand Up @@ -87,8 +87,8 @@ type searchStreamReader interface {

type searchResultIterator struct {
searchStreamReader
eof bool
err error
eof bool
}

func (i *searchResultIterator) Next(r *SearchResponse) bool {
Expand All @@ -100,6 +100,7 @@ func (i *searchResultIterator) Next(r *SearchResponse) bool {
if errors.Is(err, io.EOF) {
i.eof = true
_ = i.close()

return false
}

Expand Down
12 changes: 6 additions & 6 deletions driver/test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,17 @@ func (matcher *JSONMatcher) String() string {
func (*JSONMatcher) Got(actual any) string {
switch t := actual.(type) {
case string:
return fmt.Sprintf("JSONMatcher[string]: %v", t)
return fmt.Sprintf(" JSONMatcher: %v [string]", t)
case []byte:
return fmt.Sprintf("JSONMatcher[byte]: %v", string(t))
return fmt.Sprintf(" JSONMatcher: %v [byte]", string(t))
case Filter:
return fmt.Sprintf("JSONMatcher[Filter]: %v", string(t))
return fmt.Sprintf(" JSONMatcher: %v [Filter]", string(t))
case Projection:
return fmt.Sprintf("JSONMatcher[Projection]: %v", string(t))
return fmt.Sprintf(" JSONMatcher: %v [Projection]", string(t))
case Schema:
return fmt.Sprintf("JSONMatcher[Schema]: %v", string(t))
return fmt.Sprintf(" JSONMatcher: %v [Schema]", string(t))
default:
return fmt.Sprintf("JSONMatcher[%v]: %v", reflect.TypeOf(t), t)
return fmt.Sprintf(" JSONMatcher: %v [%v]", reflect.TypeOf(t), t)
}
}

Expand Down
9 changes: 9 additions & 0 deletions filter/eq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,26 @@ import (
)

func TestEq(t *testing.T) {
s := "test"
i := 123
cases := []struct {
name string
expr Expr
exp string
}{
{"ptr", Eq("f", &s), `{"f":{"$eq":"test"}}`},
{"ptrInt", Eq("f", &i), `{"f":{"$eq":123}}`},
{"null", Eq("f", (*string)(nil)), `{"f":{"$eq":null}}`},
{"nullInt", Eq("f", (*int)(nil)), `{"f":{"$eq":null}}`},
{"int0", EqInt("f", 0), `{"f":{"$eq":0}}`},
{"intNeg", EqInt("f", -1), `{"f":{"$eq":-1}}`},
{"int", EqInt("f", 12345), `{"f":{"$eq":12345}}`},
{"int32", EqInt32("f", 12345), `{"f":{"$eq":12345}}`},
{"int64", EqInt64("f", 123456789012), `{"f":{"$eq":123456789012}}`},
{"float32", EqFloat32("f", 12345.67), `{"f":{"$eq":12345.67}}`},
{"float64", EqFloat64("f", 123456789012.34), `{"f":{"$eq":123456789012.34}}`},
{"string", EqString("f", "1234"), `{"f":{"$eq":"1234"}}`},
{"string0", EqString("f", ""), `{"f":{"$eq":""}}`},
{"bytes", EqBytes("f", []byte("123")), `{"f":{"$eq":"MTIz"}}`},
{"time", EqTime("f", time.Time{}), `{"f":{"$eq":"0001-01-01T00:00:00Z"}}`},
{
Expand Down
3 changes: 1 addition & 2 deletions filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ package filter
import (
jsoniter "github.com/json-iterator/go"
"github.com/tigrisdata/tigris-client-go/driver"
"github.com/tigrisdata/tigris-client-go/schema"
)

type (
Expand Down Expand Up @@ -73,7 +72,7 @@ func Not(op Expr) Expr {

// Eq composes 'equal' operation.
// Result is equivalent to: field == value.
func Eq[T schema.PrimitiveFieldType](field string, value T) Expr {
func Eq(field string, value any) Expr {
return Expr{field: comparison{Eq: value}}
}

Expand Down
3 changes: 3 additions & 0 deletions filter/gt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,15 @@ func TestGt(t *testing.T) {
expr Expr
exp string
}{
{"int", GtInt("f", 0), `{"f":{"$gt":0}}`},
{"int", GtInt("f", -1), `{"f":{"$gt":-1}}`},
{"int", GtInt("f", 12345), `{"f":{"$gt":12345}}`},
{"int32", GtInt32("f", 12345), `{"f":{"$gt":12345}}`},
{"int64", GtInt64("f", 123456789012), `{"f":{"$gt":123456789012}}`},
{"float32", GtFloat32("f", 12345.67), `{"f":{"$gt":12345.67}}`},
{"float64", GtFloat64("f", 123456789012.34), `{"f":{"$gt":123456789012.34}}`},
{"string", GtString("f", "1234"), `{"f":{"$gt":"1234"}}`},
{"string0", GtString("f", ""), `{"f":{"$gt":""}}`},
{"bytes", GtBytes("f", []byte("123")), `{"f":{"$gt":"MTIz"}}`},
{"time", GtTime("f", time.Time{}), `{"f":{"$gt":"0001-01-01T00:00:00Z"}}`},
{
Expand Down
3 changes: 3 additions & 0 deletions filter/gte_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,15 @@ func TestGte(t *testing.T) {
expr Expr
exp string
}{
{"int0", GteInt("f", 0), `{"f":{"$gte":0}}`},
{"intNeg", GteInt32("f", -1), `{"f":{"$gte":-1}}`},
{"int", GteInt("f", 12345), `{"f":{"$gte":12345}}`},
{"int32", GteInt32("f", 12345), `{"f":{"$gte":12345}}`},
{"int64", GteInt64("f", 123456789012), `{"f":{"$gte":123456789012}}`},
{"float32", GteFloat32("f", 12345.67), `{"f":{"$gte":12345.67}}`},
{"float64", GteFloat64("f", 123456789012.34), `{"f":{"$gte":123456789012.34}}`},
{"string", GteString("f", "1234"), `{"f":{"$gte":"1234"}}`},
{"string0", GteString("f", ""), `{"f":{"$gte":""}}`},
{"bytes", GteBytes("f", []byte("123")), `{"f":{"$gte":"MTIz"}}`},
{"time", GteTime("f", time.Time{}), `{"f":{"$gte":"0001-01-01T00:00:00Z"}}`},
{
Expand Down
3 changes: 3 additions & 0 deletions filter/lt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,15 @@ func TestLt(t *testing.T) {
expr Expr
exp string
}{
{"int0", LtInt("f", 0), `{"f":{"$lt":0}}`},
{"intNeg", LtInt32("f", -1), `{"f":{"$lt":-1}}`},
{"int", LtInt("f", 12345), `{"f":{"$lt":12345}}`},
{"int32", LtInt32("f", 12345), `{"f":{"$lt":12345}}`},
{"int64", LtInt64("f", 123456789012), `{"f":{"$lt":123456789012}}`},
{"float32", LtFloat32("f", 12345.67), `{"f":{"$lt":12345.67}}`},
{"float64", LtFloat64("f", 123456789012.34), `{"f":{"$lt":123456789012.34}}`},
{"string", LtString("f", "1234"), `{"f":{"$lt":"1234"}}`},
{"string0", LtString("f", ""), `{"f":{"$lt":""}}`},
{"bytes", LtBytes("f", []byte("123")), `{"f":{"$lt":"MTIz"}}`},
{"time", LtTime("f", time.Time{}), `{"f":{"$lt":"0001-01-01T00:00:00Z"}}`},
{
Expand Down
3 changes: 3 additions & 0 deletions filter/lte_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,15 @@ func TestLte(t *testing.T) {
expr Expr
exp string
}{
{"int0", LteInt("f", 0), `{"f":{"$lte":0}}`},
{"intNeg", LteInt32("f", -1), `{"f":{"$lte":-1}}`},
{"int", LteInt("f", 12345), `{"f":{"$lte":12345}}`},
{"int32", LteInt32("f", 12345), `{"f":{"$lte":12345}}`},
{"int64", LteInt64("f", 123456789012), `{"f":{"$lte":123456789012}}`},
{"float32", LteFloat32("f", 12345.67), `{"f":{"$lte":12345.67}}`},
{"float64", LteFloat64("f", 123456789012.34), `{"f":{"$lte":123456789012.34}}`},
{"string", LteString("f", "1234"), `{"f":{"$lte":"1234"}}`},
{"string0", LteString("f", ""), `{"f":{"$lte":""}}`},
{"bytes", LteBytes("f", []byte("123")), `{"f":{"$lte":"MTIz"}}`},
{"time", LteTime("f", time.Time{}), `{"f":{"$lte":"0001-01-01T00:00:00Z"}}`},
{
Expand Down
7 changes: 6 additions & 1 deletion schema/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,12 @@ type PrimitiveFieldType interface {
int | int32 | int64 |
float32 | float64 |
[]byte | time.Time | uuid.UUID |
bool
bool |
*string |
*int | *int32 | *int64 |
*float32 | *float64 |
*[]byte | *time.Time | *uuid.UUID |
*bool
}

var plural *pluralize.Client
Expand Down
56 changes: 36 additions & 20 deletions tigris/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,38 @@ func (c *Collection[T]) Read(ctx context.Context, filter filter.Filter, fields .
return &Iterator[T]{Iterator: it}, nil
}

func setReadOptions(options *ReadOptions) (*driver.ReadOptions, error) {
opt := driver.ReadOptions{}

if options != nil {
var sortOrderbytes []byte

if options.Sort != nil {
var (
sortOrder driver.SortOrder
err error
)

if sortOrder, err = options.Sort.Built(); err != nil {
return nil, err
}

sortOrderbytes, err = jsoniter.Marshal(sortOrder)
if err != nil {
return nil, err
}
}

opt.Limit = options.Limit
opt.Skip = options.Skip
opt.Offset = options.Offset
opt.Collation = (*api.Collation)(options.Collation)
opt.Sort = sortOrderbytes
}

return &opt, nil
}

// ReadWithOptions returns specific fields of the documents according to the filter.
// It allows further configure returned documents by providing options:
//
Expand All @@ -224,29 +256,13 @@ func (c *Collection[T]) ReadWithOptions(ctx context.Context, filter filter.Filte
if err != nil {
return nil, err
}
if options == nil {
return nil, fmt.Errorf("API expecting options but received null")
}

var sortOrderbytes []byte
if options.Sort != nil {
sortOrder, err := options.Sort.Built()
if err != nil {
return nil, err
}
sortOrderbytes, err = jsoniter.Marshal(sortOrder)
if err != nil {
return nil, err
}
opt, err := setReadOptions(options)
if err != nil {
return nil, err
}

it, err := getDB(ctx, c.db).Read(ctx, c.name, f, p, &driver.ReadOptions{
Limit: options.Limit,
Skip: options.Skip,
Offset: options.Offset,
Collation: (*api.Collation)(options.Collation),
Sort: sortOrderbytes,
})
it, err := getDB(ctx, c.db).Read(ctx, c.name, f, p, opt)
if err != nil {
return nil, err
}
Expand Down
1 change: 1 addition & 0 deletions tigris/collection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,7 @@ func TestCollection(t *testing.T) {
Sort: []byte(`[{"Key1":"$asc"},{"Key2":"$desc"}]`),
},
).Return(mit, nil)

_, err := c.ReadWithOptions(ctx, filter.All,
fields.All,
&ReadOptions{
Expand Down
39 changes: 39 additions & 0 deletions tigris/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"os"

"github.com/google/uuid"
"github.com/tigrisdata/tigris-client-go/code"
"github.com/tigrisdata/tigris-client-go/filter"
"github.com/tigrisdata/tigris-client-go/search"
Expand Down Expand Up @@ -269,3 +270,41 @@ func ExampleIterator_Array() {
fmt.Fprintf(os.Stderr, "doc %v = %+v\n", k, v)
}
}

func ExampleGetJoin() {
ctx := context.TODO()

type User struct {
ID uuid.UUID
Name string
}

type Order struct {
ID uuid.UUID
UserID uuid.UUID
Price float64
}

db := tigris.MustOpenDatabase(ctx, &tigris.Config{Project: "db1"}, &User{}, &Order{})

join := tigris.GetJoin[User, Order](db, "ID", "UserID")

it, err := join.Read(ctx, filter.All) // return all users
if err != nil {
panic(err)
}

var (
user User
orders []*Order
)

// iterate the users with corresponding orders
for it.Next(&user, &orders) {
fmt.Printf("User: %s", user.Name)

for _, o := range orders {
fmt.Printf("Order: %f", o.Price)
}
}
}

0 comments on commit ff7f73f

Please sign in to comment.