Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(spanner): support request and transaction tags #4336

Merged
merged 1 commit into from Jun 29, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 12 additions & 2 deletions spanner/client.go
Expand Up @@ -497,6 +497,8 @@ type applyOption struct {
// If atLeastOnce == true, Client.Apply will execute the mutations on Cloud
// Spanner at least once.
atLeastOnce bool
// transactionTag will be included with the CommitRequest.
transactionTag string
// priority is the RPC priority that is used for the commit operation.
priority sppb.RequestOptions_Priority
}
Expand All @@ -521,6 +523,14 @@ func ApplyAtLeastOnce() ApplyOption {
}
}

// TransactionTag returns an ApplyOption that will include the given tag as a
// transaction tag for a write-only transaction.
func TransactionTag(tag string) ApplyOption {
return func(ao *applyOption) {
ao.transactionTag = tag
}
}

// Priority returns an ApplyOptions that sets the RPC priority to use for the
// commit operation.
func Priority(priority sppb.RequestOptions_Priority) ApplyOption {
Expand All @@ -542,10 +552,10 @@ func (c *Client) Apply(ctx context.Context, ms []*Mutation, opts ...ApplyOption)
if !ao.atLeastOnce {
resp, err := c.ReadWriteTransactionWithOptions(ctx, func(ctx context.Context, t *ReadWriteTransaction) error {
return t.BufferWrite(ms)
}, TransactionOptions{CommitPriority: ao.priority})
}, TransactionOptions{CommitPriority: ao.priority, TransactionTag: ao.transactionTag})
return resp.CommitTs, err
}
t := &writeOnlyTransaction{sp: c.idleSessions, commitPriority: ao.priority}
t := &writeOnlyTransaction{sp: c.idleSessions, commitPriority: ao.priority, transactionTag: ao.transactionTag}
return t.applyAtLeastOnce(ctx, ms...)
}

Expand Down
154 changes: 154 additions & 0 deletions spanner/client_test.go
Expand Up @@ -2530,6 +2530,141 @@ func TestClient_Apply_Priority(t *testing.T) {
checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{Priority: sppb.RequestOptions_PRIORITY_MEDIUM})
}

func TestClient_ReadOnlyTransaction_Tag(t *testing.T) {
t.Parallel()

server, client, teardown := setupMockedTestServer(t)
defer teardown()
for _, qo := range []QueryOptions{
{},
{RequestTag: "tag-1"},
} {
for _, tx := range []*ReadOnlyTransaction{
client.Single(),
client.ReadOnlyTransaction(),
} {
iter := tx.QueryWithOptions(context.Background(), NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums), qo)
iter.Next()
iter.Stop()

if tx.singleUse {
tx = client.Single()
}
iter = tx.ReadWithOptions(context.Background(), "FOO", AllKeys(), []string{"BAR"}, &ReadOptions{RequestTag: qo.RequestTag})
iter.Next()
iter.Stop()

checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 2, sppb.RequestOptions{RequestTag: qo.RequestTag})
tx.Close()
}
}
}

func TestClient_ReadWriteTransaction_Tag(t *testing.T) {
t.Parallel()

server, client, teardown := setupMockedTestServer(t)
defer teardown()
for _, to := range []TransactionOptions{
{},
{TransactionTag: "tx-tag-1"},
} {
for _, qo := range []QueryOptions{
{},
{RequestTag: "request-tag-1"},
} {
client.ReadWriteTransactionWithOptions(context.Background(), func(ctx context.Context, tx *ReadWriteTransaction) error {
iter := tx.QueryWithOptions(context.Background(), NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums), qo)
iter.Next()
iter.Stop()

iter = tx.ReadWithOptions(context.Background(), "FOO", AllKeys(), []string{"BAR"}, &ReadOptions{RequestTag: qo.RequestTag})
iter.Next()
iter.Stop()

tx.UpdateWithOptions(context.Background(), NewStatement(UpdateBarSetFoo), qo)
tx.BatchUpdateWithOptions(context.Background(), []Statement{
NewStatement(UpdateBarSetFoo),
}, qo)

// Check for SQL requests inside the transaction to prevent the check to
// drain the commit request from the server.
checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 4, sppb.RequestOptions{RequestTag: qo.RequestTag, TransactionTag: to.TransactionTag})
return nil
}, to)
checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{TransactionTag: to.TransactionTag})
}
}
}

func TestClient_StmtBasedReadWriteTransaction_Tag(t *testing.T) {
t.Parallel()

server, client, teardown := setupMockedTestServer(t)
defer teardown()
for _, to := range []TransactionOptions{
{},
{TransactionTag: "tx-tag-1"},
} {
for _, qo := range []QueryOptions{
{},
{RequestTag: "request-tag-1"},
} {
tx, _ := NewReadWriteStmtBasedTransactionWithOptions(context.Background(), client, to)
iter := tx.QueryWithOptions(context.Background(), NewStatement(SelectSingerIDAlbumIDAlbumTitleFromAlbums), qo)
iter.Next()
iter.Stop()

iter = tx.ReadWithOptions(context.Background(), "FOO", AllKeys(), []string{"BAR"}, &ReadOptions{RequestTag: qo.RequestTag})
iter.Next()
iter.Stop()

tx.UpdateWithOptions(context.Background(), NewStatement(UpdateBarSetFoo), qo)
tx.BatchUpdateWithOptions(context.Background(), []Statement{
NewStatement(UpdateBarSetFoo),
}, qo)
checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 4, sppb.RequestOptions{RequestTag: qo.RequestTag, TransactionTag: to.TransactionTag})

tx.Commit(context.Background())
checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{TransactionTag: to.TransactionTag})
}
}
}

func TestClient_PDML_Tag(t *testing.T) {
t.Parallel()

server, client, teardown := setupMockedTestServer(t)
defer teardown()

for _, qo := range []QueryOptions{
{},
{RequestTag: "request-tag-1"},
} {
client.PartitionedUpdateWithOptions(context.Background(), NewStatement(UpdateBarSetFoo), qo)
checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 1, sppb.RequestOptions{RequestTag: qo.RequestTag})
}
}

func TestClient_Apply_Tagging(t *testing.T) {
t.Parallel()

server, client, teardown := setupMockedTestServer(t)
defer teardown()

client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})})
checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{})

client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})}, TransactionTag("tx-tag"))
checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{TransactionTag: "tx-tag"})

client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})}, ApplyAtLeastOnce())
checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{})

client.Apply(context.Background(), []*Mutation{Insert("foo", []string{"col1"}, []interface{}{"val1"})}, ApplyAtLeastOnce(), TransactionTag("tx-tag"))
checkCommitForExpectedRequestOptions(t, server.TestSpanner, sppb.RequestOptions{TransactionTag: "tx-tag"})
}

func checkRequestsForExpectedRequestOptions(t *testing.T, server InMemSpannerServer, reqCount int, ro sppb.RequestOptions) {
reqs := drainRequestsFromServer(server)
reqOptions := []*sppb.RequestOptions{}
Expand Down Expand Up @@ -2559,6 +2694,12 @@ func checkRequestsForExpectedRequestOptions(t *testing.T, server InMemSpannerSer
if got != want {
t.Fatalf("Request priority mismatch\nGot: %v\nWant: %v", got, want)
}
if got, want := opts.RequestTag, ro.RequestTag; got != want {
t.Fatalf("Request tag mismatch\nGot: %v\nWant: %v", got, want)
}
if got, want := opts.TransactionTag, ro.TransactionTag; got != want {
t.Fatalf("Transaction tag mismatch\nGot: %v\nWant: %v", got, want)
}
}
}

Expand All @@ -2585,6 +2726,19 @@ func checkCommitForExpectedRequestOptions(t *testing.T, server InMemSpannerServe
if got != want {
t.Fatalf("Commit priority mismatch\nGot: %v\nWant: %v", got, want)
}

var requestTag string
var transactionTag string
if commit.RequestOptions != nil {
requestTag = commit.RequestOptions.RequestTag
transactionTag = commit.RequestOptions.TransactionTag
}
if got, want := requestTag, ro.RequestTag; got != want {
t.Fatalf("Commit request tag mismatch\nGot: %v\nWant: %v", got, want)
}
if got, want := transactionTag, ro.TransactionTag; got != want {
t.Fatalf("Commit transaction tag mismatch\nGot: %v\nWant: %v", got, want)
}
}

func TestClient_Single_Read_WithNumericKey(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion spanner/pdml.go
Expand Up @@ -69,7 +69,7 @@ func (c *Client) partitionedUpdate(ctx context.Context, statement Statement, opt
Params: params,
ParamTypes: paramTypes,
QueryOptions: options.Options,
RequestOptions: createRequestOptions(&options),
RequestOptions: createRequestOptions(options.Priority, options.RequestTag, ""),
}

// Make a retryer for Aborted and certain Internal errors.
Expand Down
12 changes: 12 additions & 0 deletions spanner/pdml_test.go
Expand Up @@ -166,3 +166,15 @@ func TestPartitionedUpdate_QueryOptions(t *testing.T) {
})
}
}

func TestPartitionedUpdate_Tagging(t *testing.T) {
ctx := context.Background()
server, client, teardown := setupMockedTestServer(t)
defer teardown()

_, err := client.PartitionedUpdateWithOptions(ctx, NewStatement(UpdateBarSetFoo), QueryOptions{RequestTag: "pdml-tag"})
if err != nil {
t.Fatalf("expect no errors, but got %v", err)
}
checkRequestsForExpectedRequestOptions(t, server.TestSpanner, 1, sppb.RequestOptions{RequestTag: "pdml-tag"})
}