Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

S3 Select - JsonOutput from SelectObjectContent not a proper Json #2769

Closed
garry-9590 opened this issue Aug 21, 2019 · 12 comments
Closed

S3 Select - JsonOutput from SelectObjectContent not a proper Json #2769

garry-9590 opened this issue Aug 21, 2019 · 12 comments
Labels
guidance Question that needs advice or information.

Comments

@garry-9590
Copy link

Please fill out the sections below to help us address your issue.

Version of AWS SDK for Go?

v1.23.4

Version of Go (go version)?

1.12.5

What issue did you see?

In case multiple records are returned by SelectObjectContent with output serializer as JsonOutput, the payload is not a proper json. Please refer to the example below:

sample.csv

id,name
1,John
2,George 
params := &s3.SelectObjectContentInput{
		Bucket:         aws.String("bucket"),
		Key:            aws.String("key"),
		ExpressionType: aws.String(s3.ExpressionTypeSql),
		Expression:     aws.String("SELECT id, name FROM S3Object"),
		InputSerialization: &s3.InputSerialization{
			CSV: &s3.CSVInput{
				FileHeaderInfo:  aws.String(s3.FileHeaderInfoUse),
				RecordDelimiter: aws.String("\n"),
			},
		},
		OutputSerialization: &s3.OutputSerialization{
			JSON: &s3.JSONOutput{},
		},
}

resp, err := s3Datasource.client.SelectObjectContent(params)
if err != nil {
    return
}
defer resp.EventStream.Close()

for event := range resp.EventStream.Events() {
	switch v := event.(type) {
	case *s3.RecordsEvent:
		fmt.Printf(string(v.Payload))
	}
}

output is

"{\"id\":\"1",\"name\":\"John\"}\n{\"id\":\"2\",\"name\":\"George\"}\n"

which is not parse-able

@diehlaws diehlaws self-assigned this Aug 21, 2019
@diehlaws diehlaws added the guidance Question that needs advice or information. label Aug 22, 2019
@diehlaws
Copy link
Contributor

Hi @garry-9590, thanks for reaching out to us about this. Unfortunately I haven't been able to reproduce the described behavior; running your code snippet results in the records being printed without escape characters on my end:

{"id":"1","name":"John"}
{"id":"2","name":"George"}

Can you provide a more complete code sample so we can further troubleshoot this behavior?

@diehlaws diehlaws added the response-requested Waiting on additional info and feedback. Will move to "closing-soon" in 7 days. label Aug 22, 2019
@diehlaws
Copy link
Contributor

For reference, here is the code I'm using to print the response and to decode the JSON into a struct, along with its corresponding output. I modified the code example for this function in our docs to handle JSON output instead of CSV and decode it into a struct.

Code
package main

import (
	"encoding/json"
	"fmt"
	"io"
	"os"
	"runtime"

	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/aws/session"
	"github.com/aws/aws-sdk-go/service/s3"
)

type Person struct {
	Id   string
	Name string
}

func main() {
	fmt.Printf("Using AWS SDK for Go version %v on %v\n", aws.SDKVersion, runtime.Version())
	sess := session.New(&aws.Config{
		Region: aws.String("us-west-2"),
	})
	svc := s3.New(sess)
	bucket := "bucket"
	object := "key"

	params := &s3.SelectObjectContentInput{
		Bucket:         aws.String(bucket),
		Key:            aws.String(object),
		ExpressionType: aws.String(s3.ExpressionTypeSql),
		Expression:     aws.String("SELECT name, id FROM S3Object"),
		InputSerialization: &s3.InputSerialization{
			CSV: &s3.CSVInput{
				FileHeaderInfo:  aws.String(s3.FileHeaderInfoUse),
				RecordDelimiter: aws.String("\n"),
			},
		},
		OutputSerialization: &s3.OutputSerialization{
			JSON: &s3.JSONOutput{},
		},
	}

	resp, err := svc.SelectObjectContent(params)
	if err != nil {
		return
	}
	defer resp.EventStream.Close()

	results, resultWriter := io.Pipe()
	go func() {
		defer resultWriter.Close()
		for event := range resp.EventStream.Events() {
			switch e := event.(type) {
			case *s3.RecordsEvent:
				resultWriter.Write(e.Payload)
				fmt.Printf("Payload: %v\n", string(e.Payload))
			case *s3.StatsEvent:
				fmt.Printf("Processed %d bytes\n", *e.Details.BytesProcessed)
			}
		}
	}()

	var record Person
	resReader := json.NewDecoder(results)
	for {
		err := resReader.Decode(&record)
		fmt.Printf("Record:\n%v\n", record)
		if err == io.EOF {
			break
		}
	}
	if err := resp.EventStream.Err(); err != nil {
		fmt.Fprintf(os.Stderr, "reading from event stream failed, %v\n", err)
	}
}
Output
$ go run s3Select.go 
Using AWS SDK for Go version 1.23.6 on go1.12.7
Payload: {"name":"John","id":"1"}
{"name":"George","id":"2"}

Record:
{1 John}
Record:
{2 George}
Processed 25 bytes

@garry-9590
Copy link
Author

Thanks for the code reference. This works for me. What I was doing while streaming the events was:

for event := range resp.EventStream.Events() {
	switch v := event.(type) {
	case *s3.RecordsEvent:
		// s3.RecordsEvent.Records is a byte slice of select records
		var i []Person
		bytes := []byte(v.Payload)
		err := json.Unmarshal(bytes, &i)
                if err != nil {
		        fmt.Printf("Error Message : %+v", e)
                }
                ....
	}
}

Output ->

Error Message : "unmarshal error: invalid character '{' after top-level value"

This was working for a single record, but failed when multiple records were present in the csv

@diehlaws diehlaws removed the response-requested Waiting on additional info and feedback. Will move to "closing-soon" in 7 days. label Aug 27, 2019
@garry-9590 garry-9590 reopened this Sep 17, 2019
@garry-9590
Copy link
Author

hi @diehlaws

I am reopening this issue because I tried using your code as reference and while it gave me appropriate output for a csv with a few records, I faced some issues while running it for extended period of time with a large csv file. Attaching my code below.. The consumption of record channel is throttled at 5 records per second.

func scanEventStream(selectObjectContentOutput *s3.SelectObjectContentOutput) {
	records := make(chan record)

	results, resultWriter := io.Pipe()
	go func(resultWriter *io.PipeWriter, selectObjectContentOutput *s3.SelectObjectContentOutput) {
		defer selectObjectContentOutput.EventStream.Close()
		defer resultWriter.Close()
		for event := range selectObjectContentOutput.EventStream.Events() {
			switch e := event.(type) {
			case *s3.RecordsEvent:
				_, err := resultWriter.Write(e.Payload)
				if err != nil {
					fmt.Printf("Error")
				}
			case *s3.EndEvent:
				fmt.Printf("End Event received")
			}
		}
		fmt.Printf("ALL DONE")
	}(resultWriter, selectObjectContentOutput)

	go func(records chan record, results *io.PipeReader) {
		defer close(records)
		resReader := json.NewDecoder(results)
		for {
			csvRecord := record{}
			err := resReader.Decode(&csvRecord)
			if err == io.EOF {
				break
			} else if err != nil {
				fmt.Printf("Error while decoding a row from S3. Error: %+v", err)
			}
			records <- csvRecord
		}
		if err := selectObjectContentOutput.EventStream.Err(); err != nil {
			fmt.Printf("Reading from S3 Select Event Stream Failed, %+v\n", err)
		}
	}(records, results)

	throttle := time.Tick(time.Duration(200) * time.Millisecond)
        for r := range records {
		<-throttle
                fmt.Printf("%v",r)
        }
}

The issue I am observing is that after processing some of the rows of csv, "ALL DONE" is getting printed while "End Event received" is not printed in the output logs. This means that the for loop over the stream of events was exited without encountering an EndEvent.
As a result, the PipeWriter gets closed without sending out an EOF error and the PipeReader loop is never exited and it keeps on throwing unexpectedEOF errors because the PipeWriter is closed.

@garry-9590
Copy link
Author

@diehlaws , Did you get a chance to look into this?

Moreover, I checked that if I give a very large buffer size to the channel "records", I don't get this error. That suggests that there is some issue with the event stream that is passing along the records from csv.

@NicolasBringas
Copy link

I'm experiencing the same behaviour as @garry-9590

@jasdel
Copy link
Contributor

jasdel commented Oct 9, 2019

Thanks for the update @garry-9590 how large (Size in bytes and number rows) is the file that is being used with the SelectObjectContent API?

In addition, could you describe the reasoning for the throttle? Is this to artificially slow down reading from the event stream? At 200ms a significant number of records may cause the underlying HTTP request response read to timeout because the client isn't able to read fast enough. This most likely is why having an exceptionally large records channel buffer does not exhibit the error case.

In addition checking the selectObjectContentOutput.EventStream.Err() within the record decoder goroutine most likely is preventing seeing any underlying error that occurred in the SDK's EventStream reader.

	if err := selectObjectContentOutput.EventStream.Err(); err != nil {
		fmt.Printf("Reading from S3 Select Event Stream Failed, %+v\n", err)
	}

Moving the checking for EventStream.Err() to the EventStream.Events() goroutine will ensure that the SDk's event stream error message is recorded.

jasdel added a commit to jasdel/aws-sdk-go that referenced this issue Oct 9, 2019
Fixes calling the Err method on SDK's Amazon Kinesis's SubscribeToShared
and Amazon S3's SelectObjectContent response EventStream members closing
the stream. This would cause unexpected read errors, or early
termination of the streams. Only the Close method of the streaming
members will close the streams.

Improved generated unit and integration tests for streaming API
operations.

Related to aws#2769
@jasdel
Copy link
Contributor

jasdel commented Oct 9, 2019

While investigating this issue I did find the resp.EventStream.Err method was incorrectly closing the stream. PR #2882 fixes this issue.

@garry-9590
Copy link
Author

Each record is about 50-80 bytes large and there are about 100,000 records.

Further processing of the business logic needed to be throttled due to rate-limiting at one of the downstream services. I found a workaround for the time being by reading all the records from the event stream and putting them into a large buffered channel without throttling.

Thanks for debugging the issue. Please feel free to close the issue once the fix is published.

@jasdel
Copy link
Contributor

jasdel commented Oct 10, 2019

Thanks for the update @garry-959. During our testing we did find batching many records before forwarding on to a downstream service. You'll probably want to play around with the batch size to find the best performance for your use case. The PR #2882, you can find an example of this.

@jasdel
Copy link
Contributor

jasdel commented Oct 10, 2019

@NicolasBringas could you provide more information on the issue you are experiencing. does the resp.EventStream.Err method return an error for you?

jasdel added a commit to jasdel/aws-sdk-go that referenced this issue Oct 10, 2019
Fixes calling the Err method on SDK's Amazon Kinesis's SubscribeToShared
and Amazon S3's SelectObjectContent response EventStream members closing
the stream. This would cause unexpected read errors, or early
termination of the streams. Only the Close method of the streaming
members will close the streams.

Improved generated unit and integration tests for streaming API
operations.

Related to aws#2769
jasdel added a commit that referenced this issue Oct 21, 2019
Fixes calling the Err method on SDK's Amazon Kinesis's SubscribeToShared
and Amazon S3's SelectObjectContent response EventStream members closing
the stream. This would cause unexpected read errors, or early
termination of the streams. Only the Close method of the streaming
members will close the streams.

Improved generated unit and integration tests for streaming API
operations.

Related to #2769
aws-sdk-go-automation pushed a commit that referenced this issue Oct 22, 2019
===

### Service Client Updates
* `service/iotevents`: Updates service API and documentation
* `service/opsworkscm`: Updates service API and documentation
  * AWS OpsWorks for Chef Automate (OWCA) now allows customers to use a custom domain and respective certificate, for their AWS OpsWorks For Chef Automate servers. Customers can now provide a CustomDomain, CustomCertificate and CustomPrivateKey in CreateServer API to configure their Chef Automate servers with a custom domain and certificate.

### SDK Bugs
* `service/s3`,`service/kinesis`: Fix streaming APIs' Err method closing stream ([#2882](#2882))
  * Fixes calling the Err method on SDK's Amazon Kinesis's SubscribeToShared and Amazon S3's SelectObjectContent response EventStream members closing the stream. This would cause unexpected read errors, or early termination of the streams. Only the Close method of the streaming members will close the streams.
  * Related to [#2769](#2769)
aws-sdk-go-automation added a commit that referenced this issue Oct 22, 2019
Release v1.25.17 (2019-10-22)
===

### Service Client Updates
* `service/iotevents`: Updates service API and documentation
* `service/opsworkscm`: Updates service API and documentation
  * AWS OpsWorks for Chef Automate (OWCA) now allows customers to use a custom domain and respective certificate, for their AWS OpsWorks For Chef Automate servers. Customers can now provide a CustomDomain, CustomCertificate and CustomPrivateKey in CreateServer API to configure their Chef Automate servers with a custom domain and certificate.

### SDK Bugs
* `service/s3`,`service/kinesis`: Fix streaming APIs' Err method closing stream ([#2882](#2882))
  * Fixes calling the Err method on SDK's Amazon Kinesis's SubscribeToShared and Amazon S3's SelectObjectContent response EventStream members closing the stream. This would cause unexpected read errors, or early termination of the streams. Only the Close method of the streaming members will close the streams.
  * Related to [#2769](#2769)
@garry-9590
Copy link
Author

@jasdel we recently scaled up this application. We are now processing 1 million records of approximately 150 bytes each. After a few thousands of the records are processed and passed from PipeWriter to PipeReader, I start getting "unexpected EOF" error on my Reader side. Seems like the same issue is still persistent.

Library versions on which I have checked this -> 1.25.17 and 1.29.22

Is there a limitation on how many records or bytes can be present in the csv? We are planning to scale this up further to 10-20million records of 200 bytes each

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
guidance Question that needs advice or information.
Projects
None yet
Development

No branches or pull requests

4 participants