Skip to content

Commit

Permalink
Add a network status listener to restart DataStore after the network … (
Browse files Browse the repository at this point in the history
#2148)

* Add a network status listener to restart DataStore after the network comes back online.

* Add Reachability monitor

* working pretty well

* cleanup

* update test

* fix: fix integration test and added logger to integration test (#2143)

* fix: Change order of updating state in local cache

* change order for updating status and add logger to integ tests

* change log level to debug

* Fix for when move to idle state is called twice (#2152)

* Update README.md (#2120)

remove dev-preview APIs note.

* Dengdan stress test (#2153)

* Initial commit

* Work in progress

* finish codes

* change build

* update build

* test excludeStressTest

* Revert "Merge branch 'main' into dengdan-stress-test"

This reverts commit b50840e, reversing
changes made to 3bacf1b.

* remove categories

* remove external changes

* remove external changes

* remove more changes

* Update copyright and refactor

* Update StorageStressTest.kt

* Update StorageStressTest.kt

* Update StorageStressTest.kt

* Update StorageStressTest.kt

* linting

* Update StorageStressTest.kt

* Delete StorageStressTest.kt

* Delete amplifyconfigurationupdated.json

* Delete amplifyconfigurationupdated.json

* Update DataStoreStressTest.kt

* force build

* force build

* force build

* fix typo

* Add a network status listener to restart DataStore after the network comes back online.

* Add Reachability monitor

* working pretty well

* cleanup

* update test

* force build

* force build

* force build

* fix typo

* reply to comments

* Add testImplementation lin eto compile tests correctly in intellij

* reply to comments

* make ReachabilityMonitor expose the observable

* Update datastore plugin to use the reachability monitor

* cleanup

* cleanup

* cleanup

* force tests

Co-authored-by: Michael Schneider <mikschn@amazon.com>
Co-authored-by: Saijad Dhuka <83975678+sdhuka@users.noreply.github.com>
Co-authored-by: gpanshu <91897496+gpanshu@users.noreply.github.com>
Co-authored-by: Divyesh Chitroda <div5yesh@gmail.com>
Co-authored-by: dengdan154 <85711456+dengdan154@users.noreply.github.com>
  • Loading branch information
6 people committed Dec 13, 2022
1 parent 4b21404 commit dfb5b55
Show file tree
Hide file tree
Showing 9 changed files with 314 additions and 6 deletions.
1 change: 1 addition & 0 deletions aws-datastore/build.gradle
Expand Up @@ -31,6 +31,7 @@ dependencies {

testImplementation project(path: ':testmodels')
testImplementation project(path: ':testutils')
testImplementation project(path: ':aws-datastore')
testImplementation dependency.jsonassert
testImplementation dependency.junit
testImplementation dependency.mockito
Expand Down
5 changes: 4 additions & 1 deletion aws-datastore/src/main/AndroidManifest.xml
Expand Up @@ -15,5 +15,8 @@
-->

<manifest package="com.amplifyframework.datastore"
xmlns:android="http://schemas.android.com/apk/res/android" />
xmlns:android="http://schemas.android.com/apk/res/android" >

<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
</manifest>

Expand Up @@ -47,6 +47,7 @@
import com.amplifyframework.datastore.storage.StorageItemChange;
import com.amplifyframework.datastore.storage.sqlite.SQLiteStorageAdapter;
import com.amplifyframework.datastore.syncengine.Orchestrator;
import com.amplifyframework.datastore.syncengine.ReachabilityMonitor;
import com.amplifyframework.hub.HubChannel;
import com.amplifyframework.logging.Logger;

Expand Down Expand Up @@ -90,6 +91,8 @@ public final class AWSDataStorePlugin extends DataStorePlugin<Void> {

private final boolean isSyncRetryEnabled;

private final ReachabilityMonitor reachabilityMonitor;

private AWSDataStorePlugin(
@NonNull ModelProvider modelProvider,
@NonNull SchemaRegistry schemaRegistry,
Expand All @@ -100,6 +103,7 @@ private AWSDataStorePlugin(
this.authModeStrategy = AuthModeStrategyType.DEFAULT;
this.userProvidedConfiguration = userProvidedConfiguration;
this.isSyncRetryEnabled = userProvidedConfiguration != null && userProvidedConfiguration.getDoSyncRetry();
this.reachabilityMonitor = ReachabilityMonitor.Companion.create();
// Used to interrogate plugins, to understand if sync should be automatically turned on
this.orchestrator = new Orchestrator(
modelProvider,
Expand Down Expand Up @@ -130,6 +134,9 @@ private AWSDataStorePlugin(@NonNull Builder builder) throws DataStoreException {
SQLiteStorageAdapter.forModels(schemaRegistry, modelProvider) :
builder.storageAdapter;
this.categoryInitializationsPending = new CountDownLatch(1);
this.reachabilityMonitor = builder.reachabilityMonitor == null ?
ReachabilityMonitor.Companion.create() :
builder.reachabilityMonitor;

// Used to interrogate plugins, to understand if sync should be automatically turned on
this.orchestrator = new Orchestrator(
Expand Down Expand Up @@ -255,6 +262,33 @@ public void configure(
event -> InitializationStatus.SUCCEEDED.toString().equals(event.getName()),
event -> categoryInitializationsPending.countDown()
);

reachabilityMonitor.configure(context);
observeNetworkStatus();
}

/**
* Start the datastore when the network is available, and stop the datastore when it is not
* available.
*/
private void observeNetworkStatus() {
reachabilityMonitor.getObservable()
.doOnNext(networkIsAvailable -> {
if (networkIsAvailable) {
LOG.info("Network available, start datastore");
start(
(Action) () -> { },
((e) -> LOG.error("Error starting datastore plugin after network event: " + e))
);
} else {
LOG.info("Network lost, stop datastore");
stop(
(Action) () -> { },
((e) -> LOG.error("Error stopping datastore plugin after network event: " + e))
);
}
})
.subscribe();
}

@WorkerThread
Expand Down Expand Up @@ -657,6 +691,7 @@ public static final class Builder {
private ApiCategory apiCategory;
private AuthModeStrategyType authModeStrategy;
private LocalStorageAdapter storageAdapter;
private ReachabilityMonitor reachabilityMonitor;
private boolean isSyncRetryEnabled;

private Builder() {}
Expand Down Expand Up @@ -714,6 +749,17 @@ Builder storageAdapter(LocalStorageAdapter storageAdapter) {
return this;
}

/**
* Package-private method to allow for injection of a ReachabilityMonitor for testing purposes.
* @param reachabilityMonitor An instance that implements LocalStorageAdapter.
* @return Current builder instance, for fluent construction of plugin.
*/
@VisibleForTesting
Builder reachabilityMonitor(ReachabilityMonitor reachabilityMonitor) {
this.reachabilityMonitor = reachabilityMonitor;
return this;
}

/**
* Sets the authorization mode strategy which will be used by DataStore sync engine
* when interacting with the API plugin.
Expand Down
@@ -0,0 +1,130 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amplifyframework.datastore.syncengine

import android.content.Context
import android.net.ConnectivityManager
import android.net.ConnectivityManager.NetworkCallback
import android.net.Network
import androidx.annotation.VisibleForTesting
import com.amplifyframework.datastore.DataStoreException
import io.reactivex.rxjava3.core.Observable
import io.reactivex.rxjava3.core.ObservableEmitter
import io.reactivex.rxjava3.core.ObservableOnSubscribe
import java.util.concurrent.TimeUnit

/**
* The ReachabilityMonitor is responsible for watching the network status as provided by the OS.
* It returns an observable that publishes "true" when the network becomes available and "false" when
* the network is lost. It publishes the current status on subscription.
*
* ReachabilityMonitor does not try to monitor the DataStore websockets or the status of the AppSync service.
*
* The network changes are debounced with a 250 ms delay to allow some time for one network to connect after another
* network has disconnected (for example, wifi is lost, then cellular connects) to reduce thrashing.
*/
internal interface ReachabilityMonitor {
fun configure(context: Context)
@VisibleForTesting
fun configure(context: Context, connectivityProvider: ConnectivityProvider)

companion object {
fun create(): ReachabilityMonitor {
return ReachabilityMonitorImpl(ProdSchedulerProvider())
}

fun createForTesting(baseSchedulerProvider: SchedulerProvider): ReachabilityMonitor {
return ReachabilityMonitorImpl(baseSchedulerProvider)
}
}
fun getObservable(): Observable<Boolean>
}

private class ReachabilityMonitorImpl constructor(val schedulerProvider: SchedulerProvider) : ReachabilityMonitor {
private var emitter: ObservableOnSubscribe<Boolean>? = null

override fun configure(context: Context) {
return configure(context, DefaultConnectivityProvider())
}

override fun configure(context: Context, connectivityProvider: ConnectivityProvider) {
emitter = ObservableOnSubscribe { emitter ->
val callback = getCallback(emitter)
connectivityProvider.registerDefaultNetworkCallback(context, callback)
// Provide the current network status upon subscription.
emitter.onNext(connectivityProvider.hasActiveNetwork)
}
}

override fun getObservable(): Observable<Boolean> {
emitter?.let { emitter ->
return Observable.create(emitter)
.subscribeOn(schedulerProvider.io())
.debounce(250, TimeUnit.MILLISECONDS, schedulerProvider.computation())
} ?: run {
throw DataStoreException(
"ReachabilityMonitor has not been configured.",
"Call ReachabilityMonitor.configure() before calling ReachabilityMonitor.getObservable()"
)
}
}

private fun getCallback(emitter: ObservableEmitter<Boolean>): NetworkCallback {
return object : NetworkCallback() {
override fun onAvailable(network: Network) {
emitter.onNext(true)
}
override fun onLost(network: Network) {
emitter.onNext(false)
}
}
}
}

/**
* This interface puts an abstraction layer over ConnectivityManager. Since ConnectivityManager
* is a concrete class created within context.getSystemService() it can't be overridden with a test
* implementation, so this interface works around that issue.
*/
internal interface ConnectivityProvider {
val hasActiveNetwork: Boolean
fun registerDefaultNetworkCallback(context: Context, callback: NetworkCallback)
}

private class DefaultConnectivityProvider : ConnectivityProvider {

private var connectivityManager: ConnectivityManager? = null

override val hasActiveNetwork: Boolean
get() = connectivityManager?.let { it.activeNetwork != null }
?: run {
throw DataStoreException(
"ReachabilityMonitor has not been configured.",
"Call ReachabilityMonitor.configure() before calling ReachabilityMonitor.getObservable()"
)
}

override fun registerDefaultNetworkCallback(context: Context, callback: NetworkCallback) {
connectivityManager = context.getSystemService(ConnectivityManager::class.java)
connectivityManager?.let { it.registerDefaultNetworkCallback(callback) }
?: run {
throw DataStoreException(
"ConnectivityManager not available",
"No recovery suggestion is available"
)
}
}
}
@@ -0,0 +1,34 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amplifyframework.datastore.syncengine

import io.reactivex.rxjava3.android.schedulers.AndroidSchedulers
import io.reactivex.rxjava3.core.Scheduler
import io.reactivex.rxjava3.schedulers.Schedulers

/**
* This interface provides a way to give custom schedulers to RX observables for testing.
*/
interface SchedulerProvider {
fun io(): Scheduler
fun computation(): Scheduler
fun ui(): Scheduler
}

class ProdSchedulerProvider : SchedulerProvider {
override fun computation() = Schedulers.computation()
override fun ui() = AndroidSchedulers.mainThread()
override fun io() = Schedulers.io()
}
Expand Up @@ -192,7 +192,6 @@ public void startInApiMode() throws JSONException, AmplifyException {

dataStoreReadyObserver.await();
subscriptionsEstablishedObserver.await();
networkStatusObserver.await();

assertRemoteSubscriptionsStarted();
}
Expand Down
Expand Up @@ -127,10 +127,17 @@ public void conflictIsResolvedByRetryingLocalData() throws AmplifyException, JSO
Amplify.Hub.publish(HubChannel.DATASTORE, HubEvent.create(InitializationStatus.SUCCEEDED));

// Save person 1
synchronousDataStore.save(person1);
Person result1 = synchronousDataStore.get(Person.class, person1.getId());
assertTrue(latch.await(7, TimeUnit.SECONDS));
assertEquals(person1, result1);
Consumer<DataStoreItemChange<Person>> onSuccess = (Consumer) value -> {
try {
Person result1 = synchronousDataStore.get(Person.class, person1.getId());
assertTrue(latch.await(7, TimeUnit.SECONDS));
assertEquals(person1, result1);
} catch (Exception error) {
throw new RuntimeException(error);
}
};
Consumer<DataStoreException> onError = (Consumer) value -> fail("awsDataStorePlugin.save: onError");
awsDataStorePlugin.save(person1, onSuccess, onError);
}

@SuppressWarnings("unchecked")
Expand Down
@@ -0,0 +1,72 @@
package com.amplifyframework.datastore.syncengine

import android.content.Context
import android.net.ConnectivityManager
import android.net.Network
import com.amplifyframework.datastore.DataStoreException
import io.reactivex.rxjava3.core.BackpressureStrategy
import io.reactivex.rxjava3.schedulers.TestScheduler
import io.reactivex.rxjava3.subscribers.TestSubscriber
import java.util.concurrent.TimeUnit
import org.junit.Test
import org.mockito.Mockito.mock

class ReachabilityMonitorTest {

// Test that calling getObservable() without calling configure() first throws a DataStoreException
@Test(expected = DataStoreException::class)
fun testReachabilityConfigThrowsException() {
ReachabilityMonitor.create().getObservable()
}

// Test that the debounce and the event publishing in ReachabilityMonitor works as expected.
// Events that occur within 250 ms of each other should be debounced so that only the last event
// of the sequence is published.
@Test
fun testReachabilityDebounce() {
var callback: ConnectivityManager.NetworkCallback? = null

val connectivityProvider = object : ConnectivityProvider {
override val hasActiveNetwork: Boolean
get() = run {
return true
}
override fun registerDefaultNetworkCallback(
context: Context,
callback2: ConnectivityManager.NetworkCallback
) {
callback = callback2
}
}

val mockContext = mock(Context::class.java)
// TestScheduler allows the virtual time to be advanced by exact amounts, to allow for repeatable tests
val testScheduler = TestScheduler()
val reachabilityMonitor = ReachabilityMonitor.createForTesting(TestSchedulerProvider(testScheduler))
reachabilityMonitor.configure(mockContext, connectivityProvider)

// TestSubscriber allows for assertions and awaits on the items it observes
val testSubscriber = TestSubscriber<Boolean>()
reachabilityMonitor.getObservable()
// TestSubscriber requires a Flowable
.toFlowable(BackpressureStrategy.BUFFER)
.subscribe(testSubscriber)

val network = mock(Network::class.java)
// Should provide initial network state (true) upon subscription (after debounce)
testScheduler.advanceTimeBy(251, TimeUnit.MILLISECONDS)
callback!!.onAvailable(network)
callback!!.onAvailable(network)
callback!!.onLost(network)
// Should provide false after debounce
testScheduler.advanceTimeBy(251, TimeUnit.MILLISECONDS)
callback!!.onAvailable(network)
// Should provide true after debounce
testScheduler.advanceTimeBy(251, TimeUnit.MILLISECONDS)
callback!!.onAvailable(network)
// Should provide true after debounce
testScheduler.advanceTimeBy(251, TimeUnit.MILLISECONDS)

testSubscriber.assertValues(true, false, true, true)
}
}
@@ -0,0 +1,16 @@
package com.amplifyframework.datastore.syncengine

import io.reactivex.rxjava3.schedulers.Schedulers
import io.reactivex.rxjava3.schedulers.TestScheduler

class TrampolineSchedulerProvider : SchedulerProvider {
override fun computation() = Schedulers.trampoline()
override fun ui() = Schedulers.trampoline()
override fun io() = Schedulers.trampoline()
}

class TestSchedulerProvider(private val scheduler: TestScheduler) : SchedulerProvider {
override fun computation() = scheduler
override fun ui() = scheduler
override fun io() = scheduler
}

0 comments on commit dfb5b55

Please sign in to comment.