Skip to content

Commit

Permalink
Ensure DNS errors bubble up as errors in reactive streams
Browse files Browse the repository at this point in the history
Currently DNS lookups happen synchronously in the reactive streams
driver.  This isn't good, but with this change at least DNS errors
will be properly reported.

JAVA-3852
  • Loading branch information
jyemin committed Dec 3, 2020
1 parent cf79fa2 commit 0e556f1
Showing 1 changed file with 27 additions and 28 deletions.
Expand Up @@ -145,37 +145,36 @@ public void openAsync(final SingleResultCallback<Void> callback) {
isTrue("Open already called", stream == null, callback);
try {
stream = streamFactory.create(serverId.getAddress());
} catch (Throwable t) {
callback.onResult(null, t);
return;
}
stream.openAsync(new AsyncCompletionHandler<Void>() {
@Override
public void completed(final Void aVoid) {
connectionInitializer.initializeAsync(InternalStreamConnection.this, new SingleResultCallback<ConnectionDescription>() {
@Override
public void onResult(final ConnectionDescription result, final Throwable t) {
if (t != null) {
close();
callback.onResult(null, t);
} else {
description = result;
opened.set(true);
sendCompressor = findSendCompressor(description);
if (LOGGER.isInfoEnabled()) {
LOGGER.info(format("Opened connection [%s] to %s", getId(), serverId.getAddress()));
stream.openAsync(new AsyncCompletionHandler<Void>() {
@Override
public void completed(final Void aVoid) {
connectionInitializer.initializeAsync(InternalStreamConnection.this, new SingleResultCallback<ConnectionDescription>() {
@Override
public void onResult(final ConnectionDescription result, final Throwable t) {
if (t != null) {
close();
callback.onResult(null, t);
} else {
description = result;
opened.set(true);
sendCompressor = findSendCompressor(description);
if (LOGGER.isInfoEnabled()) {
LOGGER.info(format("Opened connection [%s] to %s", getId(), serverId.getAddress()));
}
callback.onResult(null, null);
}
callback.onResult(null, null);
}
}
});
}
});
}

@Override
public void failed(final Throwable t) {
callback.onResult(null, t);
}
});
@Override
public void failed(final Throwable t) {
callback.onResult(null, t);
}
});
} catch (Throwable t) {
callback.onResult(null, t);
}
}

private Map<Byte, Compressor> createCompressorMap(final List<MongoCompressor> compressorList) {
Expand Down

0 comments on commit 0e556f1

Please sign in to comment.