From 8fe4dad687a19ed16c633fb54f53c8caaa430e50 Mon Sep 17 00:00:00 2001 From: Jason Del Ponte Date: Wed, 9 Oct 2019 15:31:26 -0700 Subject: [PATCH] Fix streaming APIs' Err method closing stream Fixes calling the Err method on SDK's Amazon Kinesis's SubscribeToShared and Amazon S3's SelectObjectContent response EventStream members closing the stream. This would cause unexpected read errors, or early termination of the streams. Only the Close method of the streaming members will close the streams. Improved generated unit and integration tests for streaming API operations. Related to #2769 --- .../service/restjsonservice/api.go | 8 +- .../restjsonservice/eventstream_test.go | 20 ++++ .../codegentest/service/restxmlservice/api.go | 8 +- .../restxmlservice/eventstream_test.go | 20 ++++ .../api/codegentest/service/rpcservice/api.go | 8 +- .../service/rpcservice/eventstream_test.go | 20 ++++ private/model/api/eventstream.go | 14 ++- service/kinesis/api.go | 4 +- service/kinesis/eventstream_test.go | 10 ++ service/s3/api.go | 4 +- service/s3/cust_integ_eventstream_test.go | 113 +++++++++++++++--- service/s3/cust_integ_shared_test.go | 4 +- service/s3/eventstream_test.go | 10 ++ 13 files changed, 206 insertions(+), 37 deletions(-) diff --git a/private/model/api/codegentest/service/restjsonservice/api.go b/private/model/api/codegentest/service/restjsonservice/api.go index c0117afbb96..7cdc21a791d 100644 --- a/private/model/api/codegentest/service/restjsonservice/api.go +++ b/private/model/api/codegentest/service/restjsonservice/api.go @@ -309,6 +309,8 @@ type EmptyStreamEventStream struct { // may result in resource leaks. func (es *EmptyStreamEventStream) Close() (err error) { es.Reader.Close() + es.StreamCloser.Close() + return es.Err() } @@ -318,8 +320,6 @@ func (es *EmptyStreamEventStream) Err() error { if err := es.Reader.Err(); err != nil { return err } - es.StreamCloser.Close() - return nil } @@ -720,6 +720,8 @@ type GetEventStreamEventStream struct { // may result in resource leaks. func (es *GetEventStreamEventStream) Close() (err error) { es.Reader.Close() + es.StreamCloser.Close() + return es.Err() } @@ -729,8 +731,6 @@ func (es *GetEventStreamEventStream) Err() error { if err := es.Reader.Err(); err != nil { return err } - es.StreamCloser.Close() - return nil } diff --git a/private/model/api/codegentest/service/restjsonservice/eventstream_test.go b/private/model/api/codegentest/service/restjsonservice/eventstream_test.go index cba6539ca8a..bf3f2c4914e 100644 --- a/private/model/api/codegentest/service/restjsonservice/eventstream_test.go +++ b/private/model/api/codegentest/service/restjsonservice/eventstream_test.go @@ -84,6 +84,16 @@ func TestEmptyStream_ReadClose(t *testing.T) { t.Fatalf("expect no error got, %v", err) } + // Assert calling Err before close does not close the stream. + resp.EventStream.Err() + select { + case _, ok := <-resp.EventStream.Events(): + if !ok { + t.Fatalf("expect stream not to be closed, but was") + } + default: + } + resp.EventStream.Close() <-resp.EventStream.Events() @@ -214,6 +224,16 @@ func TestGetEventStream_ReadClose(t *testing.T) { t.Fatalf("expect no error got, %v", err) } + // Assert calling Err before close does not close the stream. + resp.EventStream.Err() + select { + case _, ok := <-resp.EventStream.Events(): + if !ok { + t.Fatalf("expect stream not to be closed, but was") + } + default: + } + resp.EventStream.Close() <-resp.EventStream.Events() diff --git a/private/model/api/codegentest/service/restxmlservice/api.go b/private/model/api/codegentest/service/restxmlservice/api.go index 409e6a59576..b9b9b3ca01c 100644 --- a/private/model/api/codegentest/service/restxmlservice/api.go +++ b/private/model/api/codegentest/service/restxmlservice/api.go @@ -309,6 +309,8 @@ type EmptyStreamEventStream struct { // may result in resource leaks. func (es *EmptyStreamEventStream) Close() (err error) { es.Reader.Close() + es.StreamCloser.Close() + return es.Err() } @@ -318,8 +320,6 @@ func (es *EmptyStreamEventStream) Err() error { if err := es.Reader.Err(); err != nil { return err } - es.StreamCloser.Close() - return nil } @@ -720,6 +720,8 @@ type GetEventStreamEventStream struct { // may result in resource leaks. func (es *GetEventStreamEventStream) Close() (err error) { es.Reader.Close() + es.StreamCloser.Close() + return es.Err() } @@ -729,8 +731,6 @@ func (es *GetEventStreamEventStream) Err() error { if err := es.Reader.Err(); err != nil { return err } - es.StreamCloser.Close() - return nil } diff --git a/private/model/api/codegentest/service/restxmlservice/eventstream_test.go b/private/model/api/codegentest/service/restxmlservice/eventstream_test.go index bdb05860b5c..44019f529e8 100644 --- a/private/model/api/codegentest/service/restxmlservice/eventstream_test.go +++ b/private/model/api/codegentest/service/restxmlservice/eventstream_test.go @@ -84,6 +84,16 @@ func TestEmptyStream_ReadClose(t *testing.T) { t.Fatalf("expect no error got, %v", err) } + // Assert calling Err before close does not close the stream. + resp.EventStream.Err() + select { + case _, ok := <-resp.EventStream.Events(): + if !ok { + t.Fatalf("expect stream not to be closed, but was") + } + default: + } + resp.EventStream.Close() <-resp.EventStream.Events() @@ -214,6 +224,16 @@ func TestGetEventStream_ReadClose(t *testing.T) { t.Fatalf("expect no error got, %v", err) } + // Assert calling Err before close does not close the stream. + resp.EventStream.Err() + select { + case _, ok := <-resp.EventStream.Events(): + if !ok { + t.Fatalf("expect stream not to be closed, but was") + } + default: + } + resp.EventStream.Close() <-resp.EventStream.Events() diff --git a/private/model/api/codegentest/service/rpcservice/api.go b/private/model/api/codegentest/service/rpcservice/api.go index 66538328d69..b8f39f30ee9 100644 --- a/private/model/api/codegentest/service/rpcservice/api.go +++ b/private/model/api/codegentest/service/rpcservice/api.go @@ -311,6 +311,8 @@ type EmptyStreamEventStream struct { // may result in resource leaks. func (es *EmptyStreamEventStream) Close() (err error) { es.Reader.Close() + es.StreamCloser.Close() + return es.Err() } @@ -320,8 +322,6 @@ func (es *EmptyStreamEventStream) Err() error { if err := es.Reader.Err(); err != nil { return err } - es.StreamCloser.Close() - return nil } @@ -769,6 +769,8 @@ type GetEventStreamEventStream struct { // may result in resource leaks. func (es *GetEventStreamEventStream) Close() (err error) { es.Reader.Close() + es.StreamCloser.Close() + return es.Err() } @@ -778,8 +780,6 @@ func (es *GetEventStreamEventStream) Err() error { if err := es.Reader.Err(); err != nil { return err } - es.StreamCloser.Close() - return nil } diff --git a/private/model/api/codegentest/service/rpcservice/eventstream_test.go b/private/model/api/codegentest/service/rpcservice/eventstream_test.go index 8b87a54d00d..929715da576 100644 --- a/private/model/api/codegentest/service/rpcservice/eventstream_test.go +++ b/private/model/api/codegentest/service/rpcservice/eventstream_test.go @@ -86,6 +86,16 @@ func TestEmptyStream_ReadClose(t *testing.T) { t.Fatalf("expect no error got, %v", err) } + // Assert calling Err before close does not close the stream. + resp.EventStream.Err() + select { + case _, ok := <-resp.EventStream.Events(): + if !ok { + t.Fatalf("expect stream not to be closed, but was") + } + default: + } + resp.EventStream.Close() <-resp.EventStream.Events() @@ -238,6 +248,16 @@ func TestGetEventStream_ReadClose(t *testing.T) { t.Fatalf("expect no error got, %v", err) } + // Assert calling Err before close does not close the stream. + resp.AEventStreamRef.Err() + select { + case _, ok := <-resp.AEventStreamRef.Events(): + if !ok { + t.Fatalf("expect stream not to be closed, but was") + } + default: + } + resp.AEventStreamRef.Close() <-resp.AEventStreamRef.Events() diff --git a/private/model/api/eventstream.go b/private/model/api/eventstream.go index 25d8e391a9f..b8bc2db70c0 100644 --- a/private/model/api/eventstream.go +++ b/private/model/api/eventstream.go @@ -335,6 +335,8 @@ func (es *{{ $.ShapeName }}) Close() (err error) { es.Writer.Close() {{ end -}} + es.StreamCloser.Close() + return es.Err() } @@ -353,8 +355,6 @@ func (es *{{ $.ShapeName }}) Err() error { } {{ end -}} - es.StreamCloser.Close() - return nil } @@ -974,6 +974,16 @@ func (c *loopReader) Read(p []byte) (int, error) { t.Fatalf("expect no error got, %v", err) } + // Assert calling Err before close does not close the stream. + resp.{{ $esMemberName }}.Err() + select { + case _, ok := <-resp.{{ $esMemberName }}.Events(): + if !ok { + t.Fatalf("expect stream not to be closed, but was") + } + default: + } + resp.{{ $esMemberName }}.Close() <-resp.{{ $esMemberName }}.Events() diff --git a/service/kinesis/api.go b/service/kinesis/api.go index 888f7a4363f..b282cf645b7 100644 --- a/service/kinesis/api.go +++ b/service/kinesis/api.go @@ -7379,6 +7379,8 @@ type SubscribeToShardEventStream struct { // may result in resource leaks. func (es *SubscribeToShardEventStream) Close() (err error) { es.Reader.Close() + es.StreamCloser.Close() + return es.Err() } @@ -7388,8 +7390,6 @@ func (es *SubscribeToShardEventStream) Err() error { if err := es.Reader.Err(); err != nil { return err } - es.StreamCloser.Close() - return nil } diff --git a/service/kinesis/eventstream_test.go b/service/kinesis/eventstream_test.go index de2ae093ff4..798db08ea30 100644 --- a/service/kinesis/eventstream_test.go +++ b/service/kinesis/eventstream_test.go @@ -86,6 +86,16 @@ func TestSubscribeToShard_ReadClose(t *testing.T) { t.Fatalf("expect no error got, %v", err) } + // Assert calling Err before close does not close the stream. + resp.EventStream.Err() + select { + case _, ok := <-resp.EventStream.Events(): + if !ok { + t.Fatalf("expect stream not to be closed, but was") + } + default: + } + resp.EventStream.Close() <-resp.EventStream.Events() diff --git a/service/s3/api.go b/service/s3/api.go index b4a4e8c4ad7..2c2f204000c 100644 --- a/service/s3/api.go +++ b/service/s3/api.go @@ -22503,6 +22503,8 @@ type SelectObjectContentEventStream struct { // may result in resource leaks. func (es *SelectObjectContentEventStream) Close() (err error) { es.Reader.Close() + es.StreamCloser.Close() + return es.Err() } @@ -22512,8 +22514,6 @@ func (es *SelectObjectContentEventStream) Err() error { if err := es.Reader.Err(); err != nil { return err } - es.StreamCloser.Close() - return nil } diff --git a/service/s3/cust_integ_eventstream_test.go b/service/s3/cust_integ_eventstream_test.go index 53f6cc10197..067faab724e 100644 --- a/service/s3/cust_integ_eventstream_test.go +++ b/service/s3/cust_integ_eventstream_test.go @@ -5,21 +5,31 @@ package s3_test import ( "bytes" "encoding/csv" - "fmt" "io" - "os" - "path/filepath" "strings" "testing" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/internal/sdkio" "github.com/aws/aws-sdk-go/service/s3" ) func TestInteg_SelectObjectContent(t *testing.T) { keyName := "selectObject.csv" - putTestFile(t, filepath.Join("testdata", "positive_select.csv"), keyName) + + var header = []byte("A,B,C,D,E,F,G,H,I,J\n") + var recordRow = []byte("0,0,0.5,217.371,217.658,218.002,269.445,487.447,2.106,489.554\n") + + buf := make([]byte, 0, 6*sdkio.MebiByte) + buf = append(buf, []byte(header)...) + for i := 0; i < (cap(buf)/len(recordRow))-1; i++ { + buf = append(buf, recordRow...) + } + + // Put a mock CSV file to the S3 bucket so that its contents can be + // selected. + putTestContent(t, bytes.NewReader(buf), keyName) resp, err := svc.SelectObjectContent(&s3.SelectObjectContentInput{ Bucket: bucketName, @@ -43,15 +53,85 @@ func TestInteg_SelectObjectContent(t *testing.T) { } defer resp.EventStream.Close() + recReader, recWriter := io.Pipe() + var sum int64 var processed int64 - for event := range resp.EventStream.Events() { - switch tv := event.(type) { - case *s3.RecordsEvent: - sum += int64(len(tv.Payload)) - case *s3.StatsEvent: - processed = *tv.Details.BytesProcessed + + var gotEndEvent bool + go func(w *io.PipeWriter, resp *s3.SelectObjectContentOutput) { + defer recWriter.Close() + var numRecordEvents int64 + for event := range resp.EventStream.Events() { + switch tv := event.(type) { + case *s3.RecordsEvent: + n, err := recWriter.Write(tv.Payload) + if err != nil { + t.Logf("failed to write to record writer, %v, %v", n, err) + } + sum += int64(n) + numRecordEvents++ + case *s3.StatsEvent: + processed = *tv.Details.BytesProcessed + case *s3.EndEvent: + gotEndEvent = true + t.Logf("s3.EndEvent received") + } + } + t.Logf("received %d record events", numRecordEvents) + }(recWriter, resp) + + type Record []string + + records := make(chan []Record) + go func(r io.Reader, records chan<- []Record, batchSize int) { + defer close(records) + + csvReader := csv.NewReader(r) + var count int64 + + batch := make([]Record, 0, batchSize) + for { + count++ + record, err := csvReader.Read() + if err != nil { + if _, ok := err.(*csv.ParseError); ok { + t.Logf("failed to decode record row, %v, %v", count, err) + continue + } + if err != io.EOF { + t.Logf("csv decode failed, %v", err) + } + err = nil + break + } + batch = append(batch, record) + if len(batch) >= batchSize { + records <- batch + batch = batch[0:0] + } } + if len(batch) != 0 { + records <- batch + } + }(recReader, records, 10) + + var count int64 + for batch := range records { + // To simulate processing of a batch, add sleep delay. + count += int64(len(batch)) + + if err := resp.EventStream.Err(); err != nil { + t.Errorf("exect no error, got %v", err) + } + } + + if !gotEndEvent { + t.Errorf("expected EndEvent, did not receive") + } + + if e, a := int64(101474), count; e != a { + t.Errorf("expect %d records, got %d", e, a) } if sum == 0 { @@ -63,14 +143,14 @@ func TestInteg_SelectObjectContent(t *testing.T) { } if err := resp.EventStream.Err(); err != nil { - t.Fatalf("exect no error, %v", err) + t.Fatalf("expect no error, got %v", err) } } func TestInteg_SelectObjectContent_Error(t *testing.T) { keyName := "negativeSelect.csv" - buf := make([]byte, 0, 1024*1024*6) + buf := make([]byte, 0, 6*sdkio.MebiByte) buf = append(buf, []byte("name,number\n")...) line := []byte("jj,0\n") for i := 0; i < (cap(buf)/len(line))-2; i++ { @@ -154,8 +234,7 @@ gopher,0 }, }) if err != nil { - fmt.Fprintf(os.Stderr, "failed making API request, %v\n", err) - return + t.Fatalf("failed making API request, %v\n", err) } defer resp.EventStream.Close() @@ -167,7 +246,7 @@ gopher,0 case *s3.RecordsEvent: resultWriter.Write(e.Payload) case *s3.StatsEvent: - fmt.Printf("Processed %d bytes\n", *e.Details.BytesProcessed) + t.Logf("Processed %d bytes\n", *e.Details.BytesProcessed) } } }() @@ -179,10 +258,10 @@ gopher,0 if err == io.EOF { break } - fmt.Println(record) + t.Log(record) } if err := resp.EventStream.Err(); err != nil { - fmt.Fprintf(os.Stderr, "reading from event stream failed, %v\n", err) + t.Fatalf("reading from event stream failed, %v\n", err) } } diff --git a/service/s3/cust_integ_shared_test.go b/service/s3/cust_integ_shared_test.go index 937fdc8a23c..a7a424b7e78 100644 --- a/service/s3/cust_integ_shared_test.go +++ b/service/s3/cust_integ_shared_test.go @@ -53,10 +53,10 @@ func putTestFile(t *testing.T, filename, key string) { } func putTestContent(t *testing.T, reader io.ReadSeeker, key string) { - fmt.Println(bucketName, key, svc) + t.Logf("uploading test file %s/%s", *bucketName, key) _, err := svc.PutObject(&s3.PutObjectInput{ Bucket: bucketName, - Key: aws.String(key), + Key: &key, Body: reader, }) if err != nil { diff --git a/service/s3/eventstream_test.go b/service/s3/eventstream_test.go index 0863a4ff908..35f30565ecf 100644 --- a/service/s3/eventstream_test.go +++ b/service/s3/eventstream_test.go @@ -84,6 +84,16 @@ func TestSelectObjectContent_ReadClose(t *testing.T) { t.Fatalf("expect no error got, %v", err) } + // Assert calling Err before close does not close the stream. + resp.EventStream.Err() + select { + case _, ok := <-resp.EventStream.Events(): + if !ok { + t.Fatalf("expect stream not to be closed, but was") + } + default: + } + resp.EventStream.Close() <-resp.EventStream.Events()