Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(feat) Add a network status listener to restart DataStore after the network comes online #2148

Merged
merged 53 commits into from Dec 13, 2022
Merged
Show file tree
Hide file tree
Changes from 51 commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
2960fa9
Add a network status listener to restart DataStore after the network …
mikschn-aws Dec 1, 2022
507ac0d
Add Reachability monitor
mikschn-aws Dec 2, 2022
2874238
working pretty well
mikschn-aws Dec 5, 2022
22ff720
cleanup
mikschn-aws Dec 5, 2022
099c2f1
update test
mikschn-aws Dec 5, 2022
d57b50a
fix: fix integration test and added logger to integration test (#2143)
sdhuka Dec 1, 2022
e0a9f16
Fix for when move to idle state is called twice (#2152)
gpanshu Dec 2, 2022
0a45371
Update README.md (#2120)
div5yesh Dec 2, 2022
0a2e0f9
Dengdan stress test (#2153)
dengdan154 Dec 3, 2022
7d17ae2
Merge branch 'main' into mikschn/restart-network
mikepschneider Dec 5, 2022
c35ef39
force build
mikschn-aws Dec 5, 2022
f94d0df
force build
mikschn-aws Dec 5, 2022
19a5475
force build
mikschn-aws Dec 5, 2022
5840f86
fix typo
mikschn-aws Dec 5, 2022
fe760fd
Add a network status listener to restart DataStore after the network …
mikschn-aws Dec 1, 2022
c62cab1
Add Reachability monitor
mikschn-aws Dec 2, 2022
9d9d2be
working pretty well
mikschn-aws Dec 5, 2022
a25b9a4
cleanup
mikschn-aws Dec 5, 2022
1b02840
update test
mikschn-aws Dec 5, 2022
8aa659d
force build
mikschn-aws Dec 5, 2022
b04e1af
force build
mikschn-aws Dec 5, 2022
d921cf1
force build
mikschn-aws Dec 5, 2022
aed7746
fix typo
mikschn-aws Dec 5, 2022
2f87711
reply to comments
mikschn-aws Dec 6, 2022
503f2de
Add testImplementation lin eto compile tests correctly in intellij
mikschn-aws Dec 8, 2022
3b5b01f
reply to comments
mikschn-aws Dec 8, 2022
9f5fa8a
make ReachabilityMonitor expose the observable
mikschn-aws Dec 8, 2022
1d7768e
merge from main
mikschn-aws Dec 8, 2022
99403cf
Update datastore plugin to use the reachability monitor
mikschn-aws Dec 8, 2022
dd069b0
cleanup
mikschn-aws Dec 9, 2022
83e1df2
cleanup
mikschn-aws Dec 9, 2022
5441395
cleanup
mikschn-aws Dec 9, 2022
9f2d67f
force tests
mikschn-aws Dec 9, 2022
5318ce7
Merge branch 'main' into mikschn/restart-network
mikepschneider Dec 9, 2022
d46f578
cleanup
mikschn-aws Dec 9, 2022
98891ae
cleanup
mikschn-aws Dec 9, 2022
373daf3
force tests
mikschn-aws Dec 9, 2022
6ef37f0
force tests
mikschn-aws Dec 9, 2022
23be191
force tests
mikschn-aws Dec 9, 2022
eb621b5
force tests
mikschn-aws Dec 9, 2022
736a8fe
force tests
mikschn-aws Dec 10, 2022
cf8cfe8
force tests
mikschn-aws Dec 11, 2022
353878b
Merge branch 'main' into mikschn/restart-network
mikepschneider Dec 12, 2022
7eb4304
add NETWORK_STATUS messages back to make integration tests pass
mikschn-aws Dec 12, 2022
fbee0b7
fix typo
mikschn-aws Dec 12, 2022
52c0afa
ignore testIdentifyUserWithUserAttributes, see https://github.com/aws…
mikschn-aws Dec 12, 2022
a3dc0c2
cleanup imports
mikschn-aws Dec 12, 2022
201cb2a
force tests
mikschn-aws Dec 12, 2022
905561b
force tests
mikschn-aws Dec 12, 2022
0b30c7a
remove comments
mikschn-aws Dec 12, 2022
be87fed
cleanup
mikschn-aws Dec 12, 2022
526e72a
remove use of synchronous datastore in a test
mikschn-aws Dec 13, 2022
610cf87
force tests
mikschn-aws Dec 13, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions aws-datastore/build.gradle
Expand Up @@ -31,6 +31,7 @@ dependencies {

testImplementation project(path: ':testmodels')
testImplementation project(path: ':testutils')
testImplementation project(path: ':aws-datastore')
testImplementation dependency.jsonassert
testImplementation dependency.junit
testImplementation dependency.mockito
Expand Down
5 changes: 4 additions & 1 deletion aws-datastore/src/main/AndroidManifest.xml
Expand Up @@ -15,5 +15,8 @@
-->

<manifest package="com.amplifyframework.datastore"
xmlns:android="http://schemas.android.com/apk/res/android" />
xmlns:android="http://schemas.android.com/apk/res/android" >

<uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
</manifest>

Expand Up @@ -47,6 +47,7 @@
import com.amplifyframework.datastore.storage.StorageItemChange;
import com.amplifyframework.datastore.storage.sqlite.SQLiteStorageAdapter;
import com.amplifyframework.datastore.syncengine.Orchestrator;
import com.amplifyframework.datastore.syncengine.ReachabilityMonitor;
import com.amplifyframework.hub.HubChannel;
import com.amplifyframework.logging.Logger;

Expand Down Expand Up @@ -90,6 +91,8 @@ public final class AWSDataStorePlugin extends DataStorePlugin<Void> {

private final boolean isSyncRetryEnabled;

private final ReachabilityMonitor reachabilityMonitor;

private AWSDataStorePlugin(
@NonNull ModelProvider modelProvider,
@NonNull SchemaRegistry schemaRegistry,
Expand All @@ -100,6 +103,7 @@ private AWSDataStorePlugin(
this.authModeStrategy = AuthModeStrategyType.DEFAULT;
this.userProvidedConfiguration = userProvidedConfiguration;
this.isSyncRetryEnabled = userProvidedConfiguration != null && userProvidedConfiguration.getDoSyncRetry();
this.reachabilityMonitor = ReachabilityMonitor.Companion.create();
// Used to interrogate plugins, to understand if sync should be automatically turned on
this.orchestrator = new Orchestrator(
modelProvider,
Expand Down Expand Up @@ -130,6 +134,9 @@ private AWSDataStorePlugin(@NonNull Builder builder) throws DataStoreException {
SQLiteStorageAdapter.forModels(schemaRegistry, modelProvider) :
builder.storageAdapter;
this.categoryInitializationsPending = new CountDownLatch(1);
this.reachabilityMonitor = builder.reachabilityMonitor == null ?
ReachabilityMonitor.Companion.create() :
builder.reachabilityMonitor;

// Used to interrogate plugins, to understand if sync should be automatically turned on
this.orchestrator = new Orchestrator(
Expand Down Expand Up @@ -255,6 +262,33 @@ public void configure(
event -> InitializationStatus.SUCCEEDED.toString().equals(event.getName()),
event -> categoryInitializationsPending.countDown()
);

reachabilityMonitor.configure(context);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is configure needed, or can this just be done in the init of the ReachabilityMonitor? Side effect right now of forgetting configure is a crash, and I don't see too much of a reason not just to allow class to configure itself since it holds on to the current network status to pass to a newly attached observer.

observeNetworkStatus();
}

/**
* Start the datastore when the network is available, and stop the datastore when it is not
* available.
*/
private void observeNetworkStatus() {
reachabilityMonitor.getObservable()
.doOnNext(networkIsAvailable -> {
if (networkIsAvailable) {
LOG.info("Network available, start datastore");
start(
(Action) () -> { },
((e) -> LOG.error("Error starting datastore plugin after network event: " + e))
);
} else {
LOG.info("Network lost, stop datastore");
stop(
(Action) () -> { },
((e) -> LOG.error("Error stopping datastore plugin after network event: " + e))
);
}
})
.subscribe();
}

@WorkerThread
Expand Down Expand Up @@ -657,6 +691,7 @@ public static final class Builder {
private ApiCategory apiCategory;
private AuthModeStrategyType authModeStrategy;
private LocalStorageAdapter storageAdapter;
private ReachabilityMonitor reachabilityMonitor;
private boolean isSyncRetryEnabled;

private Builder() {}
Expand Down Expand Up @@ -714,6 +749,17 @@ Builder storageAdapter(LocalStorageAdapter storageAdapter) {
return this;
}

/**
* Package-private method to allow for injection of a ReachabilityMonitor for testing purposes.
* @param reachabilityMonitor An instance that implements LocalStorageAdapter.
* @return Current builder instance, for fluent construction of plugin.
*/
@VisibleForTesting
Builder reachabilityMonitor(ReachabilityMonitor reachabilityMonitor) {
this.reachabilityMonitor = reachabilityMonitor;
return this;
}

/**
* Sets the authorization mode strategy which will be used by DataStore sync engine
* when interacting with the API plugin.
Expand Down
@@ -0,0 +1,130 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package com.amplifyframework.datastore.syncengine

import android.content.Context
import android.net.ConnectivityManager
import android.net.ConnectivityManager.NetworkCallback
import android.net.Network
import androidx.annotation.VisibleForTesting
import com.amplifyframework.datastore.DataStoreException
import io.reactivex.rxjava3.core.Observable
import io.reactivex.rxjava3.core.ObservableEmitter
import io.reactivex.rxjava3.core.ObservableOnSubscribe
import java.util.concurrent.TimeUnit

/**
* The ReachabilityMonitor is responsible for watching the network status as provided by the OS.
* It returns an observable that publishes "true" when the network becomes available and "false" when
* the network is lost. It publishes the current status on subscription.
*
* ReachabilityMonitor does not try to monitor the DataStore websockets or the status of the AppSync service.
*
* The network changes are debounced with a 250 ms delay to allow some time for one network to connect after another
* network has disconnected (for example, wifi is lost, then cellular connects) to reduce thrashing.
*/
internal interface ReachabilityMonitor {
fun configure(context: Context)
@VisibleForTesting
fun configure(context: Context, connectivityProvider: ConnectivityProvider)

companion object {
fun create(): ReachabilityMonitor {
return ReachabilityMonitorImpl(ProdSchedulerProvider())
}

fun createForTesting(baseSchedulerProvider: SchedulerProvider): ReachabilityMonitor {
return ReachabilityMonitorImpl(baseSchedulerProvider)
}
}
fun getObservable(): Observable<Boolean>
}

private class ReachabilityMonitorImpl constructor(val schedulerProvider: SchedulerProvider) : ReachabilityMonitor {
private var emitter: ObservableOnSubscribe<Boolean>? = null

override fun configure(context: Context) {
return configure(context, DefaultConnectivityProvider())
}

override fun configure(context: Context, connectivityProvider: ConnectivityProvider) {
emitter = ObservableOnSubscribe { emitter ->
val callback = getCallback(emitter)
connectivityProvider.registerDefaultNetworkCallback(context, callback)
// Provide the current network status upon subscription.
emitter.onNext(connectivityProvider.hasActiveNetwork)
}
}

override fun getObservable(): Observable<Boolean> {
emitter?.let { emitter ->
return Observable.create(emitter)
.subscribeOn(schedulerProvider.io())
.debounce(250, TimeUnit.MILLISECONDS, schedulerProvider.computation())
} ?: run {
throw DataStoreException(
"ReachabilityMonitor has not been configured.",
"Call ReachabilityMonitor.configure() before calling ReachabilityMonitor.getObservable()"
)
}
}

private fun getCallback(emitter: ObservableEmitter<Boolean>): NetworkCallback {
return object : NetworkCallback() {
override fun onAvailable(network: Network) {
emitter.onNext(true)
}
override fun onLost(network: Network) {
emitter.onNext(false)
}
}
}
}

/**
* This interface puts an abstraction layer over ConnectivityManager. Since ConnectivityManager
* is a concrete class created within context.getSystemService() it can't be overridden with a test
* implementation, so this interface works around that issue.
*/
internal interface ConnectivityProvider {
val hasActiveNetwork: Boolean
fun registerDefaultNetworkCallback(context: Context, callback: NetworkCallback)
}

private class DefaultConnectivityProvider : ConnectivityProvider {

private var connectivityManager: ConnectivityManager? = null

override val hasActiveNetwork: Boolean
get() = connectivityManager?.let { it.activeNetwork != null }
?: run {
throw DataStoreException(
"ReachabilityMonitor has not been configured.",
"Call ReachabilityMonitor.configure() before calling ReachabilityMonitor.getObservable()"
)
}

override fun registerDefaultNetworkCallback(context: Context, callback: NetworkCallback) {
connectivityManager = context.getSystemService(ConnectivityManager::class.java)
connectivityManager?.let { it.registerDefaultNetworkCallback(callback) }
?: run {
throw DataStoreException(
"ConnectivityManager not available",
"No recovery suggestion is available"
)
}
}
}
@@ -0,0 +1,34 @@
/*
* Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package com.amplifyframework.datastore.syncengine

import io.reactivex.rxjava3.android.schedulers.AndroidSchedulers
import io.reactivex.rxjava3.core.Scheduler
import io.reactivex.rxjava3.schedulers.Schedulers

/**
* This interface provides a way to give custom schedulers to RX observables for testing.
*/
interface SchedulerProvider {
fun io(): Scheduler
fun computation(): Scheduler
fun ui(): Scheduler
}

class ProdSchedulerProvider : SchedulerProvider {
override fun computation() = Schedulers.computation()
override fun ui() = AndroidSchedulers.mainThread()
override fun io() = Schedulers.io()
}
Expand Up @@ -192,7 +192,6 @@ public void startInApiMode() throws JSONException, AmplifyException {

dataStoreReadyObserver.await();
subscriptionsEstablishedObserver.await();
networkStatusObserver.await();

assertRemoteSubscriptionsStarted();
}
Expand Down
@@ -0,0 +1,72 @@
package com.amplifyframework.datastore.syncengine

import android.content.Context
import android.net.ConnectivityManager
import android.net.Network
import com.amplifyframework.datastore.DataStoreException
import io.reactivex.rxjava3.core.BackpressureStrategy
import io.reactivex.rxjava3.schedulers.TestScheduler
import io.reactivex.rxjava3.subscribers.TestSubscriber
import java.util.concurrent.TimeUnit
import org.junit.Test
import org.mockito.Mockito.mock

class ReachabilityMonitorTest {

// Test that calling getObservable() without calling configure() first throws a DataStoreException
@Test(expected = DataStoreException::class)
fun testReachabilityConfigThrowsException() {
ReachabilityMonitor.create().getObservable()
}

// Test that the debounce and the event publishing in ReachabilityMonitor works as expected.
// Events that occur within 250 ms of each other should be debounced so that only the last event
// of the sequence is published.
@Test
fun testReachabilityDebounce() {
var callback: ConnectivityManager.NetworkCallback? = null

val connectivityProvider = object : ConnectivityProvider {
override val hasActiveNetwork: Boolean
get() = run {
return true
}
override fun registerDefaultNetworkCallback(
context: Context,
callback2: ConnectivityManager.NetworkCallback
) {
callback = callback2
}
}

val mockContext = mock(Context::class.java)
// TestScheduler allows the virtual time to be advanced by exact amounts, to allow for repeatable tests
val testScheduler = TestScheduler()
val reachabilityMonitor = ReachabilityMonitor.createForTesting(TestSchedulerProvider(testScheduler))
reachabilityMonitor.configure(mockContext, connectivityProvider)

// TestSubscriber allows for assertions and awaits on the items it observes
val testSubscriber = TestSubscriber<Boolean>()
reachabilityMonitor.getObservable()
// TestSubscriber requires a Flowable
.toFlowable(BackpressureStrategy.BUFFER)
.subscribe(testSubscriber)

val network = mock(Network::class.java)
// Should provide initial network state (true) upon subscription (after debounce)
testScheduler.advanceTimeBy(251, TimeUnit.MILLISECONDS)
callback!!.onAvailable(network)
callback!!.onAvailable(network)
callback!!.onLost(network)
// Should provide false after debounce
testScheduler.advanceTimeBy(251, TimeUnit.MILLISECONDS)
callback!!.onAvailable(network)
// Should provide true after debounce
testScheduler.advanceTimeBy(251, TimeUnit.MILLISECONDS)
callback!!.onAvailable(network)
// Should provide true after debounce
testScheduler.advanceTimeBy(251, TimeUnit.MILLISECONDS)

testSubscriber.assertValues(true, false, true, true)
}
}
@@ -0,0 +1,16 @@
package com.amplifyframework.datastore.syncengine

import io.reactivex.rxjava3.schedulers.Schedulers
import io.reactivex.rxjava3.schedulers.TestScheduler

class TrampolineSchedulerProvider : SchedulerProvider {
override fun computation() = Schedulers.trampoline()
override fun ui() = Schedulers.trampoline()
override fun io() = Schedulers.trampoline()
}

class TestSchedulerProvider(private val scheduler: TestScheduler) : SchedulerProvider {
override fun computation() = scheduler
override fun ui() = scheduler
override fun io() = scheduler
}