Skip to content

Commit

Permalink
Catch UncheckedIOException and throw as IOException (#5201)
Browse files Browse the repository at this point in the history
* InputStreamSubscriber catch and throw UncheckedIOException as IOException

* Use utils helper in test
  • Loading branch information
davidh44 committed May 8, 2024
1 parent fe578e9 commit 7af4c50
Show file tree
Hide file tree
Showing 3 changed files with 54 additions and 15 deletions.
3 changes: 2 additions & 1 deletion pom.xml
Expand Up @@ -650,7 +650,8 @@
<includeModule>netty-nio-client</includeModule>
<includeModule>url-connection-client</includeModule>
<includeModule>cloudwatch-metric-publisher</includeModule>
<includeModule>utils</includeModule>
<!-- TODO revert - Temporarily disable utils to add throws IOException to InputStreamSubscriber.read() -->
<!-- <includeModule>utils</includeModule> -->
<includeModule>imds</includeModule>

<!-- High level libraries -->
Expand Down
Expand Up @@ -15,7 +15,9 @@

package software.amazon.awssdk.utils.async;

import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.CancellationException;
Expand All @@ -25,6 +27,7 @@
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.annotations.SdkProtectedApi;
import software.amazon.awssdk.annotations.SdkTestInternalApi;
import software.amazon.awssdk.utils.SdkAutoCloseable;
import software.amazon.awssdk.utils.async.ByteBufferStoringSubscriber.TransferResult;

Expand Down Expand Up @@ -55,6 +58,11 @@ public InputStreamSubscriber() {
this.delegate = new ByteBufferStoringSubscriber(BUFFER_SIZE);
}

@SdkTestInternalApi
public InputStreamSubscriber(ByteBufferStoringSubscriber delegate) {
this.delegate = delegate;
}

@Override
public void onSubscribe(Subscription s) {
synchronized (subscribeLock) {
Expand Down Expand Up @@ -90,9 +98,15 @@ public void onComplete() {
}

@Override
public int read() {
public int read() throws IOException {
singleByte.clear();
TransferResult transferResult = delegate.blockingTransferTo(singleByte);

TransferResult transferResult;
try {
transferResult = delegate.blockingTransferTo(singleByte);
} catch (UncheckedIOException e) {
throw e.getCause();
}

if (singleByte.hasRemaining()) {
assert transferResult == TransferResult.END_OF_STREAM;
Expand All @@ -103,18 +117,25 @@ public int read() {
}

@Override
public int read(byte[] b) {
public int read(byte[] b) throws IOException {
return read(b, 0, b.length);
}

@Override
public int read(byte[] bytes, int off, int len) {
public int read(byte[] bytes, int off, int len) throws IOException {
if (len == 0) {
return 0;
}

ByteBuffer byteBuffer = ByteBuffer.wrap(bytes, off, len);
TransferResult transferResult = delegate.blockingTransferTo(byteBuffer);

TransferResult transferResult;
try {
transferResult = delegate.blockingTransferTo(byteBuffer);
} catch (UncheckedIOException e) {
throw e.getCause();
}

int dataTransferred = byteBuffer.position() - off;

if (dataTransferred == 0) {
Expand Down
Expand Up @@ -17,10 +17,14 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
Expand All @@ -41,6 +45,7 @@
import org.mockito.stubbing.Answer;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.utils.FunctionalUtils;
import software.amazon.awssdk.utils.ThreadFactoryBuilder;

public class InputStreamSubscriberTest {
Expand All @@ -54,7 +59,7 @@ public void setup() {
}

@Test
public void onComplete_returnsEndOfStream_onRead() {
public void onComplete_returnsEndOfStream_onRead() throws IOException {
publisher.subscribe(subscriber);
publisher.complete();
assertThat(subscriber.read()).isEqualTo(-1);
Expand All @@ -74,7 +79,7 @@ public void onError_throws_onRead() {
}

@Test
public void onComplete_afterOnNext_returnsEndOfStream() {
public void onComplete_afterOnNext_returnsEndOfStream() throws IOException {
publisher.subscribe(subscriber);
publisher.send(byteBufferOfLength(1));
publisher.complete();
Expand All @@ -83,7 +88,7 @@ public void onComplete_afterOnNext_returnsEndOfStream() {
}

@Test
public void onComplete_afterEmptyOnNext_returnsEndOfStream() {
public void onComplete_afterEmptyOnNext_returnsEndOfStream() throws IOException {
publisher.subscribe(subscriber);
publisher.send(byteBufferOfLength(0));
publisher.send(byteBufferOfLength(0));
Expand All @@ -93,14 +98,14 @@ public void onComplete_afterEmptyOnNext_returnsEndOfStream() {
}

@Test
public void read_afterOnNext_returnsData() {
public void read_afterOnNext_returnsData() throws IOException {
publisher.subscribe(subscriber);
publisher.send(byteBufferWithByte(10));
assertThat(subscriber.read()).isEqualTo(10);
}

@Test
public void readBytes_afterOnNext_returnsData() {
public void readBytes_afterOnNext_returnsData() throws IOException {
publisher.subscribe(subscriber);
publisher.send(byteBufferWithByte(10));
publisher.send(byteBufferWithByte(20));
Expand All @@ -112,7 +117,7 @@ public void readBytes_afterOnNext_returnsData() {
}

@Test
public void readBytesWithOffset_afterOnNext_returnsData() {
public void readBytesWithOffset_afterOnNext_returnsData() throws IOException {
publisher.subscribe(subscriber);
publisher.send(byteBufferWithByte(10));
publisher.send(byteBufferWithByte(20));
Expand All @@ -133,7 +138,7 @@ public void read_afterClose_fails() {
}

@Test
public void readByteArray_0Len_returns0() {
public void readByteArray_0Len_returns0() throws IOException {
publisher.subscribe(subscriber);

assertThat(subscriber.read(new byte[1], 0, 0)).isEqualTo(0);
Expand Down Expand Up @@ -211,6 +216,18 @@ public void stochastic_methodCallsSeemThreadSafe(Consumer<InputStreamSubscriber>
}
}

@Test
public void read_uncheckedIOException_isThrownAsIOException() {
ByteBufferStoringSubscriber byteBufferStoringSubscriber = mock(ByteBufferStoringSubscriber.class);
when(byteBufferStoringSubscriber.blockingTransferTo(any()))
.thenThrow(new UncheckedIOException(new IOException("Not wrapped as UncheckedIOException")));
InputStreamSubscriber errorSubscriber = new InputStreamSubscriber(byteBufferStoringSubscriber);

assertThatThrownBy(() -> errorSubscriber.read(new byte[1]))
.isInstanceOf(IOException.class).hasMessage("Not wrapped as UncheckedIOException");

}

public static Consumer<InputStreamSubscriber> subscriberOnNext() {
return s -> s.onNext(ByteBuffer.allocate(1));
}
Expand All @@ -224,11 +241,11 @@ public static Consumer<InputStreamSubscriber> subscriberOnError() {
}

public static Consumer<InputStreamSubscriber> subscriberRead1() {
return s -> s.read();
return s -> FunctionalUtils.invokeSafely(() -> s.read());
}

public static Consumer<InputStreamSubscriber> subscriberReadArray() {
return s -> s.read(new byte[4]);
return s -> FunctionalUtils.invokeSafely(() -> s.read(new byte[4]));
}

public static Consumer<InputStreamSubscriber> subscriberClose() {
Expand Down

0 comments on commit 7af4c50

Please sign in to comment.