Skip to content

Commit

Permalink
chore(storage): implement OpenWriter, refactor gRPC write (#6099)
Browse files Browse the repository at this point in the history
  • Loading branch information
noahdietz committed Jun 13, 2022
1 parent 5377d62 commit 3c0e4e0
Show file tree
Hide file tree
Showing 6 changed files with 590 additions and 374 deletions.
52 changes: 51 additions & 1 deletion storage/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ package storage

import (
"context"
"io"
"time"

gax "github.com/googleapis/gax-go/v2"
"google.golang.org/api/option"
Expand Down Expand Up @@ -84,7 +86,7 @@ type storageClient interface {
RewriteObject(ctx context.Context, req *rewriteObjectRequest, opts ...storageOption) (*rewriteObjectResponse, error)

NewRangeReader(ctx context.Context, params *newRangeReaderParams, opts ...storageOption) (*Reader, error)
OpenWriter(ctx context.Context, w *Writer, opts ...storageOption) error
OpenWriter(params *openWriterParams, opts ...storageOption) (*io.PipeWriter, error)

// IAM methods.

Expand Down Expand Up @@ -211,6 +213,54 @@ type userProjectOption struct {

func (o *userProjectOption) Apply(s *settings) { s.userProject = o.project }

type openWriterParams struct {
// Writer configuration

// ctx is the context used by the writer routine to make all network calls
// and to manage the writer routine - see `Writer.ctx`.
// Required.
ctx context.Context
// chunkSize - see `Writer.ChunkSize`.
// Optional.
chunkSize int
// chunkRetryDeadline - see `Writer.ChunkRetryDeadline`.
// Optional.
chunkRetryDeadline time.Duration

// Object/request properties

// bucket - see `Writer.o.bucket`.
// Required.
bucket string
// attrs - see `Writer.ObjectAttrs`.
// Required.
attrs *ObjectAttrs
// conds - see `Writer.o.conds`.
// Optional.
conds *Conditions
// encryptionKey - see `Writer.o.encryptionKey`
// Optional.
encryptionKey []byte
// sendCRC32C - see `Writer.SendCRC32C`.
// Optional.
sendCRC32C bool

// Writer callbacks

// donec - see `Writer.donec`.
// Required.
donec chan struct{}
// setError callback for reporting errors - see `Writer.error`.
// Required.
setError func(error)
// progress callback for reporting upload progress - see `Writer.progress`.
// Required.
progress func(int64)
// setObj callback for reporting the resulting object - see `Writer.obj`.
// Required.
setObj func(*ObjectAttrs)
}

type newRangeReaderParams struct {
bucket string
conds *Conditions
Expand Down
62 changes: 62 additions & 0 deletions storage/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,68 @@ func TestOpenReaderEmulated(t *testing.T) {
})
}

func TestOpenWriterEmulated(t *testing.T) {
transportClientTest(t, func(t *testing.T, project, bucket string, client storageClient) {
// Populate test data.
_, err := client.CreateBucket(context.Background(), project, &BucketAttrs{
Name: bucket,
})
if err != nil {
t.Fatalf("client.CreateBucket: %v", err)
}
prefix := time.Now().Nanosecond()
want := &ObjectAttrs{
Bucket: bucket,
Name: fmt.Sprintf("%d-object-%d", prefix, time.Now().Nanosecond()),
Generation: defaultGen,
}

var gotAttrs *ObjectAttrs
params := &openWriterParams{
attrs: want,
bucket: bucket,
ctx: context.Background(),
donec: make(chan struct{}),
setError: func(_ error) {}, // no-op
progress: func(_ int64) {}, // no-op
setObj: func(o *ObjectAttrs) { gotAttrs = o },
}
pw, err := client.OpenWriter(params)
if err != nil {
t.Fatalf("failed to open writer: %v", err)
}
if _, err := pw.Write(randomBytesToWrite); err != nil {
t.Fatalf("failed to populate test data: %v", err)
}
if err := pw.Close(); err != nil {
t.Fatalf("closing object: %v", err)
}
select {
case <-params.donec:
}
if gotAttrs == nil {
t.Fatalf("Writer finished, but resulting object wasn't set")
}
if diff := cmp.Diff(gotAttrs.Name, want.Name); diff != "" {
t.Fatalf("Resulting object name: got(-),want(+):\n%s", diff)
}

r, err := veneerClient.Bucket(bucket).Object(want.Name).NewReader(context.Background())
if err != nil {
t.Fatalf("opening reading: %v", err)
}
wantLen := len(randomBytesToWrite)
got := make([]byte, wantLen)
n, err := r.Read(got)
if n != wantLen {
t.Fatalf("expected to read %d bytes, but got %d", wantLen, n)
}
if diff := cmp.Diff(got, randomBytesToWrite); diff != "" {
t.Fatalf("checking written content: got(-),want(+):\n%s", diff)
}
})
}

func initEmulatorClients() func() error {
noopCloser := func() error { return nil }
if !isEmulatorEnvironmentSet() {
Expand Down

0 comments on commit 3c0e4e0

Please sign in to comment.