diff --git a/CHANGELOG_PENDING.md b/CHANGELOG_PENDING.md index 8a1927a39c..0f7b6cf07e 100644 --- a/CHANGELOG_PENDING.md +++ b/CHANGELOG_PENDING.md @@ -3,3 +3,7 @@ ### SDK Enhancements ### SDK Bugs +* `service/s3`,`service/kinesis`: Fix streaming APIs' Err method closing stream ([#2882](https://github.com/aws/aws-sdk-go/pull/2882)) + * Fixes calling the Err method on SDK's Amazon Kinesis's SubscribeToShared and Amazon S3's SelectObjectContent response EventStream members closing the stream. This would cause unexpected read errors, or early termination of the streams. Only the Close method of the streaming members will close the streams. + * Related to [#2769](https://github.com/aws/aws-sdk-go/issues/2769) + diff --git a/private/model/api/codegentest/service/restjsonservice/api.go b/private/model/api/codegentest/service/restjsonservice/api.go index c0117afbb9..7cdc21a791 100644 --- a/private/model/api/codegentest/service/restjsonservice/api.go +++ b/private/model/api/codegentest/service/restjsonservice/api.go @@ -309,6 +309,8 @@ type EmptyStreamEventStream struct { // may result in resource leaks. func (es *EmptyStreamEventStream) Close() (err error) { es.Reader.Close() + es.StreamCloser.Close() + return es.Err() } @@ -318,8 +320,6 @@ func (es *EmptyStreamEventStream) Err() error { if err := es.Reader.Err(); err != nil { return err } - es.StreamCloser.Close() - return nil } @@ -720,6 +720,8 @@ type GetEventStreamEventStream struct { // may result in resource leaks. func (es *GetEventStreamEventStream) Close() (err error) { es.Reader.Close() + es.StreamCloser.Close() + return es.Err() } @@ -729,8 +731,6 @@ func (es *GetEventStreamEventStream) Err() error { if err := es.Reader.Err(); err != nil { return err } - es.StreamCloser.Close() - return nil } diff --git a/private/model/api/codegentest/service/restjsonservice/eventstream_test.go b/private/model/api/codegentest/service/restjsonservice/eventstream_test.go index cba6539ca8..b9a2023862 100644 --- a/private/model/api/codegentest/service/restjsonservice/eventstream_test.go +++ b/private/model/api/codegentest/service/restjsonservice/eventstream_test.go @@ -214,6 +214,16 @@ func TestGetEventStream_ReadClose(t *testing.T) { t.Fatalf("expect no error got, %v", err) } + // Assert calling Err before close does not close the stream. + resp.EventStream.Err() + select { + case _, ok := <-resp.EventStream.Events(): + if !ok { + t.Fatalf("expect stream not to be closed, but was") + } + default: + } + resp.EventStream.Close() <-resp.EventStream.Events() diff --git a/private/model/api/codegentest/service/restxmlservice/api.go b/private/model/api/codegentest/service/restxmlservice/api.go index 409e6a5957..b9b9b3ca01 100644 --- a/private/model/api/codegentest/service/restxmlservice/api.go +++ b/private/model/api/codegentest/service/restxmlservice/api.go @@ -309,6 +309,8 @@ type EmptyStreamEventStream struct { // may result in resource leaks. func (es *EmptyStreamEventStream) Close() (err error) { es.Reader.Close() + es.StreamCloser.Close() + return es.Err() } @@ -318,8 +320,6 @@ func (es *EmptyStreamEventStream) Err() error { if err := es.Reader.Err(); err != nil { return err } - es.StreamCloser.Close() - return nil } @@ -720,6 +720,8 @@ type GetEventStreamEventStream struct { // may result in resource leaks. func (es *GetEventStreamEventStream) Close() (err error) { es.Reader.Close() + es.StreamCloser.Close() + return es.Err() } @@ -729,8 +731,6 @@ func (es *GetEventStreamEventStream) Err() error { if err := es.Reader.Err(); err != nil { return err } - es.StreamCloser.Close() - return nil } diff --git a/private/model/api/codegentest/service/restxmlservice/eventstream_test.go b/private/model/api/codegentest/service/restxmlservice/eventstream_test.go index bdb05860b5..5d357b85d5 100644 --- a/private/model/api/codegentest/service/restxmlservice/eventstream_test.go +++ b/private/model/api/codegentest/service/restxmlservice/eventstream_test.go @@ -214,6 +214,16 @@ func TestGetEventStream_ReadClose(t *testing.T) { t.Fatalf("expect no error got, %v", err) } + // Assert calling Err before close does not close the stream. + resp.EventStream.Err() + select { + case _, ok := <-resp.EventStream.Events(): + if !ok { + t.Fatalf("expect stream not to be closed, but was") + } + default: + } + resp.EventStream.Close() <-resp.EventStream.Events() diff --git a/private/model/api/codegentest/service/rpcservice/api.go b/private/model/api/codegentest/service/rpcservice/api.go index 66538328d6..b8f39f30ee 100644 --- a/private/model/api/codegentest/service/rpcservice/api.go +++ b/private/model/api/codegentest/service/rpcservice/api.go @@ -311,6 +311,8 @@ type EmptyStreamEventStream struct { // may result in resource leaks. func (es *EmptyStreamEventStream) Close() (err error) { es.Reader.Close() + es.StreamCloser.Close() + return es.Err() } @@ -320,8 +322,6 @@ func (es *EmptyStreamEventStream) Err() error { if err := es.Reader.Err(); err != nil { return err } - es.StreamCloser.Close() - return nil } @@ -769,6 +769,8 @@ type GetEventStreamEventStream struct { // may result in resource leaks. func (es *GetEventStreamEventStream) Close() (err error) { es.Reader.Close() + es.StreamCloser.Close() + return es.Err() } @@ -778,8 +780,6 @@ func (es *GetEventStreamEventStream) Err() error { if err := es.Reader.Err(); err != nil { return err } - es.StreamCloser.Close() - return nil } diff --git a/private/model/api/codegentest/service/rpcservice/eventstream_test.go b/private/model/api/codegentest/service/rpcservice/eventstream_test.go index 8b87a54d00..f3169b4560 100644 --- a/private/model/api/codegentest/service/rpcservice/eventstream_test.go +++ b/private/model/api/codegentest/service/rpcservice/eventstream_test.go @@ -238,6 +238,16 @@ func TestGetEventStream_ReadClose(t *testing.T) { t.Fatalf("expect no error got, %v", err) } + // Assert calling Err before close does not close the stream. + resp.AEventStreamRef.Err() + select { + case _, ok := <-resp.AEventStreamRef.Events(): + if !ok { + t.Fatalf("expect stream not to be closed, but was") + } + default: + } + resp.AEventStreamRef.Close() <-resp.AEventStreamRef.Events() diff --git a/private/model/api/eventstream.go b/private/model/api/eventstream.go index 25d8e391a9..34be423441 100644 --- a/private/model/api/eventstream.go +++ b/private/model/api/eventstream.go @@ -208,7 +208,7 @@ func setupEventStream(topShape *Shape) *EventStream { eventRef.Shape.EventFor = append(eventRef.Shape.EventFor, eventStream) // Exceptions and events are two different lists to allow the SDK - // to easly generate code with the two handled differently. + // to easily generate code with the two handled differently. event := &Event{ Name: eventRefName, Shape: eventRef.Shape, @@ -335,6 +335,8 @@ func (es *{{ $.ShapeName }}) Close() (err error) { es.Writer.Close() {{ end -}} + es.StreamCloser.Close() + return es.Err() } @@ -353,8 +355,6 @@ func (es *{{ $.ShapeName }}) Err() error { } {{ end -}} - es.StreamCloser.Close() - return nil } @@ -974,6 +974,18 @@ func (c *loopReader) Read(p []byte) (int, error) { t.Fatalf("expect no error got, %v", err) } + {{ if gt (len $.Inbound.Events) 0 -}} + // Assert calling Err before close does not close the stream. + resp.{{ $esMemberName }}.Err() + select { + case _, ok := <-resp.{{ $esMemberName }}.Events(): + if !ok { + t.Fatalf("expect stream not to be closed, but was") + } + default: + } + {{- end }} + resp.{{ $esMemberName }}.Close() <-resp.{{ $esMemberName }}.Events() diff --git a/service/kinesis/api.go b/service/kinesis/api.go index 888f7a4363..b282cf645b 100644 --- a/service/kinesis/api.go +++ b/service/kinesis/api.go @@ -7379,6 +7379,8 @@ type SubscribeToShardEventStream struct { // may result in resource leaks. func (es *SubscribeToShardEventStream) Close() (err error) { es.Reader.Close() + es.StreamCloser.Close() + return es.Err() } @@ -7388,8 +7390,6 @@ func (es *SubscribeToShardEventStream) Err() error { if err := es.Reader.Err(); err != nil { return err } - es.StreamCloser.Close() - return nil } diff --git a/service/kinesis/eventstream_test.go b/service/kinesis/eventstream_test.go index de2ae093ff..798db08ea3 100644 --- a/service/kinesis/eventstream_test.go +++ b/service/kinesis/eventstream_test.go @@ -86,6 +86,16 @@ func TestSubscribeToShard_ReadClose(t *testing.T) { t.Fatalf("expect no error got, %v", err) } + // Assert calling Err before close does not close the stream. + resp.EventStream.Err() + select { + case _, ok := <-resp.EventStream.Events(): + if !ok { + t.Fatalf("expect stream not to be closed, but was") + } + default: + } + resp.EventStream.Close() <-resp.EventStream.Events() diff --git a/service/s3/api.go b/service/s3/api.go index b4a4e8c4ad..2c2f204000 100644 --- a/service/s3/api.go +++ b/service/s3/api.go @@ -22503,6 +22503,8 @@ type SelectObjectContentEventStream struct { // may result in resource leaks. func (es *SelectObjectContentEventStream) Close() (err error) { es.Reader.Close() + es.StreamCloser.Close() + return es.Err() } @@ -22512,8 +22514,6 @@ func (es *SelectObjectContentEventStream) Err() error { if err := es.Reader.Err(); err != nil { return err } - es.StreamCloser.Close() - return nil } diff --git a/service/s3/cust_integ_eventstream_test.go b/service/s3/cust_integ_eventstream_test.go index 53f6cc1019..067faab724 100644 --- a/service/s3/cust_integ_eventstream_test.go +++ b/service/s3/cust_integ_eventstream_test.go @@ -5,21 +5,31 @@ package s3_test import ( "bytes" "encoding/csv" - "fmt" "io" - "os" - "path/filepath" "strings" "testing" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/internal/sdkio" "github.com/aws/aws-sdk-go/service/s3" ) func TestInteg_SelectObjectContent(t *testing.T) { keyName := "selectObject.csv" - putTestFile(t, filepath.Join("testdata", "positive_select.csv"), keyName) + + var header = []byte("A,B,C,D,E,F,G,H,I,J\n") + var recordRow = []byte("0,0,0.5,217.371,217.658,218.002,269.445,487.447,2.106,489.554\n") + + buf := make([]byte, 0, 6*sdkio.MebiByte) + buf = append(buf, []byte(header)...) + for i := 0; i < (cap(buf)/len(recordRow))-1; i++ { + buf = append(buf, recordRow...) + } + + // Put a mock CSV file to the S3 bucket so that its contents can be + // selected. + putTestContent(t, bytes.NewReader(buf), keyName) resp, err := svc.SelectObjectContent(&s3.SelectObjectContentInput{ Bucket: bucketName, @@ -43,15 +53,85 @@ func TestInteg_SelectObjectContent(t *testing.T) { } defer resp.EventStream.Close() + recReader, recWriter := io.Pipe() + var sum int64 var processed int64 - for event := range resp.EventStream.Events() { - switch tv := event.(type) { - case *s3.RecordsEvent: - sum += int64(len(tv.Payload)) - case *s3.StatsEvent: - processed = *tv.Details.BytesProcessed + + var gotEndEvent bool + go func(w *io.PipeWriter, resp *s3.SelectObjectContentOutput) { + defer recWriter.Close() + var numRecordEvents int64 + for event := range resp.EventStream.Events() { + switch tv := event.(type) { + case *s3.RecordsEvent: + n, err := recWriter.Write(tv.Payload) + if err != nil { + t.Logf("failed to write to record writer, %v, %v", n, err) + } + sum += int64(n) + numRecordEvents++ + case *s3.StatsEvent: + processed = *tv.Details.BytesProcessed + case *s3.EndEvent: + gotEndEvent = true + t.Logf("s3.EndEvent received") + } + } + t.Logf("received %d record events", numRecordEvents) + }(recWriter, resp) + + type Record []string + + records := make(chan []Record) + go func(r io.Reader, records chan<- []Record, batchSize int) { + defer close(records) + + csvReader := csv.NewReader(r) + var count int64 + + batch := make([]Record, 0, batchSize) + for { + count++ + record, err := csvReader.Read() + if err != nil { + if _, ok := err.(*csv.ParseError); ok { + t.Logf("failed to decode record row, %v, %v", count, err) + continue + } + if err != io.EOF { + t.Logf("csv decode failed, %v", err) + } + err = nil + break + } + batch = append(batch, record) + if len(batch) >= batchSize { + records <- batch + batch = batch[0:0] + } } + if len(batch) != 0 { + records <- batch + } + }(recReader, records, 10) + + var count int64 + for batch := range records { + // To simulate processing of a batch, add sleep delay. + count += int64(len(batch)) + + if err := resp.EventStream.Err(); err != nil { + t.Errorf("exect no error, got %v", err) + } + } + + if !gotEndEvent { + t.Errorf("expected EndEvent, did not receive") + } + + if e, a := int64(101474), count; e != a { + t.Errorf("expect %d records, got %d", e, a) } if sum == 0 { @@ -63,14 +143,14 @@ func TestInteg_SelectObjectContent(t *testing.T) { } if err := resp.EventStream.Err(); err != nil { - t.Fatalf("exect no error, %v", err) + t.Fatalf("expect no error, got %v", err) } } func TestInteg_SelectObjectContent_Error(t *testing.T) { keyName := "negativeSelect.csv" - buf := make([]byte, 0, 1024*1024*6) + buf := make([]byte, 0, 6*sdkio.MebiByte) buf = append(buf, []byte("name,number\n")...) line := []byte("jj,0\n") for i := 0; i < (cap(buf)/len(line))-2; i++ { @@ -154,8 +234,7 @@ gopher,0 }, }) if err != nil { - fmt.Fprintf(os.Stderr, "failed making API request, %v\n", err) - return + t.Fatalf("failed making API request, %v\n", err) } defer resp.EventStream.Close() @@ -167,7 +246,7 @@ gopher,0 case *s3.RecordsEvent: resultWriter.Write(e.Payload) case *s3.StatsEvent: - fmt.Printf("Processed %d bytes\n", *e.Details.BytesProcessed) + t.Logf("Processed %d bytes\n", *e.Details.BytesProcessed) } } }() @@ -179,10 +258,10 @@ gopher,0 if err == io.EOF { break } - fmt.Println(record) + t.Log(record) } if err := resp.EventStream.Err(); err != nil { - fmt.Fprintf(os.Stderr, "reading from event stream failed, %v\n", err) + t.Fatalf("reading from event stream failed, %v\n", err) } } diff --git a/service/s3/cust_integ_shared_test.go b/service/s3/cust_integ_shared_test.go index 937fdc8a23..a7a424b7e7 100644 --- a/service/s3/cust_integ_shared_test.go +++ b/service/s3/cust_integ_shared_test.go @@ -53,10 +53,10 @@ func putTestFile(t *testing.T, filename, key string) { } func putTestContent(t *testing.T, reader io.ReadSeeker, key string) { - fmt.Println(bucketName, key, svc) + t.Logf("uploading test file %s/%s", *bucketName, key) _, err := svc.PutObject(&s3.PutObjectInput{ Bucket: bucketName, - Key: aws.String(key), + Key: &key, Body: reader, }) if err != nil { diff --git a/service/s3/eventstream_test.go b/service/s3/eventstream_test.go index 0863a4ff90..35f30565ec 100644 --- a/service/s3/eventstream_test.go +++ b/service/s3/eventstream_test.go @@ -84,6 +84,16 @@ func TestSelectObjectContent_ReadClose(t *testing.T) { t.Fatalf("expect no error got, %v", err) } + // Assert calling Err before close does not close the stream. + resp.EventStream.Err() + select { + case _, ok := <-resp.EventStream.Events(): + if !ok { + t.Fatalf("expect stream not to be closed, but was") + } + default: + } + resp.EventStream.Close() <-resp.EventStream.Events()