Skip to content

Commit

Permalink
Added AsyncResponseTransformer.toBlockingInputStream. (#3562)
Browse files Browse the repository at this point in the history
* Added AsyncResponseTransformer.toBlockingInputStream, allowing streaming operation responses to be read as if they're an InputStream.

* Fix sonarcloud issue.
  • Loading branch information
millems committed Nov 23, 2022
1 parent 0fd7ea5 commit 70929c3
Show file tree
Hide file tree
Showing 11 changed files with 1,088 additions and 3 deletions.
6 changes: 6 additions & 0 deletions .changes/next-release/feature-AWSSDKforJavav2-a887c1c.json
@@ -0,0 +1,6 @@
{
"category": "AWS SDK for Java v2",
"contributor": "",
"type": "feature",
"description": "Added AsyncResponseTransformer.toBlockingInputStream, allowing streaming operation responses to be read as if they're an InputStream."
}
Expand Up @@ -15,6 +15,7 @@

package software.amazon.awssdk.core;

import java.io.InputStream;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.core.io.SdkFilterInputStream;
import software.amazon.awssdk.http.Abortable;
Expand Down Expand Up @@ -43,6 +44,12 @@ public ResponseInputStream(ResponseT resp, AbortableInputStream in) {
this.abortable = Validate.paramNotNull(in, "abortableInputStream");
}

public ResponseInputStream(ResponseT resp, InputStream in) {
super(in);
this.response = Validate.paramNotNull(resp, "response");
this.abortable = in instanceof Abortable ? (Abortable) in : null;
}

/**
* @return The unmarshalled POJO response associated with this content.
*/
Expand All @@ -52,6 +59,8 @@ public ResponseT response() {

@Override
public void abort() {
abortable.abort();
if (abortable != null) {
abortable.abort();
}
}
}
Expand Up @@ -16,16 +16,19 @@
package software.amazon.awssdk.core.async;

import java.io.File;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.file.Path;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.core.FileTransformerConfiguration;
import software.amazon.awssdk.core.ResponseBytes;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.SdkResponse;
import software.amazon.awssdk.core.internal.async.ByteArrayAsyncResponseTransformer;
import software.amazon.awssdk.core.internal.async.FileAsyncResponseTransformer;
import software.amazon.awssdk.core.internal.async.InputStreamResponseTransformer;
import software.amazon.awssdk.core.internal.async.PublisherAsyncResponseTransformer;
import software.amazon.awssdk.utils.Validate;

Expand Down Expand Up @@ -232,4 +235,32 @@ static <ResponseT> AsyncResponseTransformer<ResponseT, ResponseBytes<ResponseT>>
static <ResponseT extends SdkResponse> AsyncResponseTransformer<ResponseT, ResponsePublisher<ResponseT>> toPublisher() {
return new PublisherAsyncResponseTransformer<>();
}

/**
* Creates an {@link AsyncResponseTransformer} that allows reading the response body content as an
* {@link InputStream}.
* <p>
* When this transformer is used with an async client, the {@link CompletableFuture} that the client returns will
* be completed once the {@link SdkResponse} is available and the response body <i>begins</i> streaming. This
* behavior differs from some other transformers, like {@link #toFile(Path)} and {@link #toBytes()}, which only
* have their {@link CompletableFuture} completed after the entire response body has finished streaming.
* <p>
* You are responsible for performing blocking reads from this input stream and closing the stream when you are
* finished.
* <p>
* Example usage:
* <pre>
* {@code
* CompletableFuture<ResponseInputStream<GetObjectResponse>> responseFuture =
* s3AsyncClient.getObject(getObjectRequest, AsyncResponseTransformer.toBlockingInputStream());
* try (ResponseInputStream<GetObjectResponse> responseStream = responseFuture.join()) {
* responseStream.transferTo(System.out); // BLOCKS the calling thread
* }
* }
* </pre>
*/
static <ResponseT extends SdkResponse>
AsyncResponseTransformer<ResponseT, ResponseInputStream<ResponseT>> toBlockingInputStream() {
return new InputStreamResponseTransformer<>();
}
}
@@ -0,0 +1,62 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.core.internal.async;

import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.SdkResponse;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.utils.async.InputStreamSubscriber;

/**
* A {@link AsyncResponseTransformer} that allows performing blocking reads on the response data.
* <p>
* Created with {@link AsyncResponseTransformer#toBlockingInputStream()}.
*/
@SdkInternalApi
public class InputStreamResponseTransformer<ResponseT extends SdkResponse>
implements AsyncResponseTransformer<ResponseT, ResponseInputStream<ResponseT>> {

private volatile CompletableFuture<ResponseInputStream<ResponseT>> future;
private volatile ResponseT response;

@Override
public CompletableFuture<ResponseInputStream<ResponseT>> prepare() {
CompletableFuture<ResponseInputStream<ResponseT>> result = new CompletableFuture<>();
this.future = result;
return result;
}

@Override
public void onResponse(ResponseT response) {
this.response = response;
}

@Override
public void onStream(SdkPublisher<ByteBuffer> publisher) {
InputStreamSubscriber inputStreamSubscriber = new InputStreamSubscriber();
publisher.subscribe(inputStreamSubscriber);
future.complete(new ResponseInputStream<>(response, inputStreamSubscriber));
}

@Override
public void exceptionOccurred(Throwable error) {
future.completeExceptionally(error);
}
}
@@ -0,0 +1,83 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.core.internal.async;

import static org.assertj.core.api.Assertions.assertThat;

import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.SdkResponse;
import software.amazon.awssdk.core.async.SdkPublisher;
import software.amazon.awssdk.core.protocol.VoidSdkResponse;
import software.amazon.awssdk.utils.async.SimplePublisher;

class InputStreamResponseTransformerTest {
private SimplePublisher<ByteBuffer> publisher;
private InputStreamResponseTransformer<SdkResponse> transformer;
private SdkResponse response;
private CompletableFuture<ResponseInputStream<SdkResponse>> resultFuture;

@BeforeEach
public void setup() {
publisher = new SimplePublisher<>();
transformer = new InputStreamResponseTransformer<>();
resultFuture = transformer.prepare();
response = VoidSdkResponse.builder().build();

transformer.onResponse(response);

assertThat(resultFuture).isNotDone();

transformer.onStream(SdkPublisher.adapt(publisher));

assertThat(resultFuture).isCompleted();
assertThat(resultFuture.join().response()).isEqualTo(response);
}

@Test
public void inputStreamReadsAreFromPublisher() throws IOException {
InputStream stream = resultFuture.join();

publisher.send(ByteBuffer.wrap(new byte[] { 0, 1, 2 }));
publisher.complete();

assertThat(stream.read()).isEqualTo(0);
assertThat(stream.read()).isEqualTo(1);
assertThat(stream.read()).isEqualTo(2);
assertThat(stream.read()).isEqualTo(-1);
}

@Test
public void inputStreamArrayReadsAreFromPublisher() throws IOException {
InputStream stream = resultFuture.join();

publisher.send(ByteBuffer.wrap(new byte[] { 0, 1, 2 }));
publisher.complete();

byte[] data = new byte[3];
assertThat(stream.read(data)).isEqualTo(3);

assertThat(data[0]).isEqualTo((byte) 0);
assertThat(data[1]).isEqualTo((byte) 1);
assertThat(data[2]).isEqualTo((byte) 2);
assertThat(stream.read(data)).isEqualTo(-1);
}
}
@@ -0,0 +1,89 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.services;

import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
import static com.github.tomakehurst.wiremock.client.WireMock.anyUrl;
import static com.github.tomakehurst.wiremock.client.WireMock.post;
import static org.assertj.core.api.Assertions.assertThat;

import com.github.tomakehurst.wiremock.WireMockServer;
import java.io.IOException;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider;
import software.amazon.awssdk.core.ResponseInputStream;
import software.amazon.awssdk.core.async.AsyncResponseTransformer;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.protocolrestjson.ProtocolRestJsonAsyncClient;
import software.amazon.awssdk.services.protocolrestjson.model.StreamingOutputOperationRequest;
import software.amazon.awssdk.services.protocolrestjson.model.StreamingOutputOperationResponse;

@Timeout(5)
public class BlockingAsyncRequestResponseBodyTest {
private final WireMockServer wireMock = new WireMockServer(0);
private ProtocolRestJsonAsyncClient client;

@BeforeEach
public void setup() {
wireMock.start();
client = ProtocolRestJsonAsyncClient.builder()
.region(Region.US_WEST_2)
.credentialsProvider(AnonymousCredentialsProvider.create())
.endpointOverride(URI.create("http://localhost:" + wireMock.port()))
.build();
}

@Test
public void blockingResponseTransformer_readsRightValue() {
wireMock.stubFor(post(anyUrl()).willReturn(aResponse().withStatus(200).withBody("hello")));

CompletableFuture<ResponseInputStream<StreamingOutputOperationResponse>> responseFuture =
client.streamingOutputOperation(StreamingOutputOperationRequest.builder().build(),
AsyncResponseTransformer.toBlockingInputStream());
ResponseInputStream<StreamingOutputOperationResponse> responseStream = responseFuture.join();

assertThat(responseStream).asString(StandardCharsets.UTF_8).isEqualTo("hello");
assertThat(responseStream.response().sdkHttpResponse().statusCode()).isEqualTo(200);
}

@Test
public void blockingResponseTransformer_abortCloseDoesNotThrow() throws IOException {
wireMock.stubFor(post(anyUrl()).willReturn(aResponse().withStatus(200).withBody("hello")));

CompletableFuture<ResponseInputStream<StreamingOutputOperationResponse>> responseFuture =
client.streamingOutputOperation(StreamingOutputOperationRequest.builder().build(),
AsyncResponseTransformer.toBlockingInputStream());
ResponseInputStream<StreamingOutputOperationResponse> responseStream = responseFuture.join();
responseStream.abort();
responseStream.close();
}

@Test
public void blockingResponseTransformer_closeDoesNotThrow() throws IOException {
wireMock.stubFor(post(anyUrl()).willReturn(aResponse().withStatus(200).withBody("hello")));

CompletableFuture<ResponseInputStream<StreamingOutputOperationResponse>> responseFuture =
client.streamingOutputOperation(StreamingOutputOperationRequest.builder().build(),
AsyncResponseTransformer.toBlockingInputStream());
ResponseInputStream<StreamingOutputOperationResponse> responseStream = responseFuture.join();
responseStream.close();
}
}

0 comments on commit 70929c3

Please sign in to comment.