Skip to content

Commit

Permalink
Handle unknown event types gracefully (#2314)
Browse files Browse the repository at this point in the history
* Handle unknown event types gracefully - return an Event with event_type
:unknown_event and add a callback for unknown_events to generated
event_streams.
Co-authored-by: Matt Muller <mamuller@amazon.com>
  • Loading branch information
alextwoods committed May 29, 2020
1 parent e9589cd commit f310d88
Show file tree
Hide file tree
Showing 8 changed files with 79 additions and 10 deletions.
Expand Up @@ -4,15 +4,15 @@
describe 'Support H2 Async' do

before(:all) do
SpecHelper.generate_service(['Async'], multiple_files: false)
SpecHelper.generate_service(['Async'], multiple_files: false)
end

if RUBY_VERSION >= '2.1' && !ENV['NO_H2']

let(:output_stream) {
[
{ message_type: 'event', event_type: :baz_result, result: { details: [ "foo" ]}}
].each
].each
}

let(:client) {
Expand Down Expand Up @@ -125,6 +125,30 @@
input_stream.signal_end_stream
}.to_not raise_error
end

it 'handles unknown events' do
input = Async::EventStreams::InputBazStream.new
output = Async::EventStreams::OutputBazStream.new
data = { details: [ "unknown" ] }
same_client = Async::AsyncClient.new(
region: 'us-west-2',
credentials: Aws::Credentials.new('akid', 'secret'),
stub_responses: {baz: {
stream: [
{ message_type: 'event', event_type: :test_unknown_event, result: data }
].each
}}
)
output.on_unknown_event do |e|
expect(e.event_type).to eq :unknown_event
expect(e.raw_event_type).to eq 'test_unknown_event'
expect(JSON.parse(e.raw_event.payload.read, symbolize_names: true)[:result].to_h).to eq(data)
end

same_client.baz(input_event_stream_handler: input,
output_event_stream_handler: output)
input.signal_end_stream
end
else

it 'raises error when initializing AsyncClient' do
Expand Down
Expand Up @@ -49,12 +49,17 @@ module {{module_name}}
@event_emitter.on(:initial_response, block) if block_given?
end

def on_unknown_event(&block)
@event_emitter.on(:unknown_event, block) if block_given?
end

def on_event(&block)
{{#types}}
on_{{.}}_event(&block)
{{/types}}
on_error_event(&block)
on_initial_response_event(&block)
on_unknown_event(&block)
end

# @api private
Expand Down
3 changes: 2 additions & 1 deletion gems/aws-sdk-core/CHANGELOG.md
@@ -1,12 +1,13 @@
Unreleased Changes
------------------

* Issue - Handle unknown and unmodeled events from event streams by ignoring them and providing a new callback rather than raising an error.

3.97.0 (2020-05-28)
------------------
* Feature - Default endpoint_discovery to `true` for services with at least one operation that requires it.
* Feature - Updated Aws::STS::Client with the latest API changes.


3.96.1 (2020-05-18)
------------------

Expand Down
6 changes: 3 additions & 3 deletions gems/aws-sdk-core/lib/aws-sdk-core/binary/event_parser.rb
Expand Up @@ -78,9 +78,9 @@ def parse_event(raw_event)

# locate event from eventstream
name, ref = @rules.shape.member_by_location_name(event_type)
unless ref.event
raise Aws::Errors::EventStreamParserError.new(
"Failed to locate event shape for the event")
unless ref && ref.event
return Struct.new(:event_type, :raw_event_type, :raw_event)
.new(:unknown_event, event_type, raw_event)
end

event = ref.shape.struct_class.new
Expand Down
32 changes: 28 additions & 4 deletions gems/aws-sdk-core/lib/aws-sdk-core/stubbing/protocols/rest.rb
Expand Up @@ -43,7 +43,7 @@ def apply_body(api, operation, resp, data)
def build_body(api, operation, data)
rules = operation.output
if head_operation(operation)
""
''
elsif streaming?(rules)
data[rules[:payload]]
elsif rules[:payload]
Expand Down Expand Up @@ -73,7 +73,7 @@ def streaming?(ref)
end

def head_operation(operation)
operation.http_method == "HEAD"
operation.http_method == 'HEAD'
end

def eventstream?(rules)
Expand Down Expand Up @@ -116,8 +116,22 @@ def encode_error(opts, event_data)
opts
end

def encode_event(opts, rules, event_data, builder)
event_ref = rules.shape.member(event_data.delete(:event_type))
def encode_unknown_event(opts, event_type, event_data)
# right now h2 events are only rest_json
opts[:payload] = StringIO.new(JSON.dump(event_data))
opts[:headers][':event-type'] = Aws::EventStream::HeaderValue.new(
value: event_type.to_s,
type: 'string'
)
opts[:headers][':message-type'] = Aws::EventStream::HeaderValue.new(
value: 'event',
type: 'string'
)
opts
end

def encode_modeled_event(opts, rules, event_type, event_data, builder)
event_ref = rules.shape.member(event_type)
explicit_payload = false
implicit_payload_members = {}
event_ref.shape.members.each do |name, ref|
Expand Down Expand Up @@ -166,6 +180,16 @@ def encode_event(opts, rules, event_data, builder)
opts
end

def encode_event(opts, rules, event_data, builder)
event_type = event_data.delete(:event_type)

if rules.shape.member?(event_type)
encode_modeled_event(opts, rules, event_type, event_data, builder)
else
encode_unknown_event(opts, event_type, event_data)
end
end

end
end
end
Expand Down
5 changes: 5 additions & 0 deletions gems/aws-sdk-kinesis/lib/aws-sdk-kinesis/event_streams.rb
Expand Up @@ -61,6 +61,10 @@ def on_initial_response_event(&block)
@event_emitter.on(:initial_response, block) if block_given?
end

def on_unknown_event(&block)
@event_emitter.on(:unknown_event, block) if block_given?
end

def on_event(&block)
on_subscribe_to_shard_event_event(&block)
on_resource_not_found_exception_event(&block)
Expand All @@ -74,6 +78,7 @@ def on_event(&block)
on_internal_failure_exception_event(&block)
on_error_event(&block)
on_initial_response_event(&block)
on_unknown_event(&block)
end

# @api private
Expand Down
5 changes: 5 additions & 0 deletions gems/aws-sdk-s3/lib/aws-sdk-s3/event_streams.rb
Expand Up @@ -41,6 +41,10 @@ def on_initial_response_event(&block)
@event_emitter.on(:initial_response, block) if block_given?
end

def on_unknown_event(&block)
@event_emitter.on(:unknown_event, block) if block_given?
end

def on_event(&block)
on_records_event(&block)
on_stats_event(&block)
Expand All @@ -49,6 +53,7 @@ def on_event(&block)
on_end_event(&block)
on_error_event(&block)
on_initial_response_event(&block)
on_unknown_event(&block)
end

# @api private
Expand Down
Expand Up @@ -69,6 +69,10 @@ def on_initial_response_event(&block)
@event_emitter.on(:initial_response, block) if block_given?
end

def on_unknown_event(&block)
@event_emitter.on(:unknown_event, block) if block_given?
end

def on_event(&block)
on_transcript_event_event(&block)
on_bad_request_exception_event(&block)
Expand All @@ -78,6 +82,7 @@ def on_event(&block)
on_service_unavailable_exception_event(&block)
on_error_event(&block)
on_initial_response_event(&block)
on_unknown_event(&block)
end

# @api private
Expand Down

0 comments on commit f310d88

Please sign in to comment.