diff --git a/bigtable/internal/testproxy/proxy.go b/bigtable/internal/testproxy/proxy.go index 9fe5bbf02bc..be5b96d6e48 100644 --- a/bigtable/internal/testproxy/proxy.go +++ b/bigtable/internal/testproxy/proxy.go @@ -356,6 +356,15 @@ type testClient struct { isOpen bool // isOpen indicates whether this client is open for new requests } +// timeout adds a timeout setting to a context if perOperationTimeout is set on +// the testClient object. +func (tc *testClient) timeout(ctx context.Context) (context.Context, context.CancelFunc) { + if tc.perOperationTimeout != nil { + return context.WithTimeout(ctx, tc.perOperationTimeout.AsDuration()) + } + return context.WithCancel(ctx) +} + // credentialsBundle implements credentials.Bundle interface // [See documentation for usage](https://pkg.go.dev/google.golang.org/grpc/credentials#Bundle). type credentialsBundle struct { @@ -617,10 +626,8 @@ func (s *goTestProxyServer) ReadRow(ctx context.Context, req *pb.ReadRowRequest) Row: &btpb.Row{}, } - if btc.perOperationTimeout != nil { - ct, _ := context.WithTimeout(ctx, btc.perOperationTimeout.AsDuration()) - ctx = ct - } + ctx, cancel := btc.timeout(ctx) + defer cancel() r, err := t.ReadRow(ctx, req.RowKey) if err != nil { @@ -674,6 +681,9 @@ func (s *goTestProxyServer) ReadRows(ctx context.Context, req *pb.ReadRowsReques rs = bigtable.InfiniteRange("") } + ctx, cancel := btc.timeout(ctx) + defer cancel() + var c int32 var rowsPb []*btpb.Row lim := req.GetCancelAfterRows() @@ -743,6 +753,9 @@ func (s *goTestProxyServer) MutateRow(ctx context.Context, req *pb.MutateRowRequ }, } + ctx, cancel := btc.timeout(ctx) + defer cancel() + err := t.Apply(ctx, string(row), m) if err != nil { res.Status = statusFromError(err) @@ -794,6 +807,9 @@ func (s *goTestProxyServer) BulkMutateRows(ctx context.Context, req *pb.MutateRo }, } + ctx, cancel := btc.timeout(ctx) + defer cancel() + errs, err := t.ApplyBulk(ctx, keys, muts) if err != nil { log.Printf("received error from Table.ApplyBulk(): %v", err) @@ -863,6 +879,9 @@ func (s *goTestProxyServer) CheckAndMutateRow(ctx context.Context, req *pb.Check var matched bool ao := bigtable.GetCondMutationResult(&matched) + ctx, cancel := btc.timeout(ctx) + defer cancel() + err := t.Apply(ctx, rowKey, c, ao) if err != nil { log.Printf("received error from Table.Apply: %v", err) @@ -902,6 +921,9 @@ func (s *goTestProxyServer) SampleRowKeys(ctx context.Context, req *pb.SampleRow }, } + ctx, cancel := btc.timeout(ctx) + defer cancel() + t := btc.c.Open(rrq.TableName) keys, err := t.SampleRowKeys(ctx) if err != nil { @@ -964,6 +986,10 @@ func (s *goTestProxyServer) ReadModifyWriteRow(ctx context.Context, req *pb.Read t := btc.c.Open(rrq.TableName) k := string(rrq.RowKey) + + ctx, cancel := btc.timeout(ctx) + defer cancel() + r, err := t.ApplyReadModifyWrite(ctx, k, rmw) if err != nil { return nil, err