Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[bug] - Handle error when scanning s3 bucket. #969

Merged
merged 7 commits into from Dec 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
12 changes: 6 additions & 6 deletions pkg/sources/s3/s3.go
Expand Up @@ -108,20 +108,21 @@ func (s *Source) newClient(region string) (*s3.S3, error) {

// Chunks emits chunks of bytes over a channel.
func (s *Source) Chunks(ctx context.Context, chunksChan chan *sources.Chunk) error {
client, err := s.newClient("us-east-1")
const defaultAWSRegion = "us-east-1"

client, err := s.newClient(defaultAWSRegion)
if err != nil {
return errors.WrapPrefix(err, "could not create s3 client", 0)
}

bucketsToScan := []string{}
var bucketsToScan []string

switch s.conn.GetCredential().(type) {
case *sourcespb.S3_AccessKey, *sourcespb.S3_CloudEnvironment:
if len(s.conn.Buckets) == 0 {
res, err := client.ListBuckets(&s3.ListBucketsInput{})
if err != nil {
s.log.Error(err, "could not list s3 buckets")
return errors.WrapPrefix(err, "could not list s3 buckets", 0)
return fmt.Errorf("could not list s3 buckets: %w", err)
}
buckets := res.Buckets
for _, bucket := range buckets {
Expand Down Expand Up @@ -150,7 +151,7 @@ func (s *Source) Chunks(ctx context.Context, chunksChan chan *sources.Chunk) err
continue
}
var regionalClient *s3.S3
if region != "us-east-1" {
if region != defaultAWSRegion {
regionalClient, err = s.newClient(region)
if err != nil {
s.log.Error(err, "could not make regional s3 client")
Expand All @@ -170,7 +171,6 @@ func (s *Source) Chunks(ctx context.Context, chunksChan chan *sources.Chunk) err

if err != nil {
s.log.Error(err, "could not list objects in s3 bucket", "bucket", bucket)
return errors.WrapPrefix(err, fmt.Sprintf("could not list objects in s3 bucket: %s", bucket), 0)
}
}
s.SetProgressComplete(len(bucketsToScan), len(bucketsToScan), fmt.Sprintf("Completed scanning source %s", s.name), "")
Expand Down
2 changes: 1 addition & 1 deletion pkg/sources/s3/s3_test.go
Expand Up @@ -75,7 +75,7 @@ func TestSource_Chunks(t *testing.T) {
t.Fatal(err)
}

err = s.Init(ctx, tt.init.name, 0, 0, tt.init.verify, conn, 10)
err = s.Init(ctx, tt.init.name, 0, 0, tt.init.verify, conn, 8)
if (err != nil) != tt.wantErr {
t.Errorf("Source.Init() error = %v, wantErr %v", err, tt.wantErr)
return
Expand Down