Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Direct retries to another mongos if one is available #1367

Merged
merged 23 commits into from
May 28, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
e95ca0b
Direct retries to another mongos if one is available
stIncMale Apr 14, 2024
d1c1ed2
Implement specification prose tests
stIncMale Apr 17, 2024
0875e5d
Fix a typo
stIncMale Apr 17, 2024
6fe3db1
Expect `MongoServerException` instead of just `RuntimeException`
stIncMale Apr 18, 2024
7cba88b
Fix `ServerDeprioritization.onAttemptFailure`
stIncMale Apr 30, 2024
da6d111
Merge branch 'master' into JAVA-4254
stIncMale Apr 30, 2024
e4ffab4
Replace BiFunction with BinaryOperator
stIncMale Apr 30, 2024
29ebd76
Update driver-core/src/main/com/mongodb/internal/connection/Operation…
stIncMale May 2, 2024
6574e24
Trivial code improvements
stIncMale May 2, 2024
171d67e
Update the docs of the internal API for retries to reflect support of…
stIncMale May 3, 2024
4d883c1
Merge branch 'master' into JAVA-4254
stIncMale May 3, 2024
d25010d
Refactor the way `BaseCluster.selectServer` deals with the race condi…
stIncMale May 6, 2024
cc8021e
Refactor the server selection logic that is implemented not as a `Ser…
stIncMale May 7, 2024
b3430bd
Implement `DeprioritizingSelector` strictly according to the spec req…
stIncMale May 7, 2024
16df106
Merge branch 'master' into JAVA-4254
stIncMale May 7, 2024
04880c7
Do minor touches
stIncMale May 7, 2024
c1e9e4e
Update the documentation of `ClusterSettings.getServerSelector`
stIncMale May 7, 2024
246353f
Address review concerns
stIncMale May 8, 2024
69b78ff
Implement the proposed code simplification
stIncMale May 8, 2024
cbb1938
Link from `BaseClusterTest` back to `BaseClusterSpecification`
stIncMale May 8, 2024
05f6679
Simplify `MinimumOperationCountServerSelector.select`
stIncMale May 23, 2024
d55c44f
Extract `raceConditionPreFiltering` into a separate method and remove…
stIncMale May 24, 2024
7debec0
Merge branch 'master' into JAVA-4254
stIncMale May 24, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.mongodb.event.ClusterOpeningEvent;
import com.mongodb.internal.VisibleForTesting;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.connection.OperationContext.ServerDeprioritization;
import com.mongodb.internal.diagnostics.logging.Logger;
import com.mongodb.internal.diagnostics.logging.Loggers;
import com.mongodb.internal.logging.LogMessage;
Expand Down Expand Up @@ -122,8 +123,9 @@ public ServerTuple selectServer(final ServerSelector serverSelector, final Opera
CountDownLatch currentPhase = phase.get();
ClusterDescription curDescription = description;
logServerSelectionStarted(clusterId, operationContext, serverSelector, curDescription);
ServerSelector compositeServerSelector = getCompositeServerSelector(serverSelector);
ServerTuple serverTuple = selectServer(compositeServerSelector, curDescription);
ServerDeprioritization serverDeprioritization = operationContext.getServerDeprioritization();
ServerSelector completeServerSelector = getCompleteServerSelector(serverSelector, serverDeprioritization);
ServerTuple serverTuple = selectServer(completeServerSelector, curDescription);

boolean selectionWaitingLogged = false;

Expand All @@ -137,8 +139,10 @@ public ServerTuple selectServer(final ServerSelector serverSelector, final Opera
}

if (serverTuple != null) {
ServerAddress serverAddress = serverTuple.getServerDescription().getAddress();
logServerSelectionSucceeded(
clusterId, operationContext, serverTuple.getServerDescription().getAddress(), serverSelector, curDescription);
clusterId, operationContext, serverAddress, serverSelector, curDescription);
serverDeprioritization.updateCandidate(serverAddress, curDescription.getType());
return serverTuple;
}

Expand All @@ -163,7 +167,7 @@ public ServerTuple selectServer(final ServerSelector serverSelector, final Opera

currentPhase = phase.get();
curDescription = description;
serverTuple = selectServer(compositeServerSelector, curDescription);
serverTuple = selectServer(completeServerSelector, curDescription);
}

} catch (InterruptedException e) {
Expand All @@ -180,8 +184,9 @@ public void selectServerAsync(final ServerSelector serverSelector, final Operati
ClusterDescription currentDescription = description;

logServerSelectionStarted(clusterId, operationContext, serverSelector, currentDescription);
ServerSelectionRequest request = new ServerSelectionRequest(operationContext, serverSelector, getCompositeServerSelector(serverSelector),
getMaxWaitTimeNanos(), callback);
ServerSelectionRequest request = new ServerSelectionRequest(operationContext, serverSelector,
getCompleteServerSelector(serverSelector, operationContext.getServerDeprioritization()),
getMaxWaitTimeNanos(), callback);

if (!handleServerSelectionRequest(request, currentPhase, currentDescription)) {
notifyWaitQueueHandler(request);
Expand Down Expand Up @@ -276,10 +281,12 @@ private boolean handleServerSelectionRequest(final ServerSelectionRequest reques
return true;
}

ServerTuple serverTuple = selectServer(request.compositeSelector, description);
ServerTuple serverTuple = selectServer(request.completeSelector, description);
if (serverTuple != null) {
logServerSelectionSucceeded(clusterId, request.operationContext, serverTuple.getServerDescription().getAddress(),
ServerAddress serverAddress = serverTuple.getServerDescription().getAddress();
logServerSelectionSucceeded(clusterId, request.operationContext, serverAddress,
request.originalSelector, description);
request.operationContext.getServerDeprioritization().updateCandidate(serverAddress, description.getType());
request.onResult(serverTuple, null);
return true;
}
Expand Down Expand Up @@ -343,14 +350,13 @@ private static List<ServerTuple> atMostNRandom(final ArrayList<ServerDescription
return result;
}

private ServerSelector getCompositeServerSelector(final ServerSelector serverSelector) {
private ServerSelector getCompleteServerSelector(final ServerSelector serverSelector, final ServerDeprioritization serverDeprioritization) {
katcharov marked this conversation as resolved.
Show resolved Hide resolved
ServerSelector latencyMinimizingServerSelector =
new LatencyMinimizingServerSelector(settings.getLocalThreshold(MILLISECONDS), MILLISECONDS);
if (settings.getServerSelector() == null) {
return new CompositeServerSelector(asList(serverSelector, latencyMinimizingServerSelector));
} else {
return new CompositeServerSelector(asList(serverSelector, settings.getServerSelector(), latencyMinimizingServerSelector));
}
CompositeServerSelector compositeSelector = settings.getServerSelector() == null
? new CompositeServerSelector(asList(serverSelector, latencyMinimizingServerSelector))
: new CompositeServerSelector(asList(serverSelector, settings.getServerSelector(), latencyMinimizingServerSelector));
return serverDeprioritization.apply(compositeSelector);
katcharov marked this conversation as resolved.
Show resolved Hide resolved
}

protected ClusterableServer createServer(final ServerAddress serverAddress) {
Expand Down Expand Up @@ -399,21 +405,21 @@ private MongoException createAndLogTimeoutException(
private static final class ServerSelectionRequest {
private final OperationContext operationContext;
private final ServerSelector originalSelector;
private final ServerSelector compositeSelector;
private final ServerSelector completeSelector;
@Nullable
private final Long maxWaitTimeNanos;
private final SingleResultCallback<ServerTuple> callback;
private final long startTimeNanos = System.nanoTime();
private CountDownLatch phase;

ServerSelectionRequest(final OperationContext operationContext,
final ServerSelector serverSelector, final ServerSelector compositeSelector,
final ServerSelector serverSelector, final ServerSelector completeSelector,
@Nullable
final Long maxWaitTimeNanos,
final SingleResultCallback<ServerTuple> callback) {
this.operationContext = operationContext;
this.originalSelector = serverSelector;
this.compositeSelector = compositeSelector;
this.completeSelector = completeSelector;
this.maxWaitTimeNanos = maxWaitTimeNanos;
this.callback = callback;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,17 @@
*/
package com.mongodb.internal.connection;

import com.mongodb.MongoConnectionPoolClearedException;
import com.mongodb.ServerAddress;
import com.mongodb.connection.ClusterDescription;
import com.mongodb.connection.ClusterType;
import com.mongodb.connection.ServerDescription;
import com.mongodb.lang.Nullable;
import com.mongodb.selector.ServerSelector;

import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;

/**
Expand All @@ -23,12 +34,87 @@
public class OperationContext {
private static final AtomicLong NEXT_ID = new AtomicLong(0);
private final long id;
private final ServerDeprioritization serverDeprioritization;

public OperationContext() {
id = NEXT_ID.incrementAndGet();
serverDeprioritization = new ServerDeprioritization();
}

public long getId() {
return id;
}

/**
* @return The same {@link ServerDeprioritization} if called on the same {@link OperationContext}.
*/
public ServerDeprioritization getServerDeprioritization() {
return serverDeprioritization;
}

public static final class ServerDeprioritization {
@Nullable
private ServerAddress candidate;
vbabanin marked this conversation as resolved.
Show resolved Hide resolved
private final Set<ServerAddress> deprioritized;

private ServerDeprioritization() {
candidate = null;
deprioritized = new HashSet<>();
}

ServerSelector apply(final ServerSelector selector) {
return new DeprioritizingSelector(selector);
katcharov marked this conversation as resolved.
Show resolved Hide resolved
}

void updateCandidate(final ServerAddress serverAddress, final ClusterType clusterType) {
candidate = isEnabled(clusterType) ? serverAddress : null;
katcharov marked this conversation as resolved.
Show resolved Hide resolved
katcharov marked this conversation as resolved.
Show resolved Hide resolved
}

public void onAttemptFailure(final Throwable failure) {
if (candidate != null && !(failure instanceof MongoConnectionPoolClearedException)) {
deprioritized.add(candidate);
vbabanin marked this conversation as resolved.
Show resolved Hide resolved
}
candidate = null;
stIncMale marked this conversation as resolved.
Show resolved Hide resolved
}

private static boolean isEnabled(final ClusterType clusterType) {
return clusterType == ClusterType.SHARDED;
}

/**
* {@link ServerSelector} requires thread safety, but that is only because a user may specify
* {@link com.mongodb.connection.ClusterSettings.Builder#serverSelector(ServerSelector)},
* which indeed may be used concurrently. {@link DeprioritizingSelector} does not need to be thread-safe.
*/
private final class DeprioritizingSelector implements ServerSelector {
private final ServerSelector wrapped;

private DeprioritizingSelector(final ServerSelector wrapped) {
this.wrapped = wrapped;
}

@Override
public List<ServerDescription> select(final ClusterDescription clusterDescription) {
if (isEnabled(clusterDescription.getType())) {
katcharov marked this conversation as resolved.
Show resolved Hide resolved
List<ServerDescription> filteredServerDescriptions = ClusterDescriptionHelper.getServersByPredicate(
katcharov marked this conversation as resolved.
Show resolved Hide resolved
clusterDescription, serverDescription -> !deprioritized.contains(serverDescription.getAddress()));
ClusterDescription filteredClusterDescription = new ClusterDescription(
clusterDescription.getConnectionMode(),
clusterDescription.getType(),
clusterDescription.getSrvResolutionException(),
filteredServerDescriptions,
katcharov marked this conversation as resolved.
Show resolved Hide resolved
clusterDescription.getClusterSettings(),
clusterDescription.getServerSettings());
List<ServerDescription> result = wrapped.select(filteredClusterDescription);
if (result.isEmpty()) {
// fall back to selecting from all servers ignoring the deprioritized ones
result = wrapped.select(clusterDescription);
}
return result;
} else {
return wrapped.select(clusterDescription);
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@

import java.util.Collections;
import java.util.List;
import java.util.function.BinaryOperator;

import static com.mongodb.assertions.Assertions.assertNotNull;
import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback;
Expand Down Expand Up @@ -285,7 +286,11 @@ static <D, T> void createReadCommandAndExecuteAsync(

static <R> AsyncCallbackSupplier<R> decorateReadWithRetriesAsync(final RetryState retryState, final OperationContext operationContext,
final AsyncCallbackSupplier<R> asyncReadFunction) {
return new RetryingAsyncCallbackSupplier<>(retryState, CommandOperationHelper::chooseRetryableReadException,
BinaryOperator<Throwable> onAttemptFailure =
(@Nullable Throwable previouslyChosenException, Throwable mostRecentAttemptException) ->
CommandOperationHelper.onRetryableReadAttemptFailure(
operationContext, previouslyChosenException, mostRecentAttemptException);
return new RetryingAsyncCallbackSupplier<>(retryState, onAttemptFailure,
katcharov marked this conversation as resolved.
Show resolved Hide resolved
CommandOperationHelper::shouldAttemptToRetryRead, callback -> {
logRetryExecute(retryState, operationContext);
asyncReadFunction.get(callback);
Expand All @@ -294,7 +299,11 @@ static <R> AsyncCallbackSupplier<R> decorateReadWithRetriesAsync(final RetryStat

static <R> AsyncCallbackSupplier<R> decorateWriteWithRetriesAsync(final RetryState retryState, final OperationContext operationContext,
final AsyncCallbackSupplier<R> asyncWriteFunction) {
return new RetryingAsyncCallbackSupplier<>(retryState, CommandOperationHelper::chooseRetryableWriteException,
BinaryOperator<Throwable> onAttemptFailure =
(@Nullable Throwable previouslyChosenException, Throwable mostRecentAttemptException) ->
CommandOperationHelper.onRetryableWriteAttemptFailure(
operationContext, previouslyChosenException, mostRecentAttemptException);
return new RetryingAsyncCallbackSupplier<>(retryState, onAttemptFailure,
CommandOperationHelper::shouldAttemptToRetryWrite, callback -> {
logRetryExecute(retryState, operationContext);
asyncWriteFunction.get(callback);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,15 @@ interface CommandCreator {
BsonDocument create(ServerDescription serverDescription, ConnectionDescription connectionDescription);
}

static Throwable onRetryableReadAttemptFailure(
final OperationContext operationContext,
@Nullable final Throwable previouslyChosenException,
final Throwable mostRecentAttemptException) {
operationContext.getServerDeprioritization().onAttemptFailure(mostRecentAttemptException);
return chooseRetryableReadException(previouslyChosenException, mostRecentAttemptException);
}

static Throwable chooseRetryableReadException(
private static Throwable chooseRetryableReadException(
@Nullable final Throwable previouslyChosenException, final Throwable mostRecentAttemptException) {
assertFalse(mostRecentAttemptException instanceof ResourceSupplierInternalException);
if (previouslyChosenException == null
Expand All @@ -64,7 +71,15 @@ static Throwable chooseRetryableReadException(
}
}

static Throwable chooseRetryableWriteException(
static Throwable onRetryableWriteAttemptFailure(
final OperationContext operationContext,
@Nullable final Throwable previouslyChosenException,
final Throwable mostRecentAttemptException) {
operationContext.getServerDeprioritization().onAttemptFailure(mostRecentAttemptException);
return chooseRetryableWriteException(previouslyChosenException, mostRecentAttemptException);
}

private static Throwable chooseRetryableWriteException(
@Nullable final Throwable previouslyChosenException, final Throwable mostRecentAttemptException) {
if (previouslyChosenException == null) {
if (mostRecentAttemptException instanceof ResourceSupplierInternalException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.function.BinaryOperator;
import java.util.function.Supplier;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -140,7 +141,11 @@ public Boolean getRetryWrites() {

private <R> Supplier<R> decorateWriteWithRetries(final RetryState retryState, final OperationContext operationContext,
final Supplier<R> writeFunction) {
return new RetryingSyncSupplier<>(retryState, CommandOperationHelper::chooseRetryableWriteException,
BinaryOperator<Throwable> onAttemptFailure =
(@Nullable Throwable previouslyChosenException, Throwable mostRecentAttemptException) ->
CommandOperationHelper.onRetryableWriteAttemptFailure(
operationContext, previouslyChosenException, mostRecentAttemptException);
return new RetryingSyncSupplier<>(retryState, onAttemptFailure,
this::shouldAttemptToRetryWrite, () -> {
logRetryExecute(retryState, operationContext);
return writeFunction.get();
Expand All @@ -149,7 +154,11 @@ private <R> Supplier<R> decorateWriteWithRetries(final RetryState retryState, fi

private <R> AsyncCallbackSupplier<R> decorateWriteWithRetries(final RetryState retryState, final OperationContext operationContext,
final AsyncCallbackSupplier<R> writeFunction) {
return new RetryingAsyncCallbackSupplier<>(retryState, CommandOperationHelper::chooseRetryableWriteException,
BinaryOperator<Throwable> onAttemptFailure =
(@Nullable Throwable previouslyChosenException, Throwable mostRecentAttemptException) ->
CommandOperationHelper.onRetryableWriteAttemptFailure(
operationContext, previouslyChosenException, mostRecentAttemptException);
return new RetryingAsyncCallbackSupplier<>(retryState, onAttemptFailure,
this::shouldAttemptToRetryWrite, callback -> {
logRetryExecute(retryState, operationContext);
writeFunction.get(callback);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.bson.codecs.Decoder;

import java.util.function.BiFunction;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;

Expand Down Expand Up @@ -274,7 +275,11 @@ static <D, T> T createReadCommandAndExecute(

static <R> Supplier<R> decorateWriteWithRetries(final RetryState retryState,
final OperationContext operationContext, final Supplier<R> writeFunction) {
return new RetryingSyncSupplier<>(retryState, CommandOperationHelper::chooseRetryableWriteException,
BinaryOperator<Throwable> onAttemptFailure =
(@Nullable Throwable previouslyChosenException, Throwable mostRecentAttemptException) ->
CommandOperationHelper.onRetryableWriteAttemptFailure(
operationContext, previouslyChosenException, mostRecentAttemptException);
return new RetryingSyncSupplier<>(retryState, onAttemptFailure,
CommandOperationHelper::shouldAttemptToRetryWrite, () -> {
logRetryExecute(retryState, operationContext);
return writeFunction.get();
Expand All @@ -283,7 +288,11 @@ static <R> Supplier<R> decorateWriteWithRetries(final RetryState retryState,

static <R> Supplier<R> decorateReadWithRetries(final RetryState retryState, final OperationContext operationContext,
final Supplier<R> readFunction) {
return new RetryingSyncSupplier<>(retryState, CommandOperationHelper::chooseRetryableReadException,
BinaryOperator<Throwable> onAttemptFailure =
(@Nullable Throwable previouslyChosenException, Throwable mostRecentAttemptException) ->
CommandOperationHelper.onRetryableReadAttemptFailure(
operationContext, previouslyChosenException, mostRecentAttemptException);
return new RetryingSyncSupplier<>(retryState, onAttemptFailure,
CommandOperationHelper::shouldAttemptToRetryRead, () -> {
logRetryExecute(retryState, operationContext);
return readFunction.get();
Expand Down