From 26b4f7781568217c4e20b3e00f2b0ad5c83b8c5e Mon Sep 17 00:00:00 2001 From: Michiel Vermeersch Date: Mon, 24 Feb 2020 14:52:32 +0100 Subject: [PATCH] Added RxJava3 adapter --- build.gradle | 1 + retrofit-adapters/rxjava3/README.md | 64 ++++ retrofit-adapters/rxjava3/build.gradle | 20 ++ retrofit-adapters/rxjava3/gradle.properties | 2 + .../adapter/rxjava3/BodyObservable.java | 89 +++++ .../rxjava3/CallEnqueueObservable.java | 106 ++++++ .../rxjava3/CallExecuteObservable.java | 88 +++++ .../adapter/rxjava3/HttpException.java | 26 ++ .../retrofit2/adapter/rxjava3/Result.java | 68 ++++ .../adapter/rxjava3/ResultObservable.java | 76 ++++ .../adapter/rxjava3/RxJava3CallAdapter.java | 97 +++++ .../rxjava3/RxJava3CallAdapterFactory.java | 151 ++++++++ .../adapter/rxjava3/package-info.java | 2 + .../retrofit2/adapter/rxjava3/AsyncTest.java | 196 ++++++++++ .../adapter/rxjava3/CancelDisposeTest.java | 82 +++++ .../rxjava3/CancelDisposeTestSync.java | 58 +++ .../adapter/rxjava3/CompletableTest.java | 97 +++++ .../rxjava3/CompletableThrowingTest.java | 141 ++++++++ .../rxjava3/CompletableWithSchedulerTest.java | 64 ++++ .../adapter/rxjava3/FlowableTest.java | 222 ++++++++++++ .../adapter/rxjava3/FlowableThrowingTest.java | 337 ++++++++++++++++++ .../rxjava3/FlowableWithSchedulerTest.java | 93 +++++ .../retrofit2/adapter/rxjava3/MaybeTest.java | 164 +++++++++ .../adapter/rxjava3/MaybeThrowingTest.java | 279 +++++++++++++++ .../rxjava3/MaybeWithSchedulerTest.java | 93 +++++ .../adapter/rxjava3/ObservableTest.java | 180 ++++++++++ .../rxjava3/ObservableThrowingTest.java | 337 ++++++++++++++++++ .../rxjava3/ObservableWithSchedulerTest.java | 93 +++++ .../rxjava3/RecordingCompletableObserver.java | 117 ++++++ .../rxjava3/RecordingMaybeObserver.java | 133 +++++++ .../adapter/rxjava3/RecordingObserver.java | 140 ++++++++ .../rxjava3/RecordingSingleObserver.java | 128 +++++++ .../adapter/rxjava3/RecordingSubscriber.java | 161 +++++++++ .../retrofit2/adapter/rxjava3/ResultTest.java | 63 ++++ .../RxJava3CallAdapterFactoryTest.java | 282 +++++++++++++++ .../rxjava3/RxJavaPluginsResetRule.java | 38 ++ .../retrofit2/adapter/rxjava3/SingleTest.java | 164 +++++++++ .../adapter/rxjava3/SingleThrowingTest.java | 276 ++++++++++++++ .../rxjava3/SingleWithSchedulerTest.java | 93 +++++ .../rxjava3/StringConverterFactory.java | 41 +++ settings.gradle | 1 + 41 files changed, 4863 insertions(+) create mode 100644 retrofit-adapters/rxjava3/README.md create mode 100644 retrofit-adapters/rxjava3/build.gradle create mode 100644 retrofit-adapters/rxjava3/gradle.properties create mode 100644 retrofit-adapters/rxjava3/src/main/java/retrofit2/adapter/rxjava3/BodyObservable.java create mode 100644 retrofit-adapters/rxjava3/src/main/java/retrofit2/adapter/rxjava3/CallEnqueueObservable.java create mode 100644 retrofit-adapters/rxjava3/src/main/java/retrofit2/adapter/rxjava3/CallExecuteObservable.java create mode 100644 retrofit-adapters/rxjava3/src/main/java/retrofit2/adapter/rxjava3/HttpException.java create mode 100644 retrofit-adapters/rxjava3/src/main/java/retrofit2/adapter/rxjava3/Result.java create mode 100644 retrofit-adapters/rxjava3/src/main/java/retrofit2/adapter/rxjava3/ResultObservable.java create mode 100644 retrofit-adapters/rxjava3/src/main/java/retrofit2/adapter/rxjava3/RxJava3CallAdapter.java create mode 100644 retrofit-adapters/rxjava3/src/main/java/retrofit2/adapter/rxjava3/RxJava3CallAdapterFactory.java create mode 100644 retrofit-adapters/rxjava3/src/main/java/retrofit2/adapter/rxjava3/package-info.java create mode 100644 retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/AsyncTest.java create mode 100644 retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/CancelDisposeTest.java create mode 100644 retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/CancelDisposeTestSync.java create mode 100644 retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/CompletableTest.java create mode 100644 retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/CompletableThrowingTest.java create mode 100644 retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/CompletableWithSchedulerTest.java create mode 100644 retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/FlowableTest.java create mode 100644 retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/FlowableThrowingTest.java create mode 100644 retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/FlowableWithSchedulerTest.java create mode 100644 retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/MaybeTest.java create mode 100644 retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/MaybeThrowingTest.java create mode 100644 retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/MaybeWithSchedulerTest.java create mode 100644 retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/ObservableTest.java create mode 100644 retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/ObservableThrowingTest.java create mode 100644 retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/ObservableWithSchedulerTest.java create mode 100644 retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/RecordingCompletableObserver.java create mode 100644 retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/RecordingMaybeObserver.java create mode 100644 retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/RecordingObserver.java create mode 100644 retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/RecordingSingleObserver.java create mode 100644 retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/RecordingSubscriber.java create mode 100644 retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/ResultTest.java create mode 100644 retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/RxJava3CallAdapterFactoryTest.java create mode 100644 retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/RxJavaPluginsResetRule.java create mode 100644 retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/SingleTest.java create mode 100644 retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/SingleThrowingTest.java create mode 100644 retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/SingleWithSchedulerTest.java create mode 100644 retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/StringConverterFactory.java diff --git a/build.gradle b/build.gradle index d3dd0930dc..82d08c59ad 100644 --- a/build.gradle +++ b/build.gradle @@ -22,6 +22,7 @@ buildscript { 'androidxTestRunner': 'androidx.test:runner:1.1.0', 'rxjava': 'io.reactivex:rxjava:1.3.8', 'rxjava2': 'io.reactivex.rxjava2:rxjava:2.0.0', + 'rxjava3': 'io.reactivex.rxjava3:rxjava:3.0.0', 'reactiveStreams': 'org.reactivestreams:reactive-streams:1.0.3', 'scalaLibrary': 'org.scala-lang:scala-library:2.13.1', 'gson': 'com.google.code.gson:gson:2.8.5', diff --git a/retrofit-adapters/rxjava3/README.md b/retrofit-adapters/rxjava3/README.md new file mode 100644 index 0000000000..1ee2a0a232 --- /dev/null +++ b/retrofit-adapters/rxjava3/README.md @@ -0,0 +1,64 @@ +RxJava3 Adapter +============== + +An `Adapter` for adapting [RxJava 3.x][1] types. + +Available types: + + * `Observable`, `Observable>`, and `Observable>` where `T` is the body type. + * `Flowable`, `Flowable>` and `Flowable>` where `T` is the body type. + * `Single`, `Single>`, and `Single>` where `T` is the body type. + * `Maybe`, `Maybe>`, and `Maybe>` where `T` is the body type. + * `Completable` where response bodies are discarded. + + +Usage +----- + +Add `RxJava3CallAdapterFactory` as a `Call` adapter when building your `Retrofit` instance: +```java +Retrofit retrofit = new Retrofit.Builder() + .baseUrl("https://example.com/") + .addCallAdapterFactory(RxJava3CallAdapterFactory.create()) + .build(); +``` + +Your service methods can now use any of the above types as their return type. +```java +interface MyService { + @GET("/user") + Observable getUser(); +} +``` + +By default all reactive types execute their requests synchronously. There are multiple ways to +control the threading on which a request occurs: + + * Call `subscribeOn` on the returned reactive type with a `Scheduler` of your choice. + * Use `createAsync()` when creating the factory which will use OkHttp's internal thread pool. + * Use `createWithScheduler(Scheduler)` to supply a default subscription `Scheduler`. + +Download +-------- + +Download [the latest JAR][2] or grab via [Maven][3]: +```xml + + com.squareup.retrofit2 + adapter-rxjava3 + latest.version + +``` +or [Gradle][3]: +```groovy +implementation 'com.squareup.retrofit2:adapter-rxjava3:latest.version' +``` + +Snapshots of the development version are available in [Sonatype's `snapshots` repository][snap]. + + + + [1]: https://github.com/ReactiveX/RxJava/tree/3.x + [2]: https://search.maven.org/remote_content?g=com.squareup.retrofit2&a=adapter-rxjava3&v=LATEST + [3]: http://search.maven.org/#search%7Cga%7C1%7Cg%3A%22com.squareup.retrofit2%22%20a%3A%22adapter-rxjava3%22 + [snap]: https://oss.sonatype.org/content/repositories/snapshots/ diff --git a/retrofit-adapters/rxjava3/build.gradle b/retrofit-adapters/rxjava3/build.gradle new file mode 100644 index 0000000000..530887e0d5 --- /dev/null +++ b/retrofit-adapters/rxjava3/build.gradle @@ -0,0 +1,20 @@ +apply plugin: 'java-library' +apply plugin: 'com.vanniktech.maven.publish' + +dependencies { + api project(':retrofit') + api deps.rxjava3 + api deps.reactiveStreams + compileOnly deps.findBugsAnnotations + + testImplementation deps.junit + testImplementation deps.assertj + testImplementation deps.guava + testImplementation deps.mockwebserver +} + +jar { + manifest { + attributes 'Automatic-Module-Name': 'retrofit2.adapter.rxjava3' + } +} diff --git a/retrofit-adapters/rxjava3/gradle.properties b/retrofit-adapters/rxjava3/gradle.properties new file mode 100644 index 0000000000..1e57328b3f --- /dev/null +++ b/retrofit-adapters/rxjava3/gradle.properties @@ -0,0 +1,2 @@ +POM_ARTIFACT_ID=adapter-rxjava3 +POM_NAME=Adapter: RxJava 3 diff --git a/retrofit-adapters/rxjava3/src/main/java/retrofit2/adapter/rxjava3/BodyObservable.java b/retrofit-adapters/rxjava3/src/main/java/retrofit2/adapter/rxjava3/BodyObservable.java new file mode 100644 index 0000000000..84140d4ea4 --- /dev/null +++ b/retrofit-adapters/rxjava3/src/main/java/retrofit2/adapter/rxjava3/BodyObservable.java @@ -0,0 +1,89 @@ +/* + * Copyright (C) 2020 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.rxjava3; + +import io.reactivex.rxjava3.core.Observable; +import io.reactivex.rxjava3.core.Observer; +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.exceptions.CompositeException; +import io.reactivex.rxjava3.exceptions.Exceptions; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; +import retrofit2.Response; + +final class BodyObservable extends Observable { + private final Observable> upstream; + + BodyObservable(Observable> upstream) { + this.upstream = upstream; + } + + @Override + protected void subscribeActual(Observer observer) { + upstream.subscribe(new BodyObserver<>(observer)); + } + + private static class BodyObserver implements Observer> { + private final Observer observer; + private boolean terminated; + + BodyObserver(Observer observer) { + this.observer = observer; + } + + @Override + public void onSubscribe(Disposable disposable) { + observer.onSubscribe(disposable); + } + + @Override + public void onNext(Response response) { + if (response.isSuccessful()) { + observer.onNext(response.body()); + } else { + terminated = true; + Throwable t = new HttpException(response); + try { + observer.onError(t); + } catch (Throwable inner) { + Exceptions.throwIfFatal(inner); + RxJavaPlugins.onError(new CompositeException(t, inner)); + } + } + } + + @Override + public void onComplete() { + if (!terminated) { + observer.onComplete(); + } + } + + @Override + public void onError(Throwable throwable) { + if (!terminated) { + observer.onError(throwable); + } else { + // This should never happen! onNext handles and forwards errors automatically. + Throwable broken = + new AssertionError( + "This should never happen! Report as a bug with the full stacktrace."); + //noinspection UnnecessaryInitCause Two-arg AssertionError constructor is 1.7+ only. + broken.initCause(throwable); + RxJavaPlugins.onError(broken); + } + } + } +} diff --git a/retrofit-adapters/rxjava3/src/main/java/retrofit2/adapter/rxjava3/CallEnqueueObservable.java b/retrofit-adapters/rxjava3/src/main/java/retrofit2/adapter/rxjava3/CallEnqueueObservable.java new file mode 100644 index 0000000000..1152187487 --- /dev/null +++ b/retrofit-adapters/rxjava3/src/main/java/retrofit2/adapter/rxjava3/CallEnqueueObservable.java @@ -0,0 +1,106 @@ +/* + * Copyright (C) 2020 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.rxjava3; + +import io.reactivex.rxjava3.core.Observable; +import io.reactivex.rxjava3.core.Observer; +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.exceptions.CompositeException; +import io.reactivex.rxjava3.exceptions.Exceptions; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; +import retrofit2.Call; +import retrofit2.Callback; +import retrofit2.Response; + +final class CallEnqueueObservable extends Observable> { + private final Call originalCall; + + CallEnqueueObservable(Call originalCall) { + this.originalCall = originalCall; + } + + @Override + protected void subscribeActual(Observer> observer) { + // Since Call is a one-shot type, clone it for each new observer. + Call call = originalCall.clone(); + CallCallback callback = new CallCallback<>(call, observer); + observer.onSubscribe(callback); + if (!callback.isDisposed()) { + call.enqueue(callback); + } + } + + private static final class CallCallback implements Disposable, Callback { + private final Call call; + private final Observer> observer; + private volatile boolean disposed; + boolean terminated = false; + + CallCallback(Call call, Observer> observer) { + this.call = call; + this.observer = observer; + } + + @Override + public void onResponse(Call call, Response response) { + if (disposed) return; + + try { + observer.onNext(response); + + if (!disposed) { + terminated = true; + observer.onComplete(); + } + } catch (Throwable t) { + Exceptions.throwIfFatal(t); + if (terminated) { + RxJavaPlugins.onError(t); + } else if (!disposed) { + try { + observer.onError(t); + } catch (Throwable inner) { + Exceptions.throwIfFatal(inner); + RxJavaPlugins.onError(new CompositeException(t, inner)); + } + } + } + } + + @Override + public void onFailure(Call call, Throwable t) { + if (call.isCanceled()) return; + + try { + observer.onError(t); + } catch (Throwable inner) { + Exceptions.throwIfFatal(inner); + RxJavaPlugins.onError(new CompositeException(t, inner)); + } + } + + @Override + public void dispose() { + disposed = true; + call.cancel(); + } + + @Override + public boolean isDisposed() { + return disposed; + } + } +} diff --git a/retrofit-adapters/rxjava3/src/main/java/retrofit2/adapter/rxjava3/CallExecuteObservable.java b/retrofit-adapters/rxjava3/src/main/java/retrofit2/adapter/rxjava3/CallExecuteObservable.java new file mode 100644 index 0000000000..85455a6a7e --- /dev/null +++ b/retrofit-adapters/rxjava3/src/main/java/retrofit2/adapter/rxjava3/CallExecuteObservable.java @@ -0,0 +1,88 @@ +/* + * Copyright (C) 2020 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.rxjava3; + +import io.reactivex.rxjava3.core.Observable; +import io.reactivex.rxjava3.core.Observer; +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.exceptions.CompositeException; +import io.reactivex.rxjava3.exceptions.Exceptions; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; +import retrofit2.Call; +import retrofit2.Response; + +final class CallExecuteObservable extends Observable> { + private final Call originalCall; + + CallExecuteObservable(Call originalCall) { + this.originalCall = originalCall; + } + + @Override + protected void subscribeActual(Observer> observer) { + // Since Call is a one-shot type, clone it for each new observer. + Call call = originalCall.clone(); + CallDisposable disposable = new CallDisposable(call); + observer.onSubscribe(disposable); + if (disposable.isDisposed()) { + return; + } + + boolean terminated = false; + try { + Response response = call.execute(); + if (!disposable.isDisposed()) { + observer.onNext(response); + } + if (!disposable.isDisposed()) { + terminated = true; + observer.onComplete(); + } + } catch (Throwable t) { + Exceptions.throwIfFatal(t); + if (terminated) { + RxJavaPlugins.onError(t); + } else if (!disposable.isDisposed()) { + try { + observer.onError(t); + } catch (Throwable inner) { + Exceptions.throwIfFatal(inner); + RxJavaPlugins.onError(new CompositeException(t, inner)); + } + } + } + } + + private static final class CallDisposable implements Disposable { + private final Call call; + private volatile boolean disposed; + + CallDisposable(Call call) { + this.call = call; + } + + @Override + public void dispose() { + disposed = true; + call.cancel(); + } + + @Override + public boolean isDisposed() { + return disposed; + } + } +} diff --git a/retrofit-adapters/rxjava3/src/main/java/retrofit2/adapter/rxjava3/HttpException.java b/retrofit-adapters/rxjava3/src/main/java/retrofit2/adapter/rxjava3/HttpException.java new file mode 100644 index 0000000000..2e262e4336 --- /dev/null +++ b/retrofit-adapters/rxjava3/src/main/java/retrofit2/adapter/rxjava3/HttpException.java @@ -0,0 +1,26 @@ +/* + * Copyright (C) 2020 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.rxjava3; + +import retrofit2.Response; + +/** @deprecated Use {@link retrofit2.HttpException}. */ +@Deprecated +public final class HttpException extends retrofit2.HttpException { + public HttpException(Response response) { + super(response); + } +} diff --git a/retrofit-adapters/rxjava3/src/main/java/retrofit2/adapter/rxjava3/Result.java b/retrofit-adapters/rxjava3/src/main/java/retrofit2/adapter/rxjava3/Result.java new file mode 100644 index 0000000000..a3af3d1465 --- /dev/null +++ b/retrofit-adapters/rxjava3/src/main/java/retrofit2/adapter/rxjava3/Result.java @@ -0,0 +1,68 @@ +/* + * Copyright (C) 2020 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.rxjava3; + +import java.io.IOException; +import javax.annotation.Nullable; +import retrofit2.Response; + +/** The result of executing an HTTP request. */ +public final class Result { + @SuppressWarnings("ConstantConditions") // Guarding public API nullability. + public static Result error(Throwable error) { + if (error == null) throw new NullPointerException("error == null"); + return new Result<>(null, error); + } + + @SuppressWarnings("ConstantConditions") // Guarding public API nullability. + public static Result response(Response response) { + if (response == null) throw new NullPointerException("response == null"); + return new Result<>(response, null); + } + + private final @Nullable Response response; + private final @Nullable Throwable error; + + private Result(@Nullable Response response, @Nullable Throwable error) { + this.response = response; + this.error = error; + } + + /** + * The response received from executing an HTTP request. Only present when {@link #isError()} is + * false, null otherwise. + */ + public @Nullable Response response() { + return response; + } + + /** + * The error experienced while attempting to execute an HTTP request. Only present when {@link + * #isError()} is true, null otherwise. + * + *

If the error is an {@link IOException} then there was a problem with the transport to the + * remote server. Any other exception type indicates an unexpected failure and should be + * considered fatal (configuration error, programming error, etc.). + */ + public @Nullable Throwable error() { + return error; + } + + /** {@code true} if the request resulted in an error. See {@link #error()} for the cause. */ + public boolean isError() { + return error != null; + } +} diff --git a/retrofit-adapters/rxjava3/src/main/java/retrofit2/adapter/rxjava3/ResultObservable.java b/retrofit-adapters/rxjava3/src/main/java/retrofit2/adapter/rxjava3/ResultObservable.java new file mode 100644 index 0000000000..15c9295164 --- /dev/null +++ b/retrofit-adapters/rxjava3/src/main/java/retrofit2/adapter/rxjava3/ResultObservable.java @@ -0,0 +1,76 @@ +/* + * Copyright (C) 2020 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.rxjava3; + +import io.reactivex.rxjava3.core.Observable; +import io.reactivex.rxjava3.core.Observer; +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.exceptions.CompositeException; +import io.reactivex.rxjava3.exceptions.Exceptions; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; +import retrofit2.Response; + +final class ResultObservable extends Observable> { + private final Observable> upstream; + + ResultObservable(Observable> upstream) { + this.upstream = upstream; + } + + @Override + protected void subscribeActual(Observer> observer) { + upstream.subscribe(new ResultObserver<>(observer)); + } + + private static class ResultObserver implements Observer> { + private final Observer> observer; + + ResultObserver(Observer> observer) { + this.observer = observer; + } + + @Override + public void onSubscribe(Disposable disposable) { + observer.onSubscribe(disposable); + } + + @Override + public void onNext(Response response) { + observer.onNext(Result.response(response)); + } + + @Override + public void onError(Throwable throwable) { + try { + observer.onNext(Result.error(throwable)); + } catch (Throwable t) { + try { + observer.onError(t); + } catch (Throwable inner) { + Exceptions.throwIfFatal(inner); + RxJavaPlugins.onError(new CompositeException(t, inner)); + } + return; + } + observer.onComplete(); + } + + @Override + public void onComplete() { + observer.onComplete(); + } + } +} diff --git a/retrofit-adapters/rxjava3/src/main/java/retrofit2/adapter/rxjava3/RxJava3CallAdapter.java b/retrofit-adapters/rxjava3/src/main/java/retrofit2/adapter/rxjava3/RxJava3CallAdapter.java new file mode 100644 index 0000000000..f2e9a26f66 --- /dev/null +++ b/retrofit-adapters/rxjava3/src/main/java/retrofit2/adapter/rxjava3/RxJava3CallAdapter.java @@ -0,0 +1,97 @@ +/* + * Copyright (C) 2020 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.rxjava3; + +import io.reactivex.rxjava3.core.BackpressureStrategy; +import io.reactivex.rxjava3.core.Observable; +import io.reactivex.rxjava3.core.Scheduler; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; +import java.lang.reflect.Type; +import javax.annotation.Nullable; +import retrofit2.Call; +import retrofit2.CallAdapter; +import retrofit2.Response; + +final class RxJava3CallAdapter implements CallAdapter { + private final Type responseType; + private final @Nullable Scheduler scheduler; + private final boolean isAsync; + private final boolean isResult; + private final boolean isBody; + private final boolean isFlowable; + private final boolean isSingle; + private final boolean isMaybe; + private final boolean isCompletable; + + RxJava3CallAdapter( + Type responseType, + @Nullable Scheduler scheduler, + boolean isAsync, + boolean isResult, + boolean isBody, + boolean isFlowable, + boolean isSingle, + boolean isMaybe, + boolean isCompletable) { + this.responseType = responseType; + this.scheduler = scheduler; + this.isAsync = isAsync; + this.isResult = isResult; + this.isBody = isBody; + this.isFlowable = isFlowable; + this.isSingle = isSingle; + this.isMaybe = isMaybe; + this.isCompletable = isCompletable; + } + + @Override + public Type responseType() { + return responseType; + } + + @Override + public Object adapt(Call call) { + Observable> responseObservable = + isAsync ? new CallEnqueueObservable<>(call) : new CallExecuteObservable<>(call); + + Observable observable; + if (isResult) { + observable = new ResultObservable<>(responseObservable); + } else if (isBody) { + observable = new BodyObservable<>(responseObservable); + } else { + observable = responseObservable; + } + + if (scheduler != null) { + observable = observable.subscribeOn(scheduler); + } + + if (isFlowable) { + return observable.toFlowable(BackpressureStrategy.LATEST); + } + if (isSingle) { + return observable.singleOrError(); + } + if (isMaybe) { + return observable.singleElement(); + } + if (isCompletable) { + return observable.ignoreElements(); + } + return RxJavaPlugins.onAssembly(observable); + } +} diff --git a/retrofit-adapters/rxjava3/src/main/java/retrofit2/adapter/rxjava3/RxJava3CallAdapterFactory.java b/retrofit-adapters/rxjava3/src/main/java/retrofit2/adapter/rxjava3/RxJava3CallAdapterFactory.java new file mode 100644 index 0000000000..7a11315de9 --- /dev/null +++ b/retrofit-adapters/rxjava3/src/main/java/retrofit2/adapter/rxjava3/RxJava3CallAdapterFactory.java @@ -0,0 +1,151 @@ +/* + * Copyright (C) 2020 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.rxjava3; + +import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.Maybe; +import io.reactivex.rxjava3.core.Observable; +import io.reactivex.rxjava3.core.Scheduler; +import io.reactivex.rxjava3.core.Single; +import java.io.IOException; +import java.lang.annotation.Annotation; +import java.lang.reflect.ParameterizedType; +import java.lang.reflect.Type; +import javax.annotation.Nullable; +import retrofit2.CallAdapter; +import retrofit2.HttpException; +import retrofit2.Response; +import retrofit2.Retrofit; + +/** + * A {@linkplain CallAdapter.Factory call adapter} which uses RxJava 3 for creating observables. + * + *

Adding this class to {@link Retrofit} allows you to return an {@link Observable}, {@link + * Flowable}, {@link Single}, {@link Completable} or {@link Maybe} from service methods. + * + *


+ * interface MyService {
+ *   @GET("user/me")
+ *   Observable<User> getUser()
+ * }
+ * 
+ * + * There are three configurations supported for the {@code Observable}, {@code Flowable}, {@code + * Single}, {@link Completable} and {@code Maybe} type parameter: + * + *
    + *
  • Direct body (e.g., {@code Observable}) calls {@code onNext} with the deserialized + * body for 2XX responses and calls {@code onError} with {@link HttpException} for non-2XX + * responses and {@link IOException} for network errors. + *
  • Response wrapped body (e.g., {@code Observable>}) calls {@code onNext} with + * a {@link Response} object for all HTTP responses and calls {@code onError} with {@link + * IOException} for network errors + *
  • Result wrapped body (e.g., {@code Observable>}) calls {@code onNext} with a + * {@link Result} object for all HTTP responses and errors. + *
+ */ +public final class RxJava3CallAdapterFactory extends CallAdapter.Factory { + /** + * Returns an instance which creates synchronous observables that do not operate on any scheduler + * by default. + */ + public static RxJava3CallAdapterFactory create() { + return new RxJava3CallAdapterFactory(null, false); + } + + /** Returns an instance which creates asynchronous observables. */ + public static RxJava3CallAdapterFactory createAsync() { + return new RxJava3CallAdapterFactory(null, true); + } + + /** + * Returns an instance which creates synchronous observables that {@linkplain + * Observable#subscribeOn(Scheduler) subscribe on} {@code scheduler} by default. + */ + @SuppressWarnings("ConstantConditions") // Guarding public API nullability. + public static RxJava3CallAdapterFactory createWithScheduler(Scheduler scheduler) { + if (scheduler == null) throw new NullPointerException("scheduler == null"); + return new RxJava3CallAdapterFactory(scheduler, false); + } + + private final @Nullable Scheduler scheduler; + private final boolean isAsync; + + private RxJava3CallAdapterFactory(@Nullable Scheduler scheduler, boolean isAsync) { + this.scheduler = scheduler; + this.isAsync = isAsync; + } + + @Override + public @Nullable CallAdapter get( + Type returnType, Annotation[] annotations, Retrofit retrofit) { + Class rawType = getRawType(returnType); + + if (rawType == Completable.class) { + // Completable is not parameterized (which is what the rest of this method deals with) so it + // can only be created with a single configuration. + return new RxJava3CallAdapter( + Void.class, scheduler, isAsync, false, true, false, false, false, true); + } + + boolean isFlowable = rawType == Flowable.class; + boolean isSingle = rawType == Single.class; + boolean isMaybe = rawType == Maybe.class; + if (rawType != Observable.class && !isFlowable && !isSingle && !isMaybe) { + return null; + } + + boolean isResult = false; + boolean isBody = false; + Type responseType; + if (!(returnType instanceof ParameterizedType)) { + String name = + isFlowable ? "Flowable" : isSingle ? "Single" : isMaybe ? "Maybe" : "Observable"; + throw new IllegalStateException( + name + + " return type must be parameterized" + + " as " + + name + + " or " + + name + + ""); + } + + Type observableType = getParameterUpperBound(0, (ParameterizedType) returnType); + Class rawObservableType = getRawType(observableType); + if (rawObservableType == Response.class) { + if (!(observableType instanceof ParameterizedType)) { + throw new IllegalStateException( + "Response must be parameterized" + " as Response or Response"); + } + responseType = getParameterUpperBound(0, (ParameterizedType) observableType); + } else if (rawObservableType == Result.class) { + if (!(observableType instanceof ParameterizedType)) { + throw new IllegalStateException( + "Result must be parameterized" + " as Result or Result"); + } + responseType = getParameterUpperBound(0, (ParameterizedType) observableType); + isResult = true; + } else { + responseType = observableType; + isBody = true; + } + + return new RxJava3CallAdapter( + responseType, scheduler, isAsync, isResult, isBody, isFlowable, isSingle, isMaybe, false); + } +} diff --git a/retrofit-adapters/rxjava3/src/main/java/retrofit2/adapter/rxjava3/package-info.java b/retrofit-adapters/rxjava3/src/main/java/retrofit2/adapter/rxjava3/package-info.java new file mode 100644 index 0000000000..109f28d49a --- /dev/null +++ b/retrofit-adapters/rxjava3/src/main/java/retrofit2/adapter/rxjava3/package-info.java @@ -0,0 +1,2 @@ +@retrofit2.internal.EverythingIsNonNull +package retrofit2.adapter.rxjava3; diff --git a/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/AsyncTest.java b/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/AsyncTest.java new file mode 100644 index 0000000000..da9da8ed56 --- /dev/null +++ b/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/AsyncTest.java @@ -0,0 +1,196 @@ +/* + * Copyright (C) 2020 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.rxjava3; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static okhttp3.mockwebserver.SocketPolicy.DISCONNECT_AFTER_REQUEST; +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; + +import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.exceptions.CompositeException; +import io.reactivex.rxjava3.exceptions.Exceptions; +import io.reactivex.rxjava3.observers.TestObserver; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.atomic.AtomicReference; +import okhttp3.Dispatcher; +import okhttp3.OkHttpClient; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import org.junit.After; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import retrofit2.Retrofit; +import retrofit2.adapter.rxjava3.CompletableThrowingTest.ForwardingCompletableObserver; +import retrofit2.http.GET; + +public final class AsyncTest { + @Rule public final MockWebServer server = new MockWebServer(); + + interface Service { + @GET("/") + Completable completable(); + } + + private Service service; + private List uncaughtExceptions = new ArrayList<>(); + + @Before + public void setUp() { + ExecutorService executorService = + Executors.newCachedThreadPool( + r -> { + Thread thread = new Thread(r); + thread.setUncaughtExceptionHandler((t, e) -> uncaughtExceptions.add(e)); + return thread; + }); + + OkHttpClient client = + new OkHttpClient.Builder().dispatcher(new Dispatcher(executorService)).build(); + Retrofit retrofit = + new Retrofit.Builder() + .baseUrl(server.url("/")) + .client(client) + .addCallAdapterFactory(RxJava3CallAdapterFactory.createAsync()) + .build(); + service = retrofit.create(Service.class); + } + + @After + public void tearDown() { + assertTrue("Uncaught exceptions: " + uncaughtExceptions, uncaughtExceptions.isEmpty()); + } + + @Test + public void success() throws InterruptedException { + TestObserver observer = new TestObserver<>(); + service.completable().subscribe(observer); + assertFalse(observer.await(1, SECONDS)); + + server.enqueue(new MockResponse()); + observer.await(1, SECONDS); + observer.assertComplete(); + } + + @Test + public void failure() throws InterruptedException { + TestObserver observer = new TestObserver<>(); + service.completable().subscribe(observer); + assertFalse(observer.await(1, SECONDS)); + + server.enqueue(new MockResponse().setSocketPolicy(DISCONNECT_AFTER_REQUEST)); + observer.await(1, SECONDS); + observer.assertError(IOException.class); + } + + @Test + public void throwingInOnCompleteDeliveredToPlugin() throws InterruptedException { + server.enqueue(new MockResponse()); + + final CountDownLatch latch = new CountDownLatch(1); + final AtomicReference errorRef = new AtomicReference<>(); + RxJavaPlugins.setErrorHandler( + throwable -> { + if (!errorRef.compareAndSet(null, throwable)) { + throw Exceptions.propagate(throwable); // Don't swallow secondary errors! + } + latch.countDown(); + }); + + TestObserver observer = new TestObserver<>(); + final RuntimeException e = new RuntimeException(); + service + .completable() + .subscribe( + new ForwardingCompletableObserver(observer) { + @Override + public void onComplete() { + throw e; + } + }); + + latch.await(1, SECONDS); + assertThat(errorRef.get().getCause()).isSameAs(e); + } + + @Test + public void bodyThrowingInOnErrorDeliveredToPlugin() throws InterruptedException { + server.enqueue(new MockResponse().setResponseCode(404)); + + final CountDownLatch latch = new CountDownLatch(1); + final AtomicReference pluginRef = new AtomicReference<>(); + RxJavaPlugins.setErrorHandler( + throwable -> { + if (!pluginRef.compareAndSet(null, throwable)) { + throw Exceptions.propagate(throwable); // Don't swallow secondary errors! + } + latch.countDown(); + }); + + TestObserver observer = new TestObserver<>(); + final RuntimeException e = new RuntimeException(); + final AtomicReference errorRef = new AtomicReference<>(); + service + .completable() + .subscribe( + new ForwardingCompletableObserver(observer) { + @Override + public void onError(Throwable throwable) { + errorRef.set(throwable); + throw e; + } + }); + + latch.await(1, SECONDS); + //noinspection ThrowableResultOfMethodCallIgnored + CompositeException composite = (CompositeException) pluginRef.get(); + assertThat(composite.getExceptions()).containsExactly(errorRef.get(), e); + } + + @Test + public void bodyThrowingFatalInOnErrorPropagates() throws InterruptedException { + server.enqueue(new MockResponse().setResponseCode(404)); + + final CountDownLatch latch = new CountDownLatch(1); + + TestObserver observer = new TestObserver<>(); + final Error e = new OutOfMemoryError("Not real"); + service + .completable() + .subscribe( + new ForwardingCompletableObserver(observer) { + @Override + public void onError(Throwable throwable) { + throw e; + } + }); + + latch.await(1, SECONDS); + + assertEquals(1, uncaughtExceptions.size()); + assertSame(e, uncaughtExceptions.remove(0)); + } +} diff --git a/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/CancelDisposeTest.java b/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/CancelDisposeTest.java new file mode 100644 index 0000000000..aa7273729f --- /dev/null +++ b/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/CancelDisposeTest.java @@ -0,0 +1,82 @@ +/* + * Copyright (C) 2020 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.rxjava3; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import io.reactivex.rxjava3.core.Observable; +import io.reactivex.rxjava3.disposables.Disposable; +import java.util.List; +import okhttp3.Call; +import okhttp3.OkHttpClient; +import okhttp3.mockwebserver.MockWebServer; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import retrofit2.Retrofit; +import retrofit2.http.GET; + +public final class CancelDisposeTest { + @Rule public final MockWebServer server = new MockWebServer(); + + interface Service { + @GET("/") + Observable go(); + } + + private final OkHttpClient client = new OkHttpClient(); + private Service service; + + @Before + public void setUp() { + Retrofit retrofit = + new Retrofit.Builder() + .baseUrl(server.url("/")) + .addConverterFactory(new StringConverterFactory()) + .addCallAdapterFactory(RxJava3CallAdapterFactory.createAsync()) + .callFactory(client) + .build(); + service = retrofit.create(Service.class); + } + + @Test + public void disposeCancelsCall() { + Disposable disposable = service.go().subscribe(); + List calls = client.dispatcher().runningCalls(); + assertEquals(1, calls.size()); + disposable.dispose(); + assertTrue(calls.get(0).isCanceled()); + } + + @SuppressWarnings("ResultOfMethodCallIgnored") + @Test + public void disposeBeforeEnqueueDoesNotEnqueue() { + service.go().test(true); + List calls = client.dispatcher().runningCalls(); + assertEquals(0, calls.size()); + } + + @Test + public void cancelDoesNotDispose() { + Disposable disposable = service.go().subscribe(); + List calls = client.dispatcher().runningCalls(); + assertEquals(1, calls.size()); + calls.get(0).cancel(); + assertFalse(disposable.isDisposed()); + } +} diff --git a/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/CancelDisposeTestSync.java b/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/CancelDisposeTestSync.java new file mode 100644 index 0000000000..e5056a2ab9 --- /dev/null +++ b/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/CancelDisposeTestSync.java @@ -0,0 +1,58 @@ +/* + * Copyright (C) 2020 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.rxjava3; + +import static org.junit.Assert.assertEquals; + +import io.reactivex.rxjava3.core.Observable; +import okhttp3.OkHttpClient; +import okhttp3.mockwebserver.MockWebServer; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import retrofit2.Retrofit; +import retrofit2.http.GET; + +public final class CancelDisposeTestSync { + @Rule public final MockWebServer server = new MockWebServer(); + + interface Service { + @GET("/") + Observable go(); + } + + private final OkHttpClient client = new OkHttpClient(); + private Service service; + + @Before + public void setUp() { + Retrofit retrofit = + new Retrofit.Builder() + .baseUrl(server.url("/")) + .addConverterFactory(new StringConverterFactory()) + .addCallAdapterFactory(RxJava3CallAdapterFactory.create()) + .callFactory(client) + .build(); + service = retrofit.create(Service.class); + } + + @SuppressWarnings("ResultOfMethodCallIgnored") + @Test + public void disposeBeforeExecuteDoesNotEnqueue() { + service.go().test(true); + assertEquals(0, server.getRequestCount()); + } +} diff --git a/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/CompletableTest.java b/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/CompletableTest.java new file mode 100644 index 0000000000..8a79629185 --- /dev/null +++ b/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/CompletableTest.java @@ -0,0 +1,97 @@ +/* + * Copyright (C) 2020 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.rxjava3; + +import static okhttp3.mockwebserver.SocketPolicy.DISCONNECT_AFTER_REQUEST; + +import io.reactivex.rxjava3.core.Completable; +import java.io.IOException; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import retrofit2.Retrofit; +import retrofit2.http.GET; + +public final class CompletableTest { + @Rule public final MockWebServer server = new MockWebServer(); + + @Rule + public final RecordingCompletableObserver.Rule observerRule = + new RecordingCompletableObserver.Rule(); + + interface Service { + @GET("/") + Completable completable(); + } + + private Service service; + + @Before + public void setUp() { + Retrofit retrofit = + new Retrofit.Builder() + .baseUrl(server.url("/")) + .addCallAdapterFactory(RxJava3CallAdapterFactory.create()) + .build(); + service = retrofit.create(Service.class); + } + + @Test + public void completableSuccess200() { + server.enqueue(new MockResponse().setBody("Hi")); + + RecordingCompletableObserver observer = observerRule.create(); + service.completable().subscribe(observer); + observer.assertComplete(); + } + + @Test + public void completableSuccess404() { + server.enqueue(new MockResponse().setResponseCode(404)); + + RecordingCompletableObserver observer = observerRule.create(); + service.completable().subscribe(observer); + // Required for backwards compatibility. + observer.assertError(HttpException.class, "HTTP 404 Client Error"); + } + + @Test + public void completableFailure() { + server.enqueue(new MockResponse().setSocketPolicy(DISCONNECT_AFTER_REQUEST)); + + RecordingCompletableObserver observer = observerRule.create(); + service.completable().subscribe(observer); + observer.assertError(IOException.class); + } + + @Test + public void subscribeTwice() { + server.enqueue(new MockResponse().setBody("Hi")); + server.enqueue(new MockResponse().setBody("Hey")); + + Completable observable = service.completable(); + + RecordingCompletableObserver observer1 = observerRule.create(); + observable.subscribe(observer1); + observer1.assertComplete(); + + RecordingCompletableObserver observer2 = observerRule.create(); + observable.subscribe(observer2); + observer2.assertComplete(); + } +} diff --git a/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/CompletableThrowingTest.java b/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/CompletableThrowingTest.java new file mode 100644 index 0000000000..73242b6321 --- /dev/null +++ b/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/CompletableThrowingTest.java @@ -0,0 +1,141 @@ +/* + * Copyright (C) 2020 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.rxjava3; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.core.CompletableObserver; +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.exceptions.CompositeException; +import io.reactivex.rxjava3.exceptions.Exceptions; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; +import java.util.concurrent.atomic.AtomicReference; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestRule; +import retrofit2.Retrofit; +import retrofit2.http.GET; + +public final class CompletableThrowingTest { + @Rule public final MockWebServer server = new MockWebServer(); + @Rule public final TestRule resetRule = new RxJavaPluginsResetRule(); + + @Rule + public final RecordingCompletableObserver.Rule observerRule = + new RecordingCompletableObserver.Rule(); + + interface Service { + @GET("/") + Completable completable(); + } + + private Service service; + + @Before + public void setUp() { + Retrofit retrofit = + new Retrofit.Builder() + .baseUrl(server.url("/")) + .addCallAdapterFactory(RxJava3CallAdapterFactory.create()) + .build(); + service = retrofit.create(Service.class); + } + + @Test + public void throwingInOnCompleteDeliveredToPlugin() { + server.enqueue(new MockResponse()); + + final AtomicReference errorRef = new AtomicReference<>(); + RxJavaPlugins.setErrorHandler( + throwable -> { + if (!errorRef.compareAndSet(null, throwable)) { + throw Exceptions.propagate(throwable); // Don't swallow secondary errors! + } + }); + + RecordingCompletableObserver observer = observerRule.create(); + final RuntimeException e = new RuntimeException(); + service + .completable() + .subscribe( + new ForwardingCompletableObserver(observer) { + @Override + public void onComplete() { + throw e; + } + }); + + assertThat(errorRef.get().getCause()).isSameAs(e); + } + + @Test + public void bodyThrowingInOnErrorDeliveredToPlugin() { + server.enqueue(new MockResponse().setResponseCode(404)); + + final AtomicReference pluginRef = new AtomicReference<>(); + RxJavaPlugins.setErrorHandler( + throwable -> { + if (!pluginRef.compareAndSet(null, throwable)) { + throw Exceptions.propagate(throwable); // Don't swallow secondary errors! + } + }); + + RecordingCompletableObserver observer = observerRule.create(); + final RuntimeException e = new RuntimeException(); + final AtomicReference errorRef = new AtomicReference<>(); + service + .completable() + .subscribe( + new ForwardingCompletableObserver(observer) { + @Override + public void onError(Throwable throwable) { + errorRef.set(throwable); + throw e; + } + }); + + //noinspection ThrowableResultOfMethodCallIgnored + CompositeException composite = (CompositeException) pluginRef.get(); + assertThat(composite.getExceptions()).containsExactly(errorRef.get(), e); + } + + abstract static class ForwardingCompletableObserver implements CompletableObserver { + private final CompletableObserver delegate; + + ForwardingCompletableObserver(CompletableObserver delegate) { + this.delegate = delegate; + } + + @Override + public void onSubscribe(Disposable disposable) { + delegate.onSubscribe(disposable); + } + + @Override + public void onComplete() { + delegate.onComplete(); + } + + @Override + public void onError(Throwable throwable) { + delegate.onError(throwable); + } + } +} diff --git a/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/CompletableWithSchedulerTest.java b/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/CompletableWithSchedulerTest.java new file mode 100644 index 0000000000..a86cd190fc --- /dev/null +++ b/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/CompletableWithSchedulerTest.java @@ -0,0 +1,64 @@ +/* + * Copyright (C) 2020 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.rxjava3; + +import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.schedulers.TestScheduler; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import retrofit2.Retrofit; +import retrofit2.http.GET; + +public final class CompletableWithSchedulerTest { + @Rule public final MockWebServer server = new MockWebServer(); + + @Rule + public final RecordingCompletableObserver.Rule observerRule = + new RecordingCompletableObserver.Rule(); + + interface Service { + @GET("/") + Completable completable(); + } + + private final TestScheduler scheduler = new TestScheduler(); + private Service service; + + @Before + public void setUp() { + Retrofit retrofit = + new Retrofit.Builder() + .baseUrl(server.url("/")) + .addCallAdapterFactory(RxJava3CallAdapterFactory.createWithScheduler(scheduler)) + .build(); + service = retrofit.create(Service.class); + } + + @Test + public void completableUsesScheduler() { + server.enqueue(new MockResponse()); + + RecordingCompletableObserver observer = observerRule.create(); + service.completable().subscribe(observer); + observer.assertNoEvents(); + + scheduler.triggerActions(); + observer.assertComplete(); + } +} diff --git a/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/FlowableTest.java b/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/FlowableTest.java new file mode 100644 index 0000000000..9d5eceda1c --- /dev/null +++ b/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/FlowableTest.java @@ -0,0 +1,222 @@ +/* + * Copyright (C) 2020 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.rxjava3; + +import static okhttp3.mockwebserver.SocketPolicy.DISCONNECT_AFTER_REQUEST; +import static org.assertj.core.api.Assertions.assertThat; + +import io.reactivex.rxjava3.core.Flowable; +import java.io.IOException; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import retrofit2.Response; +import retrofit2.Retrofit; +import retrofit2.http.GET; + +public final class FlowableTest { + @Rule public final MockWebServer server = new MockWebServer(); + @Rule public final RecordingSubscriber.Rule subscriberRule = new RecordingSubscriber.Rule(); + + interface Service { + @GET("/") + Flowable body(); + + @GET("/") + Flowable> response(); + + @GET("/") + Flowable> result(); + } + + private Service service; + + @Before + public void setUp() { + Retrofit retrofit = + new Retrofit.Builder() + .baseUrl(server.url("/")) + .addConverterFactory(new StringConverterFactory()) + .addCallAdapterFactory(RxJava3CallAdapterFactory.create()) + .build(); + service = retrofit.create(Service.class); + } + + @Test + public void bodySuccess200() { + server.enqueue(new MockResponse().setBody("Hi")); + + RecordingSubscriber subscriber = subscriberRule.create(); + service.body().subscribe(subscriber); + subscriber.assertValue("Hi").assertComplete(); + } + + @Test + public void bodySuccess404() { + server.enqueue(new MockResponse().setResponseCode(404)); + + RecordingSubscriber subscriber = subscriberRule.create(); + service.body().subscribe(subscriber); + // Required for backwards compatibility. + subscriber.assertError(HttpException.class, "HTTP 404 Client Error"); + } + + @Test + public void bodyFailure() { + server.enqueue(new MockResponse().setSocketPolicy(DISCONNECT_AFTER_REQUEST)); + + RecordingSubscriber subscriber = subscriberRule.create(); + service.body().subscribe(subscriber); + subscriber.assertError(IOException.class); + } + + @Test + public void bodyRespectsBackpressure() { + server.enqueue(new MockResponse().setBody("Hi")); + + RecordingSubscriber subscriber = subscriberRule.createWithInitialRequest(0); + Flowable o = service.body(); + + o.subscribe(subscriber); + assertThat(server.getRequestCount()).isEqualTo(1); + subscriber.assertNoEvents(); + + subscriber.request(1); + subscriber.assertAnyValue().assertComplete(); + + subscriber.request(Long.MAX_VALUE); // Subsequent requests do not trigger HTTP or notifications. + assertThat(server.getRequestCount()).isEqualTo(1); + } + + @Test + public void responseSuccess200() { + server.enqueue(new MockResponse()); + + RecordingSubscriber> subscriber = subscriberRule.create(); + service.response().subscribe(subscriber); + assertThat(subscriber.takeValue().isSuccessful()).isTrue(); + subscriber.assertComplete(); + } + + @Test + public void responseSuccess404() { + server.enqueue(new MockResponse().setResponseCode(404)); + + RecordingSubscriber> subscriber = subscriberRule.create(); + service.response().subscribe(subscriber); + assertThat(subscriber.takeValue().isSuccessful()).isFalse(); + subscriber.assertComplete(); + } + + @Test + public void responseFailure() { + server.enqueue(new MockResponse().setSocketPolicy(DISCONNECT_AFTER_REQUEST)); + + RecordingSubscriber> subscriber = subscriberRule.create(); + service.response().subscribe(subscriber); + subscriber.assertError(IOException.class); + } + + @Test + public void responseRespectsBackpressure() { + server.enqueue(new MockResponse().setBody("Hi")); + + RecordingSubscriber> subscriber = subscriberRule.createWithInitialRequest(0); + Flowable> o = service.response(); + + o.subscribe(subscriber); + assertThat(server.getRequestCount()).isEqualTo(1); + subscriber.assertNoEvents(); + + subscriber.request(1); + subscriber.assertAnyValue().assertComplete(); + + subscriber.request(Long.MAX_VALUE); // Subsequent requests do not trigger HTTP or notifications. + assertThat(server.getRequestCount()).isEqualTo(1); + } + + @Test + public void resultSuccess200() { + server.enqueue(new MockResponse()); + + RecordingSubscriber> subscriber = subscriberRule.create(); + service.result().subscribe(subscriber); + Result result = subscriber.takeValue(); + assertThat(result.isError()).isFalse(); + assertThat(result.response().isSuccessful()).isTrue(); + subscriber.assertComplete(); + } + + @Test + public void resultSuccess404() { + server.enqueue(new MockResponse().setResponseCode(404)); + + RecordingSubscriber> subscriber = subscriberRule.create(); + service.result().subscribe(subscriber); + Result result = subscriber.takeValue(); + assertThat(result.isError()).isFalse(); + assertThat(result.response().isSuccessful()).isFalse(); + subscriber.assertComplete(); + } + + @Test + public void resultFailure() { + server.enqueue(new MockResponse().setSocketPolicy(DISCONNECT_AFTER_REQUEST)); + + RecordingSubscriber> subscriber = subscriberRule.create(); + service.result().subscribe(subscriber); + Result result = subscriber.takeValue(); + assertThat(result.isError()).isTrue(); + assertThat(result.error()).isInstanceOf(IOException.class); + subscriber.assertComplete(); + } + + @Test + public void resultRespectsBackpressure() { + server.enqueue(new MockResponse().setBody("Hi")); + + RecordingSubscriber> subscriber = subscriberRule.createWithInitialRequest(0); + Flowable> o = service.result(); + + o.subscribe(subscriber); + assertThat(server.getRequestCount()).isEqualTo(1); + subscriber.assertNoEvents(); + + subscriber.request(1); + subscriber.assertAnyValue().assertComplete(); + + subscriber.request(Long.MAX_VALUE); // Subsequent requests do not trigger HTTP or notifications. + assertThat(server.getRequestCount()).isEqualTo(1); + } + + @Test + public void subscribeTwice() { + server.enqueue(new MockResponse().setBody("Hi")); + server.enqueue(new MockResponse().setBody("Hey")); + + Flowable observable = service.body(); + + RecordingSubscriber subscriber1 = subscriberRule.create(); + observable.subscribe(subscriber1); + subscriber1.assertValue("Hi").assertComplete(); + + RecordingSubscriber subscriber2 = subscriberRule.create(); + observable.subscribe(subscriber2); + subscriber2.assertValue("Hey").assertComplete(); + } +} diff --git a/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/FlowableThrowingTest.java b/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/FlowableThrowingTest.java new file mode 100644 index 0000000000..0928115587 --- /dev/null +++ b/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/FlowableThrowingTest.java @@ -0,0 +1,337 @@ +/* + * Copyright (C) 2020 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.rxjava3; + +import static okhttp3.mockwebserver.SocketPolicy.DISCONNECT_AFTER_REQUEST; +import static org.assertj.core.api.Assertions.assertThat; + +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.exceptions.CompositeException; +import io.reactivex.rxjava3.exceptions.Exceptions; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; +import java.util.concurrent.atomic.AtomicReference; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestRule; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import retrofit2.Response; +import retrofit2.Retrofit; +import retrofit2.http.GET; + +public final class FlowableThrowingTest { + @Rule public final MockWebServer server = new MockWebServer(); + @Rule public final TestRule resetRule = new RxJavaPluginsResetRule(); + @Rule public final RecordingSubscriber.Rule subscriberRule = new RecordingSubscriber.Rule(); + + interface Service { + @GET("/") + Flowable body(); + + @GET("/") + Flowable> response(); + + @GET("/") + Flowable> result(); + } + + private Service service; + + @Before + public void setUp() { + Retrofit retrofit = + new Retrofit.Builder() + .baseUrl(server.url("/")) + .addConverterFactory(new StringConverterFactory()) + .addCallAdapterFactory(RxJava3CallAdapterFactory.create()) + .build(); + service = retrofit.create(Service.class); + } + + @Test + public void bodyThrowingInOnNextDeliveredToError() { + server.enqueue(new MockResponse()); + + RecordingSubscriber subscriber = subscriberRule.create(); + final RuntimeException e = new RuntimeException(); + service + .body() + .safeSubscribe( + new ForwardingSubscriber(subscriber) { + @Override + public void onNext(String value) { + throw e; + } + }); + + subscriber.assertError(e); + } + + @Test + public void bodyThrowingInOnCompleteDeliveredToPlugin() { + server.enqueue(new MockResponse()); + + final AtomicReference throwableRef = new AtomicReference<>(); + RxJavaPlugins.setErrorHandler( + throwable -> { + if (!throwableRef.compareAndSet(null, throwable)) { + throw Exceptions.propagate(throwable); + } + }); + + RecordingSubscriber subscriber = subscriberRule.create(); + final RuntimeException e = new RuntimeException(); + service + .body() + .subscribe( + new ForwardingSubscriber(subscriber) { + @Override + public void onComplete() { + throw e; + } + }); + + subscriber.assertAnyValue(); + assertThat(throwableRef.get().getCause()).isSameAs(e); + } + + @Test + public void bodyThrowingInOnErrorDeliveredToPlugin() { + server.enqueue(new MockResponse().setResponseCode(404)); + + final AtomicReference throwableRef = new AtomicReference<>(); + RxJavaPlugins.setErrorHandler( + throwable -> { + if (!throwableRef.compareAndSet(null, throwable)) { + throw Exceptions.propagate(throwable); + } + }); + + RecordingSubscriber subscriber = subscriberRule.create(); + final AtomicReference errorRef = new AtomicReference<>(); + final RuntimeException e = new RuntimeException(); + service + .body() + .subscribe( + new ForwardingSubscriber(subscriber) { + @Override + public void onError(Throwable throwable) { + if (!errorRef.compareAndSet(null, throwable)) { + throw Exceptions.propagate(throwable); + } + throw e; + } + }); + + //noinspection ThrowableResultOfMethodCallIgnored + CompositeException composite = (CompositeException) throwableRef.get(); + assertThat(composite.getExceptions()).containsExactly(errorRef.get(), e); + } + + @Test + public void responseThrowingInOnNextDeliveredToError() { + server.enqueue(new MockResponse()); + + RecordingSubscriber> subscriber = subscriberRule.create(); + final RuntimeException e = new RuntimeException(); + service + .response() + .safeSubscribe( + new ForwardingSubscriber>(subscriber) { + @Override + public void onNext(Response value) { + throw e; + } + }); + + subscriber.assertError(e); + } + + @Test + public void responseThrowingInOnCompleteDeliveredToPlugin() { + server.enqueue(new MockResponse()); + + final AtomicReference throwableRef = new AtomicReference<>(); + RxJavaPlugins.setErrorHandler( + throwable -> { + if (!throwableRef.compareAndSet(null, throwable)) { + throw Exceptions.propagate(throwable); + } + }); + + RecordingSubscriber> subscriber = subscriberRule.create(); + final RuntimeException e = new RuntimeException(); + service + .response() + .subscribe( + new ForwardingSubscriber>(subscriber) { + @Override + public void onComplete() { + throw e; + } + }); + + subscriber.assertAnyValue(); + assertThat(throwableRef.get().getCause()).isSameAs(e); + } + + @Test + public void responseThrowingInOnErrorDeliveredToPlugin() { + server.enqueue(new MockResponse().setSocketPolicy(DISCONNECT_AFTER_REQUEST)); + + final AtomicReference throwableRef = new AtomicReference<>(); + RxJavaPlugins.setErrorHandler( + throwable -> { + if (!throwableRef.compareAndSet(null, throwable)) { + throw Exceptions.propagate(throwable); + } + }); + + RecordingSubscriber> subscriber = subscriberRule.create(); + final AtomicReference errorRef = new AtomicReference<>(); + final RuntimeException e = new RuntimeException(); + service + .response() + .subscribe( + new ForwardingSubscriber>(subscriber) { + @Override + public void onError(Throwable throwable) { + if (!errorRef.compareAndSet(null, throwable)) { + throw Exceptions.propagate(throwable); + } + throw e; + } + }); + + //noinspection ThrowableResultOfMethodCallIgnored + CompositeException composite = (CompositeException) throwableRef.get(); + assertThat(composite.getExceptions()).containsExactly(errorRef.get(), e); + } + + @Test + public void resultThrowingInOnNextDeliveredToError() { + server.enqueue(new MockResponse()); + + RecordingSubscriber> subscriber = subscriberRule.create(); + final RuntimeException e = new RuntimeException(); + service + .result() + .safeSubscribe( + new ForwardingSubscriber>(subscriber) { + @Override + public void onNext(Result value) { + throw e; + } + }); + + subscriber.assertError(e); + } + + @Test + public void resultThrowingInOnCompletedDeliveredToPlugin() { + server.enqueue(new MockResponse()); + + final AtomicReference throwableRef = new AtomicReference<>(); + RxJavaPlugins.setErrorHandler( + throwable -> { + if (!throwableRef.compareAndSet(null, throwable)) { + throw Exceptions.propagate(throwable); + } + }); + + RecordingSubscriber> subscriber = subscriberRule.create(); + final RuntimeException e = new RuntimeException(); + service + .result() + .subscribe( + new ForwardingSubscriber>(subscriber) { + @Override + public void onComplete() { + throw e; + } + }); + + subscriber.assertAnyValue(); + assertThat(throwableRef.get().getCause()).isSameAs(e); + } + + @Test + public void resultThrowingInOnErrorDeliveredToPlugin() { + server.enqueue(new MockResponse()); + + final AtomicReference throwableRef = new AtomicReference<>(); + RxJavaPlugins.setErrorHandler( + throwable -> { + if (!throwableRef.compareAndSet(null, throwable)) { + throw Exceptions.propagate(throwable); + } + }); + + RecordingSubscriber> subscriber = subscriberRule.create(); + final RuntimeException first = new RuntimeException(); + final RuntimeException second = new RuntimeException(); + service + .result() + .safeSubscribe( + new ForwardingSubscriber>(subscriber) { + @Override + public void onNext(Result value) { + // The only way to trigger onError for a result is if onNext throws. + throw first; + } + + @Override + public void onError(Throwable throwable) { + throw second; + } + }); + + //noinspection ThrowableResultOfMethodCallIgnored + CompositeException composite = (CompositeException) throwableRef.get(); + assertThat(composite.getExceptions()).containsExactly(first, second); + } + + private abstract static class ForwardingSubscriber implements Subscriber { + private final Subscriber delegate; + + ForwardingSubscriber(Subscriber delegate) { + this.delegate = delegate; + } + + @Override + public void onSubscribe(Subscription subscription) { + delegate.onSubscribe(subscription); + } + + @Override + public void onNext(T value) { + delegate.onNext(value); + } + + @Override + public void onError(Throwable throwable) { + delegate.onError(throwable); + } + + @Override + public void onComplete() { + delegate.onComplete(); + } + } +} diff --git a/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/FlowableWithSchedulerTest.java b/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/FlowableWithSchedulerTest.java new file mode 100644 index 0000000000..888b4cca97 --- /dev/null +++ b/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/FlowableWithSchedulerTest.java @@ -0,0 +1,93 @@ +/* + * Copyright (C) 2020 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.rxjava3; + +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.schedulers.TestScheduler; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import retrofit2.Response; +import retrofit2.Retrofit; +import retrofit2.http.GET; + +public final class FlowableWithSchedulerTest { + @Rule public final MockWebServer server = new MockWebServer(); + @Rule public final RecordingSubscriber.Rule subscriberRule = new RecordingSubscriber.Rule(); + + interface Service { + @GET("/") + Flowable body(); + + @GET("/") + Flowable> response(); + + @GET("/") + Flowable> result(); + } + + private final TestScheduler scheduler = new TestScheduler(); + private Service service; + + @Before + public void setUp() { + Retrofit retrofit = + new Retrofit.Builder() + .baseUrl(server.url("/")) + .addConverterFactory(new StringConverterFactory()) + .addCallAdapterFactory(RxJava3CallAdapterFactory.createWithScheduler(scheduler)) + .build(); + service = retrofit.create(Service.class); + } + + @Test + public void bodyUsesScheduler() { + server.enqueue(new MockResponse()); + + RecordingSubscriber subscriber = subscriberRule.create(); + service.body().subscribe(subscriber); + subscriber.assertNoEvents(); + + scheduler.triggerActions(); + subscriber.assertAnyValue().assertComplete(); + } + + @Test + public void responseUsesScheduler() { + server.enqueue(new MockResponse()); + + RecordingSubscriber subscriber = subscriberRule.create(); + service.response().subscribe(subscriber); + subscriber.assertNoEvents(); + + scheduler.triggerActions(); + subscriber.assertAnyValue().assertComplete(); + } + + @Test + public void resultUsesScheduler() { + server.enqueue(new MockResponse()); + + RecordingSubscriber subscriber = subscriberRule.create(); + service.result().subscribe(subscriber); + subscriber.assertNoEvents(); + + scheduler.triggerActions(); + subscriber.assertAnyValue().assertComplete(); + } +} diff --git a/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/MaybeTest.java b/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/MaybeTest.java new file mode 100644 index 0000000000..20036ccd64 --- /dev/null +++ b/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/MaybeTest.java @@ -0,0 +1,164 @@ +/* + * Copyright (C) 2020 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.rxjava3; + +import static okhttp3.mockwebserver.SocketPolicy.DISCONNECT_AFTER_REQUEST; +import static org.assertj.core.api.Assertions.assertThat; + +import io.reactivex.rxjava3.core.Maybe; +import java.io.IOException; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import retrofit2.Response; +import retrofit2.Retrofit; +import retrofit2.http.GET; + +public final class MaybeTest { + @Rule public final MockWebServer server = new MockWebServer(); + @Rule public final RecordingMaybeObserver.Rule observerRule = new RecordingMaybeObserver.Rule(); + + interface Service { + @GET("/") + Maybe body(); + + @GET("/") + Maybe> response(); + + @GET("/") + Maybe> result(); + } + + private Service service; + + @Before + public void setUp() { + Retrofit retrofit = + new Retrofit.Builder() + .baseUrl(server.url("/")) + .addConverterFactory(new StringConverterFactory()) + .addCallAdapterFactory(RxJava3CallAdapterFactory.create()) + .build(); + service = retrofit.create(Service.class); + } + + @Test + public void bodySuccess200() { + server.enqueue(new MockResponse().setBody("Hi")); + + RecordingMaybeObserver observer = observerRule.create(); + service.body().subscribe(observer); + observer.assertValue("Hi"); + } + + @Test + public void bodySuccess404() { + server.enqueue(new MockResponse().setResponseCode(404)); + + RecordingMaybeObserver observer = observerRule.create(); + service.body().subscribe(observer); + // Required for backwards compatibility. + observer.assertError(HttpException.class, "HTTP 404 Client Error"); + } + + @Test + public void bodyFailure() { + server.enqueue(new MockResponse().setSocketPolicy(DISCONNECT_AFTER_REQUEST)); + + RecordingMaybeObserver observer = observerRule.create(); + service.body().subscribe(observer); + observer.assertError(IOException.class); + } + + @Test + public void responseSuccess200() { + server.enqueue(new MockResponse().setBody("Hi")); + + RecordingMaybeObserver> observer = observerRule.create(); + service.response().subscribe(observer); + Response response = observer.takeValue(); + assertThat(response.isSuccessful()).isTrue(); + } + + @Test + public void responseSuccess404() { + server.enqueue(new MockResponse().setResponseCode(404)); + + RecordingMaybeObserver> observer = observerRule.create(); + service.response().subscribe(observer); + assertThat(observer.takeValue().isSuccessful()).isFalse(); + } + + @Test + public void responseFailure() { + server.enqueue(new MockResponse().setSocketPolicy(DISCONNECT_AFTER_REQUEST)); + + RecordingMaybeObserver> observer = observerRule.create(); + service.response().subscribe(observer); + observer.assertError(IOException.class); + } + + @Test + public void resultSuccess200() { + server.enqueue(new MockResponse().setBody("Hi")); + + RecordingMaybeObserver> observer = observerRule.create(); + service.result().subscribe(observer); + Result result = observer.takeValue(); + assertThat(result.isError()).isFalse(); + assertThat(result.response().isSuccessful()).isTrue(); + } + + @Test + public void resultSuccess404() { + server.enqueue(new MockResponse().setResponseCode(404)); + + RecordingMaybeObserver> observer = observerRule.create(); + service.result().subscribe(observer); + Result result = observer.takeValue(); + assertThat(result.isError()).isFalse(); + assertThat(result.response().isSuccessful()).isFalse(); + } + + @Test + public void resultFailure() { + server.enqueue(new MockResponse().setSocketPolicy(DISCONNECT_AFTER_REQUEST)); + + RecordingMaybeObserver> observer = observerRule.create(); + service.result().subscribe(observer); + Result result = observer.takeValue(); + assertThat(result.isError()).isTrue(); + assertThat(result.error()).isInstanceOf(IOException.class); + } + + @Test + public void subscribeTwice() { + server.enqueue(new MockResponse().setBody("Hi")); + server.enqueue(new MockResponse().setBody("Hey")); + + Maybe observable = service.body(); + + RecordingMaybeObserver observer1 = observerRule.create(); + observable.subscribe(observer1); + observer1.assertValue("Hi"); + + RecordingMaybeObserver observer2 = observerRule.create(); + observable.subscribe(observer2); + observer2.assertValue("Hey"); + } +} diff --git a/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/MaybeThrowingTest.java b/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/MaybeThrowingTest.java new file mode 100644 index 0000000000..3c58909dc9 --- /dev/null +++ b/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/MaybeThrowingTest.java @@ -0,0 +1,279 @@ +/* + * Copyright (C) 2020 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.rxjava3; + +import static okhttp3.mockwebserver.SocketPolicy.DISCONNECT_AFTER_REQUEST; +import static org.assertj.core.api.Assertions.assertThat; + +import io.reactivex.rxjava3.core.Maybe; +import io.reactivex.rxjava3.core.MaybeObserver; +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.exceptions.CompositeException; +import io.reactivex.rxjava3.exceptions.Exceptions; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; +import java.util.concurrent.atomic.AtomicReference; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestRule; +import retrofit2.Response; +import retrofit2.Retrofit; +import retrofit2.http.GET; + +public final class MaybeThrowingTest { + @Rule public final MockWebServer server = new MockWebServer(); + @Rule public final TestRule resetRule = new RxJavaPluginsResetRule(); + @Rule public final RecordingMaybeObserver.Rule subscriberRule = new RecordingMaybeObserver.Rule(); + + interface Service { + @GET("/") + Maybe body(); + + @GET("/") + Maybe> response(); + + @GET("/") + Maybe> result(); + } + + private Service service; + + @Before + public void setUp() { + Retrofit retrofit = + new Retrofit.Builder() + .baseUrl(server.url("/")) + .addConverterFactory(new StringConverterFactory()) + .addCallAdapterFactory(RxJava3CallAdapterFactory.create()) + .build(); + service = retrofit.create(Service.class); + } + + @Test + public void bodyThrowingInOnSuccessDeliveredToPlugin() { + server.enqueue(new MockResponse()); + + final AtomicReference throwableRef = new AtomicReference<>(); + RxJavaPlugins.setErrorHandler( + throwable -> { + if (!throwableRef.compareAndSet(null, throwable)) { + throw Exceptions.propagate(throwable); + } + }); + + RecordingMaybeObserver observer = subscriberRule.create(); + final RuntimeException e = new RuntimeException(); + service + .body() + .subscribe( + new ForwardingObserver(observer) { + @Override + public void onSuccess(String value) { + throw e; + } + }); + + assertThat(throwableRef.get().getCause()).isSameAs(e); + } + + @Test + public void bodyThrowingInOnErrorDeliveredToPlugin() { + server.enqueue(new MockResponse().setResponseCode(404)); + + final AtomicReference throwableRef = new AtomicReference<>(); + RxJavaPlugins.setErrorHandler( + throwable -> { + if (!throwableRef.compareAndSet(null, throwable)) { + throw Exceptions.propagate(throwable); + } + }); + + RecordingMaybeObserver observer = subscriberRule.create(); + final AtomicReference errorRef = new AtomicReference<>(); + final RuntimeException e = new RuntimeException(); + service + .body() + .subscribe( + new ForwardingObserver(observer) { + @Override + public void onError(Throwable throwable) { + if (!errorRef.compareAndSet(null, throwable)) { + throw Exceptions.propagate(throwable); + } + throw e; + } + }); + + //noinspection ThrowableResultOfMethodCallIgnored + CompositeException composite = (CompositeException) throwableRef.get(); + assertThat(composite.getExceptions()).containsExactly(errorRef.get(), e); + } + + @Test + public void responseThrowingInOnSuccessDeliveredToPlugin() { + server.enqueue(new MockResponse()); + + final AtomicReference throwableRef = new AtomicReference<>(); + RxJavaPlugins.setErrorHandler( + throwable -> { + if (!throwableRef.compareAndSet(null, throwable)) { + throw Exceptions.propagate(throwable); + } + }); + + RecordingMaybeObserver> observer = subscriberRule.create(); + final RuntimeException e = new RuntimeException(); + service + .response() + .subscribe( + new ForwardingObserver>(observer) { + @Override + public void onSuccess(Response value) { + throw e; + } + }); + + assertThat(throwableRef.get().getCause()).isSameAs(e); + } + + @Test + public void responseThrowingInOnErrorDeliveredToPlugin() { + server.enqueue(new MockResponse().setSocketPolicy(DISCONNECT_AFTER_REQUEST)); + + final AtomicReference throwableRef = new AtomicReference<>(); + RxJavaPlugins.setErrorHandler( + throwable -> { + if (!throwableRef.compareAndSet(null, throwable)) { + throw Exceptions.propagate(throwable); + } + }); + + RecordingMaybeObserver> observer = subscriberRule.create(); + final AtomicReference errorRef = new AtomicReference<>(); + final RuntimeException e = new RuntimeException(); + service + .response() + .subscribe( + new ForwardingObserver>(observer) { + @Override + public void onError(Throwable throwable) { + if (!errorRef.compareAndSet(null, throwable)) { + throw Exceptions.propagate(throwable); + } + throw e; + } + }); + + //noinspection ThrowableResultOfMethodCallIgnored + CompositeException composite = (CompositeException) throwableRef.get(); + assertThat(composite.getExceptions()).containsExactly(errorRef.get(), e); + } + + @Test + public void resultThrowingInOnSuccessDeliveredToPlugin() { + server.enqueue(new MockResponse()); + + final AtomicReference throwableRef = new AtomicReference<>(); + RxJavaPlugins.setErrorHandler( + throwable -> { + if (!throwableRef.compareAndSet(null, throwable)) { + throw Exceptions.propagate(throwable); + } + }); + + RecordingMaybeObserver> observer = subscriberRule.create(); + final RuntimeException e = new RuntimeException(); + service + .result() + .subscribe( + new ForwardingObserver>(observer) { + @Override + public void onSuccess(Result value) { + throw e; + } + }); + + assertThat(throwableRef.get().getCause()).isSameAs(e); + } + + @Ignore("Single's contract is onNext|onError so we have no way of triggering this case") + @Test + public void resultThrowingInOnErrorDeliveredToPlugin() { + server.enqueue(new MockResponse()); + + final AtomicReference throwableRef = new AtomicReference<>(); + RxJavaPlugins.setErrorHandler( + throwable -> { + if (!throwableRef.compareAndSet(null, throwable)) { + throw Exceptions.propagate(throwable); + } + }); + + RecordingMaybeObserver> observer = subscriberRule.create(); + final RuntimeException first = new RuntimeException(); + final RuntimeException second = new RuntimeException(); + service + .result() + .subscribe( + new ForwardingObserver>(observer) { + @Override + public void onSuccess(Result value) { + // The only way to trigger onError for Result is if onSuccess throws. + throw first; + } + + @Override + public void onError(Throwable throwable) { + throw second; + } + }); + + //noinspection ThrowableResultOfMethodCallIgnored + CompositeException composite = (CompositeException) throwableRef.get(); + assertThat(composite.getExceptions()).containsExactly(first, second); + } + + private abstract static class ForwardingObserver implements MaybeObserver { + private final MaybeObserver delegate; + + ForwardingObserver(MaybeObserver delegate) { + this.delegate = delegate; + } + + @Override + public void onSubscribe(Disposable disposable) { + delegate.onSubscribe(disposable); + } + + @Override + public void onSuccess(T value) { + delegate.onSuccess(value); + } + + @Override + public void onError(Throwable throwable) { + delegate.onError(throwable); + } + + @Override + public void onComplete() { + delegate.onComplete(); + } + } +} diff --git a/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/MaybeWithSchedulerTest.java b/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/MaybeWithSchedulerTest.java new file mode 100644 index 0000000000..4782ad0a53 --- /dev/null +++ b/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/MaybeWithSchedulerTest.java @@ -0,0 +1,93 @@ +/* + * Copyright (C) 2020 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.rxjava3; + +import io.reactivex.rxjava3.core.Maybe; +import io.reactivex.rxjava3.schedulers.TestScheduler; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import retrofit2.Response; +import retrofit2.Retrofit; +import retrofit2.http.GET; + +public final class MaybeWithSchedulerTest { + @Rule public final MockWebServer server = new MockWebServer(); + @Rule public final RecordingMaybeObserver.Rule observerRule = new RecordingMaybeObserver.Rule(); + + interface Service { + @GET("/") + Maybe body(); + + @GET("/") + Maybe> response(); + + @GET("/") + Maybe> result(); + } + + private final TestScheduler scheduler = new TestScheduler(); + private Service service; + + @Before + public void setUp() { + Retrofit retrofit = + new Retrofit.Builder() + .baseUrl(server.url("/")) + .addConverterFactory(new StringConverterFactory()) + .addCallAdapterFactory(RxJava3CallAdapterFactory.createWithScheduler(scheduler)) + .build(); + service = retrofit.create(Service.class); + } + + @Test + public void bodyUsesScheduler() { + server.enqueue(new MockResponse()); + + RecordingMaybeObserver observer = observerRule.create(); + service.body().subscribe(observer); + observer.assertNoEvents(); + + scheduler.triggerActions(); + observer.assertAnyValue(); + } + + @Test + public void responseUsesScheduler() { + server.enqueue(new MockResponse()); + + RecordingMaybeObserver observer = observerRule.create(); + service.response().subscribe(observer); + observer.assertNoEvents(); + + scheduler.triggerActions(); + observer.assertAnyValue(); + } + + @Test + public void resultUsesScheduler() { + server.enqueue(new MockResponse()); + + RecordingMaybeObserver observer = observerRule.create(); + service.result().subscribe(observer); + observer.assertNoEvents(); + + scheduler.triggerActions(); + observer.assertAnyValue(); + } +} diff --git a/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/ObservableTest.java b/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/ObservableTest.java new file mode 100644 index 0000000000..888fe4d8bc --- /dev/null +++ b/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/ObservableTest.java @@ -0,0 +1,180 @@ +/* + * Copyright (C) 2020 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.rxjava3; + +import static okhttp3.mockwebserver.SocketPolicy.DISCONNECT_AFTER_REQUEST; +import static org.assertj.core.api.Assertions.assertThat; + +import io.reactivex.rxjava3.core.Observable; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; +import java.io.IOException; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import retrofit2.Response; +import retrofit2.Retrofit; +import retrofit2.http.GET; + +public final class ObservableTest { + @Rule public final MockWebServer server = new MockWebServer(); + @Rule public final RecordingObserver.Rule observerRule = new RecordingObserver.Rule(); + + interface Service { + @GET("/") + Observable body(); + + @GET("/") + Observable> response(); + + @GET("/") + Observable> result(); + } + + private Service service; + + @Before + public void setUp() { + Retrofit retrofit = + new Retrofit.Builder() + .baseUrl(server.url("/")) + .addConverterFactory(new StringConverterFactory()) + .addCallAdapterFactory(RxJava3CallAdapterFactory.create()) + .build(); + service = retrofit.create(Service.class); + } + + @Test + public void bodySuccess200() { + server.enqueue(new MockResponse().setBody("Hi")); + + RecordingObserver observer = observerRule.create(); + service.body().subscribe(observer); + observer.assertValue("Hi").assertComplete(); + } + + @Test + public void bodySuccess404() { + server.enqueue(new MockResponse().setResponseCode(404)); + + RecordingObserver observer = observerRule.create(); + service.body().subscribe(observer); + // Required for backwards compatibility. + observer.assertError(HttpException.class, "HTTP 404 Client Error"); + } + + @Test + public void bodyFailure() { + server.enqueue(new MockResponse().setSocketPolicy(DISCONNECT_AFTER_REQUEST)); + + RecordingObserver observer = observerRule.create(); + service.body().subscribe(observer); + observer.assertError(IOException.class); + } + + @Test + public void responseSuccess200() { + server.enqueue(new MockResponse()); + + RecordingObserver> observer = observerRule.create(); + service.response().subscribe(observer); + assertThat(observer.takeValue().isSuccessful()).isTrue(); + observer.assertComplete(); + } + + @Test + public void responseSuccess404() { + server.enqueue(new MockResponse().setResponseCode(404)); + + RecordingObserver> observer = observerRule.create(); + service.response().subscribe(observer); + assertThat(observer.takeValue().isSuccessful()).isFalse(); + observer.assertComplete(); + } + + @Test + public void responseFailure() { + server.enqueue(new MockResponse().setSocketPolicy(DISCONNECT_AFTER_REQUEST)); + + RecordingObserver> observer = observerRule.create(); + service.response().subscribe(observer); + observer.assertError(IOException.class); + } + + @Test + public void resultSuccess200() { + server.enqueue(new MockResponse().setBody("Hi")); + + RecordingObserver> observer = observerRule.create(); + service.result().subscribe(observer); + Result result = observer.takeValue(); + assertThat(result.isError()).isFalse(); + assertThat(result.response().isSuccessful()).isTrue(); + observer.assertComplete(); + } + + @Test + public void resultSuccess404() { + server.enqueue(new MockResponse().setResponseCode(404)); + + RecordingObserver> observer = observerRule.create(); + service.result().subscribe(observer); + Result result = observer.takeValue(); + assertThat(result.isError()).isFalse(); + assertThat(result.response().isSuccessful()).isFalse(); + observer.assertComplete(); + } + + @Test + public void resultFailure() { + server.enqueue(new MockResponse().setSocketPolicy(DISCONNECT_AFTER_REQUEST)); + + RecordingObserver> observer = observerRule.create(); + service.result().subscribe(observer); + Result result = observer.takeValue(); + assertThat(result.isError()).isTrue(); + assertThat(result.error()).isInstanceOf(IOException.class); + observer.assertComplete(); + } + + @Test + public void observableAssembly() { + try { + final Observable justMe = Observable.just("me"); + RxJavaPlugins.setOnObservableAssembly(f -> justMe); + assertThat(service.body()).isEqualTo(justMe); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void subscribeTwice() { + server.enqueue(new MockResponse().setBody("Hi")); + server.enqueue(new MockResponse().setBody("Hey")); + + Observable observable = service.body(); + + RecordingObserver observer1 = observerRule.create(); + observable.subscribe(observer1); + observer1.assertValue("Hi").assertComplete(); + + RecordingObserver observer2 = observerRule.create(); + observable.subscribe(observer2); + observer2.assertValue("Hey").assertComplete(); + } +} diff --git a/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/ObservableThrowingTest.java b/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/ObservableThrowingTest.java new file mode 100644 index 0000000000..5348c1f455 --- /dev/null +++ b/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/ObservableThrowingTest.java @@ -0,0 +1,337 @@ +/* + * Copyright (C) 2020 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.rxjava3; + +import static okhttp3.mockwebserver.SocketPolicy.DISCONNECT_AFTER_REQUEST; +import static org.assertj.core.api.Assertions.assertThat; + +import io.reactivex.rxjava3.core.Observable; +import io.reactivex.rxjava3.core.Observer; +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.exceptions.CompositeException; +import io.reactivex.rxjava3.exceptions.Exceptions; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; +import java.util.concurrent.atomic.AtomicReference; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestRule; +import retrofit2.Response; +import retrofit2.Retrofit; +import retrofit2.http.GET; + +public final class ObservableThrowingTest { + @Rule public final MockWebServer server = new MockWebServer(); + @Rule public final TestRule resetRule = new RxJavaPluginsResetRule(); + @Rule public final RecordingObserver.Rule subscriberRule = new RecordingObserver.Rule(); + + interface Service { + @GET("/") + Observable body(); + + @GET("/") + Observable> response(); + + @GET("/") + Observable> result(); + } + + private Service service; + + @Before + public void setUp() { + Retrofit retrofit = + new Retrofit.Builder() + .baseUrl(server.url("/")) + .addConverterFactory(new StringConverterFactory()) + .addCallAdapterFactory(RxJava3CallAdapterFactory.create()) + .build(); + service = retrofit.create(Service.class); + } + + @Test + public void bodyThrowingInOnNextDeliveredToError() { + server.enqueue(new MockResponse()); + + RecordingObserver observer = subscriberRule.create(); + final RuntimeException e = new RuntimeException(); + service + .body() + .subscribe( + new ForwardingObserver(observer) { + @Override + public void onNext(String value) { + throw e; + } + }); + + observer.assertError(e); + } + + @Test + public void bodyThrowingInOnCompleteDeliveredToPlugin() { + server.enqueue(new MockResponse()); + + final AtomicReference throwableRef = new AtomicReference<>(); + RxJavaPlugins.setErrorHandler( + throwable -> { + if (!throwableRef.compareAndSet(null, throwable)) { + throw Exceptions.propagate(throwable); + } + }); + + RecordingObserver observer = subscriberRule.create(); + final RuntimeException e = new RuntimeException(); + service + .body() + .subscribe( + new ForwardingObserver(observer) { + @Override + public void onComplete() { + throw e; + } + }); + + observer.assertAnyValue(); + assertThat(throwableRef.get().getCause()).isSameAs(e); + } + + @Test + public void bodyThrowingInOnErrorDeliveredToPlugin() { + server.enqueue(new MockResponse().setResponseCode(404)); + + final AtomicReference throwableRef = new AtomicReference<>(); + RxJavaPlugins.setErrorHandler( + throwable -> { + if (!throwableRef.compareAndSet(null, throwable)) { + throw Exceptions.propagate(throwable); + } + }); + + RecordingObserver observer = subscriberRule.create(); + final AtomicReference errorRef = new AtomicReference<>(); + final RuntimeException e = new RuntimeException(); + service + .body() + .subscribe( + new ForwardingObserver(observer) { + @Override + public void onError(Throwable throwable) { + if (!errorRef.compareAndSet(null, throwable)) { + throw Exceptions.propagate(throwable); + } + throw e; + } + }); + + //noinspection ThrowableResultOfMethodCallIgnored + CompositeException composite = (CompositeException) throwableRef.get(); + assertThat(composite.getExceptions()).containsExactly(errorRef.get(), e); + } + + @Test + public void responseThrowingInOnNextDeliveredToError() { + server.enqueue(new MockResponse()); + + RecordingObserver> observer = subscriberRule.create(); + final RuntimeException e = new RuntimeException(); + service + .response() + .subscribe( + new ForwardingObserver>(observer) { + @Override + public void onNext(Response value) { + throw e; + } + }); + + observer.assertError(e); + } + + @Test + public void responseThrowingInOnCompleteDeliveredToPlugin() { + server.enqueue(new MockResponse()); + + final AtomicReference throwableRef = new AtomicReference<>(); + RxJavaPlugins.setErrorHandler( + throwable -> { + if (!throwableRef.compareAndSet(null, throwable)) { + throw Exceptions.propagate(throwable); + } + }); + + RecordingObserver> observer = subscriberRule.create(); + final RuntimeException e = new RuntimeException(); + service + .response() + .subscribe( + new ForwardingObserver>(observer) { + @Override + public void onComplete() { + throw e; + } + }); + + observer.assertAnyValue(); + assertThat(throwableRef.get().getCause()).isSameAs(e); + } + + @Test + public void responseThrowingInOnErrorDeliveredToPlugin() { + server.enqueue(new MockResponse().setSocketPolicy(DISCONNECT_AFTER_REQUEST)); + + final AtomicReference throwableRef = new AtomicReference<>(); + RxJavaPlugins.setErrorHandler( + throwable -> { + if (!throwableRef.compareAndSet(null, throwable)) { + throw Exceptions.propagate(throwable); + } + }); + + RecordingObserver> observer = subscriberRule.create(); + final AtomicReference errorRef = new AtomicReference<>(); + final RuntimeException e = new RuntimeException(); + service + .response() + .subscribe( + new ForwardingObserver>(observer) { + @Override + public void onError(Throwable throwable) { + if (!errorRef.compareAndSet(null, throwable)) { + throw Exceptions.propagate(throwable); + } + throw e; + } + }); + + //noinspection ThrowableResultOfMethodCallIgnored + CompositeException composite = (CompositeException) throwableRef.get(); + assertThat(composite.getExceptions()).containsExactly(errorRef.get(), e); + } + + @Test + public void resultThrowingInOnNextDeliveredToError() { + server.enqueue(new MockResponse()); + + RecordingObserver> observer = subscriberRule.create(); + final RuntimeException e = new RuntimeException(); + service + .result() + .subscribe( + new ForwardingObserver>(observer) { + @Override + public void onNext(Result value) { + throw e; + } + }); + + observer.assertError(e); + } + + @Test + public void resultThrowingInOnCompletedDeliveredToPlugin() { + server.enqueue(new MockResponse()); + + final AtomicReference throwableRef = new AtomicReference<>(); + RxJavaPlugins.setErrorHandler( + throwable -> { + if (!throwableRef.compareAndSet(null, throwable)) { + throw Exceptions.propagate(throwable); + } + }); + + RecordingObserver> observer = subscriberRule.create(); + final RuntimeException e = new RuntimeException(); + service + .result() + .subscribe( + new ForwardingObserver>(observer) { + @Override + public void onComplete() { + throw e; + } + }); + + observer.assertAnyValue(); + assertThat(throwableRef.get().getCause()).isSameAs(e); + } + + @Test + public void resultThrowingInOnErrorDeliveredToPlugin() { + server.enqueue(new MockResponse()); + + final AtomicReference throwableRef = new AtomicReference<>(); + RxJavaPlugins.setErrorHandler( + throwable -> { + if (!throwableRef.compareAndSet(null, throwable)) { + throw Exceptions.propagate(throwable); + } + }); + + RecordingObserver> observer = subscriberRule.create(); + final RuntimeException first = new RuntimeException(); + final RuntimeException second = new RuntimeException(); + service + .result() + .subscribe( + new ForwardingObserver>(observer) { + @Override + public void onNext(Result value) { + // The only way to trigger onError for a result is if onNext throws. + throw first; + } + + @Override + public void onError(Throwable throwable) { + throw second; + } + }); + + //noinspection ThrowableResultOfMethodCallIgnored + CompositeException composite = (CompositeException) throwableRef.get(); + assertThat(composite.getExceptions()).containsExactly(first, second); + } + + private abstract static class ForwardingObserver implements Observer { + private final Observer delegate; + + ForwardingObserver(Observer delegate) { + this.delegate = delegate; + } + + @Override + public void onSubscribe(Disposable disposable) { + delegate.onSubscribe(disposable); + } + + @Override + public void onNext(T value) { + delegate.onNext(value); + } + + @Override + public void onError(Throwable throwable) { + delegate.onError(throwable); + } + + @Override + public void onComplete() { + delegate.onComplete(); + } + } +} diff --git a/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/ObservableWithSchedulerTest.java b/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/ObservableWithSchedulerTest.java new file mode 100644 index 0000000000..b646671d32 --- /dev/null +++ b/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/ObservableWithSchedulerTest.java @@ -0,0 +1,93 @@ +/* + * Copyright (C) 2020 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.rxjava3; + +import io.reactivex.rxjava3.core.Observable; +import io.reactivex.rxjava3.schedulers.TestScheduler; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import retrofit2.Response; +import retrofit2.Retrofit; +import retrofit2.http.GET; + +public final class ObservableWithSchedulerTest { + @Rule public final MockWebServer server = new MockWebServer(); + @Rule public final RecordingObserver.Rule observerRule = new RecordingObserver.Rule(); + + interface Service { + @GET("/") + Observable body(); + + @GET("/") + Observable> response(); + + @GET("/") + Observable> result(); + } + + private final TestScheduler scheduler = new TestScheduler(); + private Service service; + + @Before + public void setUp() { + Retrofit retrofit = + new Retrofit.Builder() + .baseUrl(server.url("/")) + .addConverterFactory(new StringConverterFactory()) + .addCallAdapterFactory(RxJava3CallAdapterFactory.createWithScheduler(scheduler)) + .build(); + service = retrofit.create(Service.class); + } + + @Test + public void bodyUsesScheduler() { + server.enqueue(new MockResponse()); + + RecordingObserver observer = observerRule.create(); + service.body().subscribe(observer); + observer.assertNoEvents(); + + scheduler.triggerActions(); + observer.assertAnyValue().assertComplete(); + } + + @Test + public void responseUsesScheduler() { + server.enqueue(new MockResponse()); + + RecordingObserver observer = observerRule.create(); + service.response().subscribe(observer); + observer.assertNoEvents(); + + scheduler.triggerActions(); + observer.assertAnyValue().assertComplete(); + } + + @Test + public void resultUsesScheduler() { + server.enqueue(new MockResponse()); + + RecordingObserver observer = observerRule.create(); + service.result().subscribe(observer); + observer.assertNoEvents(); + + scheduler.triggerActions(); + observer.assertAnyValue().assertComplete(); + } +} diff --git a/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/RecordingCompletableObserver.java b/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/RecordingCompletableObserver.java new file mode 100644 index 0000000000..f7a238eaf4 --- /dev/null +++ b/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/RecordingCompletableObserver.java @@ -0,0 +1,117 @@ +/* + * Copyright (C) 2020 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.rxjava3; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.reactivex.rxjava3.core.CompletableObserver; +import io.reactivex.rxjava3.core.Notification; +import io.reactivex.rxjava3.disposables.Disposable; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Deque; +import java.util.List; +import org.junit.rules.TestRule; +import org.junit.runner.Description; +import org.junit.runners.model.Statement; + +/** A test {@link CompletableObserver} and JUnit rule which guarantees all events are asserted. */ +final class RecordingCompletableObserver implements CompletableObserver { + private final Deque> events = new ArrayDeque<>(); + + private RecordingCompletableObserver() {} + + @Override + public void onSubscribe(Disposable disposable) {} + + @Override + public void onComplete() { + events.add(Notification.createOnComplete()); + } + + @Override + public void onError(Throwable e) { + events.add(Notification.createOnError(e)); + } + + private Notification takeNotification() { + Notification notification = events.pollFirst(); + if (notification == null) { + throw new AssertionError("No event found!"); + } + return notification; + } + + public Throwable takeError() { + Notification notification = takeNotification(); + assertThat(notification.isOnError()) + .as("Expected onError event but was " + notification) + .isTrue(); + return notification.getError(); + } + + public void assertComplete() { + Notification notification = takeNotification(); + assertThat(notification.isOnComplete()) + .as("Expected onCompleted event but was " + notification) + .isTrue(); + assertNoEvents(); + } + + public void assertError(Throwable throwable) { + assertThat(takeError()).isEqualTo(throwable); + } + + public void assertError(Class errorClass) { + assertError(errorClass, null); + } + + public void assertError(Class errorClass, String message) { + Throwable throwable = takeError(); + assertThat(throwable).isInstanceOf(errorClass); + if (message != null) { + assertThat(throwable).hasMessage(message); + } + assertNoEvents(); + } + + public void assertNoEvents() { + assertThat(events).as("Unconsumed events found!").isEmpty(); + } + + public static final class Rule implements TestRule { + final List subscribers = new ArrayList<>(); + + public RecordingCompletableObserver create() { + RecordingCompletableObserver subscriber = new RecordingCompletableObserver(); + subscribers.add(subscriber); + return subscriber; + } + + @Override + public Statement apply(final Statement base, Description description) { + return new Statement() { + @Override + public void evaluate() throws Throwable { + base.evaluate(); + for (RecordingCompletableObserver subscriber : subscribers) { + subscriber.assertNoEvents(); + } + } + }; + } + } +} diff --git a/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/RecordingMaybeObserver.java b/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/RecordingMaybeObserver.java new file mode 100644 index 0000000000..dd00051b89 --- /dev/null +++ b/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/RecordingMaybeObserver.java @@ -0,0 +1,133 @@ +/* + * Copyright (C) 2020 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.rxjava3; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.reactivex.rxjava3.core.MaybeObserver; +import io.reactivex.rxjava3.core.Notification; +import io.reactivex.rxjava3.core.Observer; +import io.reactivex.rxjava3.disposables.Disposable; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Deque; +import java.util.List; +import org.junit.rules.TestRule; +import org.junit.runner.Description; +import org.junit.runners.model.Statement; + +/** A test {@link Observer} and JUnit rule which guarantees all events are asserted. */ +final class RecordingMaybeObserver implements MaybeObserver { + private final Deque> events = new ArrayDeque<>(); + + private RecordingMaybeObserver() {} + + @Override + public void onSubscribe(Disposable disposable) {} + + @Override + public void onSuccess(T value) { + events.add(Notification.createOnNext(value)); + } + + @Override + public void onError(Throwable e) { + events.add(Notification.createOnError(e)); + } + + @Override + public void onComplete() { + events.add(Notification.createOnComplete()); + } + + private Notification takeNotification() { + Notification notification = events.pollFirst(); + if (notification == null) { + throw new AssertionError("No event found!"); + } + return notification; + } + + public T takeValue() { + Notification notification = takeNotification(); + assertThat(notification.isOnNext()) + .as("Expected onNext event but was " + notification) + .isTrue(); + return notification.getValue(); + } + + public Throwable takeError() { + Notification notification = takeNotification(); + assertThat(notification.isOnError()) + .as("Expected onError event but was " + notification) + .isTrue(); + return notification.getError(); + } + + public RecordingMaybeObserver assertAnyValue() { + takeValue(); + return this; + } + + public RecordingMaybeObserver assertValue(T value) { + assertThat(takeValue()).isEqualTo(value); + return this; + } + + public void assertError(Throwable throwable) { + assertThat(takeError()).isEqualTo(throwable); + } + + public void assertError(Class errorClass) { + assertError(errorClass, null); + } + + public void assertError(Class errorClass, String message) { + Throwable throwable = takeError(); + assertThat(throwable).isInstanceOf(errorClass); + if (message != null) { + assertThat(throwable).hasMessage(message); + } + assertNoEvents(); + } + + public void assertNoEvents() { + assertThat(events).as("Unconsumed events found!").isEmpty(); + } + + public static final class Rule implements TestRule { + final List> subscribers = new ArrayList<>(); + + public RecordingMaybeObserver create() { + RecordingMaybeObserver subscriber = new RecordingMaybeObserver<>(); + subscribers.add(subscriber); + return subscriber; + } + + @Override + public Statement apply(final Statement base, Description description) { + return new Statement() { + @Override + public void evaluate() throws Throwable { + base.evaluate(); + for (RecordingMaybeObserver subscriber : subscribers) { + subscriber.assertNoEvents(); + } + } + }; + } + } +} diff --git a/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/RecordingObserver.java b/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/RecordingObserver.java new file mode 100644 index 0000000000..c7c0bc5b87 --- /dev/null +++ b/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/RecordingObserver.java @@ -0,0 +1,140 @@ +/* + * Copyright (C) 2020 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.rxjava3; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.reactivex.rxjava3.core.Notification; +import io.reactivex.rxjava3.core.Observer; +import io.reactivex.rxjava3.disposables.Disposable; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Deque; +import java.util.List; +import org.junit.rules.TestRule; +import org.junit.runner.Description; +import org.junit.runners.model.Statement; + +/** A test {@link Observer} and JUnit rule which guarantees all events are asserted. */ +final class RecordingObserver implements Observer { + private final Deque> events = new ArrayDeque<>(); + + private RecordingObserver() {} + + @Override + public void onSubscribe(Disposable disposable) {} + + @Override + public void onNext(T value) { + events.add(Notification.createOnNext(value)); + } + + @Override + public void onComplete() { + events.add(Notification.createOnComplete()); + } + + @Override + public void onError(Throwable e) { + events.add(Notification.createOnError(e)); + } + + private Notification takeNotification() { + Notification notification = events.pollFirst(); + if (notification == null) { + throw new AssertionError("No event found!"); + } + return notification; + } + + public T takeValue() { + Notification notification = takeNotification(); + assertThat(notification.isOnNext()) + .as("Expected onNext event but was " + notification) + .isTrue(); + return notification.getValue(); + } + + public Throwable takeError() { + Notification notification = takeNotification(); + assertThat(notification.isOnError()) + .as("Expected onError event but was " + notification) + .isTrue(); + return notification.getError(); + } + + public RecordingObserver assertAnyValue() { + takeValue(); + return this; + } + + public RecordingObserver assertValue(T value) { + assertThat(takeValue()).isEqualTo(value); + return this; + } + + public void assertComplete() { + Notification notification = takeNotification(); + assertThat(notification.isOnComplete()) + .as("Expected onCompleted event but was " + notification) + .isTrue(); + assertNoEvents(); + } + + public void assertError(Throwable throwable) { + assertThat(takeError()).isEqualTo(throwable); + } + + public void assertError(Class errorClass) { + assertError(errorClass, null); + } + + public void assertError(Class errorClass, String message) { + Throwable throwable = takeError(); + assertThat(throwable).isInstanceOf(errorClass); + if (message != null) { + assertThat(throwable).hasMessage(message); + } + assertNoEvents(); + } + + public void assertNoEvents() { + assertThat(events).as("Unconsumed events found!").isEmpty(); + } + + public static final class Rule implements TestRule { + final List> subscribers = new ArrayList<>(); + + public RecordingObserver create() { + RecordingObserver subscriber = new RecordingObserver<>(); + subscribers.add(subscriber); + return subscriber; + } + + @Override + public Statement apply(final Statement base, Description description) { + return new Statement() { + @Override + public void evaluate() throws Throwable { + base.evaluate(); + for (RecordingObserver subscriber : subscribers) { + subscriber.assertNoEvents(); + } + } + }; + } + } +} diff --git a/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/RecordingSingleObserver.java b/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/RecordingSingleObserver.java new file mode 100644 index 0000000000..18d3015a35 --- /dev/null +++ b/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/RecordingSingleObserver.java @@ -0,0 +1,128 @@ +/* + * Copyright (C) 2020 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.rxjava3; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.reactivex.rxjava3.core.Notification; +import io.reactivex.rxjava3.core.Observer; +import io.reactivex.rxjava3.core.SingleObserver; +import io.reactivex.rxjava3.disposables.Disposable; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Deque; +import java.util.List; +import org.junit.rules.TestRule; +import org.junit.runner.Description; +import org.junit.runners.model.Statement; + +/** A test {@link Observer} and JUnit rule which guarantees all events are asserted. */ +final class RecordingSingleObserver implements SingleObserver { + private final Deque> events = new ArrayDeque<>(); + + private RecordingSingleObserver() {} + + @Override + public void onSubscribe(Disposable disposable) {} + + @Override + public void onSuccess(T value) { + events.add(Notification.createOnNext(value)); + } + + @Override + public void onError(Throwable e) { + events.add(Notification.createOnError(e)); + } + + private Notification takeNotification() { + Notification notification = events.pollFirst(); + if (notification == null) { + throw new AssertionError("No event found!"); + } + return notification; + } + + public T takeValue() { + Notification notification = takeNotification(); + assertThat(notification.isOnNext()) + .as("Expected onNext event but was " + notification) + .isTrue(); + return notification.getValue(); + } + + public Throwable takeError() { + Notification notification = takeNotification(); + assertThat(notification.isOnError()) + .as("Expected onError event but was " + notification) + .isTrue(); + return notification.getError(); + } + + public RecordingSingleObserver assertAnyValue() { + takeValue(); + return this; + } + + public RecordingSingleObserver assertValue(T value) { + assertThat(takeValue()).isEqualTo(value); + return this; + } + + public void assertError(Throwable throwable) { + assertThat(takeError()).isEqualTo(throwable); + } + + public void assertError(Class errorClass) { + assertError(errorClass, null); + } + + public void assertError(Class errorClass, String message) { + Throwable throwable = takeError(); + assertThat(throwable).isInstanceOf(errorClass); + if (message != null) { + assertThat(throwable).hasMessage(message); + } + assertNoEvents(); + } + + public void assertNoEvents() { + assertThat(events).as("Unconsumed events found!").isEmpty(); + } + + public static final class Rule implements TestRule { + final List> subscribers = new ArrayList<>(); + + public RecordingSingleObserver create() { + RecordingSingleObserver subscriber = new RecordingSingleObserver<>(); + subscribers.add(subscriber); + return subscriber; + } + + @Override + public Statement apply(final Statement base, Description description) { + return new Statement() { + @Override + public void evaluate() throws Throwable { + base.evaluate(); + for (RecordingSingleObserver subscriber : subscribers) { + subscriber.assertNoEvents(); + } + } + }; + } + } +} diff --git a/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/RecordingSubscriber.java b/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/RecordingSubscriber.java new file mode 100644 index 0000000000..dd25590875 --- /dev/null +++ b/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/RecordingSubscriber.java @@ -0,0 +1,161 @@ +/* + * Copyright (C) 2020 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.rxjava3; + +import static org.assertj.core.api.Assertions.assertThat; + +import io.reactivex.rxjava3.core.FlowableSubscriber; +import io.reactivex.rxjava3.core.Notification; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Deque; +import java.util.List; +import org.junit.rules.TestRule; +import org.junit.runner.Description; +import org.junit.runners.model.Statement; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +/** A test {@link Subscriber} and JUnit rule which guarantees all events are asserted. */ +final class RecordingSubscriber implements FlowableSubscriber { + private final long initialRequest; + private final Deque> events = new ArrayDeque<>(); + + private Subscription subscription; + + private RecordingSubscriber(long initialRequest) { + this.initialRequest = initialRequest; + } + + @Override + public void onSubscribe(Subscription subscription) { + this.subscription = subscription; + + subscription.request(initialRequest); + } + + @Override + public void onNext(T value) { + events.add(Notification.createOnNext(value)); + } + + @Override + public void onComplete() { + events.add(Notification.createOnComplete()); + } + + @Override + public void onError(Throwable e) { + events.add(Notification.createOnError(e)); + } + + private Notification takeNotification() { + Notification notification = events.pollFirst(); + if (notification == null) { + throw new AssertionError("No event found!"); + } + return notification; + } + + public T takeValue() { + Notification notification = takeNotification(); + assertThat(notification.isOnNext()) + .as("Expected onNext event but was " + notification) + .isTrue(); + return notification.getValue(); + } + + public Throwable takeError() { + Notification notification = takeNotification(); + assertThat(notification.isOnError()) + .as("Expected onError event but was " + notification) + .isTrue(); + return notification.getError(); + } + + public RecordingSubscriber assertAnyValue() { + takeValue(); + return this; + } + + public RecordingSubscriber assertValue(T value) { + assertThat(takeValue()).isEqualTo(value); + return this; + } + + public void assertComplete() { + Notification notification = takeNotification(); + assertThat(notification.isOnComplete()) + .as("Expected onCompleted event but was " + notification) + .isTrue(); + assertNoEvents(); + } + + public void assertError(Throwable throwable) { + assertThat(takeError()).isEqualTo(throwable); + } + + public void assertError(Class errorClass) { + assertError(errorClass, null); + } + + public void assertError(Class errorClass, String message) { + Throwable throwable = takeError(); + assertThat(throwable).isInstanceOf(errorClass); + if (message != null) { + assertThat(throwable).hasMessage(message); + } + assertNoEvents(); + } + + public void assertNoEvents() { + assertThat(events).as("Unconsumed events found!").isEmpty(); + } + + public void request(long amount) { + if (subscription == null) { + throw new IllegalStateException("onSubscribe has not been called yet. Did you subscribe()?"); + } + subscription.request(amount); + } + + public static final class Rule implements TestRule { + final List> subscribers = new ArrayList<>(); + + public RecordingSubscriber create() { + return createWithInitialRequest(Long.MAX_VALUE); + } + + public RecordingSubscriber createWithInitialRequest(long initialRequest) { + RecordingSubscriber subscriber = new RecordingSubscriber<>(initialRequest); + subscribers.add(subscriber); + return subscriber; + } + + @Override + public Statement apply(final Statement base, Description description) { + return new Statement() { + @Override + public void evaluate() throws Throwable { + base.evaluate(); + for (RecordingSubscriber subscriber : subscribers) { + subscriber.assertNoEvents(); + } + } + }; + } + } +} diff --git a/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/ResultTest.java b/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/ResultTest.java new file mode 100644 index 0000000000..a32ed5041a --- /dev/null +++ b/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/ResultTest.java @@ -0,0 +1,63 @@ +/* + * Copyright (C) 2020 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.rxjava3; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.fail; + +import java.io.IOException; +import org.junit.Test; +import retrofit2.Response; + +public final class ResultTest { + @Test + public void response() { + Response response = Response.success("Hi"); + Result result = Result.response(response); + assertThat(result.isError()).isFalse(); + assertThat(result.error()).isNull(); + assertThat(result.response()).isSameAs(response); + } + + @Test + public void nullResponseThrows() { + try { + Result.response(null); + fail(); + } catch (NullPointerException e) { + assertThat(e).hasMessage("response == null"); + } + } + + @Test + public void error() { + Throwable error = new IOException(); + Result result = Result.error(error); + assertThat(result.isError()).isTrue(); + assertThat(result.error()).isSameAs(error); + assertThat(result.response()).isNull(); + } + + @Test + public void nullErrorThrows() { + try { + Result.error(null); + fail(); + } catch (NullPointerException e) { + assertThat(e).hasMessage("error == null"); + } + } +} diff --git a/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/RxJava3CallAdapterFactoryTest.java b/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/RxJava3CallAdapterFactoryTest.java new file mode 100644 index 0000000000..6cfb0895f8 --- /dev/null +++ b/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/RxJava3CallAdapterFactoryTest.java @@ -0,0 +1,282 @@ +/* + * Copyright (C) 2020 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package retrofit2.adapter.rxjava3; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.Assert.fail; + +import com.google.common.reflect.TypeToken; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.Maybe; +import io.reactivex.rxjava3.core.Observable; +import io.reactivex.rxjava3.core.Single; +import java.lang.annotation.Annotation; +import java.lang.reflect.Type; +import java.util.List; +import org.junit.Before; +import org.junit.Test; +import retrofit2.CallAdapter; +import retrofit2.Response; +import retrofit2.Retrofit; + +public class RxJava3CallAdapterFactoryTest { + private static final Annotation[] NO_ANNOTATIONS = new Annotation[0]; + + private final CallAdapter.Factory factory = RxJava3CallAdapterFactory.create(); + private Retrofit retrofit; + + @Before + public void setUp() { + retrofit = + new Retrofit.Builder() + .baseUrl("http://localhost:1") + .addConverterFactory(new StringConverterFactory()) + .addCallAdapterFactory(factory) + .build(); + } + + @Test + public void nullSchedulerThrows() { + try { + RxJava3CallAdapterFactory.createWithScheduler(null); + fail(); + } catch (NullPointerException e) { + assertThat(e).hasMessage("scheduler == null"); + } + } + + @Test + public void nonRxJavaTypeReturnsNull() { + CallAdapter adapter = factory.get(String.class, NO_ANNOTATIONS, retrofit); + assertThat(adapter).isNull(); + } + + @Test + public void responseTypes() { + Type oBodyClass = new TypeToken>() {}.getType(); + assertThat(factory.get(oBodyClass, NO_ANNOTATIONS, retrofit).responseType()) + .isEqualTo(String.class); + Type sBodyClass = new TypeToken>() {}.getType(); + assertThat(factory.get(sBodyClass, NO_ANNOTATIONS, retrofit).responseType()) + .isEqualTo(String.class); + Type mBodyClass = new TypeToken>() {}.getType(); + assertThat(factory.get(mBodyClass, NO_ANNOTATIONS, retrofit).responseType()) + .isEqualTo(String.class); + Type fBodyClass = new TypeToken>() {}.getType(); + assertThat(factory.get(fBodyClass, NO_ANNOTATIONS, retrofit).responseType()) + .isEqualTo(String.class); + + Type oBodyWildcard = new TypeToken>() {}.getType(); + assertThat(factory.get(oBodyWildcard, NO_ANNOTATIONS, retrofit).responseType()) + .isEqualTo(String.class); + Type sBodyWildcard = new TypeToken>() {}.getType(); + assertThat(factory.get(sBodyWildcard, NO_ANNOTATIONS, retrofit).responseType()) + .isEqualTo(String.class); + Type mBodyWildcard = new TypeToken>() {}.getType(); + assertThat(factory.get(mBodyWildcard, NO_ANNOTATIONS, retrofit).responseType()) + .isEqualTo(String.class); + Type fBodyWildcard = new TypeToken>() {}.getType(); + assertThat(factory.get(fBodyWildcard, NO_ANNOTATIONS, retrofit).responseType()) + .isEqualTo(String.class); + + Type oBodyGeneric = new TypeToken>>() {}.getType(); + assertThat(factory.get(oBodyGeneric, NO_ANNOTATIONS, retrofit).responseType()) + .isEqualTo(new TypeToken>() {}.getType()); + Type sBodyGeneric = new TypeToken>>() {}.getType(); + assertThat(factory.get(sBodyGeneric, NO_ANNOTATIONS, retrofit).responseType()) + .isEqualTo(new TypeToken>() {}.getType()); + Type mBodyGeneric = new TypeToken>>() {}.getType(); + assertThat(factory.get(mBodyGeneric, NO_ANNOTATIONS, retrofit).responseType()) + .isEqualTo(new TypeToken>() {}.getType()); + Type fBodyGeneric = new TypeToken>>() {}.getType(); + assertThat(factory.get(fBodyGeneric, NO_ANNOTATIONS, retrofit).responseType()) + .isEqualTo(new TypeToken>() {}.getType()); + + Type oResponseClass = new TypeToken>>() {}.getType(); + assertThat(factory.get(oResponseClass, NO_ANNOTATIONS, retrofit).responseType()) + .isEqualTo(String.class); + Type sResponseClass = new TypeToken>>() {}.getType(); + assertThat(factory.get(sResponseClass, NO_ANNOTATIONS, retrofit).responseType()) + .isEqualTo(String.class); + Type mResponseClass = new TypeToken>>() {}.getType(); + assertThat(factory.get(mResponseClass, NO_ANNOTATIONS, retrofit).responseType()) + .isEqualTo(String.class); + Type fResponseClass = new TypeToken>>() {}.getType(); + assertThat(factory.get(fResponseClass, NO_ANNOTATIONS, retrofit).responseType()) + .isEqualTo(String.class); + + Type oResponseWildcard = new TypeToken>>() {}.getType(); + assertThat(factory.get(oResponseWildcard, NO_ANNOTATIONS, retrofit).responseType()) + .isEqualTo(String.class); + Type sResponseWildcard = new TypeToken>>() {}.getType(); + assertThat(factory.get(sResponseWildcard, NO_ANNOTATIONS, retrofit).responseType()) + .isEqualTo(String.class); + Type mResponseWildcard = new TypeToken>>() {}.getType(); + assertThat(factory.get(mResponseWildcard, NO_ANNOTATIONS, retrofit).responseType()) + .isEqualTo(String.class); + Type fResponseWildcard = new TypeToken>>() {}.getType(); + assertThat(factory.get(fResponseWildcard, NO_ANNOTATIONS, retrofit).responseType()) + .isEqualTo(String.class); + + Type oResultClass = new TypeToken>>() {}.getType(); + assertThat(factory.get(oResultClass, NO_ANNOTATIONS, retrofit).responseType()) + .isEqualTo(String.class); + Type sResultClass = new TypeToken>>() {}.getType(); + assertThat(factory.get(sResultClass, NO_ANNOTATIONS, retrofit).responseType()) + .isEqualTo(String.class); + Type mResultClass = new TypeToken>>() {}.getType(); + assertThat(factory.get(mResultClass, NO_ANNOTATIONS, retrofit).responseType()) + .isEqualTo(String.class); + Type fResultClass = new TypeToken>>() {}.getType(); + assertThat(factory.get(fResultClass, NO_ANNOTATIONS, retrofit).responseType()) + .isEqualTo(String.class); + + Type oResultWildcard = new TypeToken>>() {}.getType(); + assertThat(factory.get(oResultWildcard, NO_ANNOTATIONS, retrofit).responseType()) + .isEqualTo(String.class); + Type sResultWildcard = new TypeToken>>() {}.getType(); + assertThat(factory.get(sResultWildcard, NO_ANNOTATIONS, retrofit).responseType()) + .isEqualTo(String.class); + Type mResultWildcard = new TypeToken>>() {}.getType(); + assertThat(factory.get(mResultWildcard, NO_ANNOTATIONS, retrofit).responseType()) + .isEqualTo(String.class); + Type fResultWildcard = new TypeToken>>() {}.getType(); + assertThat(factory.get(fResultWildcard, NO_ANNOTATIONS, retrofit).responseType()) + .isEqualTo(String.class); + } + + @Test + public void rawBodyTypeThrows() { + Type observableType = new TypeToken() {}.getType(); + try { + factory.get(observableType, NO_ANNOTATIONS, retrofit); + fail(); + } catch (IllegalStateException e) { + assertThat(e) + .hasMessage( + "Observable return type must be parameterized as Observable or Observable"); + } + + Type singleType = new TypeToken() {}.getType(); + try { + factory.get(singleType, NO_ANNOTATIONS, retrofit); + fail(); + } catch (IllegalStateException e) { + assertThat(e) + .hasMessage( + "Single return type must be parameterized as Single or Single"); + } + + Type maybeType = new TypeToken() {}.getType(); + try { + factory.get(maybeType, NO_ANNOTATIONS, retrofit); + fail(); + } catch (IllegalStateException e) { + assertThat(e) + .hasMessage( + "Maybe return type must be parameterized as Maybe or Maybe"); + } + + Type flowableType = new TypeToken() {}.getType(); + try { + factory.get(flowableType, NO_ANNOTATIONS, retrofit); + fail(); + } catch (IllegalStateException e) { + assertThat(e) + .hasMessage( + "Flowable return type must be parameterized as Flowable or Flowable"); + } + } + + @Test + public void rawResponseTypeThrows() { + Type observableType = new TypeToken>() {}.getType(); + try { + factory.get(observableType, NO_ANNOTATIONS, retrofit); + fail(); + } catch (IllegalStateException e) { + assertThat(e) + .hasMessage("Response must be parameterized as Response or Response"); + } + + Type singleType = new TypeToken>() {}.getType(); + try { + factory.get(singleType, NO_ANNOTATIONS, retrofit); + fail(); + } catch (IllegalStateException e) { + assertThat(e) + .hasMessage("Response must be parameterized as Response or Response"); + } + + Type maybeType = new TypeToken>() {}.getType(); + try { + factory.get(maybeType, NO_ANNOTATIONS, retrofit); + fail(); + } catch (IllegalStateException e) { + assertThat(e) + .hasMessage("Response must be parameterized as Response or Response"); + } + + Type flowableType = new TypeToken>() {}.getType(); + try { + factory.get(flowableType, NO_ANNOTATIONS, retrofit); + fail(); + } catch (IllegalStateException e) { + assertThat(e) + .hasMessage("Response must be parameterized as Response or Response"); + } + } + + @Test + public void rawResultTypeThrows() { + Type observableType = new TypeToken>() {}.getType(); + try { + factory.get(observableType, NO_ANNOTATIONS, retrofit); + fail(); + } catch (IllegalStateException e) { + assertThat(e) + .hasMessage("Result must be parameterized as Result or Result"); + } + + Type singleType = new TypeToken>() {}.getType(); + try { + factory.get(singleType, NO_ANNOTATIONS, retrofit); + fail(); + } catch (IllegalStateException e) { + assertThat(e) + .hasMessage("Result must be parameterized as Result or Result"); + } + + Type maybeType = new TypeToken>() {}.getType(); + try { + factory.get(maybeType, NO_ANNOTATIONS, retrofit); + fail(); + } catch (IllegalStateException e) { + assertThat(e) + .hasMessage("Result must be parameterized as Result or Result"); + } + + Type flowableType = new TypeToken>() {}.getType(); + try { + factory.get(flowableType, NO_ANNOTATIONS, retrofit); + fail(); + } catch (IllegalStateException e) { + assertThat(e) + .hasMessage("Result must be parameterized as Result or Result"); + } + } +} diff --git a/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/RxJavaPluginsResetRule.java b/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/RxJavaPluginsResetRule.java new file mode 100644 index 0000000000..a4690bf2de --- /dev/null +++ b/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/RxJavaPluginsResetRule.java @@ -0,0 +1,38 @@ +/* + * Copyright (C) 2020 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.rxjava3; + +import io.reactivex.rxjava3.plugins.RxJavaPlugins; +import org.junit.rules.TestRule; +import org.junit.runner.Description; +import org.junit.runners.model.Statement; + +final class RxJavaPluginsResetRule implements TestRule { + @Override + public Statement apply(final Statement base, Description description) { + return new Statement() { + @Override + public void evaluate() throws Throwable { + RxJavaPlugins.reset(); + try { + base.evaluate(); + } finally { + RxJavaPlugins.reset(); + } + } + }; + } +} diff --git a/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/SingleTest.java b/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/SingleTest.java new file mode 100644 index 0000000000..b03f69014b --- /dev/null +++ b/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/SingleTest.java @@ -0,0 +1,164 @@ +/* + * Copyright (C) 2020 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.rxjava3; + +import static okhttp3.mockwebserver.SocketPolicy.DISCONNECT_AFTER_REQUEST; +import static org.assertj.core.api.Assertions.assertThat; + +import io.reactivex.rxjava3.core.Single; +import java.io.IOException; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import retrofit2.Response; +import retrofit2.Retrofit; +import retrofit2.http.GET; + +public final class SingleTest { + @Rule public final MockWebServer server = new MockWebServer(); + @Rule public final RecordingSingleObserver.Rule observerRule = new RecordingSingleObserver.Rule(); + + interface Service { + @GET("/") + Single body(); + + @GET("/") + Single> response(); + + @GET("/") + Single> result(); + } + + private Service service; + + @Before + public void setUp() { + Retrofit retrofit = + new Retrofit.Builder() + .baseUrl(server.url("/")) + .addConverterFactory(new StringConverterFactory()) + .addCallAdapterFactory(RxJava3CallAdapterFactory.create()) + .build(); + service = retrofit.create(Service.class); + } + + @Test + public void bodySuccess200() { + server.enqueue(new MockResponse().setBody("Hi")); + + RecordingSingleObserver observer = observerRule.create(); + service.body().subscribe(observer); + observer.assertValue("Hi"); + } + + @Test + public void bodySuccess404() { + server.enqueue(new MockResponse().setResponseCode(404)); + + RecordingSingleObserver observer = observerRule.create(); + service.body().subscribe(observer); + // Required for backwards compatibility. + observer.assertError(HttpException.class, "HTTP 404 Client Error"); + } + + @Test + public void bodyFailure() { + server.enqueue(new MockResponse().setSocketPolicy(DISCONNECT_AFTER_REQUEST)); + + RecordingSingleObserver observer = observerRule.create(); + service.body().subscribe(observer); + observer.assertError(IOException.class); + } + + @Test + public void responseSuccess200() { + server.enqueue(new MockResponse().setBody("Hi")); + + RecordingSingleObserver> observer = observerRule.create(); + service.response().subscribe(observer); + Response response = observer.takeValue(); + assertThat(response.isSuccessful()).isTrue(); + } + + @Test + public void responseSuccess404() { + server.enqueue(new MockResponse().setResponseCode(404)); + + RecordingSingleObserver> observer = observerRule.create(); + service.response().subscribe(observer); + assertThat(observer.takeValue().isSuccessful()).isFalse(); + } + + @Test + public void responseFailure() { + server.enqueue(new MockResponse().setSocketPolicy(DISCONNECT_AFTER_REQUEST)); + + RecordingSingleObserver> observer = observerRule.create(); + service.response().subscribe(observer); + observer.assertError(IOException.class); + } + + @Test + public void resultSuccess200() { + server.enqueue(new MockResponse().setBody("Hi")); + + RecordingSingleObserver> observer = observerRule.create(); + service.result().subscribe(observer); + Result result = observer.takeValue(); + assertThat(result.isError()).isFalse(); + assertThat(result.response().isSuccessful()).isTrue(); + } + + @Test + public void resultSuccess404() { + server.enqueue(new MockResponse().setResponseCode(404)); + + RecordingSingleObserver> observer = observerRule.create(); + service.result().subscribe(observer); + Result result = observer.takeValue(); + assertThat(result.isError()).isFalse(); + assertThat(result.response().isSuccessful()).isFalse(); + } + + @Test + public void resultFailure() { + server.enqueue(new MockResponse().setSocketPolicy(DISCONNECT_AFTER_REQUEST)); + + RecordingSingleObserver> observer = observerRule.create(); + service.result().subscribe(observer); + Result result = observer.takeValue(); + assertThat(result.isError()).isTrue(); + assertThat(result.error()).isInstanceOf(IOException.class); + } + + @Test + public void subscribeTwice() { + server.enqueue(new MockResponse().setBody("Hi")); + server.enqueue(new MockResponse().setBody("Hey")); + + Single observable = service.body(); + + RecordingSingleObserver observer1 = observerRule.create(); + observable.subscribe(observer1); + observer1.assertValue("Hi"); + + RecordingSingleObserver observer2 = observerRule.create(); + observable.subscribe(observer2); + observer2.assertValue("Hey"); + } +} diff --git a/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/SingleThrowingTest.java b/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/SingleThrowingTest.java new file mode 100644 index 0000000000..25a92c3aaa --- /dev/null +++ b/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/SingleThrowingTest.java @@ -0,0 +1,276 @@ +/* + * Copyright (C) 2020 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.rxjava3; + +import static okhttp3.mockwebserver.SocketPolicy.DISCONNECT_AFTER_REQUEST; +import static org.assertj.core.api.Assertions.assertThat; + +import io.reactivex.rxjava3.core.Single; +import io.reactivex.rxjava3.core.SingleObserver; +import io.reactivex.rxjava3.disposables.Disposable; +import io.reactivex.rxjava3.exceptions.CompositeException; +import io.reactivex.rxjava3.exceptions.Exceptions; +import io.reactivex.rxjava3.plugins.RxJavaPlugins; +import java.util.concurrent.atomic.AtomicReference; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TestRule; +import retrofit2.Response; +import retrofit2.Retrofit; +import retrofit2.http.GET; + +public final class SingleThrowingTest { + @Rule public final MockWebServer server = new MockWebServer(); + @Rule public final TestRule resetRule = new RxJavaPluginsResetRule(); + + @Rule + public final RecordingSingleObserver.Rule subscriberRule = new RecordingSingleObserver.Rule(); + + interface Service { + @GET("/") + Single body(); + + @GET("/") + Single> response(); + + @GET("/") + Single> result(); + } + + private Service service; + + @Before + public void setUp() { + Retrofit retrofit = + new Retrofit.Builder() + .baseUrl(server.url("/")) + .addConverterFactory(new StringConverterFactory()) + .addCallAdapterFactory(RxJava3CallAdapterFactory.create()) + .build(); + service = retrofit.create(Service.class); + } + + @Test + public void bodyThrowingInOnSuccessDeliveredToPlugin() { + server.enqueue(new MockResponse()); + + final AtomicReference throwableRef = new AtomicReference<>(); + RxJavaPlugins.setErrorHandler( + throwable -> { + if (!throwableRef.compareAndSet(null, throwable)) { + throw Exceptions.propagate(throwable); + } + }); + + RecordingSingleObserver observer = subscriberRule.create(); + final RuntimeException e = new RuntimeException(); + service + .body() + .subscribe( + new ForwardingObserver(observer) { + @Override + public void onSuccess(String value) { + throw e; + } + }); + + assertThat(throwableRef.get().getCause()).isSameAs(e); + } + + @Test + public void bodyThrowingInOnErrorDeliveredToPlugin() { + server.enqueue(new MockResponse().setResponseCode(404)); + + final AtomicReference throwableRef = new AtomicReference<>(); + RxJavaPlugins.setErrorHandler( + throwable -> { + if (!throwableRef.compareAndSet(null, throwable)) { + throw Exceptions.propagate(throwable); + } + }); + + RecordingSingleObserver observer = subscriberRule.create(); + final AtomicReference errorRef = new AtomicReference<>(); + final RuntimeException e = new RuntimeException(); + service + .body() + .subscribe( + new ForwardingObserver(observer) { + @Override + public void onError(Throwable throwable) { + if (!errorRef.compareAndSet(null, throwable)) { + throw Exceptions.propagate(throwable); + } + throw e; + } + }); + + //noinspection ThrowableResultOfMethodCallIgnored + CompositeException composite = (CompositeException) throwableRef.get(); + assertThat(composite.getExceptions()).containsExactly(errorRef.get(), e); + } + + @Test + public void responseThrowingInOnSuccessDeliveredToPlugin() { + server.enqueue(new MockResponse()); + + final AtomicReference throwableRef = new AtomicReference<>(); + RxJavaPlugins.setErrorHandler( + throwable -> { + if (!throwableRef.compareAndSet(null, throwable)) { + throw Exceptions.propagate(throwable); + } + }); + + RecordingSingleObserver> observer = subscriberRule.create(); + final RuntimeException e = new RuntimeException(); + service + .response() + .subscribe( + new ForwardingObserver>(observer) { + @Override + public void onSuccess(Response value) { + throw e; + } + }); + + assertThat(throwableRef.get().getCause()).isSameAs(e); + } + + @Test + public void responseThrowingInOnErrorDeliveredToPlugin() { + server.enqueue(new MockResponse().setSocketPolicy(DISCONNECT_AFTER_REQUEST)); + + final AtomicReference throwableRef = new AtomicReference<>(); + RxJavaPlugins.setErrorHandler( + throwable -> { + if (!throwableRef.compareAndSet(null, throwable)) { + throw Exceptions.propagate(throwable); + } + }); + + RecordingSingleObserver> observer = subscriberRule.create(); + final AtomicReference errorRef = new AtomicReference<>(); + final RuntimeException e = new RuntimeException(); + service + .response() + .subscribe( + new ForwardingObserver>(observer) { + @Override + public void onError(Throwable throwable) { + if (!errorRef.compareAndSet(null, throwable)) { + throw Exceptions.propagate(throwable); + } + throw e; + } + }); + + //noinspection ThrowableResultOfMethodCallIgnored + CompositeException composite = (CompositeException) throwableRef.get(); + assertThat(composite.getExceptions()).containsExactly(errorRef.get(), e); + } + + @Test + public void resultThrowingInOnSuccessDeliveredToPlugin() { + server.enqueue(new MockResponse()); + + final AtomicReference throwableRef = new AtomicReference<>(); + RxJavaPlugins.setErrorHandler( + throwable -> { + if (!throwableRef.compareAndSet(null, throwable)) { + throw Exceptions.propagate(throwable); + } + }); + + RecordingSingleObserver> observer = subscriberRule.create(); + final RuntimeException e = new RuntimeException(); + service + .result() + .subscribe( + new ForwardingObserver>(observer) { + @Override + public void onSuccess(Result value) { + throw e; + } + }); + + assertThat(throwableRef.get().getCause()).isSameAs(e); + } + + @Ignore("Single's contract is onNext|onError so we have no way of triggering this case") + @Test + public void resultThrowingInOnErrorDeliveredToPlugin() { + server.enqueue(new MockResponse()); + + final AtomicReference throwableRef = new AtomicReference<>(); + RxJavaPlugins.setErrorHandler( + throwable -> { + if (!throwableRef.compareAndSet(null, throwable)) { + throw Exceptions.propagate(throwable); + } + }); + + RecordingSingleObserver> observer = subscriberRule.create(); + final RuntimeException first = new RuntimeException(); + final RuntimeException second = new RuntimeException(); + service + .result() + .subscribe( + new ForwardingObserver>(observer) { + @Override + public void onSuccess(Result value) { + // The only way to trigger onError for Result is if onSuccess throws. + throw first; + } + + @Override + public void onError(Throwable throwable) { + throw second; + } + }); + + //noinspection ThrowableResultOfMethodCallIgnored + CompositeException composite = (CompositeException) throwableRef.get(); + assertThat(composite.getExceptions()).containsExactly(first, second); + } + + private abstract static class ForwardingObserver implements SingleObserver { + private final SingleObserver delegate; + + ForwardingObserver(SingleObserver delegate) { + this.delegate = delegate; + } + + @Override + public void onSubscribe(Disposable disposable) { + delegate.onSubscribe(disposable); + } + + @Override + public void onSuccess(T value) { + delegate.onSuccess(value); + } + + @Override + public void onError(Throwable throwable) { + delegate.onError(throwable); + } + } +} diff --git a/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/SingleWithSchedulerTest.java b/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/SingleWithSchedulerTest.java new file mode 100644 index 0000000000..a52bf169e6 --- /dev/null +++ b/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/SingleWithSchedulerTest.java @@ -0,0 +1,93 @@ +/* + * Copyright (C) 2020 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.rxjava3; + +import io.reactivex.rxjava3.core.Single; +import io.reactivex.rxjava3.schedulers.TestScheduler; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import retrofit2.Response; +import retrofit2.Retrofit; +import retrofit2.http.GET; + +public final class SingleWithSchedulerTest { + @Rule public final MockWebServer server = new MockWebServer(); + @Rule public final RecordingSingleObserver.Rule observerRule = new RecordingSingleObserver.Rule(); + + interface Service { + @GET("/") + Single body(); + + @GET("/") + Single> response(); + + @GET("/") + Single> result(); + } + + private final TestScheduler scheduler = new TestScheduler(); + private Service service; + + @Before + public void setUp() { + Retrofit retrofit = + new Retrofit.Builder() + .baseUrl(server.url("/")) + .addConverterFactory(new StringConverterFactory()) + .addCallAdapterFactory(RxJava3CallAdapterFactory.createWithScheduler(scheduler)) + .build(); + service = retrofit.create(Service.class); + } + + @Test + public void bodyUsesScheduler() { + server.enqueue(new MockResponse()); + + RecordingSingleObserver observer = observerRule.create(); + service.body().subscribe(observer); + observer.assertNoEvents(); + + scheduler.triggerActions(); + observer.assertAnyValue(); + } + + @Test + public void responseUsesScheduler() { + server.enqueue(new MockResponse()); + + RecordingSingleObserver observer = observerRule.create(); + service.response().subscribe(observer); + observer.assertNoEvents(); + + scheduler.triggerActions(); + observer.assertAnyValue(); + } + + @Test + public void resultUsesScheduler() { + server.enqueue(new MockResponse()); + + RecordingSingleObserver observer = observerRule.create(); + service.result().subscribe(observer); + observer.assertNoEvents(); + + scheduler.triggerActions(); + observer.assertAnyValue(); + } +} diff --git a/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/StringConverterFactory.java b/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/StringConverterFactory.java new file mode 100644 index 0000000000..7db520db54 --- /dev/null +++ b/retrofit-adapters/rxjava3/src/test/java/retrofit2/adapter/rxjava3/StringConverterFactory.java @@ -0,0 +1,41 @@ +/* + * Copyright (C) 2020 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package retrofit2.adapter.rxjava3; + +import java.lang.annotation.Annotation; +import java.lang.reflect.Type; +import okhttp3.MediaType; +import okhttp3.RequestBody; +import okhttp3.ResponseBody; +import retrofit2.Converter; +import retrofit2.Retrofit; + +final class StringConverterFactory extends Converter.Factory { + @Override + public Converter responseBodyConverter( + Type type, Annotation[] annotations, Retrofit retrofit) { + return ResponseBody::string; + } + + @Override + public Converter requestBodyConverter( + Type type, + Annotation[] parameterAnnotations, + Annotation[] methodAnnotations, + Retrofit retrofit) { + return value -> RequestBody.create(MediaType.get("text/plain"), value); + } +} diff --git a/settings.gradle b/settings.gradle index ca235e2372..d7c495e7f5 100644 --- a/settings.gradle +++ b/settings.gradle @@ -8,6 +8,7 @@ include ':retrofit-adapters:guava' include ':retrofit-adapters:java8' include ':retrofit-adapters:rxjava' include ':retrofit-adapters:rxjava2' +include ':retrofit-adapters:rxjava3' include ':retrofit-adapters:scala' include ':retrofit-converters:gson'