Skip to content

Commit

Permalink
Merge branch 'master' into update-microgen
Browse files Browse the repository at this point in the history
  • Loading branch information
noahdietz committed Jun 28, 2021
2 parents 809b8dc + 12f3042 commit 2bc3462
Show file tree
Hide file tree
Showing 11 changed files with 447 additions and 29 deletions.
39 changes: 39 additions & 0 deletions bigquery/bigquery.go
Expand Up @@ -16,6 +16,7 @@ package bigquery

import (
"context"
"errors"
"fmt"
"io"
"net/http"
Expand All @@ -29,6 +30,7 @@ import (
bq "google.golang.org/api/bigquery/v2"
"google.golang.org/api/googleapi"
"google.golang.org/api/option"
"google.golang.org/api/transport"
)

const (
Expand Down Expand Up @@ -56,8 +58,20 @@ type Client struct {
bqs *bq.Service
}

// DetectProjectID is a sentinel value that instructs NewClient to detect the
// project ID. It is given in place of the projectID argument. NewClient will
// use the project ID from the given credentials or the default credentials
// (https://developers.google.com/accounts/docs/application-default-credentials)
// if no credentials were provided. When providing credentials, not all
// options will allow NewClient to extract the project ID. Specifically a JWT
// does not have the project ID encoded.
const DetectProjectID = "*detect-project-id*"

// NewClient constructs a new Client which can perform BigQuery operations.
// Operations performed via the client are billed to the specified GCP project.
//
// If the project ID is set to DetectProjectID, NewClient will attempt to detect
// the project ID from credentials.
func NewClient(ctx context.Context, projectID string, opts ...option.ClientOption) (*Client, error) {
o := []option.ClientOption{
option.WithScopes(Scope),
Expand All @@ -68,20 +82,45 @@ func NewClient(ctx context.Context, projectID string, opts ...option.ClientOptio
if err != nil {
return nil, fmt.Errorf("bigquery: constructing client: %v", err)
}

if projectID == DetectProjectID {
projectID, err = detectProjectID(ctx, opts...)
if err != nil {
return nil, fmt.Errorf("failed to detect project: %v", err)
}
}

c := &Client{
projectID: projectID,
bqs: bqs,
}
return c, nil
}

// Project returns the project ID or number for this instance of the client, which may have
// either been explicitly specified or autodetected.
func (c *Client) Project() string {
return c.projectID
}

// Close closes any resources held by the client.
// Close should be called when the client is no longer needed.
// It need not be called at program exit.
func (c *Client) Close() error {
return nil
}

func detectProjectID(ctx context.Context, opts ...option.ClientOption) (string, error) {
creds, err := transport.Creds(ctx, opts...)
if err != nil {
return "", fmt.Errorf("fetching creds: %v", err)
}
if creds.ProjectID == "" {
return "", errors.New("credentials did not provide a valid ProjectID")
}
return creds.ProjectID, nil
}

// Calls the Jobs.Insert RPC and returns a Job.
func (c *Client) insertJob(ctx context.Context, job *bq.Job, media io.Reader) (*Job, error) {
call := c.bqs.Jobs.Insert(c.projectID, job).Context(ctx)
Expand Down
26 changes: 23 additions & 3 deletions bigquery/integration_test.go
Expand Up @@ -228,6 +228,24 @@ func initTestState(client *Client, t time.Time) func() {
}
}

func TestIntegration_DetectProjectID(t *testing.T) {
ctx := context.Background()
testCreds := testutil.Credentials(ctx)
if testCreds == nil {
t.Skip("test credentials not present, skipping")
}

if _, err := NewClient(ctx, DetectProjectID, option.WithCredentials(testCreds)); err != nil {
t.Errorf("test NewClient: %v", err)
}

badTS := testutil.ErroringTokenSource{}

if badClient, err := NewClient(ctx, DetectProjectID, option.WithTokenSource(badTS)); err == nil {
t.Errorf("expected error from bad token source, NewClient succeeded with project: %s", badClient.Project())
}
}

func TestIntegration_TableCreate(t *testing.T) {
// Check that creating a record field with an empty schema is an error.
if client == nil {
Expand Down Expand Up @@ -1360,9 +1378,11 @@ func TestIntegration_InsertErrors(t *testing.T) {
if !ok {
t.Errorf("Wanted googleapi.Error, got: %v", err)
}
want := "Request payload size exceeds the limit"
if !strings.Contains(e.Message, want) {
t.Errorf("Error didn't contain expected message (%s): %s", want, e.Message)
if e.Code != http.StatusRequestEntityTooLarge {
want := "Request payload size exceeds the limit"
if !strings.Contains(e.Message, want) {
t.Errorf("Error didn't contain expected message (%s): %#v", want, e)
}
}
// Case 2: Very Large Request
// Request so large it gets rejected by intermediate infra (3x 10MB rows)
Expand Down
5 changes: 5 additions & 0 deletions bigquery/job.go
Expand Up @@ -63,6 +63,11 @@ func (c *Client) JobFromIDLocation(ctx context.Context, id, location string) (j
return bqToJob(bqjob, c)
}

// Project returns the job's project.
func (j *Job) Project() string {
return j.projectID
}

// ID returns the job's ID.
func (j *Job) ID() string {
return j.jobID
Expand Down
148 changes: 148 additions & 0 deletions firestore/collgroupref.go
Expand Up @@ -14,6 +14,16 @@

package firestore

import (
"context"
"errors"
"fmt"
"sort"

"google.golang.org/api/iterator"
firestorepb "google.golang.org/genproto/googleapis/firestore/v1"
)

// A CollectionGroupRef is a reference to a group of collections sharing the
// same ID.
type CollectionGroupRef struct {
Expand All @@ -36,3 +46,141 @@ func newCollectionGroupRef(c *Client, dbPath, collectionID string) *CollectionGr
},
}
}

// GetPartitionedQueries returns a slice of Query objects, each containing a
// partition of a collection group. partitionCount must be a positive value and
// the number of returned partitions may be less than the requested number if
// providing the desired number would result in partitions with very few documents.
//
// If a Collection Group Query would return a large number of documents, this
// can help to subdivide the query to smaller working units that can be distributed.
func (cgr CollectionGroupRef) GetPartitionedQueries(ctx context.Context, partitionCount int) ([]Query, error) {
qp, err := cgr.getPartitions(ctx, partitionCount)
if err != nil {
return nil, err
}
queries := make([]Query, len(qp))
for _, part := range qp {
queries = append(queries, part.toQuery())
}
return queries, nil
}

// getPartitions returns a slice of queryPartition objects, describing a start
// and end range to query a subsection of the collection group. partitionCount
// must be a positive value and the number of returned partitions may be less
// than the requested number if providing the desired number would result in
// partitions with very few documents.
func (cgr CollectionGroupRef) getPartitions(ctx context.Context, partitionCount int) ([]queryPartition, error) {
orderedQuery := cgr.query().OrderBy(DocumentID, Asc)

if partitionCount <= 0 {
return nil, errors.New("a positive partitionCount must be provided")
} else if partitionCount == 1 {
return []queryPartition{{CollectionGroupQuery: orderedQuery}}, nil
}

db := cgr.c.path()
ctx = withResourceHeader(ctx, db)

// CollectionGroup Queries need to be ordered by __name__ ASC.
query, err := orderedQuery.toProto()
if err != nil {
return nil, err
}
structuredQuery := &firestorepb.PartitionQueryRequest_StructuredQuery{
StructuredQuery: query,
}

// Uses default PageSize
pbr := &firestorepb.PartitionQueryRequest{
Parent: db + "/documents",
PartitionCount: int64(partitionCount),
QueryType: structuredQuery,
}
cursorReferences := make([]*firestorepb.Value, 0, partitionCount)
iter := cgr.c.c.PartitionQuery(ctx, pbr)
for {
cursor, err := iter.Next()
if err == iterator.Done {
break
}
if err != nil {
return nil, fmt.Errorf("GetPartitions: %v", err)
}
cursorReferences = append(cursorReferences, cursor.GetValues()...)
}

// From Proto documentation:
// To obtain a complete result set ordered with respect to the results of the
// query supplied to PartitionQuery, the results sets should be merged:
// cursor A, cursor B, cursor M, cursor Q, cursor U, cursor W
// Once we have exhausted the pages, the cursor values need to be sorted in
// lexicographical order by segment (areas between '/').
sort.Sort(byFirestoreValue(cursorReferences))

queryPartitions := make([]queryPartition, 0, len(cursorReferences))
previousCursor := ""

for _, cursor := range cursorReferences {
cursorRef := cursor.GetReferenceValue()

// remove the root path from the reference, as queries take cursors
// relative to a collection
cursorRef = cursorRef[len(orderedQuery.path)+1:]

qp := queryPartition{
CollectionGroupQuery: orderedQuery,
StartAt: previousCursor,
EndBefore: cursorRef,
}
queryPartitions = append(queryPartitions, qp)
previousCursor = cursorRef
}

// In the case there were no partitions, we still add a single partition to
// the result, that covers the complete range.
lastPart := queryPartition{CollectionGroupQuery: orderedQuery}
if len(cursorReferences) > 0 {
cursorRef := cursorReferences[len(cursorReferences)-1].GetReferenceValue()
lastPart.StartAt = cursorRef[len(orderedQuery.path)+1:]
}
queryPartitions = append(queryPartitions, lastPart)

return queryPartitions, nil
}

// queryPartition provides a Collection Group Reference and start and end split
// points allowing for a section of a collection group to be queried. This is
// used by GetPartitions which, given a CollectionGroupReference returns smaller
// sub-queries or partitions
type queryPartition struct {
// CollectionGroupQuery is an ordered query on a CollectionGroupReference.
// This query must be ordered Asc on __name__.
// Example: client.CollectionGroup("collectionID").query().OrderBy(DocumentID, Asc)
CollectionGroupQuery Query

// StartAt is a document reference value, relative to the collection, not
// a complete parent path.
// Example: "documents/collectionName/documentName"
StartAt string

// EndBefore is a document reference value, relative to the collection, not
// a complete parent path.
// Example: "documents/collectionName/documentName"
EndBefore string
}

// toQuery converts a queryPartition object to a Query object
func (qp queryPartition) toQuery() Query {
q := *qp.CollectionGroupQuery.query()

// Remove the leading path before calling StartAt, EndBefore
if qp.StartAt != "" {
q = q.StartAt(qp.StartAt)
}
if qp.EndBefore != "" {
q = q.EndBefore(qp.EndBefore)
}
return q
}
73 changes: 73 additions & 0 deletions firestore/collgroupref_test.go
@@ -0,0 +1,73 @@
// Copyright 2021 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package firestore

import (
"context"
"testing"
)

func TestCGR_TestQueryPartition_ToQuery(t *testing.T) {
cgr := newCollectionGroupRef(testClient, testClient.path(), "collectionID")
qp := queryPartition{
CollectionGroupQuery: cgr.Query.OrderBy(DocumentID, Asc),
StartAt: "documents/start/at",
EndBefore: "documents/end/before",
}

got := qp.toQuery()

want := Query{
c: testClient,
path: "projects/projectID/databases/(default)",
parentPath: "projects/projectID/databases/(default)/documents",
collectionID: "collectionID",
startVals: []interface{}{"documents/start/at"},
endVals: []interface{}{"documents/end/before"},
startBefore: true,
endBefore: true,
allDescendants: true,
orders: []order{{fieldPath: []string{"__name__"}, dir: 1}},
}

if !testEqual(got, want) {
t.Errorf("got %+v, want %+v", got, want)
}
}

func TestCGR_TestGetPartitions(t *testing.T) {
cgr := newCollectionGroupRef(testClient, testClient.path(), "collectionID")
_, err := cgr.getPartitions(context.Background(), 0)
if err == nil {
t.Error("Expected an error when requested partition count is < 1")
}

parts, err := cgr.getPartitions(context.Background(), 1)
if err != nil {
t.Error("Didn't expect an error when requested partition count is 1")
}
if len(parts) != 1 {
t.Fatal("Expected 1 queryPartition")
}
got := parts[0]
want := queryPartition{
CollectionGroupQuery: cgr.Query.OrderBy(DocumentID, Asc),
StartAt: "",
EndBefore: "",
}
if !testEqual(got, want) {
t.Errorf("got %+v, want %+v", got, want)
}
}

0 comments on commit 2bc3462

Please sign in to comment.