Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Re-enable DNS round robin #827

Merged
merged 4 commits into from Aug 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
9 changes: 8 additions & 1 deletion src/main/java/com/rabbitmq/client/Address.java
Expand Up @@ -16,6 +16,7 @@

package com.rabbitmq.client;

import java.net.InetSocketAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -161,6 +162,13 @@ public static Address parseAddress(String addressString) {
}
}

/**
* Construct an InetSocketAddress for this address with a specific port
*/
public InetSocketAddress toInetSocketAddress(int port) {
return new InetSocketAddress(getHost(), port);
}

/**
* Array-based factory method: takes an array of formatted address strings as construction parameter
* @param addresses array of strings of form "host[:port],..."
Expand Down Expand Up @@ -191,5 +199,4 @@ public static Address[] parseAddresses(String addresses) {
@Override public String toString() {
return _port == -1 ? _host : _host + ":" + _port;
}

}
6 changes: 5 additions & 1 deletion src/main/java/com/rabbitmq/client/ConnectionFactory.java
Expand Up @@ -1339,7 +1339,11 @@ public Connection newConnection(ExecutorService executor, String connectionName)
}

protected AddressResolver createAddressResolver(List<Address> addresses) {
return new ListAddressResolver(addresses);
if (addresses.size() > 1) {
return new ListAddressResolver(addresses);
} else {
return new DnsRecordIpAddressResolver(addresses.get(0), isSSL());
}
}

@Override public ConnectionFactory clone(){
Expand Down
Expand Up @@ -72,9 +72,9 @@ public List<Address> getAddresses() throws UnknownHostException {

InetAddress[] inetAddresses = resolveIpAddresses(hostName);

List<Address> addresses = new ArrayList<Address>();
List<Address> addresses = new ArrayList<>();
for (InetAddress inetAddress : inetAddresses) {
addresses.add(new Address(inetAddress.getHostAddress(), portNumber));
addresses.add(new ResolvedInetAddress(hostName, inetAddress, portNumber));
}
return addresses;
}
Expand Down
18 changes: 18 additions & 0 deletions src/main/java/com/rabbitmq/client/ResolvedInetAddress.java
@@ -0,0 +1,18 @@
package com.rabbitmq.client;

import java.net.InetAddress;
import java.net.InetSocketAddress;

public class ResolvedInetAddress extends Address {
private final InetAddress inetAddress;

public ResolvedInetAddress(String originalHostname, InetAddress inetAddress, int port) {
super(originalHostname, port);
this.inetAddress = inetAddress;
}

@Override
public InetSocketAddress toInetSocketAddress(int port) {
return new InetSocketAddress(inetAddress, port);
}
}
Expand Up @@ -22,7 +22,6 @@

import javax.net.SocketFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.concurrent.ExecutorService;

Expand Down Expand Up @@ -51,14 +50,13 @@ public SocketFrameHandlerFactory(int connectionTimeout, SocketFactory socketFact
}

public FrameHandler create(Address addr, String connectionName) throws IOException {
String hostName = addr.getHost();
int portNumber = ConnectionFactory.portOrDefault(addr.getPort(), ssl);
Socket socket = null;
try {
socket = createSocket(connectionName);
configurator.configure(socket);
socket.connect(new InetSocketAddress(hostName, portNumber),
connectionTimeout);

socket.connect(addr.toInetSocketAddress(portNumber), connectionTimeout);
return create(socket);
} catch (IOException ioe) {
quietTrySocketClose(socket);
Expand Down
Expand Up @@ -85,7 +85,7 @@ public FrameHandler create(Address addr, String connectionName) throws IOExcepti
}
}

SocketAddress address = new InetSocketAddress(addr.getHost(), portNumber);
SocketAddress address = addr.toInetSocketAddress(portNumber);
// No Sonar: the channel is closed in case of error and it cannot
// be closed here because it's part of the state of the connection
// to be returned.
Expand Down
Expand Up @@ -96,7 +96,7 @@ protected synchronized FrameHandlerFactory createFrameHandlerFactory() {
}

@Test
public void shouldNotUseDnsResolutionWhenOneAddressAndNoTls() throws Exception {
public void shouldUseDnsResolutionWhenOneAddressAndNoTls() throws Exception {
AMQConnection connection = mock(AMQConnection.class);
AtomicReference<AddressResolver> addressResolver = new AtomicReference<>();

Expand All @@ -123,11 +123,11 @@ protected synchronized FrameHandlerFactory createFrameHandlerFactory() {

doNothing().when(connection).start();
connectionFactory.newConnection();
assertThat(addressResolver.get()).isNotNull().isInstanceOf(ListAddressResolver.class);
assertThat(addressResolver.get()).isNotNull().isInstanceOf(DnsRecordIpAddressResolver.class);
}

@Test
public void shouldNotUseDnsResolutionWhenOneAddressAndTls() throws Exception {
public void shouldUseDnsResolutionWhenOneAddressAndTls() throws Exception {
AMQConnection connection = mock(AMQConnection.class);
AtomicReference<AddressResolver> addressResolver = new AtomicReference<>();

Expand Down Expand Up @@ -156,7 +156,7 @@ protected synchronized FrameHandlerFactory createFrameHandlerFactory() {
connectionFactory.useSslProtocol();
connectionFactory.newConnection();

assertThat(addressResolver.get()).isNotNull().isInstanceOf(ListAddressResolver.class);
assertThat(addressResolver.get()).isNotNull().isInstanceOf(DnsRecordIpAddressResolver.class);
}

@Test
Expand Down