Skip to content

Commit

Permalink
use gapic client and rely on gax for retries
Browse files Browse the repository at this point in the history
This change contains the following global changes:
1. spanner.Client uses the generated gapic client for gRPC
calls, instead of a gRPC connection and a
spannerpb.SpannerClient.
2. The gapic client uses the default gax retry logic.
3. Most custom retry logic has been removed, except:
   * retry on aborted transactions
   * retry for resumableStreamDecoder.next()

The change also includes an in-memory Spanner server for test
purposes. The server requires the user to mock the result of queries
and update statements. Sessions and transactions are handled
automatically. It also allows the user to register specific errors
to be returned for each gRPC function.
This test server makes it easier to develop test cases that verify
the behavior of the client library for an entire transaction for
situations that cannot easily be created in an integration test
using a real Cloud Spanner instance, such as aborted transactions
or temporary retryable errors.
The test cases can use the standard Spanner client withouth the
need to mock any of the server functions, other than specifying the
results for queries and updates.

Fixes #1418 and #1384

Change-Id: If0a8bbed50b512b32d73a8ef7ad74cdb1192294b
Reviewed-on: https://code-review.googlesource.com/c/gocloud/+/41131
Reviewed-by: kokoro <noreply+kokoro@google.com>
Reviewed-by: Jean de Klerk <deklerk@google.com>
  • Loading branch information
olavloite committed Jul 3, 2019
1 parent d6bee4c commit 9f33eb1
Show file tree
Hide file tree
Showing 14 changed files with 2,258 additions and 518 deletions.
5 changes: 1 addition & 4 deletions spanner/batch.go
Expand Up @@ -221,10 +221,7 @@ func (t *BatchReadOnlyTransaction) Cleanup(ctx context.Context) {
}
t.sh = nil
sid, client := sh.getID(), sh.getClient()
err := runRetryable(ctx, func(ctx context.Context) error {
_, e := client.DeleteSession(ctx, &sppb.DeleteSessionRequest{Name: sid})
return e
})
err := client.DeleteSession(ctx, &sppb.DeleteSessionRequest{Name: sid})
if err != nil {
log.Printf("Failed to delete session %v. Error: %v", sid, err)
}
Expand Down
97 changes: 63 additions & 34 deletions spanner/client.go
Expand Up @@ -25,8 +25,9 @@ import (

"cloud.google.com/go/internal/trace"
"cloud.google.com/go/internal/version"
vkit "cloud.google.com/go/spanner/apiv1"
"cloud.google.com/go/spanner/internal/backoff"
"google.golang.org/api/option"
gtransport "google.golang.org/api/transport/grpc"
sppb "google.golang.org/genproto/googleapis/spanner/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
Expand Down Expand Up @@ -71,8 +72,7 @@ func validDatabaseName(db string) error {
type Client struct {
// rr must be accessed through atomic operations.
rr uint32
conns []*grpc.ClientConn
clients []sppb.SpannerClient
clients []*vkit.Client

database string
// Metadata to be sent with each request.
Expand Down Expand Up @@ -170,19 +170,18 @@ func NewClientWithConfig(ctx context.Context, database string, config ClientConf

// TODO(deklerk): This should be replaced with a balancer with
// config.NumChannels connections, instead of config.NumChannels
// clientconns.
// clients.
for i := 0; i < config.NumChannels; i++ {
conn, err := gtransport.Dial(ctx, allOpts...)
client, err := vkit.NewClient(ctx, allOpts...)
if err != nil {
return nil, errDial(i, err)
}
c.conns = append(c.conns, conn)
c.clients = append(c.clients, sppb.NewSpannerClient(conn))
c.clients = append(c.clients, client)
}

// Prepare session pool.
config.SessionPoolConfig.getRPCClient = func() (sppb.SpannerClient, error) {
// TODO: support more loadbalancing options.
// TODO: support more loadbalancing options.
config.SessionPoolConfig.getRPCClient = func() (*vkit.Client, error) {
return c.rrNext(), nil
}
config.SessionPoolConfig.sessionLabels = c.sessionLabels
Expand All @@ -195,9 +194,9 @@ func NewClientWithConfig(ctx context.Context, database string, config ClientConf
return c, nil
}

// rrNext returns the next available Cloud Spanner RPC client in a round-robin
// manner.
func (c *Client) rrNext() sppb.SpannerClient {
// rrNext returns the next available vkit Cloud Spanner RPC client in a
// round-robin manner.
func (c *Client) rrNext() *vkit.Client {
return c.clients[atomic.AddUint32(&c.rr, 1)%uint32(len(c.clients))]
}

Expand All @@ -206,8 +205,8 @@ func (c *Client) Close() {
if c.idleSessions != nil {
c.idleSessions.close()
}
for _, conn := range c.conns {
conn.Close()
for _, gpc := range c.clients {
gpc.Close()
}
}

Expand Down Expand Up @@ -279,26 +278,20 @@ func (c *Client) BatchReadOnlyTransaction(ctx context.Context, tb TimestampBound
sh = &sessionHandle{session: s}

// Begin transaction.
err = runRetryable(contextWithOutgoingMetadata(ctx, sh.getMetadata()), func(ctx context.Context) error {
res, e := sh.getClient().BeginTransaction(ctx, &sppb.BeginTransactionRequest{
Session: sh.getID(),
Options: &sppb.TransactionOptions{
Mode: &sppb.TransactionOptions_ReadOnly_{
ReadOnly: buildTransactionOptionsReadOnly(tb, true),
},
res, err := sh.getClient().BeginTransaction(contextWithOutgoingMetadata(ctx, sh.getMetadata()), &sppb.BeginTransactionRequest{
Session: sh.getID(),
Options: &sppb.TransactionOptions{
Mode: &sppb.TransactionOptions_ReadOnly_{
ReadOnly: buildTransactionOptionsReadOnly(tb, true),
},
})
if e != nil {
return e
}
tx = res.Id
if res.ReadTimestamp != nil {
rts = time.Unix(res.ReadTimestamp.Seconds, int64(res.ReadTimestamp.Nanos))
}
return nil
},
})
if err != nil {
return nil, err
return nil, toSpannerError(err)
}
tx = res.Id
if res.ReadTimestamp != nil {
rts = time.Unix(res.ReadTimestamp.Seconds, int64(res.ReadTimestamp.Nanos))
}

t := &BatchReadOnlyTransaction{
Expand Down Expand Up @@ -377,7 +370,7 @@ func (c *Client) ReadWriteTransaction(ctx context.Context, f func(context.Contex
ts time.Time
sh *sessionHandle
)
err = runRetryableNoWrap(ctx, func(ctx context.Context) error {
err = runWithRetryOnAborted(ctx, func(ctx context.Context) error {
var (
err error
t *ReadWriteTransaction
Expand All @@ -402,8 +395,7 @@ func (c *Client) ReadWriteTransaction(ctx context.Context, f func(context.Contex
trace.TracePrintf(ctx, map[string]interface{}{"transactionID": string(sh.getTransactionID())},
"Starting transaction attempt")
if err = t.begin(ctx); err != nil {
// Mask error from begin operation as retryable error.
return errRetry(err)
return err
}
ts, err = t.runInTransaction(ctx, f)
return err
Expand All @@ -414,6 +406,43 @@ func (c *Client) ReadWriteTransaction(ctx context.Context, f func(context.Contex
return ts, err
}

func runWithRetryOnAborted(ctx context.Context, f func(context.Context) error) error {
var funcErr error
retryCount := 0
for {
select {
case <-ctx.Done():
// Do context check here so that even f() failed to do so (for
// example, gRPC implementation bug), the loop can still have a
// chance to exit as expected.
return errContextCanceled(ctx, funcErr)
default:
}
funcErr = f(ctx)
if funcErr == nil {
return nil
}
// Only retry on ABORTED.
if isAbortErr(funcErr) {
// Aborted, do exponential backoff and continue.
b, ok := extractRetryDelay(funcErr)
if !ok {
b = backoff.DefaultBackoff.Delay(retryCount)
}
trace.TracePrintf(ctx, nil, "Backing off after ABORTED for %s, then retrying", b)
select {
case <-ctx.Done():
return errContextCanceled(ctx, funcErr)
case <-time.After(b):
}
retryCount++
continue
}
// Error isn't ABORTED / no error, return immediately.
return funcErr
}
}

// applyOption controls the behavior of Client.Apply.
type applyOption struct {
// If atLeastOnce == true, Client.Apply will execute the mutations on Cloud
Expand Down

0 comments on commit 9f33eb1

Please sign in to comment.