diff --git a/aws-datastore/build.gradle b/aws-datastore/build.gradle
index 2ee8e9c673..88bd870920 100644
--- a/aws-datastore/build.gradle
+++ b/aws-datastore/build.gradle
@@ -31,6 +31,7 @@ dependencies {
testImplementation project(path: ':testmodels')
testImplementation project(path: ':testutils')
+ testImplementation project(path: ':aws-datastore')
testImplementation dependency.jsonassert
testImplementation dependency.junit
testImplementation dependency.mockito
diff --git a/aws-datastore/src/main/AndroidManifest.xml b/aws-datastore/src/main/AndroidManifest.xml
index 79fede2b4d..e844fd32e2 100644
--- a/aws-datastore/src/main/AndroidManifest.xml
+++ b/aws-datastore/src/main/AndroidManifest.xml
@@ -15,5 +15,8 @@
-->
+ xmlns:android="http://schemas.android.com/apk/res/android" >
+
+
+
diff --git a/aws-datastore/src/main/java/com/amplifyframework/datastore/AWSDataStorePlugin.java b/aws-datastore/src/main/java/com/amplifyframework/datastore/AWSDataStorePlugin.java
index 7475d0784d..a1088820ca 100644
--- a/aws-datastore/src/main/java/com/amplifyframework/datastore/AWSDataStorePlugin.java
+++ b/aws-datastore/src/main/java/com/amplifyframework/datastore/AWSDataStorePlugin.java
@@ -47,6 +47,7 @@
import com.amplifyframework.datastore.storage.StorageItemChange;
import com.amplifyframework.datastore.storage.sqlite.SQLiteStorageAdapter;
import com.amplifyframework.datastore.syncengine.Orchestrator;
+import com.amplifyframework.datastore.syncengine.ReachabilityMonitor;
import com.amplifyframework.hub.HubChannel;
import com.amplifyframework.logging.Logger;
@@ -90,6 +91,8 @@ public final class AWSDataStorePlugin extends DataStorePlugin {
private final boolean isSyncRetryEnabled;
+ private final ReachabilityMonitor reachabilityMonitor;
+
private AWSDataStorePlugin(
@NonNull ModelProvider modelProvider,
@NonNull SchemaRegistry schemaRegistry,
@@ -100,6 +103,7 @@ private AWSDataStorePlugin(
this.authModeStrategy = AuthModeStrategyType.DEFAULT;
this.userProvidedConfiguration = userProvidedConfiguration;
this.isSyncRetryEnabled = userProvidedConfiguration != null && userProvidedConfiguration.getDoSyncRetry();
+ this.reachabilityMonitor = ReachabilityMonitor.Companion.create();
// Used to interrogate plugins, to understand if sync should be automatically turned on
this.orchestrator = new Orchestrator(
modelProvider,
@@ -130,6 +134,9 @@ private AWSDataStorePlugin(@NonNull Builder builder) throws DataStoreException {
SQLiteStorageAdapter.forModels(schemaRegistry, modelProvider) :
builder.storageAdapter;
this.categoryInitializationsPending = new CountDownLatch(1);
+ this.reachabilityMonitor = builder.reachabilityMonitor == null ?
+ ReachabilityMonitor.Companion.create() :
+ builder.reachabilityMonitor;
// Used to interrogate plugins, to understand if sync should be automatically turned on
this.orchestrator = new Orchestrator(
@@ -255,6 +262,33 @@ public void configure(
event -> InitializationStatus.SUCCEEDED.toString().equals(event.getName()),
event -> categoryInitializationsPending.countDown()
);
+
+ reachabilityMonitor.configure(context);
+ observeNetworkStatus();
+ }
+
+ /**
+ * Start the datastore when the network is available, and stop the datastore when it is not
+ * available.
+ */
+ private void observeNetworkStatus() {
+ reachabilityMonitor.getObservable()
+ .doOnNext(networkIsAvailable -> {
+ if (networkIsAvailable) {
+ LOG.info("Network available, start datastore");
+ start(
+ (Action) () -> { },
+ ((e) -> LOG.error("Error starting datastore plugin after network event: " + e))
+ );
+ } else {
+ LOG.info("Network lost, stop datastore");
+ stop(
+ (Action) () -> { },
+ ((e) -> LOG.error("Error stopping datastore plugin after network event: " + e))
+ );
+ }
+ })
+ .subscribe();
}
@WorkerThread
@@ -657,6 +691,7 @@ public static final class Builder {
private ApiCategory apiCategory;
private AuthModeStrategyType authModeStrategy;
private LocalStorageAdapter storageAdapter;
+ private ReachabilityMonitor reachabilityMonitor;
private boolean isSyncRetryEnabled;
private Builder() {}
@@ -714,6 +749,17 @@ Builder storageAdapter(LocalStorageAdapter storageAdapter) {
return this;
}
+ /**
+ * Package-private method to allow for injection of a ReachabilityMonitor for testing purposes.
+ * @param reachabilityMonitor An instance that implements LocalStorageAdapter.
+ * @return Current builder instance, for fluent construction of plugin.
+ */
+ @VisibleForTesting
+ Builder reachabilityMonitor(ReachabilityMonitor reachabilityMonitor) {
+ this.reachabilityMonitor = reachabilityMonitor;
+ return this;
+ }
+
/**
* Sets the authorization mode strategy which will be used by DataStore sync engine
* when interacting with the API plugin.
diff --git a/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/ReachabilityMonitor.kt b/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/ReachabilityMonitor.kt
new file mode 100644
index 0000000000..b4d8a590a4
--- /dev/null
+++ b/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/ReachabilityMonitor.kt
@@ -0,0 +1,130 @@
+/*
+ * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package com.amplifyframework.datastore.syncengine
+
+import android.content.Context
+import android.net.ConnectivityManager
+import android.net.ConnectivityManager.NetworkCallback
+import android.net.Network
+import androidx.annotation.VisibleForTesting
+import com.amplifyframework.datastore.DataStoreException
+import io.reactivex.rxjava3.core.Observable
+import io.reactivex.rxjava3.core.ObservableEmitter
+import io.reactivex.rxjava3.core.ObservableOnSubscribe
+import java.util.concurrent.TimeUnit
+
+/**
+ * The ReachabilityMonitor is responsible for watching the network status as provided by the OS.
+ * It returns an observable that publishes "true" when the network becomes available and "false" when
+ * the network is lost. It publishes the current status on subscription.
+ *
+ * ReachabilityMonitor does not try to monitor the DataStore websockets or the status of the AppSync service.
+ *
+ * The network changes are debounced with a 250 ms delay to allow some time for one network to connect after another
+ * network has disconnected (for example, wifi is lost, then cellular connects) to reduce thrashing.
+ */
+internal interface ReachabilityMonitor {
+ fun configure(context: Context)
+ @VisibleForTesting
+ fun configure(context: Context, connectivityProvider: ConnectivityProvider)
+
+ companion object {
+ fun create(): ReachabilityMonitor {
+ return ReachabilityMonitorImpl(ProdSchedulerProvider())
+ }
+
+ fun createForTesting(baseSchedulerProvider: SchedulerProvider): ReachabilityMonitor {
+ return ReachabilityMonitorImpl(baseSchedulerProvider)
+ }
+ }
+ fun getObservable(): Observable
+}
+
+private class ReachabilityMonitorImpl constructor(val schedulerProvider: SchedulerProvider) : ReachabilityMonitor {
+ private var emitter: ObservableOnSubscribe? = null
+
+ override fun configure(context: Context) {
+ return configure(context, DefaultConnectivityProvider())
+ }
+
+ override fun configure(context: Context, connectivityProvider: ConnectivityProvider) {
+ emitter = ObservableOnSubscribe { emitter ->
+ val callback = getCallback(emitter)
+ connectivityProvider.registerDefaultNetworkCallback(context, callback)
+ // Provide the current network status upon subscription.
+ emitter.onNext(connectivityProvider.hasActiveNetwork)
+ }
+ }
+
+ override fun getObservable(): Observable {
+ emitter?.let { emitter ->
+ return Observable.create(emitter)
+ .subscribeOn(schedulerProvider.io())
+ .debounce(250, TimeUnit.MILLISECONDS, schedulerProvider.computation())
+ } ?: run {
+ throw DataStoreException(
+ "ReachabilityMonitor has not been configured.",
+ "Call ReachabilityMonitor.configure() before calling ReachabilityMonitor.getObservable()"
+ )
+ }
+ }
+
+ private fun getCallback(emitter: ObservableEmitter): NetworkCallback {
+ return object : NetworkCallback() {
+ override fun onAvailable(network: Network) {
+ emitter.onNext(true)
+ }
+ override fun onLost(network: Network) {
+ emitter.onNext(false)
+ }
+ }
+ }
+}
+
+/**
+ * This interface puts an abstraction layer over ConnectivityManager. Since ConnectivityManager
+ * is a concrete class created within context.getSystemService() it can't be overridden with a test
+ * implementation, so this interface works around that issue.
+ */
+internal interface ConnectivityProvider {
+ val hasActiveNetwork: Boolean
+ fun registerDefaultNetworkCallback(context: Context, callback: NetworkCallback)
+}
+
+private class DefaultConnectivityProvider : ConnectivityProvider {
+
+ private var connectivityManager: ConnectivityManager? = null
+
+ override val hasActiveNetwork: Boolean
+ get() = connectivityManager?.let { it.activeNetwork != null }
+ ?: run {
+ throw DataStoreException(
+ "ReachabilityMonitor has not been configured.",
+ "Call ReachabilityMonitor.configure() before calling ReachabilityMonitor.getObservable()"
+ )
+ }
+
+ override fun registerDefaultNetworkCallback(context: Context, callback: NetworkCallback) {
+ connectivityManager = context.getSystemService(ConnectivityManager::class.java)
+ connectivityManager?.let { it.registerDefaultNetworkCallback(callback) }
+ ?: run {
+ throw DataStoreException(
+ "ConnectivityManager not available",
+ "No recovery suggestion is available"
+ )
+ }
+ }
+}
diff --git a/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/SchedulerProvider.kt b/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/SchedulerProvider.kt
new file mode 100644
index 0000000000..76150847aa
--- /dev/null
+++ b/aws-datastore/src/main/java/com/amplifyframework/datastore/syncengine/SchedulerProvider.kt
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License").
+ * You may not use this file except in compliance with the License.
+ * A copy of the License is located at
+ *
+ * http://aws.amazon.com/apache2.0
+ *
+ * or in the "license" file accompanying this file. This file is distributed
+ * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
+ * express or implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+package com.amplifyframework.datastore.syncengine
+
+import io.reactivex.rxjava3.android.schedulers.AndroidSchedulers
+import io.reactivex.rxjava3.core.Scheduler
+import io.reactivex.rxjava3.schedulers.Schedulers
+
+/**
+ * This interface provides a way to give custom schedulers to RX observables for testing.
+ */
+interface SchedulerProvider {
+ fun io(): Scheduler
+ fun computation(): Scheduler
+ fun ui(): Scheduler
+}
+
+class ProdSchedulerProvider : SchedulerProvider {
+ override fun computation() = Schedulers.computation()
+ override fun ui() = AndroidSchedulers.mainThread()
+ override fun io() = Schedulers.io()
+}
diff --git a/aws-datastore/src/test/java/com/amplifyframework/datastore/AWSDataStorePluginTest.java b/aws-datastore/src/test/java/com/amplifyframework/datastore/AWSDataStorePluginTest.java
index 41c82cad48..011d95b335 100644
--- a/aws-datastore/src/test/java/com/amplifyframework/datastore/AWSDataStorePluginTest.java
+++ b/aws-datastore/src/test/java/com/amplifyframework/datastore/AWSDataStorePluginTest.java
@@ -192,7 +192,6 @@ public void startInApiMode() throws JSONException, AmplifyException {
dataStoreReadyObserver.await();
subscriptionsEstablishedObserver.await();
- networkStatusObserver.await();
assertRemoteSubscriptionsStarted();
}
diff --git a/aws-datastore/src/test/java/com/amplifyframework/datastore/ConflictResolverIntegrationTest.java b/aws-datastore/src/test/java/com/amplifyframework/datastore/ConflictResolverIntegrationTest.java
index 037aa400e5..5058110c8c 100644
--- a/aws-datastore/src/test/java/com/amplifyframework/datastore/ConflictResolverIntegrationTest.java
+++ b/aws-datastore/src/test/java/com/amplifyframework/datastore/ConflictResolverIntegrationTest.java
@@ -127,10 +127,17 @@ public void conflictIsResolvedByRetryingLocalData() throws AmplifyException, JSO
Amplify.Hub.publish(HubChannel.DATASTORE, HubEvent.create(InitializationStatus.SUCCEEDED));
// Save person 1
- synchronousDataStore.save(person1);
- Person result1 = synchronousDataStore.get(Person.class, person1.getId());
- assertTrue(latch.await(7, TimeUnit.SECONDS));
- assertEquals(person1, result1);
+ Consumer> onSuccess = (Consumer) value -> {
+ try {
+ Person result1 = synchronousDataStore.get(Person.class, person1.getId());
+ assertTrue(latch.await(7, TimeUnit.SECONDS));
+ assertEquals(person1, result1);
+ } catch (Exception error) {
+ throw new RuntimeException(error);
+ }
+ };
+ Consumer onError = (Consumer) value -> fail("awsDataStorePlugin.save: onError");
+ awsDataStorePlugin.save(person1, onSuccess, onError);
}
@SuppressWarnings("unchecked")
diff --git a/aws-datastore/src/test/java/com/amplifyframework/datastore/syncengine/ReachabilityMonitorTest.kt b/aws-datastore/src/test/java/com/amplifyframework/datastore/syncengine/ReachabilityMonitorTest.kt
new file mode 100644
index 0000000000..6bccd55700
--- /dev/null
+++ b/aws-datastore/src/test/java/com/amplifyframework/datastore/syncengine/ReachabilityMonitorTest.kt
@@ -0,0 +1,72 @@
+package com.amplifyframework.datastore.syncengine
+
+import android.content.Context
+import android.net.ConnectivityManager
+import android.net.Network
+import com.amplifyframework.datastore.DataStoreException
+import io.reactivex.rxjava3.core.BackpressureStrategy
+import io.reactivex.rxjava3.schedulers.TestScheduler
+import io.reactivex.rxjava3.subscribers.TestSubscriber
+import java.util.concurrent.TimeUnit
+import org.junit.Test
+import org.mockito.Mockito.mock
+
+class ReachabilityMonitorTest {
+
+ // Test that calling getObservable() without calling configure() first throws a DataStoreException
+ @Test(expected = DataStoreException::class)
+ fun testReachabilityConfigThrowsException() {
+ ReachabilityMonitor.create().getObservable()
+ }
+
+ // Test that the debounce and the event publishing in ReachabilityMonitor works as expected.
+ // Events that occur within 250 ms of each other should be debounced so that only the last event
+ // of the sequence is published.
+ @Test
+ fun testReachabilityDebounce() {
+ var callback: ConnectivityManager.NetworkCallback? = null
+
+ val connectivityProvider = object : ConnectivityProvider {
+ override val hasActiveNetwork: Boolean
+ get() = run {
+ return true
+ }
+ override fun registerDefaultNetworkCallback(
+ context: Context,
+ callback2: ConnectivityManager.NetworkCallback
+ ) {
+ callback = callback2
+ }
+ }
+
+ val mockContext = mock(Context::class.java)
+ // TestScheduler allows the virtual time to be advanced by exact amounts, to allow for repeatable tests
+ val testScheduler = TestScheduler()
+ val reachabilityMonitor = ReachabilityMonitor.createForTesting(TestSchedulerProvider(testScheduler))
+ reachabilityMonitor.configure(mockContext, connectivityProvider)
+
+ // TestSubscriber allows for assertions and awaits on the items it observes
+ val testSubscriber = TestSubscriber()
+ reachabilityMonitor.getObservable()
+ // TestSubscriber requires a Flowable
+ .toFlowable(BackpressureStrategy.BUFFER)
+ .subscribe(testSubscriber)
+
+ val network = mock(Network::class.java)
+ // Should provide initial network state (true) upon subscription (after debounce)
+ testScheduler.advanceTimeBy(251, TimeUnit.MILLISECONDS)
+ callback!!.onAvailable(network)
+ callback!!.onAvailable(network)
+ callback!!.onLost(network)
+ // Should provide false after debounce
+ testScheduler.advanceTimeBy(251, TimeUnit.MILLISECONDS)
+ callback!!.onAvailable(network)
+ // Should provide true after debounce
+ testScheduler.advanceTimeBy(251, TimeUnit.MILLISECONDS)
+ callback!!.onAvailable(network)
+ // Should provide true after debounce
+ testScheduler.advanceTimeBy(251, TimeUnit.MILLISECONDS)
+
+ testSubscriber.assertValues(true, false, true, true)
+ }
+}
diff --git a/aws-datastore/src/test/java/com/amplifyframework/datastore/syncengine/TestSchedulerProvider.kt b/aws-datastore/src/test/java/com/amplifyframework/datastore/syncengine/TestSchedulerProvider.kt
new file mode 100644
index 0000000000..4b4c386981
--- /dev/null
+++ b/aws-datastore/src/test/java/com/amplifyframework/datastore/syncengine/TestSchedulerProvider.kt
@@ -0,0 +1,16 @@
+package com.amplifyframework.datastore.syncengine
+
+import io.reactivex.rxjava3.schedulers.Schedulers
+import io.reactivex.rxjava3.schedulers.TestScheduler
+
+class TrampolineSchedulerProvider : SchedulerProvider {
+ override fun computation() = Schedulers.trampoline()
+ override fun ui() = Schedulers.trampoline()
+ override fun io() = Schedulers.trampoline()
+}
+
+class TestSchedulerProvider(private val scheduler: TestScheduler) : SchedulerProvider {
+ override fun computation() = scheduler
+ override fun ui() = scheduler
+ override fun io() = scheduler
+}