Skip to content

Commit

Permalink
Merge branch 'master' into regen_gocloud
Browse files Browse the repository at this point in the history
  • Loading branch information
codyoss committed Jun 8, 2021
2 parents 411054a + 445b003 commit cbe1c83
Show file tree
Hide file tree
Showing 9 changed files with 92 additions and 46 deletions.
16 changes: 16 additions & 0 deletions pubsublite/CHANGES.md
@@ -1,5 +1,21 @@
# Changes

## [0.9.0](https://www.github.com/googleapis/google-cloud-go/compare/pubsublite/v0.8.0...pubsublite/v0.9.0) (2021-06-08)


### Features

* **pubsublite:** Add initial_cursor field to InitialSubscribeRequest ([6f9c8b0](https://www.github.com/googleapis/google-cloud-go/commit/6f9c8b0a5d6e4509f056a146cb586f310f3336a9))
* **pubsublite:** Add Pub/Sub Lite Reservation APIs ([18375e5](https://www.github.com/googleapis/google-cloud-go/commit/18375e50e8f16e63506129b8927a7b62f85e407b))
* **pubsublite:** ComputeTimeCursor RPC for Pub/Sub Lite ([d089dda](https://www.github.com/googleapis/google-cloud-go/commit/d089dda0089acb9aaef9b3da40b219476af9fc06))
* **pubsublite:** detect stream reset signal ([#4144](https://www.github.com/googleapis/google-cloud-go/issues/4144)) ([ff5f8c9](https://www.github.com/googleapis/google-cloud-go/commit/ff5f8c989cba2751dcc77745483ef3828e6df78c))
* **pubsublite:** flush and reset committer ([#4143](https://www.github.com/googleapis/google-cloud-go/issues/4143)) ([0ecd732](https://www.github.com/googleapis/google-cloud-go/commit/0ecd732e3f57928e7999ae4e78871be070c184d9))


### Bug Fixes

* **pubsublite:** prevent subscriber flow control token races ([#4060](https://www.github.com/googleapis/google-cloud-go/issues/4060)) ([dc0103b](https://www.github.com/googleapis/google-cloud-go/commit/dc0103baeaf168474b9e163f0aa5f7555170ffc4))

## [0.8.0](https://www.github.com/googleapis/google-cloud-go/compare/pubsublite/v0.7.0...pubsublite/v0.8.0) (2021-03-25)


Expand Down
7 changes: 1 addition & 6 deletions pubsublite/internal/wire/subscriber.go
Expand Up @@ -307,16 +307,11 @@ func (s *subscribeStream) unsafeOnMessageResponse(response *pb.MessageResponse)
}

func (s *subscribeStream) onAck(ac *ackConsumer) {
// Don't block the user's goroutine with potentially expensive ack processing.
go s.onAckAsync(ac.MsgBytes)
}

func (s *subscribeStream) onAckAsync(msgBytes int64) {
s.mu.Lock()
defer s.mu.Unlock()

if s.status == serviceActive {
s.unsafeAllowFlow(flowControlTokens{Bytes: msgBytes, Messages: 1})
s.unsafeAllowFlow(flowControlTokens{Bytes: ac.MsgBytes, Messages: 1})
}
}

Expand Down
10 changes: 5 additions & 5 deletions pubsublite/internal/wire/subscriber_test.go
Expand Up @@ -339,8 +339,8 @@ func TestSubscribeStreamFlowControlBatching(t *testing.T) {
}
sub.Receiver.ValidateMsg(msg1)
sub.Receiver.ValidateMsg(msg2)
sub.sub.onAckAsync(msg1.SizeBytes)
sub.sub.onAckAsync(msg2.SizeBytes)
sub.sub.onAck(&ackConsumer{MsgBytes: msg1.SizeBytes})
sub.sub.onAck(&ackConsumer{MsgBytes: msg2.SizeBytes})
sub.sub.sendBatchFlowControl()
if gotErr := sub.FinalError(); !test.ErrorEqual(gotErr, serverErr) {
t.Errorf("Final err: (%v), want: (%v)", gotErr, serverErr)
Expand Down Expand Up @@ -373,8 +373,8 @@ func TestSubscribeStreamExpediteFlowControl(t *testing.T) {
}
sub.Receiver.ValidateMsg(msg1)
sub.Receiver.ValidateMsg(msg2)
sub.sub.onAckAsync(msg1.SizeBytes)
sub.sub.onAckAsync(msg2.SizeBytes)
sub.sub.onAck(&ackConsumer{MsgBytes: msg1.SizeBytes})
sub.sub.onAck(&ackConsumer{MsgBytes: msg2.SizeBytes})
// Note: the ack for msg2 automatically triggers sending the flow control.
if gotErr := sub.FinalError(); !test.ErrorEqual(gotErr, serverErr) {
t.Errorf("Final err: (%v), want: (%v)", gotErr, serverErr)
Expand Down Expand Up @@ -419,7 +419,7 @@ func TestSubscribeStreamDisableBatchFlowControl(t *testing.T) {
barrier.ReleaseAfter(func() {
// While the stream is not connected, the pending flow control request
// should not be released and sent to the stream.
sub.sub.onAckAsync(msg.SizeBytes)
sub.sub.onAck(&ackConsumer{MsgBytes: msg.SizeBytes})
if sub.PendingFlowControlRequest() == nil {
t.Errorf("Pending flow control request should not be cleared")
}
Expand Down
7 changes: 7 additions & 0 deletions spanner/CHANGES.md
@@ -1,5 +1,12 @@
# Changes

## [1.20.0](https://www.github.com/googleapis/google-cloud-go/compare/spanner/v1.19.0...spanner/v1.20.0) (2021-06-08)


### Features

* **spanner:** add the support of optimizer statistics package ([#2717](https://www.github.com/googleapis/google-cloud-go/issues/2717)) ([29c7247](https://www.github.com/googleapis/google-cloud-go/commit/29c724771f0b19849c76e62d4bc8e9342922bf75))

## [1.19.0](https://www.github.com/googleapis/google-cloud-go/compare/spanner/v1.18.0...spanner/v1.19.0) (2021-06-03)


Expand Down
8 changes: 5 additions & 3 deletions spanner/batch_test.go
Expand Up @@ -18,7 +18,6 @@ package spanner

import (
"context"
"os"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -80,8 +79,8 @@ func TestPartitionQuery_QueryOptions(t *testing.T) {
for _, tt := range queryOptionsTestCases() {
t.Run(tt.name, func(t *testing.T) {
if tt.env.Options != nil {
os.Setenv("SPANNER_OPTIMIZER_VERSION", tt.env.Options.OptimizerVersion)
defer os.Setenv("SPANNER_OPTIMIZER_VERSION", "")
unset := setQueryOptionsEnvVars(tt.env.Options)
defer unset()
}

ctx := context.Background()
Expand Down Expand Up @@ -113,6 +112,9 @@ func TestPartitionQuery_QueryOptions(t *testing.T) {
if got, want := p.qreq.QueryOptions.OptimizerVersion, tt.want.Options.OptimizerVersion; got != want {
t.Fatalf("Incorrect optimizer version: got %v, want %v", got, want)
}
if got, want := p.qreq.QueryOptions.OptimizerStatisticsPackage, tt.want.Options.OptimizerStatisticsPackage; got != want {
t.Fatalf("Incorrect optimizer statistics package: got %v, want %v", got, want)
}
}
})
}
Expand Down
10 changes: 7 additions & 3 deletions spanner/client.go
Expand Up @@ -227,13 +227,17 @@ func allClientOpts(numChannels int, userOpts ...option.ClientOption) []option.Cl
// via application-level configuration. If the environment variables are set,
// this will return the overwritten query options.
func getQueryOptions(opts QueryOptions) QueryOptions {
if opts.Options == nil {
opts.Options = &sppb.ExecuteSqlRequest_QueryOptions{}
}
opv := os.Getenv("SPANNER_OPTIMIZER_VERSION")
if opv != "" {
if opts.Options == nil {
opts.Options = &sppb.ExecuteSqlRequest_QueryOptions{}
}
opts.Options.OptimizerVersion = opv
}
opsp := os.Getenv("SPANNER_OPTIMIZER_STATISTICS_PACKAGE")
if opsp != "" {
opts.Options.OptimizerStatisticsPackage = opsp
}
return opts
}

Expand Down
71 changes: 45 additions & 26 deletions spanner/client_test.go
Expand Up @@ -397,8 +397,8 @@ func TestClient_Single_QueryOptions(t *testing.T) {
for _, tt := range queryOptionsTestCases() {
t.Run(tt.name, func(t *testing.T) {
if tt.env.Options != nil {
os.Setenv("SPANNER_OPTIMIZER_VERSION", tt.env.Options.OptimizerVersion)
defer os.Setenv("SPANNER_OPTIMIZER_VERSION", "")
unset := setQueryOptionsEnvVars(tt.env.Options)
defer unset()
}

ctx := context.Background()
Expand Down Expand Up @@ -458,6 +458,9 @@ func checkReqsForQueryOptions(t *testing.T, server InMemSpannerServer, qo QueryO
if got, want := reqQueryOptions.OptimizerVersion, qo.Options.OptimizerVersion; got != want {
t.Fatalf("Optimizer version mismatch, got %v, want %v", got, want)
}
if got, want := reqQueryOptions.OptimizerStatisticsPackage, qo.Options.OptimizerStatisticsPackage; got != want {
t.Fatalf("Optimizer statistics package mismatch, got %v, want %v", got, want)
}
}

func testSingleQuery(t *testing.T, serverError error) error {
Expand Down Expand Up @@ -623,8 +626,8 @@ func TestClient_ReadOnlyTransaction_QueryOptions(t *testing.T) {
for _, tt := range queryOptionsTestCases() {
t.Run(tt.name, func(t *testing.T) {
if tt.env.Options != nil {
os.Setenv("SPANNER_OPTIMIZER_VERSION", tt.env.Options.OptimizerVersion)
defer os.Setenv("SPANNER_OPTIMIZER_VERSION", "")
unset := setQueryOptionsEnvVars(tt.env.Options)
defer unset()
}

ctx := context.Background()
Expand All @@ -645,6 +648,15 @@ func TestClient_ReadOnlyTransaction_QueryOptions(t *testing.T) {
}
}

func setQueryOptionsEnvVars(opts *sppb.ExecuteSqlRequest_QueryOptions) func() {
os.Setenv("SPANNER_OPTIMIZER_VERSION", opts.OptimizerVersion)
os.Setenv("SPANNER_OPTIMIZER_STATISTICS_PACKAGE", opts.OptimizerStatisticsPackage)
return func() {
defer os.Setenv("SPANNER_OPTIMIZER_VERSION", "")
defer os.Setenv("SPANNER_OPTIMIZER_STATISTICS_PACKAGE", "")
}
}

func testReadOnlyTransaction(t *testing.T, executionTimes map[string]SimulatedExecutionTime) error {
server, client, teardown := setupMockedTestServer(t)
defer teardown()
Expand Down Expand Up @@ -784,8 +796,8 @@ func TestClient_ReadWriteTransaction_Query_QueryOptions(t *testing.T) {
for _, tt := range queryOptionsTestCases() {
t.Run(tt.name, func(t *testing.T) {
if tt.env.Options != nil {
os.Setenv("SPANNER_OPTIMIZER_VERSION", tt.env.Options.OptimizerVersion)
defer os.Setenv("SPANNER_OPTIMIZER_VERSION", "")
unset := setQueryOptionsEnvVars(tt.env.Options)
defer unset()
}

ctx := context.Background()
Expand Down Expand Up @@ -813,8 +825,8 @@ func TestClient_ReadWriteTransaction_Update_QueryOptions(t *testing.T) {
for _, tt := range queryOptionsTestCases() {
t.Run(tt.name, func(t *testing.T) {
if tt.env.Options != nil {
os.Setenv("SPANNER_OPTIMIZER_VERSION", tt.env.Options.OptimizerVersion)
defer os.Setenv("SPANNER_OPTIMIZER_VERSION", "")
unset := setQueryOptionsEnvVars(tt.env.Options)
defer unset()
}

ctx := context.Background()
Expand Down Expand Up @@ -2240,7 +2252,10 @@ func TestClient_EmulatorWithCredentialsFile(t *testing.T) {

func TestBatchReadOnlyTransaction_QueryOptions(t *testing.T) {
ctx := context.Background()
qo := QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1"}}
qo := QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{
OptimizerVersion: "1",
OptimizerStatisticsPackage: "auto_20191128_14_47_22UTC",
}}
_, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{QueryOptions: qo})
defer teardown()

Expand All @@ -2256,7 +2271,10 @@ func TestBatchReadOnlyTransaction_QueryOptions(t *testing.T) {
}

func TestBatchReadOnlyTransactionFromID_QueryOptions(t *testing.T) {
qo := QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1"}}
qo := QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{
OptimizerVersion: "1",
OptimizerStatisticsPackage: "auto_20191128_14_47_22UTC",
}}
_, client, teardown := setupMockedTestServerWithConfig(t, ClientConfig{QueryOptions: qo})
defer teardown()

Expand All @@ -2276,48 +2294,49 @@ type QueryOptionsTestCase struct {
}

func queryOptionsTestCases() []QueryOptionsTestCase {
statsPkg := "auto_20191128_14_47_22UTC"
return []QueryOptionsTestCase{
{
"Client level",
QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1"}},
QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1", OptimizerStatisticsPackage: statsPkg}},
QueryOptions{Options: nil},
QueryOptions{Options: nil},
QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1"}},
QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1", OptimizerStatisticsPackage: statsPkg}},
},
{
"Environment level",
QueryOptions{Options: nil},
QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1"}},
QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1", OptimizerStatisticsPackage: statsPkg}},
QueryOptions{Options: nil},
QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1"}},
QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1", OptimizerStatisticsPackage: statsPkg}},
},
{
"Query level",
QueryOptions{Options: nil},
QueryOptions{Options: nil},
QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1"}},
QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1"}},
QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1", OptimizerStatisticsPackage: statsPkg}},
QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1", OptimizerStatisticsPackage: statsPkg}},
},
{
"Environment level has precedence",
QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1"}},
QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "2"}},
QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1", OptimizerStatisticsPackage: statsPkg}},
QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "2", OptimizerStatisticsPackage: statsPkg}},
QueryOptions{Options: nil},
QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "2"}},
QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "2", OptimizerStatisticsPackage: statsPkg}},
},
{
"Query level has precedence than client level",
QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1"}},
QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1", OptimizerStatisticsPackage: statsPkg}},
QueryOptions{Options: nil},
QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "3"}},
QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "3"}},
QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "3", OptimizerStatisticsPackage: statsPkg}},
QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "3", OptimizerStatisticsPackage: statsPkg}},
},
{
"Query level has highest precedence",
QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1"}},
QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "2"}},
QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "3"}},
QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "3"}},
QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1", OptimizerStatisticsPackage: statsPkg}},
QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "2", OptimizerStatisticsPackage: statsPkg}},
QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "3", OptimizerStatisticsPackage: statsPkg}},
QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "3", OptimizerStatisticsPackage: statsPkg}},
},
}
}
Expand Down
5 changes: 4 additions & 1 deletion spanner/integration_test.go
Expand Up @@ -635,7 +635,10 @@ func TestIntegration_SingleUse_WithQueryOptions(t *testing.T) {
t.Fatal(err)
}
}
qo := QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{OptimizerVersion: "1"}}
qo := QueryOptions{Options: &sppb.ExecuteSqlRequest_QueryOptions{
OptimizerVersion: "1",
OptimizerStatisticsPackage: "auto_20191128_14_47_22UTC",
}}
got, err := readAll(client.Single().QueryWithOptions(ctx, Statement{
"SELECT SingerId, FirstName, LastName FROM Singers WHERE SingerId IN (@id1, @id3, @id4)",
map[string]interface{}{"id1": int64(1), "id3": int64(3), "id4": int64(4)},
Expand Down
4 changes: 2 additions & 2 deletions spanner/pdml_test.go
Expand Up @@ -145,8 +145,8 @@ func TestPartitionedUpdate_QueryOptions(t *testing.T) {
for _, tt := range queryOptionsTestCases() {
t.Run(tt.name, func(t *testing.T) {
if tt.env.Options != nil {
os.Setenv("SPANNER_OPTIMIZER_VERSION", tt.env.Options.OptimizerVersion)
defer os.Setenv("SPANNER_OPTIMIZER_VERSION", "")
unset := setQueryOptionsEnvVars(tt.env.Options)
defer unset()
}

ctx := context.Background()
Expand Down

0 comments on commit cbe1c83

Please sign in to comment.