Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Direct retries to another mongos if one is available #1367

Merged
merged 23 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
e95ca0b
Direct retries to another mongos if one is available
stIncMale Apr 14, 2024
d1c1ed2
Implement specification prose tests
stIncMale Apr 17, 2024
0875e5d
Fix a typo
stIncMale Apr 17, 2024
6fe3db1
Expect `MongoServerException` instead of just `RuntimeException`
stIncMale Apr 18, 2024
7cba88b
Fix `ServerDeprioritization.onAttemptFailure`
stIncMale Apr 30, 2024
da6d111
Merge branch 'master' into JAVA-4254
stIncMale Apr 30, 2024
e4ffab4
Replace BiFunction with BinaryOperator
stIncMale Apr 30, 2024
29ebd76
Update driver-core/src/main/com/mongodb/internal/connection/Operation…
stIncMale May 2, 2024
6574e24
Trivial code improvements
stIncMale May 2, 2024
171d67e
Update the docs of the internal API for retries to reflect support of…
stIncMale May 3, 2024
4d883c1
Merge branch 'master' into JAVA-4254
stIncMale May 3, 2024
d25010d
Refactor the way `BaseCluster.selectServer` deals with the race condi…
stIncMale May 6, 2024
cc8021e
Refactor the server selection logic that is implemented not as a `Ser…
stIncMale May 7, 2024
b3430bd
Implement `DeprioritizingSelector` strictly according to the spec req…
stIncMale May 7, 2024
16df106
Merge branch 'master' into JAVA-4254
stIncMale May 7, 2024
04880c7
Do minor touches
stIncMale May 7, 2024
c1e9e4e
Update the documentation of `ClusterSettings.getServerSelector`
stIncMale May 7, 2024
246353f
Address review concerns
stIncMale May 8, 2024
69b78ff
Implement the proposed code simplification
stIncMale May 8, 2024
cbb1938
Link from `BaseClusterTest` back to `BaseClusterSpecification`
stIncMale May 8, 2024
05f6679
Simplify `MinimumOperationCountServerSelector.select`
stIncMale May 23, 2024
d55c44f
Extract `raceConditionPreFiltering` into a separate method and remove…
stIncMale May 24, 2024
7debec0
Merge branch 'master' into JAVA-4254
stIncMale May 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 8 additions & 5 deletions driver-core/src/main/com/mongodb/connection/ClusterSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -468,16 +468,18 @@ public String getRequiredReplicaSetName() {
*
* <p>The server selector augments the normal server selection rules applied by the driver when determining
* which server to send an operation to. At the point that it's called by the driver, the
* {@link com.mongodb.connection.ClusterDescription} which is passed to it contains a list of
* {@link com.mongodb.connection.ServerDescription} instances which satisfy either the configured {@link com.mongodb.ReadPreference}
* for any read operation or ones that can take writes (e.g. a standalone, mongos, or replica set primary).
* {@link ClusterDescription} which is passed to it {@linkplain ClusterDescription#getServerDescriptions() contains} a list of
* {@link ServerDescription} instances which satisfy either the configured {@link com.mongodb.ReadPreference}
* for any read operation or ones that can take writes (e.g. a standalone, mongos, or replica set primary),
* barring those corresponding to servers that the driver considers unavailable or potentially problematic.
* </p>
* <p>The server selector can then filter the {@code ServerDescription} list using whatever criteria that is required by the
* application.</p>
* <p>After this selector executes, two additional selectors are applied by the driver:</p>
* <p>After this selector executes, three additional selectors are applied by the driver:</p>
* <ul>
* <li>select from within the latency window</li>
* <li>select a random server from those remaining</li>
* <li>select at most two random servers from those remaining</li>
* <li>select the one with fewer outstanding concurrent operations</li>
* </ul>
* <p>To skip the latency window selector, an application can:</p>
* <ul>
Expand All @@ -486,6 +488,7 @@ public String getRequiredReplicaSetName() {
* </ul>
*
* @return the server selector, which may be null
* @see Builder#serverSelector(ServerSelector)
*/
@Nullable
public ServerSelector getServerSelector() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,32 +78,33 @@ public RetryState() {
* which is usually synchronous code.
*
* @param attemptException The exception produced by the most recent attempt.
* It is passed to the {@code retryPredicate} and to the {@code exceptionTransformer}.
* @param exceptionTransformer A function that chooses which exception to preserve as a prospective failed result of the associated
* retryable activity and may also transform or mutate the exceptions.
* The choice is between
* It is passed to the {@code retryPredicate} and to the {@code onAttemptFailureOperator}.
* @param onAttemptFailureOperator The action that is called once per failed attempt before (in the happens-before order) the
* {@code retryPredicate}, regardless of whether the {@code retryPredicate} is called.
* This action is allowed to have side effects.
* <p>
* It also has to choose which exception to preserve as a prospective failed result of the associated retryable activity.
* The {@code onAttemptFailureOperator} may mutate its arguments, choose from the arguments, or return a different exception,
* but it must return a {@code @}{@link NonNull} value.
* The choice is between</p>
* <ul>
* <li>the previously chosen exception or {@code null} if none has been chosen
* (the first argument of the {@code exceptionTransformer})</li>
* <li>and the exception from the most recent attempt (the second argument of the {@code exceptionTransformer}).</li>
* (the first argument of the {@code onAttemptFailureOperator})</li>
* <li>and the exception from the most recent attempt (the second argument of the {@code onAttemptFailureOperator}).</li>
* </ul>
* The {@code exceptionTransformer} may either choose from its arguments, or return a different exception, a.k.a. transform,
* but it must return a {@code @}{@link NonNull} value.
* The {@code exceptionTransformer} is called once before (in the happens-before order) the {@code retryPredicate},
* regardless of whether the {@code retryPredicate} is called. The result of the {@code exceptionTransformer} does not affect
* what exception is passed to the {@code retryPredicate}.
* The result of the {@code onAttemptFailureOperator} does not affect the exception passed to the {@code retryPredicate}.
* @param retryPredicate {@code true} iff another attempt needs to be made. The {@code retryPredicate} is called not more than once
* per attempt and only if all the following is true:
* <ul>
* <li>{@code exceptionTransformer} completed normally;</li>
* <li>{@code onAttemptFailureOperator} completed normally;</li>
* <li>the most recent attempt is not the {@linkplain #isLastAttempt() last} one.</li>
* </ul>
* The {@code retryPredicate} accepts this {@link RetryState} and the exception from the most recent attempt,
* and may mutate the exception. The {@linkplain RetryState} advances to represent the state of a new attempt
* after (in the happens-before order) testing the {@code retryPredicate}, and only if the predicate completes normally.
* @throws RuntimeException Iff any of the following is true:
* <ul>
* <li>the {@code exceptionTransformer} completed abruptly;</li>
* <li>the {@code onAttemptFailureOperator} completed abruptly;</li>
* <li>the most recent attempt is the {@linkplain #isLastAttempt() last} one;</li>
* <li>the {@code retryPredicate} completed abruptly;</li>
* <li>the {@code retryPredicate} is {@code false}.</li>
Expand All @@ -112,10 +113,10 @@ public RetryState() {
* i.e., the caller must not do any more attempts.
* @see #advanceOrThrow(Throwable, BinaryOperator, BiPredicate)
*/
void advanceOrThrow(final RuntimeException attemptException, final BinaryOperator<Throwable> exceptionTransformer,
void advanceOrThrow(final RuntimeException attemptException, final BinaryOperator<Throwable> onAttemptFailureOperator,
final BiPredicate<RetryState, Throwable> retryPredicate) throws RuntimeException {
try {
doAdvanceOrThrow(attemptException, exceptionTransformer, retryPredicate, true);
doAdvanceOrThrow(attemptException, onAttemptFailureOperator, retryPredicate, true);
} catch (RuntimeException | Error unchecked) {
throw unchecked;
} catch (Throwable checked) {
Expand All @@ -129,18 +130,19 @@ void advanceOrThrow(final RuntimeException attemptException, final BinaryOperato
*
* @see #advanceOrThrow(RuntimeException, BinaryOperator, BiPredicate)
*/
void advanceOrThrow(final Throwable attemptException, final BinaryOperator<Throwable> exceptionTransformer,
void advanceOrThrow(final Throwable attemptException, final BinaryOperator<Throwable> onAttemptFailureOperator,
final BiPredicate<RetryState, Throwable> retryPredicate) throws Throwable {
doAdvanceOrThrow(attemptException, exceptionTransformer, retryPredicate, false);
doAdvanceOrThrow(attemptException, onAttemptFailureOperator, retryPredicate, false);
}

/**
* @param onlyRuntimeExceptions {@code true} iff the method must expect {@link #exception} and {@code attemptException} to be
* {@link RuntimeException}s and must not explicitly handle other {@link Throwable} types, of which only {@link Error} is possible
* as {@link RetryState} does not have any source of {@link Exception}s.
* @param onAttemptFailureOperator See {@link #advanceOrThrow(RuntimeException, BinaryOperator, BiPredicate)}.
*/
private void doAdvanceOrThrow(final Throwable attemptException,
final BinaryOperator<Throwable> exceptionTransformer,
final BinaryOperator<Throwable> onAttemptFailureOperator,
final BiPredicate<RetryState, Throwable> retryPredicate,
final boolean onlyRuntimeExceptions) throws Throwable {
assertTrue(attempt() < attempts);
Expand All @@ -149,7 +151,7 @@ private void doAdvanceOrThrow(final Throwable attemptException,
assertTrue(isRuntime(attemptException));
}
assertTrue(!isFirstAttempt() || exception == null);
Throwable newlyChosenException = transformException(exception, attemptException, onlyRuntimeExceptions, exceptionTransformer);
Throwable newlyChosenException = callOnAttemptFailureOperator(exception, attemptException, onlyRuntimeExceptions, onAttemptFailureOperator);
if (isLastAttempt()) {
exception = newlyChosenException;
throw exception;
Expand All @@ -167,27 +169,31 @@ private void doAdvanceOrThrow(final Throwable attemptException,

/**
* @param onlyRuntimeExceptions See {@link #doAdvanceOrThrow(Throwable, BinaryOperator, BiPredicate, boolean)}.
* @param onAttemptFailureOperator See {@link #advanceOrThrow(RuntimeException, BinaryOperator, BiPredicate)}.
*/
private static Throwable transformException(@Nullable final Throwable previouslyChosenException, final Throwable attemptException,
final boolean onlyRuntimeExceptions, final BinaryOperator<Throwable> exceptionTransformer) {
private static Throwable callOnAttemptFailureOperator(
@Nullable final Throwable previouslyChosenException,
final Throwable attemptException,
final boolean onlyRuntimeExceptions,
final BinaryOperator<Throwable> onAttemptFailureOperator) {
if (onlyRuntimeExceptions && previouslyChosenException != null) {
assertTrue(isRuntime(previouslyChosenException));
}
Throwable result;
try {
result = assertNotNull(exceptionTransformer.apply(previouslyChosenException, attemptException));
result = assertNotNull(onAttemptFailureOperator.apply(previouslyChosenException, attemptException));
if (onlyRuntimeExceptions) {
assertTrue(isRuntime(result));
}
} catch (Throwable exceptionTransformerException) {
if (onlyRuntimeExceptions && !isRuntime(exceptionTransformerException)) {
throw exceptionTransformerException;
} catch (Throwable onAttemptFailureOperatorException) {
if (onlyRuntimeExceptions && !isRuntime(onAttemptFailureOperatorException)) {
throw onAttemptFailureOperatorException;
}
if (previouslyChosenException != null) {
exceptionTransformerException.addSuppressed(previouslyChosenException);
onAttemptFailureOperatorException.addSuppressed(previouslyChosenException);
}
exceptionTransformerException.addSuppressed(attemptException);
throw exceptionTransformerException;
onAttemptFailureOperatorException.addSuppressed(attemptException);
throw onAttemptFailureOperatorException;
}
return result;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,31 +41,34 @@
public final class RetryingAsyncCallbackSupplier<R> implements AsyncCallbackSupplier<R> {
private final RetryState state;
private final BiPredicate<RetryState, Throwable> retryPredicate;
private final BinaryOperator<Throwable> failedResultTransformer;
private final BinaryOperator<Throwable> onAttemptFailureOperator;
private final AsyncCallbackSupplier<R> asyncFunction;

/**
* @param state The {@link RetryState} to be deemed as initial for the purpose of the new {@link RetryingAsyncCallbackSupplier}.
* @param failedResultTransformer A function that chooses which failed result of the {@code asyncFunction} to preserve as a prospective
* failed result of this {@link RetryingAsyncCallbackSupplier} and may also transform or mutate the exceptions.
* The choice is between
* @param onAttemptFailureOperator The action that is called once per failed attempt before (in the happens-before order) the
* {@code retryPredicate}, regardless of whether the {@code retryPredicate} is called.
* This action is allowed to have side effects.
* <p>
* It also has to choose which exception to preserve as a prospective failed result of this {@link RetryingAsyncCallbackSupplier}.
* The {@code onAttemptFailureOperator} may mutate its arguments, choose from the arguments, or return a different exception,
* but it must return a {@code @}{@link NonNull} value.
* The choice is between</p>
* <ul>
* <li>the previously chosen failed result or {@code null} if none has been chosen
* (the first argument of the {@code failedResultTransformer})</li>
* <li>and the failed result from the most recent attempt (the second argument of the {@code failedResultTransformer}).</li>
* (the first argument of the {@code onAttemptFailureOperator})</li>
* <li>and the failed result from the most recent attempt (the second argument of the {@code onAttemptFailureOperator}).</li>
* </ul>
* The {@code failedResultTransformer} may either choose from its arguments, or return a different exception, a.k.a. transform,
* but it must return a {@code @}{@link NonNull} value.
* If it completes abruptly, then the {@code asyncFunction} cannot be retried and the exception thrown by
* the {@code failedResultTransformer} is used as a failed result of this {@link RetryingAsyncCallbackSupplier}.
* The {@code failedResultTransformer} is called before (in the happens-before order) the {@code retryPredicate}.
* The result of the {@code failedResultTransformer} does not affect what exception is passed to the {@code retryPredicate}.
* The result of the {@code onAttemptFailureOperator} does not affect the exception passed to the {@code retryPredicate}.
* <p>
* If {@code onAttemptFailureOperator} completes abruptly, then the {@code asyncFunction} cannot be retried and the exception thrown by
* the {@code onAttemptFailureOperator} is used as a failed result of this {@link RetryingAsyncCallbackSupplier}.</p>
* @param retryPredicate {@code true} iff another attempt needs to be made. If it completes abruptly,
* then the {@code asyncFunction} cannot be retried and the exception thrown by the {@code retryPredicate}
* is used as a failed result of this {@link RetryingAsyncCallbackSupplier}. The {@code retryPredicate} is called not more than once
* per attempt and only if all the following is true:
* <ul>
* <li>{@code failedResultTransformer} completed normally;</li>
* <li>{@code onAttemptFailureOperator} completed normally;</li>
* <li>the most recent attempt is not the {@linkplain RetryState#isLastAttempt() last} one.</li>
* </ul>
* The {@code retryPredicate} accepts this {@link RetryState} and the exception from the most recent attempt,
Expand All @@ -75,12 +78,12 @@ public final class RetryingAsyncCallbackSupplier<R> implements AsyncCallbackSupp
*/
public RetryingAsyncCallbackSupplier(
final RetryState state,
final BinaryOperator<Throwable> failedResultTransformer,
final BinaryOperator<Throwable> onAttemptFailureOperator,
final BiPredicate<RetryState, Throwable> retryPredicate,
final AsyncCallbackSupplier<R> asyncFunction) {
this.state = state;
this.retryPredicate = retryPredicate;
this.failedResultTransformer = failedResultTransformer;
this.onAttemptFailureOperator = onAttemptFailureOperator;
this.asyncFunction = asyncFunction;
}

Expand Down Expand Up @@ -113,7 +116,7 @@ private class RetryingCallback implements SingleResultCallback<R> {
public void onResult(@Nullable final R result, @Nullable final Throwable t) {
if (t != null) {
try {
state.advanceOrThrow(t, failedResultTransformer, retryPredicate);
state.advanceOrThrow(t, onAttemptFailureOperator, retryPredicate);
} catch (Throwable failedResult) {
wrapped.onResult(null, failedResult);
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,26 +37,26 @@
public final class RetryingSyncSupplier<R> implements Supplier<R> {
private final RetryState state;
private final BiPredicate<RetryState, Throwable> retryPredicate;
private final BinaryOperator<Throwable> failedResultTransformer;
private final BinaryOperator<Throwable> onAttemptFailureOperator;
private final Supplier<R> syncFunction;

/**
* See {@link RetryingAsyncCallbackSupplier#RetryingAsyncCallbackSupplier(RetryState, BinaryOperator, BiPredicate, AsyncCallbackSupplier)}
* for the documentation of the parameters.
*
* @param failedResultTransformer Even though the {@code failedResultTransformer} accepts {@link Throwable},
* @param onAttemptFailureOperator Even though the {@code onAttemptFailureOperator} accepts {@link Throwable},
* only {@link RuntimeException}s are passed to it.
* @param retryPredicate Even though the {@code retryPredicate} accepts {@link Throwable},
* only {@link RuntimeException}s are passed to it.
*/
public RetryingSyncSupplier(
final RetryState state,
final BinaryOperator<Throwable> failedResultTransformer,
final BinaryOperator<Throwable> onAttemptFailureOperator,
final BiPredicate<RetryState, Throwable> retryPredicate,
final Supplier<R> syncFunction) {
this.state = state;
this.retryPredicate = retryPredicate;
this.failedResultTransformer = failedResultTransformer;
this.onAttemptFailureOperator = onAttemptFailureOperator;
this.syncFunction = syncFunction;
}

Expand All @@ -66,10 +66,10 @@ public R get() {
try {
return syncFunction.get();
} catch (RuntimeException attemptException) {
state.advanceOrThrow(attemptException, failedResultTransformer, retryPredicate);
state.advanceOrThrow(attemptException, onAttemptFailureOperator, retryPredicate);
} catch (Exception attemptException) {
// wrap potential sneaky / Kotlin exceptions
state.advanceOrThrow(new RuntimeException(attemptException), failedResultTransformer, retryPredicate);
state.advanceOrThrow(new RuntimeException(attemptException), onAttemptFailureOperator, retryPredicate);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,11 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand Down Expand Up @@ -122,14 +124,13 @@ public void close() {
}

@Override
public ClusterableServer getServer(final ServerAddress serverAddress) {
public ServersSnapshot getServersSnapshot() {
isTrue("is open", !isClosed());

ServerTuple serverTuple = addressToServerTupleMap.get(serverAddress);
if (serverTuple == null) {
return null;
}
return serverTuple.server;
Map<ServerAddress, ServerTuple> nonAtomicSnapshot = new HashMap<>(addressToServerTupleMap);
return serverAddress -> {
ServerTuple serverTuple = nonAtomicSnapshot.get(serverAddress);
return serverTuple == null ? null : serverTuple.server;
};
}

void onChange(final Collection<ServerAddress> newHosts) {
Expand Down