Skip to content

Commit

Permalink
feat: Client side in memory joins
Browse files Browse the repository at this point in the history
To simplify explanation the proposal below uses this specific example models:
```
type User struct {
  ID uuid.UUID `json:"id"`
  Name string `json:"name"`
}

type Order struct {
  ID uuid.UUID
  UserID uuid.UUID `json:"user_id"` // references User's ID field
  Quantity int
}
```

The API would allow to join two tables based on field equality:
```
userOrders, err := tigris.GetJoin[User, Order](db, "id", "user_id")
```
Now results can be read by:
```
it, err := userOrders.Read(ctx, filter.Eq("name"))
```
where filter is applied to the left table.

Results could be iterated as:
```
var user User
var orders []*Order

for it.Next(&user, &orders) {
  fmt.Printf("user=%v orders=%v\n", user, orders)
}
```

Implementation details:

First request is issued to the left table with filter provided to Read API.
Result is read into memory and request is prepared for the right table.
Which will have the following filter `filter.Or(filter.Eq("user_id", {value of id field}), ....)`.

Result from the first query is put in the map with ID as the key, while reading the result from second query we append it to the corresponding map bucket.

So as merge is done in the memory, this approach is not effective for very large results.
  • Loading branch information
efirs committed May 22, 2023
1 parent 0262afd commit 2c30323
Show file tree
Hide file tree
Showing 6 changed files with 921 additions and 4 deletions.
5 changes: 3 additions & 2 deletions driver/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ type streamReader interface {

type readIterator struct {
streamReader
eof bool
err error
eof bool
}

func (i *readIterator) Next(d *Document) bool {
Expand Down Expand Up @@ -87,8 +87,8 @@ type searchStreamReader interface {

type searchResultIterator struct {
searchStreamReader
eof bool
err error
eof bool
}

func (i *searchResultIterator) Next(r *SearchResponse) bool {
Expand All @@ -100,6 +100,7 @@ func (i *searchResultIterator) Next(r *SearchResponse) bool {
if errors.Is(err, io.EOF) {
i.eof = true
_ = i.close()

return false
}

Expand Down
3 changes: 1 addition & 2 deletions filter/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"encoding/json"

"github.com/tigrisdata/tigris-client-go/driver"
"github.com/tigrisdata/tigris-client-go/schema"
)

type (
Expand Down Expand Up @@ -74,7 +73,7 @@ func Not(op Expr) Expr {

// Eq composes 'equal' operation.
// Result is equivalent to: field == value.
func Eq[T schema.PrimitiveFieldType](field string, value T) Expr {
func Eq(field string, value any) Expr {
return Expr{field: comparison{Eq: value}}
}

Expand Down
39 changes: 39 additions & 0 deletions tigris/example_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"os"

"github.com/google/uuid"
"github.com/tigrisdata/tigris-client-go/code"
"github.com/tigrisdata/tigris-client-go/filter"
"github.com/tigrisdata/tigris-client-go/search"
Expand Down Expand Up @@ -269,3 +270,41 @@ func ExampleIterator_Array() {
fmt.Fprintf(os.Stderr, "doc %v = %+v\n", k, v)
}
}

func ExampleGetJoin() {
ctx := context.TODO()

type User struct {
ID uuid.UUID
Name string
}

type Order struct {
ID uuid.UUID
UserID uuid.UUID
Price float64
}

db := tigris.MustOpenDatabase(ctx, &tigris.Config{Project: "db1"}, &User{}, &Order{})

join := tigris.GetJoin[User, Order](db, "ID", "UserID")

it, err := join.Read(ctx, filter.All) // return all users
if err != nil {
panic(err)
}

var (
user User
orders []*Order
)

// iterate the users with corresponding orders
for it.Next(&user, &orders) {
fmt.Printf("User: %s", user.Name)

for _, o := range orders {
fmt.Printf("Order: %f", o.Price)
}
}
}
87 changes: 87 additions & 0 deletions tigris/iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"encoding/json"

"github.com/tigrisdata/tigris-client-go/driver"
"github.com/tigrisdata/tigris-client-go/schema"
"github.com/tigrisdata/tigris-client-go/search"
)

Expand Down Expand Up @@ -137,6 +138,7 @@ func (it *SearchIterator[T]) Next(res *search.Result[T]) bool {
if err := res.From(r); err != nil {
it.err = err
it.Close()

return false
}

Expand All @@ -149,10 +151,95 @@ func (it *SearchIterator[T]) Err() error {
if it.Iterator.Err() != nil {
return it.Iterator.Err()
}

return it.err
}

// Close closes Iterator stream.
func (it *SearchIterator[T]) Close() {
it.Iterator.Close()
}

// JoinIterator is used to iterate documents
// returned by streaming APIs.
type JoinIterator[P schema.Model, C schema.Model] struct {
err error

resMap map[string]*JoinResult[P, C]
sortedKeys []string
ptr int
options *JoinOptions
}

// Next populates 'doc' with the next document in the iteration order
// Returns false at the end of the stream or in the case of error.
func (it *JoinIterator[P, C]) Next(parent *P, child *[]*C) bool {
for it.ptr < len(it.sortedKeys) {
it.ptr++
k := it.sortedKeys[it.ptr-1]

if it.options == nil || it.options.Type != InnerJoin || it.resMap[k].Child != nil {
*parent = *it.resMap[k].Parent
*child = it.resMap[k].Child

return true
}
}

return false
}

// Iterate calls provided function for every document in the result.
// It's ia convenience to avoid common mistakes of not closing the
// iterator and not checking the error from the iterator.
func (it *JoinIterator[P, C]) Iterate(fn func(parent *P, child []*C) error) error {
defer it.Close()

var (
p P
c []*C
)

for it.Next(&p, &c) {
if err := fn(&p, c); err != nil {
return err
}
}

return it.Err()
}

type JoinResult[P schema.Model, C schema.Model] struct {
Parent *P
Child []*C
}

// Array returns result of iteration as an array of documents.
func (it *JoinIterator[P, C]) Array() ([]JoinResult[P, C], error) {
defer it.Close()

var (
child []*C
p P
arr []JoinResult[P, C]
)

for it.Next(&p, &child) {
arr = append(arr, JoinResult[P, C]{Parent: &p, Child: child})
}

if it.Err() != nil {
return nil, it.Err()
}

return arr, nil
}

// Err returns nil if iteration was successful,
// otherwise return error details.
func (it *JoinIterator[P, C]) Err() error {
return it.err
}

func (*JoinIterator[P, C]) Close() {
}

0 comments on commit 2c30323

Please sign in to comment.