Skip to content

Commit

Permalink
Merge pull request #827 from ogolberg/dns-round-robin
Browse files Browse the repository at this point in the history
Re-enable DNS round robin
  • Loading branch information
acogoluegnes committed Aug 11, 2022
2 parents be58ec5 + 4d28a1a commit 95e6c0b
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 13 deletions.
9 changes: 8 additions & 1 deletion src/main/java/com/rabbitmq/client/Address.java
Expand Up @@ -16,6 +16,7 @@

package com.rabbitmq.client;

import java.net.InetSocketAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -161,6 +162,13 @@ public static Address parseAddress(String addressString) {
}
}

/**
* Construct an InetSocketAddress for this address with a specific port
*/
public InetSocketAddress toInetSocketAddress(int port) {
return new InetSocketAddress(getHost(), port);
}

/**
* Array-based factory method: takes an array of formatted address strings as construction parameter
* @param addresses array of strings of form "host[:port],..."
Expand Down Expand Up @@ -191,5 +199,4 @@ public static Address[] parseAddresses(String addresses) {
@Override public String toString() {
return _port == -1 ? _host : _host + ":" + _port;
}

}
6 changes: 5 additions & 1 deletion src/main/java/com/rabbitmq/client/ConnectionFactory.java
Expand Up @@ -1339,7 +1339,11 @@ public Connection newConnection(ExecutorService executor, String connectionName)
}

protected AddressResolver createAddressResolver(List<Address> addresses) {
return new ListAddressResolver(addresses);
if (addresses.size() > 1) {
return new ListAddressResolver(addresses);
} else {
return new DnsRecordIpAddressResolver(addresses.get(0), isSSL());
}
}

@Override public ConnectionFactory clone(){
Expand Down
Expand Up @@ -72,9 +72,9 @@ public List<Address> getAddresses() throws UnknownHostException {

InetAddress[] inetAddresses = resolveIpAddresses(hostName);

List<Address> addresses = new ArrayList<Address>();
List<Address> addresses = new ArrayList<>();
for (InetAddress inetAddress : inetAddresses) {
addresses.add(new Address(inetAddress.getHostAddress(), portNumber));
addresses.add(new ResolvedInetAddress(hostName, inetAddress, portNumber));
}
return addresses;
}
Expand Down
18 changes: 18 additions & 0 deletions src/main/java/com/rabbitmq/client/ResolvedInetAddress.java
@@ -0,0 +1,18 @@
package com.rabbitmq.client;

import java.net.InetAddress;
import java.net.InetSocketAddress;

public class ResolvedInetAddress extends Address {
private final InetAddress inetAddress;

public ResolvedInetAddress(String originalHostname, InetAddress inetAddress, int port) {
super(originalHostname, port);
this.inetAddress = inetAddress;
}

@Override
public InetSocketAddress toInetSocketAddress(int port) {
return new InetSocketAddress(inetAddress, port);
}
}
Expand Up @@ -22,7 +22,6 @@

import javax.net.SocketFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.concurrent.ExecutorService;

Expand Down Expand Up @@ -51,14 +50,13 @@ public SocketFrameHandlerFactory(int connectionTimeout, SocketFactory socketFact
}

public FrameHandler create(Address addr, String connectionName) throws IOException {
String hostName = addr.getHost();
int portNumber = ConnectionFactory.portOrDefault(addr.getPort(), ssl);
Socket socket = null;
try {
socket = createSocket(connectionName);
configurator.configure(socket);
socket.connect(new InetSocketAddress(hostName, portNumber),
connectionTimeout);

socket.connect(addr.toInetSocketAddress(portNumber), connectionTimeout);
return create(socket);
} catch (IOException ioe) {
quietTrySocketClose(socket);
Expand Down
Expand Up @@ -85,7 +85,7 @@ public FrameHandler create(Address addr, String connectionName) throws IOExcepti
}
}

SocketAddress address = new InetSocketAddress(addr.getHost(), portNumber);
SocketAddress address = addr.toInetSocketAddress(portNumber);
// No Sonar: the channel is closed in case of error and it cannot
// be closed here because it's part of the state of the connection
// to be returned.
Expand Down
Expand Up @@ -96,7 +96,7 @@ protected synchronized FrameHandlerFactory createFrameHandlerFactory() {
}

@Test
public void shouldNotUseDnsResolutionWhenOneAddressAndNoTls() throws Exception {
public void shouldUseDnsResolutionWhenOneAddressAndNoTls() throws Exception {
AMQConnection connection = mock(AMQConnection.class);
AtomicReference<AddressResolver> addressResolver = new AtomicReference<>();

Expand All @@ -123,11 +123,11 @@ protected synchronized FrameHandlerFactory createFrameHandlerFactory() {

doNothing().when(connection).start();
connectionFactory.newConnection();
assertThat(addressResolver.get()).isNotNull().isInstanceOf(ListAddressResolver.class);
assertThat(addressResolver.get()).isNotNull().isInstanceOf(DnsRecordIpAddressResolver.class);
}

@Test
public void shouldNotUseDnsResolutionWhenOneAddressAndTls() throws Exception {
public void shouldUseDnsResolutionWhenOneAddressAndTls() throws Exception {
AMQConnection connection = mock(AMQConnection.class);
AtomicReference<AddressResolver> addressResolver = new AtomicReference<>();

Expand Down Expand Up @@ -156,7 +156,7 @@ protected synchronized FrameHandlerFactory createFrameHandlerFactory() {
connectionFactory.useSslProtocol();
connectionFactory.newConnection();

assertThat(addressResolver.get()).isNotNull().isInstanceOf(ListAddressResolver.class);
assertThat(addressResolver.get()).isNotNull().isInstanceOf(DnsRecordIpAddressResolver.class);
}

@Test
Expand Down

0 comments on commit 95e6c0b

Please sign in to comment.