Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

draft(storage/transfermanager): prototype #10045

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from

Conversation

BrennaEpp
Copy link
Contributor

No description provided.

@product-auto-label product-auto-label bot added the api: storage Issues related to the Cloud Storage API. label Apr 25, 2024
Copy link
Contributor

@tritone tritone left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few initial comments, overall looks like a good start

type Downloader struct {
client *storage.Client
config *transferManagerConfig
work chan *DownloadObjectInput // Piece of work to be executed.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Presumably this should be send-only and output should be receive?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are sending and receiving from both channels in different places in the downloader. Unidirectional channels could be used in subcomponents or if we were providing the channel to the user, but I don't see how we could implement this with unidirectional channels - if we only received from output, who would send us the output (and vice-versa for work)?

"google.golang.org/api/iterator"
)

// Downloader manages parallel download operations from a Cloud Storage bucket.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically bucket can be specified per object. Let's just say that it manages a set of parallelized downloads.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense, that wording is clearer.

}

// DownloadObject queues the download of a single object. If it's larger than
// the specified part size, the download will automatically be broken up into
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can leave off the part about sharding for the initial PR and add when we actually do that.

// This will initiate the download but is non-blocking; wait on Downloader.Results
// to process the result. Results will be split into individual objects.
// NOTE: Do not use, DownloadDirectory is not implemented.
func (d *Downloader) DownloadDirectory(ctx context.Context, input *DownloadDirectoryInput) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can leave this out of the PR for now.

// Choice of transport, etc is configured on the client that's passed in.
func NewDownloader(c *storage.Client, opts ...TransferManagerOption) (*Downloader, error) {
const (
chanBufferSize = 1000 // how big is it reasonable to make this?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be unbuffered and we should just buffer in a slice probably. Does that make sense to you?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure... does that mean that we'd have something in the background checking the slice and sending the work through?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah that sounds right. Background routine listens on results channel and writes to a slice (which functions as a queue but with no risk of blocking from hitting max length). Might make Next a little trickier though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, so I'm not sure if I follow your solution and how Next plays into it.

Rather than having a slice, I came up with a solution that uses errgroup. Let's take it offline to discuss pros and cons.

return
}

type DownloadDirectoryInput struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove for initial PR.


// Waits for all outstanding downloads to complete. The Downloader must not be
// used to download more objects or directories after this has been called.
func (d *Downloader) WaitAndClose() error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO this should return an error if any individual download returned an error. So, we'll have to collect those.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you thinking it will return an error collecting all errors that occurred, or just something that indicated that there was a failure somewhere?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can do the latter to start, but eventually I guess It would be nice to offer one large multierror as well

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, I'll mark it as a to-do for when we can use errors.Join. I think it makes sense to wait until then to avoid having to continue to support a custom multierror rather than the native one.

return crc32cHash.Sum32(), w.Close()
}

type testWriter struct {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doesn't have to be in this PR, but we should add a version of this (well, some kind of DownloaderBuffer that implements WriterAt) to the library.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I'd have to look more into that. This is a very barebones implementation that is likely not at all efficient (and doesn't really work as a WriterAt yet).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think that's fine for now.

// shards to transfer; that is, if the object is larger than this size, it will
// be uploaded or downloaded in concurrent pieces.
// The default is 32 MiB for downloads.
// NOTE: Sharding is not yet implemented.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Leave off this PR.

}

// Start workers in background.
for i := 0; i < d.config.numWorkers; i++ {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Presumably we could optimize this by spinning up workers as needed when there are objects enqueued? Doesn't have to be in this PR though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, though I'm not sure how much that would optimize this by... I guess it depends on the num of workers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah something we can test out later.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
api: storage Issues related to the Cloud Storage API.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants