Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(firestore): adds snapshot reads impl. #6718

Merged
merged 29 commits into from Oct 17, 2022
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
018a3a6
feat(firestore): adds snapshot reads impl.
Sep 21, 2022
575c782
Merge branch 'main' into firestore-snapshot
Sep 21, 2022
21dc006
Merge branch 'main' into firestore-snapshot
Sep 21, 2022
571ec3e
feat: better comments, one unit test
Sep 21, 2022
b4053c6
more unit tests
Sep 22, 2022
1b5faa1
Merge branch 'main' into firestore-snapshot
Sep 22, 2022
8c60de4
Merge branch 'main' into firestore-snapshot
Sep 23, 2022
23e0363
Merge branch 'main' into firestore-snapshot
Sep 23, 2022
6387e46
Merge branch 'main' into firestore-snapshot
Oct 7, 2022
5c71f40
per reviewer
Oct 7, 2022
cb4cfbc
Merge branch 'main' into firestore-snapshot
Oct 7, 2022
ddc115d
linter
Oct 7, 2022
21eb732
Merge branch 'main' into firestore-snapshot
Oct 7, 2022
16fedba
test failures
Oct 7, 2022
239d5c7
more test fixes
Oct 7, 2022
7e6af29
hopefully fixes tests :/
Oct 10, 2022
7215f56
more test fixes?
Oct 10, 2022
a50a427
Merge branch 'main' into firestore-snapshot
Oct 10, 2022
3350870
Merge branch 'main' into firestore-snapshot
Oct 10, 2022
78ec8eb
fix again
Oct 10, 2022
9b003ef
Merge branch 'main' into firestore-snapshot
Oct 10, 2022
0c87ef8
Merge branch 'main' into firestore-snapshot
Oct 11, 2022
3f42c29
Merge branch 'main' into firestore-snapshot
Oct 12, 2022
10805a7
per reviewer
Oct 12, 2022
49e9880
Merge branch 'main' into firestore-snapshot
Oct 12, 2022
bd5adbb
per reviewer
Oct 12, 2022
59e529e
per reviewer
Oct 12, 2022
4d566dc
Merge branch 'main' into firestore-snapshot
Oct 17, 2022
9600e78
fix
Oct 17, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
78 changes: 69 additions & 9 deletions firestore/client.go
Expand Up @@ -36,6 +36,7 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
)

// resourcePrefixHeader is the name of the metadata header used to indicate
Expand All @@ -53,9 +54,10 @@ const DetectProjectID = "*detect-project-id*"

// A Client provides access to the Firestore service.
type Client struct {
c *vkit.Client
projectID string
databaseID string // A client is tied to a single database.
c *vkit.Client
projectID string
databaseID string // A client is tied to a single database.
readSettings *readSettings // readSettings allows setting a snapshot time to read the database
}

// NewClient creates a new Firestore client that uses the given project.
Expand Down Expand Up @@ -94,9 +96,10 @@ func NewClient(ctx context.Context, projectID string, opts ...option.ClientOptio
}
vc.SetGoogleClientInfo("gccl", internal.Version)
c := &Client{
c: vc,
projectID: projectID,
databaseID: "(default)", // always "(default)", for now
c: vc,
projectID: projectID,
databaseID: "(default)", // always "(default)", for now
readSettings: &readSettings{},
}
return c, nil
}
Expand Down Expand Up @@ -199,10 +202,10 @@ func (c *Client) GetAll(ctx context.Context, docRefs []*DocumentRef) (_ []*Docum
ctx = trace.StartSpan(ctx, "cloud.google.com/go/firestore.GetAll")
defer func() { trace.EndSpan(ctx, err) }()

return c.getAll(ctx, docRefs, nil)
return c.getAll(ctx, docRefs, nil, nil)
}

func (c *Client) getAll(ctx context.Context, docRefs []*DocumentRef, tid []byte) (_ []*DocumentSnapshot, err error) {
func (c *Client) getAll(ctx context.Context, docRefs []*DocumentRef, tid []byte, rs *readSettings) (_ []*DocumentSnapshot, err error) {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/firestore.Client.BatchGetDocuments")
defer func() { trace.EndSpan(ctx, err) }()

Expand All @@ -219,9 +222,18 @@ func (c *Client) getAll(ctx context.Context, docRefs []*DocumentRef, tid []byte)
Database: c.path(),
Documents: docNames,
}

// Note that transaction ID and other consistency selectors are mutually exclusive.
// We respect the transaction first, any read options passed by the caller second,
// and any read options stored in the client third.
if rt, hasOpts := parseReadTime(c, rs); hasOpts {
req.ConsistencySelector = &pb.BatchGetDocumentsRequest_ReadTime{ReadTime: rt}
}

if tid != nil {
req.ConsistencySelector = &pb.BatchGetDocumentsRequest_Transaction{tid}
req.ConsistencySelector = &pb.BatchGetDocumentsRequest_Transaction{Transaction: tid}
}

streamClient, err := c.c.BatchGetDocuments(withResourceHeader(ctx, req.Database), req)
if err != nil {
return nil, err
Expand Down Expand Up @@ -306,6 +318,18 @@ func (c *Client) BulkWriter(ctx context.Context) *BulkWriter {
return bw
}

// WithReadOptions specifies constraints for accessing documents from the database,
// e.g. at what time snapshot to read the documents.
func (c *Client) WithReadOptions(opts ...ReadOption) *Client {
for _, ro := range opts {
switch r := ro.(type) {
telpirion marked this conversation as resolved.
Show resolved Hide resolved
case readTime:
r.apply(c.readSettings)
}
}
return c
}

// commit calls the Commit RPC outside of a transaction.
func (c *Client) commit(ctx context.Context, ws []*pb.Write) (_ []*WriteResult, err error) {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/firestore.Client.commit")
Expand Down Expand Up @@ -381,3 +405,39 @@ func (ec emulatorCreds) GetRequestMetadata(ctx context.Context, uri ...string) (
func (ec emulatorCreds) RequireTransportSecurity() bool {
return false
}

// ReadTime specifies a time-specific snapshot of the database to read.
func ReadTime(t time.Time) ReadOption {
var rt readTime
rt.Time = t
return rt
telpirion marked this conversation as resolved.
Show resolved Hide resolved
}

type readTime struct {
telpirion marked this conversation as resolved.
Show resolved Hide resolved
time.Time
}

func (rt readTime) apply(rs *readSettings) {
rs.readTime = rt.Time
}

// ReadOption interface allows for abstraction of computing read time settings.
type ReadOption interface {
apply(*readSettings)
}

// readSettings contains the ReadOptions for a read operation
type readSettings struct {
readTime time.Time
}

// parseReadTime ensures that fallback order of read options is respected.
// First, if a ReadOption is set on the readOptionable
telpirion marked this conversation as resolved.
Show resolved Hide resolved
func parseReadTime(c *Client, rs *readSettings) (*timestamppb.Timestamp, bool) {
if rs != nil && !rs.readTime.IsZero() {
return &timestamppb.Timestamp{Seconds: int64(rs.readTime.Unix())}, true
} else if c.readSettings != nil && !c.readSettings.readTime.IsZero() {
telpirion marked this conversation as resolved.
Show resolved Hide resolved
return &timestamppb.Timestamp{Seconds: int64(c.readSettings.readTime.Unix())}, true
}
return nil, false
}
58 changes: 52 additions & 6 deletions firestore/client_test.go
Expand Up @@ -17,16 +17,19 @@ package firestore
import (
"context"
"testing"
"time"

tspb "github.com/golang/protobuf/ptypes/timestamp"
pb "google.golang.org/genproto/googleapis/firestore/v1"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
)

var testClient = &Client{
projectID: "projectID",
databaseID: "(default)",
projectID: "projectID",
databaseID: "(default)",
readSettings: &readSettings{},
}

func TestClientCollectionAndDoc(t *testing.T) {
Expand All @@ -45,16 +48,18 @@ func TestClientCollectionAndDoc(t *testing.T) {
path: "projects/projectID/databases/(default)/documents/X",
parentPath: db + "/documents",
},
readSettings: &readSettings{},
}
if !testEqual(coll1, wantc1) {
t.Fatalf("got\n%+v\nwant\n%+v", coll1, wantc1)
}
doc1 := testClient.Doc("X/a")
wantd1 := &DocumentRef{
Parent: coll1,
ID: "a",
Path: "projects/projectID/databases/(default)/documents/X/a",
shortPath: "X/a",
Parent: coll1,
ID: "a",
Path: "projects/projectID/databases/(default)/documents/X/a",
shortPath: "X/a",
readSettings: &readSettings{},
}

if !testEqual(doc1, wantd1) {
Expand Down Expand Up @@ -309,3 +314,44 @@ func TestGetAllErrors(t *testing.T) {
t.Error("got nil, want error")
}
}

func TestClient_WithReadOptions(t *testing.T) {
ctx := context.Background()
c, srv, cleanup := newMock(t)
defer cleanup()

const dbPath = "projects/projectID/databases/(default)"
const docPath = dbPath + "/documents/C/a"
tm := time.Date(2021, time.February, 20, 0, 0, 0, 0, time.UTC)

dr := &DocumentRef{
Parent: &CollectionRef{
c: c,
},
ID: "123",
Path: docPath,
}

srv.addRPC(&pb.BatchGetDocumentsRequest{
Database: dbPath,
Documents: []string{docPath},
ConsistencySelector: &pb.BatchGetDocumentsRequest_ReadTime{
ReadTime: &timestamppb.Timestamp{Seconds: tm.Unix()},
},
}, []interface{}{
&pb.BatchGetDocumentsResponse{
ReadTime: &timestamppb.Timestamp{Seconds: tm.Unix()},
Result: &pb.BatchGetDocumentsResponse_Found{
Found: &pb.Document{},
},
},
})

_, err := c.WithReadOptions(ReadTime(tm)).GetAll(ctx, []*DocumentRef{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this call fail if it doesn't match what was added with srv.addRPC above?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes

dr,
})

if err != nil {
t.Fatal(err)
}
}
20 changes: 19 additions & 1 deletion firestore/collref.go
Expand Up @@ -49,6 +49,10 @@ type CollectionRef struct {

// Use the methods of Query on a CollectionRef to create and run queries.
Query

// readSettings specifies constraints for reading documents in the collection
// e.g. read time
readSettings *readSettings
}

func newTopLevelCollRef(c *Client, dbPath, id string) *CollectionRef {
Expand All @@ -64,6 +68,7 @@ func newTopLevelCollRef(c *Client, dbPath, id string) *CollectionRef {
path: dbPath + "/documents/" + id,
parentPath: dbPath + "/documents",
},
readSettings: &readSettings{},
}
}

Expand All @@ -82,6 +87,7 @@ func newCollRefWithParent(c *Client, parent *DocumentRef, id string) *Collection
path: parent.Path + "/" + id,
parentPath: parent.Path,
},
readSettings: &readSettings{},
}
}

Expand Down Expand Up @@ -121,7 +127,7 @@ func (c *CollectionRef) Add(ctx context.Context, data interface{}) (*DocumentRef
// missing documents. A missing document is a document that does not exist but has
// sub-documents.
func (c *CollectionRef) DocumentRefs(ctx context.Context) *DocumentRefIterator {
return newDocumentRefIterator(ctx, c, nil)
return newDocumentRefIterator(ctx, c, nil, c.readSettings)
}

const alphanum = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"
Expand All @@ -136,3 +142,15 @@ func uniqueID() string {
}
return string(b)
}

// WithReadOptions specifies constraints for accessing documents from the database,
// e.g. at what time snapshot to read the documents.
func (c *CollectionRef) WithReadOptions(opts ...ReadOption) *CollectionRef {
for _, ro := range opts {
switch r := ro.(type) {
telpirion marked this conversation as resolved.
Show resolved Hide resolved
case readTime:
r.apply(c.readSettings)
}
}
return c
}
43 changes: 39 additions & 4 deletions firestore/collref_test.go
Expand Up @@ -17,19 +17,22 @@ package firestore
import (
"context"
"testing"
"time"

"github.com/golang/protobuf/proto"
pb "google.golang.org/genproto/googleapis/firestore/v1"
"google.golang.org/protobuf/types/known/timestamppb"
)

func TestDoc(t *testing.T) {
coll := testClient.Collection("C")
got := coll.Doc("d")
want := &DocumentRef{
Parent: coll,
ID: "d",
Path: "projects/projectID/databases/(default)/documents/C/d",
shortPath: "C/d",
Parent: coll,
ID: "d",
Path: "projects/projectID/databases/(default)/documents/C/d",
shortPath: "C/d",
readSettings: &readSettings{},
}
if !testEqual(got, want) {
t.Errorf("got %+v, want %+v", got, want)
Expand Down Expand Up @@ -98,3 +101,35 @@ func TestNilErrors(t *testing.T) {
t.Fatalf("got <%v>, want <%v>", err, errNilDocRef)
}
}

func TestCollRef_WithReadOptions(t *testing.T) {
ctx := context.Background()
c, srv, cleanup := newMock(t)
defer cleanup()

const dbPath = "projects/projectID/databases/(default)"
const docPath = dbPath + "/documents/C/a"
tm := time.Date(2021, time.February, 20, 0, 0, 0, 0, time.UTC)

srv.addRPC(&pb.ListDocumentsRequest{
Parent: dbPath,
CollectionId: "myCollection",
ShowMissing: true,
ConsistencySelector: &pb.ListDocumentsRequest_ReadTime{
ReadTime: &timestamppb.Timestamp{Seconds: tm.Unix()},
},
}, []interface{}{
&pb.ListDocumentsResponse{
Documents: []*pb.Document{
{
Name: docPath,
},
},
},
})

_, err := c.Collection("myCollection").WithReadOptions(ReadTime(tm)).DocumentRefs(ctx).GetAll()
if err == nil {
t.Fatal(err)
}
}
31 changes: 24 additions & 7 deletions firestore/docref.go
Expand Up @@ -47,14 +47,18 @@ type DocumentRef struct {

// The ID of the document: the last component of the resource path.
ID string

// The options (only read time currently supported) for reading this document
readSettings *readSettings
}

func newDocRef(parent *CollectionRef, id string) *DocumentRef {
return &DocumentRef{
Parent: parent,
ID: id,
Path: parent.Path + "/" + id,
shortPath: parent.selfPath + "/" + id,
Parent: parent,
ID: id,
Path: parent.Path + "/" + id,
shortPath: parent.selfPath + "/" + id,
readSettings: &readSettings{},
}
}

Expand All @@ -77,7 +81,8 @@ func (d *DocumentRef) Get(ctx context.Context) (_ *DocumentSnapshot, err error)
if d == nil {
return nil, errNilDocRef
}
docsnaps, err := d.Parent.c.getAll(ctx, []*DocumentRef{d}, nil)

docsnaps, err := d.Parent.c.getAll(ctx, []*DocumentRef{d}, nil, d.readSettings)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -803,7 +808,7 @@ type DocumentSnapshotIterator struct {
// Next is not expected to return iterator.Done unless it is called after Stop.
// Rarely, networking issues may also cause iterator.Done to be returned.
func (it *DocumentSnapshotIterator) Next() (*DocumentSnapshot, error) {
btree, _, readTime, err := it.ws.nextSnapshot()
btree, _, rt, err := it.ws.nextSnapshot()
if err != nil {
if err == io.EOF {
err = iterator.Done
Expand All @@ -812,7 +817,7 @@ func (it *DocumentSnapshotIterator) Next() (*DocumentSnapshot, error) {
return nil, err
}
if btree.Len() == 0 { // document deleted
return &DocumentSnapshot{Ref: it.docref, ReadTime: readTime}, nil
return &DocumentSnapshot{Ref: it.docref, ReadTime: rt}, nil
}
snap, _ := btree.At(0)
return snap.(*DocumentSnapshot), nil
Expand All @@ -824,3 +829,15 @@ func (it *DocumentSnapshotIterator) Next() (*DocumentSnapshot, error) {
func (it *DocumentSnapshotIterator) Stop() {
it.ws.stop()
}

// WithReadOptions specifies constraints for accessing documents from the database,
// e.g. at what time snapshot to read the documents.
func (d *DocumentRef) WithReadOptions(opts ...ReadOption) *DocumentRef {
for _, ro := range opts {
switch r := ro.(type) {
telpirion marked this conversation as resolved.
Show resolved Hide resolved
case readTime:
r.apply(d.readSettings)
}
}
return d
}