From 0e556f1db850d47bd890d144f3a48064af554b5c Mon Sep 17 00:00:00 2001 From: jyemin Date: Thu, 3 Dec 2020 08:43:51 -0500 Subject: [PATCH] Ensure DNS errors bubble up as errors in reactive streams Currently DNS lookups happen synchronously in the reactive streams driver. This isn't good, but with this change at least DNS errors will be properly reported. JAVA-3852 --- .../connection/InternalStreamConnection.java | 55 +++++++++---------- 1 file changed, 27 insertions(+), 28 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java index 63d6e05cc22..b1faa79fd6d 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java +++ b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java @@ -145,37 +145,36 @@ public void openAsync(final SingleResultCallback callback) { isTrue("Open already called", stream == null, callback); try { stream = streamFactory.create(serverId.getAddress()); - } catch (Throwable t) { - callback.onResult(null, t); - return; - } - stream.openAsync(new AsyncCompletionHandler() { - @Override - public void completed(final Void aVoid) { - connectionInitializer.initializeAsync(InternalStreamConnection.this, new SingleResultCallback() { - @Override - public void onResult(final ConnectionDescription result, final Throwable t) { - if (t != null) { - close(); - callback.onResult(null, t); - } else { - description = result; - opened.set(true); - sendCompressor = findSendCompressor(description); - if (LOGGER.isInfoEnabled()) { - LOGGER.info(format("Opened connection [%s] to %s", getId(), serverId.getAddress())); + stream.openAsync(new AsyncCompletionHandler() { + @Override + public void completed(final Void aVoid) { + connectionInitializer.initializeAsync(InternalStreamConnection.this, new SingleResultCallback() { + @Override + public void onResult(final ConnectionDescription result, final Throwable t) { + if (t != null) { + close(); + callback.onResult(null, t); + } else { + description = result; + opened.set(true); + sendCompressor = findSendCompressor(description); + if (LOGGER.isInfoEnabled()) { + LOGGER.info(format("Opened connection [%s] to %s", getId(), serverId.getAddress())); + } + callback.onResult(null, null); } - callback.onResult(null, null); } - } - }); - } + }); + } - @Override - public void failed(final Throwable t) { - callback.onResult(null, t); - } - }); + @Override + public void failed(final Throwable t) { + callback.onResult(null, t); + } + }); + } catch (Throwable t) { + callback.onResult(null, t); + } } private Map createCompressorMap(final List compressorList) {