diff --git a/private/model/api/codegentest/service/restjsonservice/api.go b/private/model/api/codegentest/service/restjsonservice/api.go index c0117afbb9..7cdc21a791 100644 --- a/private/model/api/codegentest/service/restjsonservice/api.go +++ b/private/model/api/codegentest/service/restjsonservice/api.go @@ -309,6 +309,8 @@ type EmptyStreamEventStream struct { // may result in resource leaks. func (es *EmptyStreamEventStream) Close() (err error) { es.Reader.Close() + es.StreamCloser.Close() + return es.Err() } @@ -318,8 +320,6 @@ func (es *EmptyStreamEventStream) Err() error { if err := es.Reader.Err(); err != nil { return err } - es.StreamCloser.Close() - return nil } @@ -720,6 +720,8 @@ type GetEventStreamEventStream struct { // may result in resource leaks. func (es *GetEventStreamEventStream) Close() (err error) { es.Reader.Close() + es.StreamCloser.Close() + return es.Err() } @@ -729,8 +731,6 @@ func (es *GetEventStreamEventStream) Err() error { if err := es.Reader.Err(); err != nil { return err } - es.StreamCloser.Close() - return nil } diff --git a/private/model/api/codegentest/service/restjsonservice/eventstream_test.go b/private/model/api/codegentest/service/restjsonservice/eventstream_test.go index cba6539ca8..bf3f2c4914 100644 --- a/private/model/api/codegentest/service/restjsonservice/eventstream_test.go +++ b/private/model/api/codegentest/service/restjsonservice/eventstream_test.go @@ -84,6 +84,16 @@ func TestEmptyStream_ReadClose(t *testing.T) { t.Fatalf("expect no error got, %v", err) } + // Assert calling Err before close does not close the stream. + resp.EventStream.Err() + select { + case _, ok := <-resp.EventStream.Events(): + if !ok { + t.Fatalf("expect stream not to be closed, but was") + } + default: + } + resp.EventStream.Close() <-resp.EventStream.Events() @@ -214,6 +224,16 @@ func TestGetEventStream_ReadClose(t *testing.T) { t.Fatalf("expect no error got, %v", err) } + // Assert calling Err before close does not close the stream. + resp.EventStream.Err() + select { + case _, ok := <-resp.EventStream.Events(): + if !ok { + t.Fatalf("expect stream not to be closed, but was") + } + default: + } + resp.EventStream.Close() <-resp.EventStream.Events() diff --git a/private/model/api/codegentest/service/restxmlservice/api.go b/private/model/api/codegentest/service/restxmlservice/api.go index 409e6a5957..b9b9b3ca01 100644 --- a/private/model/api/codegentest/service/restxmlservice/api.go +++ b/private/model/api/codegentest/service/restxmlservice/api.go @@ -309,6 +309,8 @@ type EmptyStreamEventStream struct { // may result in resource leaks. func (es *EmptyStreamEventStream) Close() (err error) { es.Reader.Close() + es.StreamCloser.Close() + return es.Err() } @@ -318,8 +320,6 @@ func (es *EmptyStreamEventStream) Err() error { if err := es.Reader.Err(); err != nil { return err } - es.StreamCloser.Close() - return nil } @@ -720,6 +720,8 @@ type GetEventStreamEventStream struct { // may result in resource leaks. func (es *GetEventStreamEventStream) Close() (err error) { es.Reader.Close() + es.StreamCloser.Close() + return es.Err() } @@ -729,8 +731,6 @@ func (es *GetEventStreamEventStream) Err() error { if err := es.Reader.Err(); err != nil { return err } - es.StreamCloser.Close() - return nil } diff --git a/private/model/api/codegentest/service/restxmlservice/eventstream_test.go b/private/model/api/codegentest/service/restxmlservice/eventstream_test.go index bdb05860b5..44019f529e 100644 --- a/private/model/api/codegentest/service/restxmlservice/eventstream_test.go +++ b/private/model/api/codegentest/service/restxmlservice/eventstream_test.go @@ -84,6 +84,16 @@ func TestEmptyStream_ReadClose(t *testing.T) { t.Fatalf("expect no error got, %v", err) } + // Assert calling Err before close does not close the stream. + resp.EventStream.Err() + select { + case _, ok := <-resp.EventStream.Events(): + if !ok { + t.Fatalf("expect stream not to be closed, but was") + } + default: + } + resp.EventStream.Close() <-resp.EventStream.Events() @@ -214,6 +224,16 @@ func TestGetEventStream_ReadClose(t *testing.T) { t.Fatalf("expect no error got, %v", err) } + // Assert calling Err before close does not close the stream. + resp.EventStream.Err() + select { + case _, ok := <-resp.EventStream.Events(): + if !ok { + t.Fatalf("expect stream not to be closed, but was") + } + default: + } + resp.EventStream.Close() <-resp.EventStream.Events() diff --git a/private/model/api/codegentest/service/rpcservice/api.go b/private/model/api/codegentest/service/rpcservice/api.go index 66538328d6..b8f39f30ee 100644 --- a/private/model/api/codegentest/service/rpcservice/api.go +++ b/private/model/api/codegentest/service/rpcservice/api.go @@ -311,6 +311,8 @@ type EmptyStreamEventStream struct { // may result in resource leaks. func (es *EmptyStreamEventStream) Close() (err error) { es.Reader.Close() + es.StreamCloser.Close() + return es.Err() } @@ -320,8 +322,6 @@ func (es *EmptyStreamEventStream) Err() error { if err := es.Reader.Err(); err != nil { return err } - es.StreamCloser.Close() - return nil } @@ -769,6 +769,8 @@ type GetEventStreamEventStream struct { // may result in resource leaks. func (es *GetEventStreamEventStream) Close() (err error) { es.Reader.Close() + es.StreamCloser.Close() + return es.Err() } @@ -778,8 +780,6 @@ func (es *GetEventStreamEventStream) Err() error { if err := es.Reader.Err(); err != nil { return err } - es.StreamCloser.Close() - return nil } diff --git a/private/model/api/codegentest/service/rpcservice/eventstream_test.go b/private/model/api/codegentest/service/rpcservice/eventstream_test.go index 8b87a54d00..929715da57 100644 --- a/private/model/api/codegentest/service/rpcservice/eventstream_test.go +++ b/private/model/api/codegentest/service/rpcservice/eventstream_test.go @@ -86,6 +86,16 @@ func TestEmptyStream_ReadClose(t *testing.T) { t.Fatalf("expect no error got, %v", err) } + // Assert calling Err before close does not close the stream. + resp.EventStream.Err() + select { + case _, ok := <-resp.EventStream.Events(): + if !ok { + t.Fatalf("expect stream not to be closed, but was") + } + default: + } + resp.EventStream.Close() <-resp.EventStream.Events() @@ -238,6 +248,16 @@ func TestGetEventStream_ReadClose(t *testing.T) { t.Fatalf("expect no error got, %v", err) } + // Assert calling Err before close does not close the stream. + resp.AEventStreamRef.Err() + select { + case _, ok := <-resp.AEventStreamRef.Events(): + if !ok { + t.Fatalf("expect stream not to be closed, but was") + } + default: + } + resp.AEventStreamRef.Close() <-resp.AEventStreamRef.Events() diff --git a/private/model/api/eventstream.go b/private/model/api/eventstream.go index 25d8e391a9..b8bc2db70c 100644 --- a/private/model/api/eventstream.go +++ b/private/model/api/eventstream.go @@ -335,6 +335,8 @@ func (es *{{ $.ShapeName }}) Close() (err error) { es.Writer.Close() {{ end -}} + es.StreamCloser.Close() + return es.Err() } @@ -353,8 +355,6 @@ func (es *{{ $.ShapeName }}) Err() error { } {{ end -}} - es.StreamCloser.Close() - return nil } @@ -974,6 +974,16 @@ func (c *loopReader) Read(p []byte) (int, error) { t.Fatalf("expect no error got, %v", err) } + // Assert calling Err before close does not close the stream. + resp.{{ $esMemberName }}.Err() + select { + case _, ok := <-resp.{{ $esMemberName }}.Events(): + if !ok { + t.Fatalf("expect stream not to be closed, but was") + } + default: + } + resp.{{ $esMemberName }}.Close() <-resp.{{ $esMemberName }}.Events() diff --git a/service/kinesis/api.go b/service/kinesis/api.go index 888f7a4363..b282cf645b 100644 --- a/service/kinesis/api.go +++ b/service/kinesis/api.go @@ -7379,6 +7379,8 @@ type SubscribeToShardEventStream struct { // may result in resource leaks. func (es *SubscribeToShardEventStream) Close() (err error) { es.Reader.Close() + es.StreamCloser.Close() + return es.Err() } @@ -7388,8 +7390,6 @@ func (es *SubscribeToShardEventStream) Err() error { if err := es.Reader.Err(); err != nil { return err } - es.StreamCloser.Close() - return nil } diff --git a/service/kinesis/eventstream_test.go b/service/kinesis/eventstream_test.go index de2ae093ff..798db08ea3 100644 --- a/service/kinesis/eventstream_test.go +++ b/service/kinesis/eventstream_test.go @@ -86,6 +86,16 @@ func TestSubscribeToShard_ReadClose(t *testing.T) { t.Fatalf("expect no error got, %v", err) } + // Assert calling Err before close does not close the stream. + resp.EventStream.Err() + select { + case _, ok := <-resp.EventStream.Events(): + if !ok { + t.Fatalf("expect stream not to be closed, but was") + } + default: + } + resp.EventStream.Close() <-resp.EventStream.Events() diff --git a/service/s3/api.go b/service/s3/api.go index b4a4e8c4ad..2c2f204000 100644 --- a/service/s3/api.go +++ b/service/s3/api.go @@ -22503,6 +22503,8 @@ type SelectObjectContentEventStream struct { // may result in resource leaks. func (es *SelectObjectContentEventStream) Close() (err error) { es.Reader.Close() + es.StreamCloser.Close() + return es.Err() } @@ -22512,8 +22514,6 @@ func (es *SelectObjectContentEventStream) Err() error { if err := es.Reader.Err(); err != nil { return err } - es.StreamCloser.Close() - return nil } diff --git a/service/s3/cust_integ_eventstream_test.go b/service/s3/cust_integ_eventstream_test.go index 53f6cc1019..067faab724 100644 --- a/service/s3/cust_integ_eventstream_test.go +++ b/service/s3/cust_integ_eventstream_test.go @@ -5,21 +5,31 @@ package s3_test import ( "bytes" "encoding/csv" - "fmt" "io" - "os" - "path/filepath" "strings" "testing" "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/internal/sdkio" "github.com/aws/aws-sdk-go/service/s3" ) func TestInteg_SelectObjectContent(t *testing.T) { keyName := "selectObject.csv" - putTestFile(t, filepath.Join("testdata", "positive_select.csv"), keyName) + + var header = []byte("A,B,C,D,E,F,G,H,I,J\n") + var recordRow = []byte("0,0,0.5,217.371,217.658,218.002,269.445,487.447,2.106,489.554\n") + + buf := make([]byte, 0, 6*sdkio.MebiByte) + buf = append(buf, []byte(header)...) + for i := 0; i < (cap(buf)/len(recordRow))-1; i++ { + buf = append(buf, recordRow...) + } + + // Put a mock CSV file to the S3 bucket so that its contents can be + // selected. + putTestContent(t, bytes.NewReader(buf), keyName) resp, err := svc.SelectObjectContent(&s3.SelectObjectContentInput{ Bucket: bucketName, @@ -43,15 +53,85 @@ func TestInteg_SelectObjectContent(t *testing.T) { } defer resp.EventStream.Close() + recReader, recWriter := io.Pipe() + var sum int64 var processed int64 - for event := range resp.EventStream.Events() { - switch tv := event.(type) { - case *s3.RecordsEvent: - sum += int64(len(tv.Payload)) - case *s3.StatsEvent: - processed = *tv.Details.BytesProcessed + + var gotEndEvent bool + go func(w *io.PipeWriter, resp *s3.SelectObjectContentOutput) { + defer recWriter.Close() + var numRecordEvents int64 + for event := range resp.EventStream.Events() { + switch tv := event.(type) { + case *s3.RecordsEvent: + n, err := recWriter.Write(tv.Payload) + if err != nil { + t.Logf("failed to write to record writer, %v, %v", n, err) + } + sum += int64(n) + numRecordEvents++ + case *s3.StatsEvent: + processed = *tv.Details.BytesProcessed + case *s3.EndEvent: + gotEndEvent = true + t.Logf("s3.EndEvent received") + } + } + t.Logf("received %d record events", numRecordEvents) + }(recWriter, resp) + + type Record []string + + records := make(chan []Record) + go func(r io.Reader, records chan<- []Record, batchSize int) { + defer close(records) + + csvReader := csv.NewReader(r) + var count int64 + + batch := make([]Record, 0, batchSize) + for { + count++ + record, err := csvReader.Read() + if err != nil { + if _, ok := err.(*csv.ParseError); ok { + t.Logf("failed to decode record row, %v, %v", count, err) + continue + } + if err != io.EOF { + t.Logf("csv decode failed, %v", err) + } + err = nil + break + } + batch = append(batch, record) + if len(batch) >= batchSize { + records <- batch + batch = batch[0:0] + } } + if len(batch) != 0 { + records <- batch + } + }(recReader, records, 10) + + var count int64 + for batch := range records { + // To simulate processing of a batch, add sleep delay. + count += int64(len(batch)) + + if err := resp.EventStream.Err(); err != nil { + t.Errorf("exect no error, got %v", err) + } + } + + if !gotEndEvent { + t.Errorf("expected EndEvent, did not receive") + } + + if e, a := int64(101474), count; e != a { + t.Errorf("expect %d records, got %d", e, a) } if sum == 0 { @@ -63,14 +143,14 @@ func TestInteg_SelectObjectContent(t *testing.T) { } if err := resp.EventStream.Err(); err != nil { - t.Fatalf("exect no error, %v", err) + t.Fatalf("expect no error, got %v", err) } } func TestInteg_SelectObjectContent_Error(t *testing.T) { keyName := "negativeSelect.csv" - buf := make([]byte, 0, 1024*1024*6) + buf := make([]byte, 0, 6*sdkio.MebiByte) buf = append(buf, []byte("name,number\n")...) line := []byte("jj,0\n") for i := 0; i < (cap(buf)/len(line))-2; i++ { @@ -154,8 +234,7 @@ gopher,0 }, }) if err != nil { - fmt.Fprintf(os.Stderr, "failed making API request, %v\n", err) - return + t.Fatalf("failed making API request, %v\n", err) } defer resp.EventStream.Close() @@ -167,7 +246,7 @@ gopher,0 case *s3.RecordsEvent: resultWriter.Write(e.Payload) case *s3.StatsEvent: - fmt.Printf("Processed %d bytes\n", *e.Details.BytesProcessed) + t.Logf("Processed %d bytes\n", *e.Details.BytesProcessed) } } }() @@ -179,10 +258,10 @@ gopher,0 if err == io.EOF { break } - fmt.Println(record) + t.Log(record) } if err := resp.EventStream.Err(); err != nil { - fmt.Fprintf(os.Stderr, "reading from event stream failed, %v\n", err) + t.Fatalf("reading from event stream failed, %v\n", err) } } diff --git a/service/s3/cust_integ_shared_test.go b/service/s3/cust_integ_shared_test.go index 937fdc8a23..a7a424b7e7 100644 --- a/service/s3/cust_integ_shared_test.go +++ b/service/s3/cust_integ_shared_test.go @@ -53,10 +53,10 @@ func putTestFile(t *testing.T, filename, key string) { } func putTestContent(t *testing.T, reader io.ReadSeeker, key string) { - fmt.Println(bucketName, key, svc) + t.Logf("uploading test file %s/%s", *bucketName, key) _, err := svc.PutObject(&s3.PutObjectInput{ Bucket: bucketName, - Key: aws.String(key), + Key: &key, Body: reader, }) if err != nil { diff --git a/service/s3/eventstream_test.go b/service/s3/eventstream_test.go index 0863a4ff90..35f30565ec 100644 --- a/service/s3/eventstream_test.go +++ b/service/s3/eventstream_test.go @@ -84,6 +84,16 @@ func TestSelectObjectContent_ReadClose(t *testing.T) { t.Fatalf("expect no error got, %v", err) } + // Assert calling Err before close does not close the stream. + resp.EventStream.Err() + select { + case _, ok := <-resp.EventStream.Events(): + if !ok { + t.Fatalf("expect stream not to be closed, but was") + } + default: + } + resp.EventStream.Close() <-resp.EventStream.Events()