diff --git a/.changelog/7a2940fda4304318ae91a73d3deec4e7.json b/.changelog/7a2940fda4304318ae91a73d3deec4e7.json new file mode 100644 index 00000000000..ee73acd198c --- /dev/null +++ b/.changelog/7a2940fda4304318ae91a73d3deec4e7.json @@ -0,0 +1,11 @@ +{ + "id": "7a2940fd-a430-4318-ae91-a73d3deec4e7", + "type": "bugfix", + "description": "Fixes an issue that caused the unexported constructor function names for EventStream types to be swapped for the event reader and writer respectivly.", + "modules": [ + "service/kinesis", + "service/lexruntimev2", + "service/s3", + "service/transcribestreaming" + ] +} \ No newline at end of file diff --git a/codegen/smithy-aws-go-codegen/src/main/java/software/amazon/smithy/aws/go/codegen/AwsEventStreamUtils.java b/codegen/smithy-aws-go-codegen/src/main/java/software/amazon/smithy/aws/go/codegen/AwsEventStreamUtils.java index 8f29fcb8ae1..b40a867d569 100644 --- a/codegen/smithy-aws-go-codegen/src/main/java/software/amazon/smithy/aws/go/codegen/AwsEventStreamUtils.java +++ b/codegen/smithy-aws-go-codegen/src/main/java/software/amazon/smithy/aws/go/codegen/AwsEventStreamUtils.java @@ -45,8 +45,11 @@ import software.amazon.smithy.utils.StringUtils; public final class AwsEventStreamUtils { + private static final String EVENT_STREAM_SERIALIZER_HELPER = "eventStreamSerializerHelper"; private static final String EVENT_STREAM_SIGNER_INTERFACE = "eventStreamSigner"; + private static final String CONTEXT_GET_EVENT_STREAM_INPUT = "getEventStreamInput"; + private static final String CONTEXT_WITH_EVENT_STREAM_INPUT = "withEventStreamInput"; private AwsEventStreamUtils() { } @@ -72,6 +75,9 @@ public static void generateEventStreamComponents(GenerationContext context) { if (inputEventStreams.isPresent()) { generateEventSignerInterface(settings, writer); + if (!isHttpBindingProto) { + generateInputMiddlewareHelper(settings, writer); + } inputEventStreams.get().forEach((shapeId, eventStreamInfos) -> { generateEventStreamWriter(context, model.expectShape(shapeId, UnionShape.class), eventStreamInfos, !isHttpBindingProto); @@ -89,7 +95,7 @@ public static void generateEventStreamComponents(GenerationContext context) { for (OperationShape operationShape : operationShapes) { if (streamIndex.getInputInfo(operationShape).isEmpty() - && streamIndex.getOutputInfo(operationShape).isEmpty()) { + && streamIndex.getOutputInfo(operationShape).isEmpty()) { continue; } generateEventStreamMiddleware(context, operationShape, !isHttpBindingProto); @@ -101,6 +107,48 @@ public static void generateEventStreamComponents(GenerationContext context) { generateToggleClientLogModeFinalizer(context); } + private static void generateInputMiddlewareHelper(GoSettings settings, GoWriter writer) { + writer.pushState(); + + writer.putContext("context", SymbolUtils.createValueSymbolBuilder("Context", + SmithyGoDependency.CONTEXT).build()); + writer.putContext("errorf", SymbolUtils.createValueSymbolBuilder("Errorf", + SmithyGoDependency.FMT).build()); + + var middleware = GoStackStepMiddlewareGenerator.createSerializeStepMiddleware( + EVENT_STREAM_SERIALIZER_HELPER, + MiddlewareIdentifier.builder() + .name("OperationEventStreamSerializer") + .build()); + + writer.putContext("keyType", SymbolUtils.createValueSymbolBuilder("eventStreamInputKey").build()); + writer.putContext("withValue", SymbolUtils.createValueSymbolBuilder("WithValue", + SmithyGoDependency.CONTEXT).build()); + + writer.putContext("contextGetter", CONTEXT_GET_EVENT_STREAM_INPUT); + writer.putContext("contextSetter", CONTEXT_WITH_EVENT_STREAM_INPUT); + writer.putContext("smithyRequest", getSymbol("Request", SmithyGoDependency.SMITHY_HTTP_TRANSPORT)); + + writer.write(""" + type $keyType:T struct{} + + func $contextGetter:L(ctx $context:P) interface{} { + return ctx.Value($keyType:T{}) + } + + func $contextSetter:L(ctx $context:P, value interface{}) $context:T { + return $withValue:T(ctx, $keyType:T{}, value) + } + """); + + middleware.writeMiddleware(writer, (mg, wr) -> { + wr.putContext("nextMethod", mg.getHandleMethodName()); + wr.write("return next.$nextMethod:L($contextSetter:L(ctx, in.Parameters), in)"); + }); + + writer.popState(); + } + private static void generateUnknownEventMessageError(GenerationContext context) { var writer = context.getWriter().get(); @@ -108,18 +156,18 @@ private static void generateUnknownEventMessageError(GenerationContext context) var message = getEventStreamSymbol("Message"); writer.write(""" - // $T provides an error when a message is received from the stream, - // but the reader is unable to determine what kind of message it is. - type $T struct { - Type string - Message $P - } - - // Error retruns the error message string. - func (e $P) Error() string { - return "unknown event stream message type, " + e.Type - } - """, symbol, symbol, message, symbol); + // $T provides an error when a message is received from the stream, + // but the reader is unable to determine what kind of message it is. + type $T struct { + Type string + Message $P + } + + // Error retruns the error message string. + func (e $P) Error() string { + return "unknown event stream message type, " + e.Type + } + """, symbol, symbol, message, symbol); } private static Symbol getUnknownEventMessageErrorSymbol() { @@ -143,17 +191,17 @@ private static void generateEventStreamClientLogModeFinalizer( var responseStream = streamIndex .getOutputInfo(operationShape).isPresent(); writer.write(""" - case $S: - $T(o, $L, $L) - return - """, operationShape.getId().getName(), + case $S: + $T(o, $L, $L) + return + """, operationShape.getId().getName(), getToggleEventStreamClientLogModeSymbol(), requestStream, responseStream); }); writer.write(""" - default: - return - """); + default: + return + """); }); }); } @@ -169,20 +217,20 @@ private static void generateToggleClientLogModeFinalizer(GenerationContext conte writer.openBlock("func $T(o *Options, request, response bool) {", "}", getToggleEventStreamClientLogModeSymbol(), () -> { writer.write(""" - mode := o.ClientLogMode - - if request && mode.IsRequestWithBody() { - mode.ClearRequestWithBody() - mode |= $T - } - - if response && mode.IsResponseWithBody() { - mode.ClearResponseWithBody() - mode |= $T - } - - o.ClientLogMode = mode - """, logRequest, logResponse + mode := o.ClientLogMode + + if request && mode.IsRequestWithBody() { + mode.ClearRequestWithBody() + mode |= $T + } + + if response && mode.IsResponseWithBody() { + mode.ClearResponseWithBody() + mode |= $T + } + + o.ClientLogMode = mode + """, logRequest, logResponse ); }).write(""); } @@ -252,26 +300,26 @@ defer func() { if (inputInfo.isPresent()) { w.write(""" - if err := $T(request); err != nil { - return out, metadata, err - } - """, getEventStreamApiSymbol("ApplyHTTPTransportFixes")) + if err := $T(request); err != nil { + return out, metadata, err + } + """, getEventStreamApiSymbol("ApplyHTTPTransportFixes")) .write(""); w.write(""" - requestSignature, err := $T(request.Request) - if err != nil { - return out, metadata, $T("failed to get event stream seed signature: %v", err) - } - """, getSignedRequestSignature, errorf).write("") + requestSignature, err := $T(request.Request) + if err != nil { + return out, metadata, $T("failed to get event stream seed signature: %v", err) + } + """, getSignedRequestSignature, errorf).write("") .openBlock("signer := $T(", ")", getSymbol("NewStreamSigner", AwsGoDependency.AWS_SIGNER_V4, false), () -> w .write(""" - $T(ctx), - $T(ctx), - $T(ctx), - requestSignature, - """, getSymbol("GetSigningCredentials", + $T(ctx), + $T(ctx), + $T(ctx), + requestSignature, + """, getSymbol("GetSigningCredentials", AwsGoDependency.AWS_MIDDLEWARE, false), getSymbol("GetSigningName", AwsGoDependency.AWS_MIDDLEWARE, false), @@ -291,9 +339,9 @@ defer func() { .openBlock("$T(func(options $P) {", "}),", newEncoder, encoderOptions, () -> w .write(""" - options.Logger = logger - options.LogMessages = m.LogEventStreamWrites - """)) + options.Logger = logger + options.LogMessages = m.LogEventStreamWrites + """)) .write("signer,"); if (withInitialMessages) { w.write("$L,", getEventStreamMessageRequestSerializerName( @@ -302,23 +350,31 @@ defer func() { } }) .write(""" - defer func() { - if err == nil { - return - } - _ = eventWriter.Close() - }() - """); + defer func() { + if err == nil { + return + } + _ = eventWriter.Close() + }() + """); if (withInitialMessages) { + var inputShape = model.expectShape(operationShape.getInput().get()); w.write(""" + params, ok := $L(ctx).($P) + if !ok || params == nil { + return out, metadata, $T("unexpected nil type: %T", params) + } + reqSend := make(chan error, 1) go func() { defer close(reqSend) - reqSend <- eventWriter.send(ctx, &$T{Value: request}) + sErr := eventWriter.send(ctx, &$T{Value: params}) + reqSend <- sErr }() - """, getWriterEventWrapperInitialRequestType(symbolProvider, - inputInfo.get().getEventStreamTarget().asUnionShape().get(), serviceShape)); + """, CONTEXT_GET_EVENT_STREAM_INPUT, symbolProvider.toSymbol(inputShape), + errorf, getWriterEventWrapperInitialRequestType(symbolProvider, + inputInfo.get().getEventStreamTarget().asUnionShape().get(), serviceShape)); } } @@ -326,6 +382,16 @@ defer close(reqSend) w.write("out, metadata, err = next.HandleDeserialize(ctx, in)"); writer.openBlock("if err != nil {", "}", () -> { + if (withInitialMessages && inputInfo.isPresent()) { + w.write(""" + select { + case sErr := <-reqSend: + if sErr != nil { + err = $T("%v: %w", err, sErr) + } + default: + }""", errorf); + } writer.write("return out, metadata, err"); }).write(""); @@ -338,20 +404,20 @@ defer close(reqSend) } w.write(""" - deserializeOutput, ok := out.RawResponse.($P) - if !ok { - return out, metadata, $T("unknown transport type: %T", out.RawResponse) - } - _ = deserializeOutput - - output, ok := out.Result.($P) - if out.Result != nil && !ok { - return out, metadata, $T("unexpected output result type: %T", out.Result) - } else if out.Result == nil { - output = &$T{} - out.Result = output - } - """, getSymbol("Response", SmithyGoDependency.SMITHY_HTTP_TRANSPORT), errorf, + deserializeOutput, ok := out.RawResponse.($P) + if !ok { + return out, metadata, $T("unknown transport type: %T", out.RawResponse) + } + _ = deserializeOutput + + output, ok := out.Result.($P) + if out.Result != nil && !ok { + return out, metadata, $T("unexpected output result type: %T", out.Result) + } else if out.Result == nil { + output = &$T{} + out.Result = output + } + """, getSymbol("Response", SmithyGoDependency.SMITHY_HTTP_TRANSPORT), errorf, outputSymbol, errorf, outputSymbol ); @@ -367,9 +433,9 @@ defer close(reqSend) .openBlock("$T(func(options $P) {", "}),", newDecoder, decoderOptions, () -> w .write(""" - options.Logger = logger - options.LogMessages = m.LogEventStreamReads - """)); + options.Logger = logger + options.LogMessages = m.LogEventStreamReads + """)); if (withInitialMessages) { w.write("$L,", getEventStreamMessageResponseDeserializerName( operationShape.getOutput().get(), serviceShape, @@ -377,13 +443,13 @@ defer close(reqSend) } }) .write(""" - defer func() { - if err == nil { - return - } - _ = eventReader.Close() - }() - """); + defer func() { + if err == nil { + return + } + _ = eventReader.Close() + }() + """); if (withInitialMessages) { w.write(""" @@ -415,43 +481,56 @@ defer func() { .write("return out, metadata, nil"); }, (mg, w) -> w.write(""" - LogEventStreamWrites bool - LogEventStreamReads bool - """)); + LogEventStreamWrites bool + LogEventStreamReads bool + """)); var deserializeOutput = getSymbol("DeserializeOutput", SmithyGoDependency.SMITHY_MIDDLEWARE); var httpResponse = getSymbol("Response", SmithyGoDependency.SMITHY_HTTP_TRANSPORT); var copy = getSymbol("Copy", SmithyGoDependency.IO); var discard = getSymbol("Discard", SmithyGoDependency.IOUTIL); writer.write(""" - - func ($P) closeResponseBody(out $T) { - if resp, ok := out.RawResponse.($P); ok && resp != nil && resp.Body != nil { - _, _ = $T($T, resp.Body) - _ = resp.Body.Close() - } - } - """, middleware.getMiddlewareSymbol(), deserializeOutput, httpResponse, copy, discard); + + func ($P) closeResponseBody(out $T) { + if resp, ok := out.RawResponse.($P); ok && resp != nil && resp.Body != nil { + _, _ = $T($T, resp.Body) + _ = resp.Body.Close() + } + } + """, middleware.getMiddlewareSymbol(), deserializeOutput, httpResponse, copy, discard); var stack = getSymbol("Stack", SmithyGoDependency.SMITHY_MIDDLEWARE); + var after = getSymbol("After", SmithyGoDependency.SMITHY_MIDDLEWARE); var before = getSymbol("Before", SmithyGoDependency.SMITHY_MIDDLEWARE); - writer.write(""" - func $T(stack $P, options Options) error { - return stack.Deserialize.Insert(&$T{ - LogEventStreamWrites: options.ClientLogMode.IsRequestEventMessage(), - LogEventStreamReads: options.ClientLogMode.IsResponseEventMessage(), - }, "OperationDeserializer", $T) - } - """, getAddEventStreamOperationMiddlewareSymbol(operationShape), - stack, middleware.getMiddlewareSymbol(), before); + writer.openBlock("func $T(stack $P, options Options) error {", "}", + getAddEventStreamOperationMiddlewareSymbol(operationShape), stack, + () -> { + if (withInitialMessages && inputInfo.isPresent()) { + writer.write(""" + if err := stack.Serialize.Insert(&$T{}, "OperationSerializer", $T); err != nil { + return err + } + """, getModuleSymbol(context.getSettings(), EVENT_STREAM_SERIALIZER_HELPER), + after); + } + writer.write(""" + if err := stack.Deserialize.Insert(&$T{ + LogEventStreamWrites: options.ClientLogMode.IsRequestEventMessage(), + LogEventStreamReads: options.ClientLogMode.IsResponseEventMessage(), + }, "OperationDeserializer", $T); err != nil { + return err + } + return nil + """, middleware.getMiddlewareSymbol(), before); + }); } private static void generateEventSignerInterface(GoSettings settings, GoWriter writer) { writer.openBlock("type $T interface {", "}", getModuleSymbol(settings, EVENT_STREAM_SIGNER_INTERFACE), () -> { writer.write("GetSignature(ctx context.Context, headers, payload []byte, signingTime time.Time, " - + "optFns ...func($P)) ([]byte, error)", + + "optFns ...func($P)) ([]byte, error)", SymbolUtils.createPointableSymbolBuilder("StreamSignerOptions", AwsGoDependency.AWS_SIGNER_V4) .build()); @@ -493,13 +572,13 @@ private static void generateEventStreamReader( var syncOnce = getSymbol("Once", SmithyGoDependency.SYNC, false); writer.write(""" - stream chan $T - decoder $P - eventStream $T - err $P - payloadBuf []byte - done chan struct{} - closeOnce $T""", eventUnionSymbol, decoderSymbol, readCloser, onceErr, syncOnce); + stream chan $T + decoder $P + eventStream $T + err $P + payloadBuf []byte + done chan struct{} + closeOnce $T""", eventUnionSymbol, decoderSymbol, readCloser, onceErr, syncOnce); if (withInitialMessages) { writer.write("initialResponseDeserializer func($P) (interface{}, error)", messageSymbol); writer.write("initialResponse chan interface{}"); @@ -517,12 +596,12 @@ private static void generateEventStreamReader( var newOnceErr = getSymbol("NewOnceErr", SmithyGoDependency.SMITHY_SYNC, false); writer.openBlock("w := &$T{", "}", readerSymbol, () -> { writer.write(""" - stream: make(chan $T), - decoder: decoder, - eventStream: readCloser, - err: $T(), - done: make(chan struct{}), - payloadBuf: make([]byte, 10*1024),""", eventUnionSymbol, newOnceErr); + stream: make(chan $T), + decoder: decoder, + eventStream: readCloser, + err: $T(), + done: make(chan struct{}), + payloadBuf: make([]byte, 10*1024),""", eventUnionSymbol, newOnceErr); if (withInitialMessages) { writer.write("initialResponseDeserializer: ird,"); writer.write("initialResponse: make(chan interface{}, 1),"); @@ -530,9 +609,9 @@ private static void generateEventStreamReader( }).write(""); writer.write(""" - go w.readEventStream() - - return w"""); + go w.readEventStream() + + return w"""); }).write(""); writer.openBlock("func (r $P) Events() <-chan $T {", "}", readerSymbol, eventUnionSymbol, () -> writer @@ -540,73 +619,73 @@ private static void generateEventStreamReader( writer.openBlock("func (r $P) readEventStream() {", "}", readerSymbol, () -> { writer.write(""" - defer r.Close() - defer close(r.stream) - """); + defer r.Close() + defer close(r.stream) + """); if (withInitialMessages) { writer.write(""" - defer close(r.initialResponse) - """); + defer close(r.initialResponse) + """); } writer.openBlock("for {", "}", () -> { writer.write(""" - r.payloadBuf = r.payloadBuf[0:0] - decodedMessage, err := r.decoder.Decode(r.eventStream, r.payloadBuf) - if err != nil { - if err == $T { - return - } - select { - case <-r.done: - return - default: - r.err.SetError(err) - return - } - } - - event, err := r.deserializeEventMessage(&decodedMessage) - if err != nil { - r.err.SetError(err) - return - } - """, SymbolUtils.createValueSymbolBuilder("EOF", + r.payloadBuf = r.payloadBuf[0:0] + decodedMessage, err := r.decoder.Decode(r.eventStream, r.payloadBuf) + if err != nil { + if err == $T { + return + } + select { + case <-r.done: + return + default: + r.err.SetError(err) + return + } + } + + event, err := r.deserializeEventMessage(&decodedMessage) + if err != nil { + r.err.SetError(err) + return + } + """, SymbolUtils.createValueSymbolBuilder("EOF", SmithyGoDependency.IO).build()); if (withInitialMessages) { writer.write(""" - switch ev := event.(type) { - case $P: - select { - case r.initialResponse <- ev.Value: - case <-r.done: - return - default: - } - case $P: - select { - case r.stream <- ev.Value: - case <-r.done: - return - } - default: - r.err.SetError($T("unexpected event wrapper: %T", event)) - return - } - """, + switch ev := event.(type) { + case $P: + select { + case r.initialResponse <- ev.Value: + case <-r.done: + return + default: + } + case $P: + select { + case r.stream <- ev.Value: + case <-r.done: + return + } + default: + r.err.SetError($T("unexpected event wrapper: %T", event)) + return + } + """, getReaderEventWrapperInitialResponseType(symbolProvider, eventStream, service), getReaderEventWrapperMessageType(symbolProvider, eventStream, service), getSymbol("Errorf", SmithyGoDependency.FMT, false)); } else { writer.write(""" - select { - case r.stream <- event: - case <-r.done: - return - } - """); + select { + case r.stream <- event: + case <-r.done: + return + } + """); } }); } @@ -621,38 +700,38 @@ defer close(r.initialResponse) var errorMessageType = getEventStreamApiSymbol("ErrorMessageType", false); writer.write(""" - messageType := msg.Headers.Get($T) - if messageType == nil { - return nil, $T("%s event header not present", $T) - } - """, messageTypeHeader, errorf, messageTypeHeader) + messageType := msg.Headers.Get($T) + if messageType == nil { + return nil, $T("%s event header not present", $T) + } + """, messageTypeHeader, errorf, messageTypeHeader) .openBlock("switch messageType.String() {", "}", () -> writer .openBlock("case $T:", "", eventMessageType, () -> { if (withInitialMessages) { var eventTypeHeader = getEventStreamApiSymbol("EventTypeHeader", false); writer.write(""" - eventType := msg.Headers.Get($T) - if eventType == nil { - return nil, $T("%s event header not present", $T) - } - - if eventType.String() == "initial-response" { - v, err := r.initialResponseDeserializer(msg) - if err != nil { - return nil, err - } - return &$T{Value: v}, nil - } - """, eventTypeHeader, errorf, eventTypeHeader, + eventType := msg.Headers.Get($T) + if eventType == nil { + return nil, $T("%s event header not present", $T) + } + + if eventType.String() == "initial-response" { + v, err := r.initialResponseDeserializer(msg) + if err != nil { + return nil, err + } + return &$T{Value: v}, nil + } + """, eventTypeHeader, errorf, eventTypeHeader, getReaderEventWrapperInitialResponseType(symbolProvider, eventStream, service)); } writer.write(""" - var v $T - if err := $L(&v, msg); err != nil { - return nil, err - }""", + var v $T + if err := $L(&v, msg); err != nil { + return nil, err + }""", eventUnionSymbol, getEventStreamDeserializerName(eventStream, service, context.getProtocolName())); if (withInitialMessages) { @@ -669,29 +748,29 @@ eventUnionSymbol, getEventStreamDeserializerName(eventStream, context.getProtocolName()))) .openBlock("case $T:", "", errorMessageType, () -> writer .write(""" - errorCode := "UnknownError" - errorMessage := errorCode - if header := msg.Headers.Get($T); header != nil { - errorCode = header.String() - } - if header := msg.Headers.Get($T); header != nil { - errorMessage = header.String() - } - return nil, &$T{ - Code: errorCode, - Message: errorMessage, - } - """, getEventStreamApiSymbol("ErrorCodeHeader", false), + errorCode := "UnknownError" + errorMessage := errorCode + if header := msg.Headers.Get($T); header != nil { + errorCode = header.String() + } + if header := msg.Headers.Get($T); header != nil { + errorMessage = header.String() + } + return nil, &$T{ + Code: errorCode, + Message: errorMessage, + } + """, getEventStreamApiSymbol("ErrorCodeHeader", false), getEventStreamApiSymbol("ErrorMessageHeader", false), getSymbol("GenericAPIError", SmithyGoDependency.SMITHY, false))) .write(""" - default: - mc := msg.Clone() - return nil, &$T{ - Type: messageType.String(), - Message: &mc, - } - """, getUnknownEventMessageErrorSymbol())); + default: + mc := msg.Clone() + return nil, &$T{ + Type: messageType.String(), + Message: &mc, + } + """, getUnknownEventMessageErrorSymbol())); }).write(""); writer.openBlock("func (r $P) ErrorSet() <-chan struct{} {", "}", readerSymbol, () -> writer @@ -703,9 +782,9 @@ eventUnionSymbol, getEventStreamDeserializerName(eventStream, writer.openBlock("func (r $P) safeClose() {", "}", readerSymbol, () -> writer .write(""" - close(r.done) - r.eventStream.Close() - """)).write(""); + close(r.done) + r.eventStream.Close() + """)).write(""); writer.openBlock("func (r $P) Err() error {", "}", readerSymbol, () -> writer .write("return r.err.Err()")).write(""); @@ -725,30 +804,30 @@ private static void generateEventStreamReaderMessageWrapper( var interfaceMethod = "is" + StringUtils.capitalize(readerEventWrapperInterface.getName()); writer.write(""" - type $T interface { - $L() - } - """, readerEventWrapperInterface, interfaceMethod); + type $T interface { + $L() + } + """, readerEventWrapperInterface, interfaceMethod); var readerEventWrapperMessageType = getReaderEventWrapperMessageType(symbolProvider, eventStream, service); writer.write(""" - type $T struct { - Value $P - } - - func ($P) $L() {} - """, readerEventWrapperMessageType, eventUnionSymbol, readerEventWrapperMessageType, + type $T struct { + Value $P + } + + func ($P) $L() {} + """, readerEventWrapperMessageType, eventUnionSymbol, readerEventWrapperMessageType, interfaceMethod); var readerEventWrapperInitialResponseType = getReaderEventWrapperInitialResponseType(symbolProvider, eventStream, service); writer.write(""" - type $T struct { - Value interface{} - } - - func ($P) $L() {} - """, readerEventWrapperInitialResponseType, readerEventWrapperInitialResponseType, + type $T struct { + Value interface{} + } + + func ($P) $L() {} + """, readerEventWrapperInitialResponseType, readerEventWrapperInitialResponseType, interfaceMethod); } @@ -794,16 +873,16 @@ private static void generateEventStreamWriter( var onceErr = getSymbol("OnceErr", SmithyGoDependency.SMITHY_SYNC); writer.write(""" - encoder $P - signer $T - stream chan $T - serializationBuffer $P - signingBuffer $P - eventStream $T - done chan struct{} - closeOnce $T - err $P - """, encoderSymbol, signerInterface, asyncEventSymbol, bytesBufferSymbol, bytesBufferSymbol, + encoder $P + signer $T + stream chan $T + serializationBuffer $P + signingBuffer $P + eventStream $T + done chan struct{} + closeOnce $T + err $P + """, encoderSymbol, signerInterface, asyncEventSymbol, bytesBufferSymbol, bytesBufferSymbol, writeCloser, syncOnce, onceErr); if (withInitialMessages) { @@ -825,24 +904,24 @@ private static void generateEventStreamWriter( var onceErr = SymbolUtils.createValueSymbolBuilder("NewOnceErr", SmithyGoDependency.SMITHY_SYNC).build(); writer.write(""" - encoder: encoder, - signer: signer, - stream: make(chan $T), - eventStream: stream, - done: make(chan struct{}), - err: $T(), - serializationBuffer: $T(nil), - signingBuffer: $T(nil), - """, asyncEventSymbol, onceErr, bytesNewBuffer, bytesNewBuffer); + encoder: encoder, + signer: signer, + stream: make(chan $T), + eventStream: stream, + done: make(chan struct{}), + err: $T(), + serializationBuffer: $T(nil), + signingBuffer: $T(nil), + """, asyncEventSymbol, onceErr, bytesNewBuffer, bytesNewBuffer); if (withInitialMessages) { writer.write("initialRequestSerializer: irs,"); } }).write("") .write(""" - go w.writeStream() - - return w - """)).write(""); + go w.writeStream() + + return w + """)).write(""); Symbol contextSymbol = SymbolUtils.createValueSymbolBuilder("Context", SmithyGoDependency.CONTEXT).build(); @@ -859,17 +938,17 @@ private static void generateEventStreamWriter( writer.openBlock("func (w $P) send(ctx $P, event $P) error {", "}", writerSymbol, contextSymbol, eventSymbol, () -> { writer.write(""" - if err := w.err.Err(); err != nil { - return err - } - - resultCh := make(chan error) - - wrapped := $T{ - Event: event, - Result: resultCh, - } - """, asyncEventSymbol); + if err := w.err.Err(); err != nil { + return err + } + + resultCh := make(chan error) + + wrapped := $T{ + Event: event, + Result: resultCh, + } + """, asyncEventSymbol); Symbol errorfSymbol = SymbolUtils.createValueSymbolBuilder("Errorf", SmithyGoDependency.FMT) .build(); @@ -877,22 +956,22 @@ private static void generateEventStreamWriter( writer.openBlock("select {", "}", () -> writer .write(""" - case w.stream <- wrapped: - case <-ctx.Done(): - return ctx.Err() - case <-w.done: - return $T($S) - """, errorfSymbol, streamClosedError)).write(""); + case w.stream <- wrapped: + case <-ctx.Done(): + return ctx.Err() + case <-w.done: + return $T($S) + """, errorfSymbol, streamClosedError)).write(""); writer.openBlock("select {", "}", () -> writer .write(""" - case err := <-resultCh: - return err - case <-ctx.Done(): - return ctx.Err() - case <-w.done: - return $T($S) - """, errorfSymbol, streamClosedError)).write(""); + case err := <-resultCh: + return err + case <-ctx.Done(): + return ctx.Err() + case <-w.done: + return $T($S) + """, errorfSymbol, streamClosedError)).write(""); }).write(""); writer.openBlock("func (w $P) writeStream() {", "}", writerSymbol, () -> writer @@ -914,17 +993,17 @@ private static void generateEventStreamWriter( Runnable returnErr = () -> writer.openBlock("if err != nil {", "}", () -> writer.write("return err")) .write(""); writer.writeDocs(""" - serializedEvent returned bytes refers to an underlying byte buffer and must not escape - this writeEvent scope without first copying. Any previous bytes stored in the buffer - are cleared by this call. - """); + serializedEvent returned bytes refers to an underlying byte buffer and must not escape + this writeEvent scope without first copying. Any previous bytes stored in the buffer + are cleared by this call. + """); writer.write("serializedEvent, err := w.serializeEvent(event)"); returnErr.run(); writer.writeDocs(""" - signedEvent returned bytes refers to an underlying byte buffer and must not escape - this writeEvent scope without first copying. Any previous bytes stored in the buffer - are cleared by this call. - """); + signedEvent returned bytes refers to an underlying byte buffer and must not escape + this writeEvent scope without first copying. Any previous bytes stored in the buffer + are cleared by this call. + """); writer.write("signedEvent, err := w.signEvent(serializedEvent)"); returnErr.run(); writer.writeDocs("bytes are now copied to the underlying stream writer"); @@ -947,34 +1026,34 @@ private static void generateEventStreamWriter( service); var errorf = getSymbol("Errorf", SmithyGoDependency.FMT, false); writer.write(""" - switch ev := event.(type) { - case $P: - if err := w.initialRequestSerializer(ev.Value, &eventMessage); err != nil { - return nil, err - } - case $P: - if err := $L(ev.Value, &eventMessage); err != nil { - return nil, err - } - default: - return nil, $T("unknown event wrapper type: %v", event) - } - """, initialRequestType, messageEventType, eventStreamSerializerName, errorf); + switch ev := event.(type) { + case $P: + if err := w.initialRequestSerializer(ev.Value, &eventMessage); err != nil { + return nil, err + } + case $P: + if err := $L(ev.Value, &eventMessage); err != nil { + return nil, err + } + default: + return nil, $T("unknown event wrapper type: %v", event) + } + """, initialRequestType, messageEventType, eventStreamSerializerName, errorf); } else { writer.write(""" - if err := $L(event, &eventMessage); err != nil { - return nil, err - } - """, + if err := $L(event, &eventMessage); err != nil { + return nil, err + } + """, eventStreamSerializerName); } writer.write(""" - if err := w.encoder.Encode(w.serializationBuffer, eventMessage); err != nil { - return nil, err - } - - return w.serializationBuffer.Bytes(), nil"""); + if err := w.encoder.Encode(w.serializationBuffer, eventMessage); err != nil { + return nil, err + } + + return w.serializationBuffer.Bytes(), nil"""); }).write(""); writer.openBlock("func (w $P) signEvent(payload []byte) ([]byte, error) {", "}", writerSymbol, () -> { @@ -994,7 +1073,7 @@ private static void generateEventStreamWriter( getEventStreamSymbol("EncodeHeaders", false), () -> writer.write("return nil, err")).write("") .write("sig, err := w.signer.GetSignature(context.Background(), headers.Bytes(), " - + "msg.Payload, date)") + + "msg.Payload, date)") .openBlock("if err != nil {", "}", () -> writer .write("return nil, err")).write("") .write("msg.Headers.Set($T, $T(sig))", chunkSignatureHeader, bytesValue).write("") @@ -1008,9 +1087,9 @@ private static void generateEventStreamWriter( .openBlock("if cErr := w.eventStream.Close(); cErr != nil && err == nil {", "}", () -> writer.write("err = cErr"))).write("") .write(""" - // Per the protocol, a signed empty message is used to indicate the end of the stream, - // and that no subsequent events will be sent. - signedEvent, err := w.signEvent([]byte{})""") + // Per the protocol, a signed empty message is used to indicate the end of the stream, + // and that no subsequent events will be sent. + signedEvent, err := w.signEvent([]byte{})""") .openBlock("if err != nil {", "}", () -> writer.write("return err")).write("") .write("_, err = io.Copy(w.eventStream, bytes.NewReader(signedEvent))") .write("return err")).write(""); @@ -1039,30 +1118,30 @@ private static void generateEventStreamWriterMessageWrapper( var writerEventWrapperInterface = getWriterEventWrapperInterface(symbolProvider, eventStream, service); var interfaceMethod = "is" + StringUtils.capitalize(writerEventWrapperInterface.getName()); writer.write(""" - type $T interface { - $L() - } - """, writerEventWrapperInterface, interfaceMethod); + type $T interface { + $L() + } + """, writerEventWrapperInterface, interfaceMethod); var writerEventWrapperMessageType = getWriterEventWrapperMessageType(symbolProvider, eventStream, service); writer.write(""" - type $T struct { - Value $P - } - - func ($P) $L() {} - """, writerEventWrapperMessageType, eventUnionSymbol, writerEventWrapperMessageType, + type $T struct { + Value $P + } + + func ($P) $L() {} + """, writerEventWrapperMessageType, eventUnionSymbol, writerEventWrapperMessageType, interfaceMethod); var writerEventWrapperInitialRequestType = getWriterEventWrapperInitialRequestType(symbolProvider, eventStream, service); writer.write(""" - type $T struct { - Value interface{} - } - - func ($P) $L() {} - """, writerEventWrapperInitialRequestType, writerEventWrapperInitialRequestType, interfaceMethod); + type $T struct { + Value interface{} + } + + func ($P) $L() {} + """, writerEventWrapperInitialRequestType, writerEventWrapperInitialRequestType, interfaceMethod); } private static Symbol getWriterEventWrapperInterface( @@ -1146,10 +1225,10 @@ public static void generateEventStreamSerializer( getEventStreamSymbol("Message"), () -> { Symbol errof = getSymbol("Errorf", SmithyGoDependency.FMT, false); writer.write(""" - if v == nil { - return $T("unexpected serialization of nil %T", v) - } - """, errof) + if v == nil { + return $T("unexpected serialization of nil %T", v) + } + """, errof) .write("") .openBlock("switch vv := v.(type) {", "}", () -> { for (MemberShape member : eventUnion.members()) { @@ -1168,9 +1247,9 @@ public static void generateEventStreamSerializer( model.expectShape(member.getTarget()), "vv.Value"))); } writer.write(""" - default: - return $T("unexpected event message type: %v", v) - """, errof); + default: + return $T("unexpected event message type: %v", v) + """, errof); }); }); } @@ -1198,10 +1277,10 @@ public static void generateEventMessageSerializer( writer.openBlock("func $L(v $P, msg $P) error {", "}", serializerName, symbolProvider.toSymbol(targetShape), getEventStreamSymbol("Message"), () -> { writer.write(""" - if v == nil { - return $T("unexpected serialization of nil %T", v) - } - """, errorf).write("") + if v == nil { + return $T("unexpected serialization of nil %T", v) + } + """, errorf).write("") .write("msg.Headers.Set($T, $T($T))", messageTypeHeader, stringValue, eventMessageType); var headerBindings = targetShape.members().stream() @@ -1249,7 +1328,7 @@ public static void generateEventMessageSerializer( break; default: throw new CodegenException("unexpected event payload shape: " - + payloadTarget.getType()); + + payloadTarget.getType()); } } } else { @@ -1274,17 +1353,17 @@ public static void generateEventStreamDeserializer(GenerationContext context, Un var equalFold = SymbolUtils.createValueSymbolBuilder("EqualFold", SmithyGoDependency.STRINGS).build(); writer.write(""" - if v == nil { - return $T("unexpected serialization of nil %T", v) - } - """, errof) + if v == nil { + return $T("unexpected serialization of nil %T", v) + } + """, errof) .write("") .write(""" - eventType := msg.Headers.Get($T) - if eventType == nil { - return $T("%s event header not present", $T) - } - """, eventTypeHeader, errof, eventTypeHeader).write("") + eventType := msg.Headers.Get($T) + if eventType == nil { + return $T("%s event header not present", $T) + } + """, eventTypeHeader, errof, eventTypeHeader).write("") .openBlock("switch {", "}", () -> { var members = eventUnion.members().stream() .filter(ms -> ms.getMemberTrait(model, ErrorTrait.class).isEmpty()) @@ -1301,27 +1380,27 @@ public static void generateEventStreamDeserializer(GenerationContext context, Un eventUnionSymbol.getNamespace()) .build(); writer.write(""" - vv := &$T{} - if err := $L(&vv.Value, msg); err != nil { - return err - } - *v = vv - return nil - """, memberSymbol, messageDeserializerName); + vv := &$T{} + if err := $L(&vv.Value, msg); err != nil { + return err + } + *v = vv + return nil + """, memberSymbol, messageDeserializerName); }); } var newBuffer = getSymbol("NewBuffer", SmithyGoDependency.BYTES); var newEncoder = getEventStreamSymbol("NewEncoder"); writer.write(""" - default: - buffer := $T(nil) - $T().Encode(buffer, *msg) - *v = &$T{ - Tag: eventType.String(), - Value: buffer.Bytes(), - } - return nil - """, newBuffer, newEncoder, SymbolUtils. + default: + buffer := $T(nil) + $T().Encode(buffer, *msg) + *v = &$T{ + Tag: eventType.String(), + Value: buffer.Bytes(), + } + return nil + """, newBuffer, newEncoder, SymbolUtils. createValueSymbolBuilder("UnknownUnionMember", eventUnionSymbol.getNamespace()).build()); }); @@ -1348,11 +1427,11 @@ public static void generateEventStreamExceptionDeserializer( writer.openBlock("func $L(msg $P) error {", "}", deserializerName, getEventStreamSymbol("Message"), () -> { writer.write(""" - exceptionType := msg.Headers.Get($T) - if exceptionType == nil { - return $T("%s event header not present", $T) - } - """, exceptionTypeHeader, errorf, exceptionTypeHeader).write(""); + exceptionType := msg.Headers.Get($T) + if exceptionType == nil { + return $T("%s event header not present", $T) + } + """, exceptionTypeHeader, errorf, exceptionTypeHeader).write(""); var errorMemberShapes = eventUnion.members().stream() .filter(ms -> ms.getMemberTrait(model, ErrorTrait.class).isPresent()) @@ -1426,10 +1505,10 @@ public static void generateEventMessageDeserializer( writer.openBlock("func $L(v $P, msg $P) error {", "}", deserializerName, symbolProvider.toSymbol(targetShape), getEventStreamSymbol("Message"), () -> { writer.write(""" - if v == nil { - return $T("unexpected serialization of nil %T", v) - } - """, errorf).write(""); + if v == nil { + return $T("unexpected serialization of nil %T", v) + } + """, errorf).write(""); var headerBindings = targetShape.members().stream() .filter(memberShape -> memberShape.hasTrait(EventHeaderTrait.class)) @@ -1469,9 +1548,9 @@ public static void generateEventMessageDeserializer( case BLOB: writer.openBlock("if msg.Payload != nil {", "}", () -> { writer.write(""" - bsv := make([]byte, len(msg.Payload)) - copy(bsv, msg.Payload) - """); + bsv := make([]byte, len(msg.Payload)) + copy(bsv, msg.Payload) + """); var pointable = CodegenUtils.getAsPointerIfPointable(model, writer, pointableIndex, memberShape, "bsv"); writer.write("$L = $L", String.format("v.%s", @@ -1481,7 +1560,7 @@ public static void generateEventMessageDeserializer( break; default: throw new CodegenException("unexpected event payload shape: " - + payloadTarget.getType()); + + payloadTarget.getType()); } } } else { @@ -1522,12 +1601,18 @@ private static String getEventStreamMessageSerializerName( return getSerDeName(toShapeId, serviceShape, protocolName, "_serializeEventMessage"); } - private static String getEventStreamWriterImplConstructorName(UnionShape unionShape, ServiceShape serviceShape) { - return "new" + StringUtils.capitalize(getEventStreamReaderImplName(unionShape, serviceShape)); + private static String getEventStreamWriterImplConstructorName( + UnionShape unionShape, ServiceShape + serviceShape + ) { + return "new" + StringUtils.capitalize(getEventStreamWriterImplName(unionShape, serviceShape)); } - private static String getEventStreamReaderImplConstructorName(UnionShape unionShape, ServiceShape serviceShape) { - return "new" + StringUtils.capitalize(getEventStreamWriterImplName(unionShape, serviceShape)); + private static String getEventStreamReaderImplConstructorName( + UnionShape unionShape, ServiceShape + serviceShape + ) { + return "new" + StringUtils.capitalize(getEventStreamReaderImplName(unionShape, serviceShape)); } private static void generateAsyncWriteReporter(GoWriter writer, Symbol eventSymbol, Symbol asyncEventSymbol) { @@ -1620,19 +1705,19 @@ public static void generateEventMessageRequestSerializer( () -> { var inputSymbol = symbolProvider.toSymbol(inputShape); writer.write(""" - if i == nil { - return $T("event message serializer expects non-nil %T", ($P)(nil)) - } - - v, ok := i.($P) - if !ok { - return $T("unexpected serialization of %T", i) - } - """, errorf, inputSymbol, inputSymbol, errorf).write("") + if i == nil { + return $T("event message serializer expects non-nil %T", ($P)(nil)) + } + + v, ok := i.($P) + if !ok { + return $T("unexpected serialization of %T", i) + } + """, errorf, inputSymbol, inputSymbol, errorf).write("") .write(""" - msg.Headers.Set($T, $T($T)) - msg.Headers.Set($T, $T($S)) - """, + msg.Headers.Set($T, $T($T)) + msg.Headers.Set($T, $T($S)) + """, messageTypeHeader, stringValue, eventMessageType, eventTypeHeader, stringValue, "initial-request" ).write(""); @@ -1682,7 +1767,7 @@ private static String getSerDeName( ToShapeId toShapeId, ServiceShape serviceShape, String protocolName, String name ) { return StringUtils.uncapitalize(protocolName) + name - + toShapeId.toShapeId().getName(serviceShape); + + toShapeId.toShapeId().getName(serviceShape); } public static void writeOperationSerializerMiddlewareEventStreamSetup( @@ -1912,10 +1997,10 @@ private void writeTypeDeserializer(Symbol apiHeaderType, Symbol concreteType, Ru .openBlock("if headerValue != nil {", "}", () -> { writer.write("hv, ok := headerValue.($P)", apiHeaderType) .write(""" - if !ok { - return $T("unexpected event header %s with type %T:", $S, headerValue) - } - """, errorf, headerName).write("") + if !ok { + return $T("unexpected event header %s with type %T:", $S, headerValue) + } + """, errorf, headerName).write("") .write("ihv := hv.Get().($P)", concreteType); setter.run(); }); diff --git a/service/kinesis/eventstream.go b/service/kinesis/eventstream.go index c044348a4f0..39598ca82a1 100644 --- a/service/kinesis/eventstream.go +++ b/service/kinesis/eventstream.go @@ -57,7 +57,7 @@ type subscribeToShardEventStreamReader struct { initialResponse chan interface{} } -func newSubscribeToShardEventStreamWriter(readCloser io.ReadCloser, decoder *eventstream.Decoder, ird func(*eventstream.Message) (interface{}, error)) *subscribeToShardEventStreamReader { +func newSubscribeToShardEventStreamReader(readCloser io.ReadCloser, decoder *eventstream.Decoder, ird func(*eventstream.Message) (interface{}, error)) *subscribeToShardEventStreamReader { w := &subscribeToShardEventStreamReader{ stream: make(chan types.SubscribeToShardEventStream), decoder: decoder, @@ -251,7 +251,7 @@ func (m *awsAwsjson11_deserializeOpEventStreamSubscribeToShard) HandleDeserializ out.Result = output } - eventReader := newSubscribeToShardEventStreamWriter( + eventReader := newSubscribeToShardEventStreamReader( deserializeOutput.Body, eventstream.NewDecoder(func(options *eventstream.DecoderOptions) { options.Logger = logger @@ -291,10 +291,14 @@ func (*awsAwsjson11_deserializeOpEventStreamSubscribeToShard) closeResponseBody( } func addEventStreamSubscribeToShardMiddleware(stack *middleware.Stack, options Options) error { - return stack.Deserialize.Insert(&awsAwsjson11_deserializeOpEventStreamSubscribeToShard{ + if err := stack.Deserialize.Insert(&awsAwsjson11_deserializeOpEventStreamSubscribeToShard{ LogEventStreamWrites: options.ClientLogMode.IsRequestEventMessage(), LogEventStreamReads: options.ClientLogMode.IsResponseEventMessage(), - }, "OperationDeserializer", middleware.Before) + }, "OperationDeserializer", middleware.Before); err != nil { + return err + } + return nil + } // UnknownEventMessageError provides an error when a message is received from the stream, diff --git a/service/lexruntimev2/eventstream.go b/service/lexruntimev2/eventstream.go index bec14364018..2a37aaa68e8 100644 --- a/service/lexruntimev2/eventstream.go +++ b/service/lexruntimev2/eventstream.go @@ -74,7 +74,7 @@ type startConversationRequestEventStreamWriter struct { err *smithysync.OnceErr } -func newStartConversationRequestEventStreamReader(stream io.WriteCloser, encoder *eventstream.Encoder, signer eventStreamSigner) *startConversationRequestEventStreamWriter { +func newStartConversationRequestEventStreamWriter(stream io.WriteCloser, encoder *eventstream.Encoder, signer eventStreamSigner) *startConversationRequestEventStreamWriter { w := &startConversationRequestEventStreamWriter{ encoder: encoder, signer: signer, @@ -263,7 +263,7 @@ type startConversationResponseEventStreamReader struct { closeOnce sync.Once } -func newStartConversationResponseEventStreamWriter(readCloser io.ReadCloser, decoder *eventstream.Decoder) *startConversationResponseEventStreamReader { +func newStartConversationResponseEventStreamReader(readCloser io.ReadCloser, decoder *eventstream.Decoder) *startConversationResponseEventStreamReader { w := &startConversationResponseEventStreamReader{ stream: make(chan types.StartConversationResponseEventStream), decoder: decoder, @@ -424,7 +424,7 @@ func (m *awsRestjson1_deserializeOpEventStreamStartConversation) HandleDeseriali requestSignature, ) - eventWriter := newStartConversationRequestEventStreamReader( + eventWriter := newStartConversationRequestEventStreamWriter( eventstreamapi.GetInputStreamWriter(ctx), eventstream.NewEncoder(func(options *eventstream.EncoderOptions) { options.Logger = logger @@ -459,7 +459,7 @@ func (m *awsRestjson1_deserializeOpEventStreamStartConversation) HandleDeseriali out.Result = output } - eventReader := newStartConversationResponseEventStreamWriter( + eventReader := newStartConversationResponseEventStreamReader( deserializeOutput.Body, eventstream.NewDecoder(func(options *eventstream.DecoderOptions) { options.Logger = logger @@ -492,10 +492,14 @@ func (*awsRestjson1_deserializeOpEventStreamStartConversation) closeResponseBody } func addEventStreamStartConversationMiddleware(stack *middleware.Stack, options Options) error { - return stack.Deserialize.Insert(&awsRestjson1_deserializeOpEventStreamStartConversation{ + if err := stack.Deserialize.Insert(&awsRestjson1_deserializeOpEventStreamStartConversation{ LogEventStreamWrites: options.ClientLogMode.IsRequestEventMessage(), LogEventStreamReads: options.ClientLogMode.IsResponseEventMessage(), - }, "OperationDeserializer", middleware.Before) + }, "OperationDeserializer", middleware.Before); err != nil { + return err + } + return nil + } // UnknownEventMessageError provides an error when a message is received from the stream, diff --git a/service/s3/eventstream.go b/service/s3/eventstream.go index 0e267c92730..d6cdb533727 100644 --- a/service/s3/eventstream.go +++ b/service/s3/eventstream.go @@ -38,7 +38,7 @@ type selectObjectContentEventStreamReader struct { closeOnce sync.Once } -func newSelectObjectContentEventStreamWriter(readCloser io.ReadCloser, decoder *eventstream.Decoder) *selectObjectContentEventStreamReader { +func newSelectObjectContentEventStreamReader(readCloser io.ReadCloser, decoder *eventstream.Decoder) *selectObjectContentEventStreamReader { w := &selectObjectContentEventStreamReader{ stream: make(chan types.SelectObjectContentEventStream), decoder: decoder, @@ -202,7 +202,7 @@ func (m *awsRestxml_deserializeOpEventStreamSelectObjectContent) HandleDeseriali out.Result = output } - eventReader := newSelectObjectContentEventStreamWriter( + eventReader := newSelectObjectContentEventStreamReader( deserializeOutput.Body, eventstream.NewDecoder(func(options *eventstream.DecoderOptions) { options.Logger = logger @@ -234,10 +234,14 @@ func (*awsRestxml_deserializeOpEventStreamSelectObjectContent) closeResponseBody } func addEventStreamSelectObjectContentMiddleware(stack *middleware.Stack, options Options) error { - return stack.Deserialize.Insert(&awsRestxml_deserializeOpEventStreamSelectObjectContent{ + if err := stack.Deserialize.Insert(&awsRestxml_deserializeOpEventStreamSelectObjectContent{ LogEventStreamWrites: options.ClientLogMode.IsRequestEventMessage(), LogEventStreamReads: options.ClientLogMode.IsResponseEventMessage(), - }, "OperationDeserializer", middleware.Before) + }, "OperationDeserializer", middleware.Before); err != nil { + return err + } + return nil + } // UnknownEventMessageError provides an error when a message is received from the stream, diff --git a/service/transcribestreaming/eventstream.go b/service/transcribestreaming/eventstream.go index b6814886e25..6995964715d 100644 --- a/service/transcribestreaming/eventstream.go +++ b/service/transcribestreaming/eventstream.go @@ -83,7 +83,7 @@ type audioStreamWriter struct { err *smithysync.OnceErr } -func newAudioStreamReader(stream io.WriteCloser, encoder *eventstream.Encoder, signer eventStreamSigner) *audioStreamWriter { +func newAudioStreamWriter(stream io.WriteCloser, encoder *eventstream.Encoder, signer eventStreamSigner) *audioStreamWriter { w := &audioStreamWriter{ encoder: encoder, signer: signer, @@ -272,7 +272,7 @@ type medicalTranscriptResultStreamReader struct { closeOnce sync.Once } -func newMedicalTranscriptResultStreamWriter(readCloser io.ReadCloser, decoder *eventstream.Decoder) *medicalTranscriptResultStreamReader { +func newMedicalTranscriptResultStreamReader(readCloser io.ReadCloser, decoder *eventstream.Decoder) *medicalTranscriptResultStreamReader { w := &medicalTranscriptResultStreamReader{ stream: make(chan types.MedicalTranscriptResultStream), decoder: decoder, @@ -400,7 +400,7 @@ type transcriptResultStreamReader struct { closeOnce sync.Once } -func newTranscriptResultStreamWriter(readCloser io.ReadCloser, decoder *eventstream.Decoder) *transcriptResultStreamReader { +func newTranscriptResultStreamReader(readCloser io.ReadCloser, decoder *eventstream.Decoder) *transcriptResultStreamReader { w := &transcriptResultStreamReader{ stream: make(chan types.TranscriptResultStream), decoder: decoder, @@ -561,7 +561,7 @@ func (m *awsRestjson1_deserializeOpEventStreamStartMedicalStreamTranscription) H requestSignature, ) - eventWriter := newAudioStreamReader( + eventWriter := newAudioStreamWriter( eventstreamapi.GetInputStreamWriter(ctx), eventstream.NewEncoder(func(options *eventstream.EncoderOptions) { options.Logger = logger @@ -596,7 +596,7 @@ func (m *awsRestjson1_deserializeOpEventStreamStartMedicalStreamTranscription) H out.Result = output } - eventReader := newMedicalTranscriptResultStreamWriter( + eventReader := newMedicalTranscriptResultStreamReader( deserializeOutput.Body, eventstream.NewDecoder(func(options *eventstream.DecoderOptions) { options.Logger = logger @@ -629,10 +629,14 @@ func (*awsRestjson1_deserializeOpEventStreamStartMedicalStreamTranscription) clo } func addEventStreamStartMedicalStreamTranscriptionMiddleware(stack *middleware.Stack, options Options) error { - return stack.Deserialize.Insert(&awsRestjson1_deserializeOpEventStreamStartMedicalStreamTranscription{ + if err := stack.Deserialize.Insert(&awsRestjson1_deserializeOpEventStreamStartMedicalStreamTranscription{ LogEventStreamWrites: options.ClientLogMode.IsRequestEventMessage(), LogEventStreamReads: options.ClientLogMode.IsResponseEventMessage(), - }, "OperationDeserializer", middleware.Before) + }, "OperationDeserializer", middleware.Before); err != nil { + return err + } + return nil + } type awsRestjson1_deserializeOpEventStreamStartStreamTranscription struct { @@ -678,7 +682,7 @@ func (m *awsRestjson1_deserializeOpEventStreamStartStreamTranscription) HandleDe requestSignature, ) - eventWriter := newAudioStreamReader( + eventWriter := newAudioStreamWriter( eventstreamapi.GetInputStreamWriter(ctx), eventstream.NewEncoder(func(options *eventstream.EncoderOptions) { options.Logger = logger @@ -713,7 +717,7 @@ func (m *awsRestjson1_deserializeOpEventStreamStartStreamTranscription) HandleDe out.Result = output } - eventReader := newTranscriptResultStreamWriter( + eventReader := newTranscriptResultStreamReader( deserializeOutput.Body, eventstream.NewDecoder(func(options *eventstream.DecoderOptions) { options.Logger = logger @@ -746,10 +750,14 @@ func (*awsRestjson1_deserializeOpEventStreamStartStreamTranscription) closeRespo } func addEventStreamStartStreamTranscriptionMiddleware(stack *middleware.Stack, options Options) error { - return stack.Deserialize.Insert(&awsRestjson1_deserializeOpEventStreamStartStreamTranscription{ + if err := stack.Deserialize.Insert(&awsRestjson1_deserializeOpEventStreamStartStreamTranscription{ LogEventStreamWrites: options.ClientLogMode.IsRequestEventMessage(), LogEventStreamReads: options.ClientLogMode.IsResponseEventMessage(), - }, "OperationDeserializer", middleware.Before) + }, "OperationDeserializer", middleware.Before); err != nil { + return err + } + return nil + } // UnknownEventMessageError provides an error when a message is received from the stream,