Skip to content

Commit

Permalink
Retry streaming S3 object downloads
Browse files Browse the repository at this point in the history
  • Loading branch information
janko committed Sep 20, 2017
1 parent 16e5b83 commit d099329
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 10 deletions.
9 changes: 7 additions & 2 deletions gems/aws-sdk-core/lib/aws-sdk-core/plugins/retry_errors.rb
Expand Up @@ -70,6 +70,11 @@ def networking?
NETWORKING_ERRORS.include?(@name)
end

def invalid_bytes?
@error.is_a?(Seahorse::Client::NetworkingError) &&
@error.original_error.is_a?(Seahorse::Client::NetHttp::Handler::TruncatedBodyError)
end

def server?
(500..599).include?(@http_status_code)
end
Expand Down Expand Up @@ -119,7 +124,7 @@ def retry_request(context, error)
context.retries += 1
context.config.credentials.refresh! if error.expired_credentials?
context.http_request.body.rewind
context.http_response.reset
context.http_response.reset(truncate: error.invalid_bytes?)
call(context)
end

Expand All @@ -130,7 +135,7 @@ def delay_retry(context)
def should_retry?(context, error)
retryable?(context, error) and
context.retries < retry_limit(context) and
response_truncatable?(context)
(response_truncatable?(context) or !error.invalid_bytes?)
end

def retryable?(context, error)
Expand Down
4 changes: 2 additions & 2 deletions gems/aws-sdk-core/lib/seahorse/client/http/response.rb
Expand Up @@ -149,10 +149,10 @@ def on_error(&callback)
end
end

def reset
def reset(options = {})
@status_code = 0
@headers.clear
@body.truncate(0)
@body.truncate(0) if options[:truncate]
@error = nil
end

Expand Down
14 changes: 11 additions & 3 deletions gems/aws-sdk-core/lib/seahorse/client/net_http/handler.rb
Expand Up @@ -44,7 +44,7 @@ class InvalidHttpVerbError < StandardError; end
# @param [RequestContext] context
# @return [Response]
def call(context)
transmit(context.config, context.http_request, context.http_response)
transmit(context.config, context.http_request, context.http_response, context)
Response.new(context: context)
end

Expand All @@ -69,7 +69,7 @@ def error_message(req, error)
# @param [Http::Request] req
# @param [Http::Response] resp
# @return [void]
def transmit(config, req, resp)
def transmit(config, req, resp, context)
session(config, req) do |http|
http.request(build_net_request(req)) do |net_resp|

Expand All @@ -79,8 +79,16 @@ def transmit(config, req, resp)
bytes_received = 0
resp.signal_headers(status_code, headers)
net_resp.read_body do |chunk|
if bytes_received < resp.body.size
chunk_index = resp.body.size - bytes_received
next_chunk = chunk.byteslice(chunk_index..-1)
else
next_chunk = chunk
end

resp.signal_data(next_chunk) if next_chunk

bytes_received += chunk.bytesize
resp.signal_data(chunk)
end
complete_response(req, resp, bytes_received)

Expand Down
16 changes: 13 additions & 3 deletions gems/aws-sdk-core/spec/aws/plugins/retry_errors_spec.rb
Expand Up @@ -224,17 +224,27 @@ def handle(send_handler = nil, &block)
handle { |context| resp }
end

it 'truncates the response body before each retry attempt' do
it 'truncates the response body before retry attempt of invalid bytes' do
body = double('truncatable-body', pos: 100, truncate: 0)
resp.context.http_response.body = body
expect(body).to receive(:truncate).with(0).exactly(3).times
resp.error = Seahorse::Client::NetworkingError.new(
Seahorse::Client::NetHttp::Handler::TruncatedBodyError.new(11, 10))
handle { |context| resp }
end

it 'does not truncate response body before retry attempt of other errors' do
body = double('truncatable-body', pos: 100, truncate: 0)
resp.context.http_response.body = body
expect(body).to receive(:truncate).never
resp.error = RetryErrorsSvc::Errors::RequestLimitExceeded.new(nil,nil)
handle { |context| resp }
end

it 'skips retry if un-truncatable response body has received data' do
it 'skips retry on invalid bytes if response body is un-truncatable' do
resp.context.http_response.body = double('write-once-body', pos: 100)
resp.error = RetryErrorsSvc::Errors::RequestLimitExceeded.new(nil,nil)
resp.error = Seahorse::Client::NetworkingError.new(
Seahorse::Client::NetHttp::Handler::TruncatedBodyError.new(11, 10))
handle { |context| resp }
expect(resp.context.retries).to eq(0)
end
Expand Down
16 changes: 16 additions & 0 deletions gems/aws-sdk-core/spec/seahorse/client/net_http/handler_spec.rb
Expand Up @@ -251,6 +251,22 @@ def endpoint
expect(resp_body.read).to eq('response-body')
end

it 'populates part of response body that has not been written yet' do
context.http_response.body.write('response')
stub_request(:any, endpoint).to_return(body: 'response-body')
resp_body = make_request.context.http_response.body
resp_body.rewind
expect(resp_body.read).to eq('response-body')
end

it 'skips part of response body that has already been written' do
context.http_response.body.write('response-body')
stub_request(:any, endpoint).to_return(body: 'response-body')
resp_body = make_request.context.http_response.body
resp_body.rewind
expect(resp_body.read).to eq('response-body')
end

it 'wraps errors with a NetworkingError' do
stub_request(:any, endpoint).to_raise(EOFError)
resp = make_request
Expand Down
4 changes: 4 additions & 0 deletions gems/aws-sdk-s3/lib/aws-sdk-s3/encryption/io_decrypter.rb
Expand Up @@ -23,6 +23,10 @@ def finalize
@io.write(@cipher.final)
end

def size
@io.size
end

end
end
end
Expand Down

0 comments on commit d099329

Please sign in to comment.