diff --git a/dubbo-common/src/main/java/com/alibaba/dubbo/common/utils/NetUtils.java b/dubbo-common/src/main/java/com/alibaba/dubbo/common/utils/NetUtils.java index 44ca3bc0815..1798f565900 100644 --- a/dubbo-common/src/main/java/com/alibaba/dubbo/common/utils/NetUtils.java +++ b/dubbo-common/src/main/java/com/alibaba/dubbo/common/utils/NetUtils.java @@ -21,8 +21,11 @@ import com.alibaba.dubbo.common.logger.LoggerFactory; import java.io.IOException; +import java.net.Inet4Address; +import java.net.Inet6Address; import java.net.InetAddress; import java.net.InetSocketAddress; +import java.net.MulticastSocket; import java.net.NetworkInterface; import java.net.ServerSocket; import java.net.UnknownHostException; @@ -284,4 +287,34 @@ public static String toURL(String protocol, String host, int port, String path) return sb.toString(); } + public static void joinMulticastGroup(MulticastSocket multicastSocket, InetAddress multicastAddress) throws IOException { + setInterface(multicastSocket, multicastAddress instanceof Inet6Address); + multicastSocket.setLoopbackMode(false); + multicastSocket.joinGroup(multicastAddress); + } + + public static void setInterface(MulticastSocket multicastSocket, boolean preferIpv6) throws IOException { + boolean interfaceSet = false; + Enumeration interfaces = NetworkInterface.getNetworkInterfaces(); + while (interfaces.hasMoreElements()) { + NetworkInterface i = (NetworkInterface) interfaces.nextElement(); + Enumeration addresses = i.getInetAddresses(); + while (addresses.hasMoreElements()) { + InetAddress address = (InetAddress) addresses.nextElement(); + if (preferIpv6 && address instanceof Inet6Address) { + multicastSocket.setInterface(address); + interfaceSet = true; + break; + } else if (!preferIpv6 && address instanceof Inet4Address) { + multicastSocket.setInterface(address); + interfaceSet = true; + break; + } + } + if (interfaceSet) { + break; + } + } + } + } \ No newline at end of file diff --git a/dubbo-registry/dubbo-registry-multicast/src/main/java/com/alibaba/dubbo/registry/multicast/MulticastRegistry.java b/dubbo-registry/dubbo-registry-multicast/src/main/java/com/alibaba/dubbo/registry/multicast/MulticastRegistry.java index d3928397b3b..73ff3b8f87c 100644 --- a/dubbo-registry/dubbo-registry-multicast/src/main/java/com/alibaba/dubbo/registry/multicast/MulticastRegistry.java +++ b/dubbo-registry/dubbo-registry-multicast/src/main/java/com/alibaba/dubbo/registry/multicast/MulticastRegistry.java @@ -31,6 +31,7 @@ import java.io.IOException; import java.net.DatagramPacket; +import java.net.Inet4Address; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.MulticastSocket; @@ -58,11 +59,11 @@ public class MulticastRegistry extends FailbackRegistry { private static final int DEFAULT_MULTICAST_PORT = 1234; - private final InetAddress mutilcastAddress; + private final InetAddress multicastAddress; - private final MulticastSocket mutilcastSocket; + private final MulticastSocket multicastSocket; - private final int mutilcastPort; + private final int multicastPort; private final ConcurrentMap> received = new ConcurrentHashMap>(); @@ -79,23 +80,21 @@ public MulticastRegistry(URL url) { if (url.isAnyHost()) { throw new IllegalStateException("registry address == null"); } - if (!isMulticastAddress(url.getHost())) { - throw new IllegalArgumentException("Invalid multicast address " + url.getHost() + ", scope: 224.0.0.0 - 239.255.255.255"); - } try { - mutilcastAddress = InetAddress.getByName(url.getHost()); - mutilcastPort = url.getPort() <= 0 ? DEFAULT_MULTICAST_PORT : url.getPort(); - mutilcastSocket = new MulticastSocket(mutilcastPort); - mutilcastSocket.setLoopbackMode(false); - mutilcastSocket.joinGroup(mutilcastAddress); + multicastAddress = InetAddress.getByName(url.getHost()); + checkMulticastAddress(multicastAddress); + + multicastPort = url.getPort() <= 0 ? DEFAULT_MULTICAST_PORT : url.getPort(); + multicastSocket = new MulticastSocket(multicastPort); + NetUtils.joinMulticastGroup(multicastSocket, multicastAddress); Thread thread = new Thread(new Runnable() { @Override public void run() { byte[] buf = new byte[2048]; DatagramPacket recv = new DatagramPacket(buf, buf.length); - while (!mutilcastSocket.isClosed()) { + while (!multicastSocket.isClosed()) { try { - mutilcastSocket.receive(recv); + multicastSocket.receive(recv); String msg = new String(recv.getData()).trim(); int i = msg.indexOf('\n'); if (i > 0) { @@ -104,7 +103,7 @@ public void run() { MulticastRegistry.this.receive(msg, (InetSocketAddress) recv.getSocketAddress()); Arrays.fill(buf, (byte) 0); } catch (Throwable e) { - if (!mutilcastSocket.isClosed()) { + if (!multicastSocket.isClosed()) { logger.error(e.getMessage(), e); } } @@ -133,6 +132,19 @@ public void run() { } } + private void checkMulticastAddress(InetAddress multicastAddress) { + if (!multicastAddress.isMulticastAddress()) { + String message = "Invalid multicast address " + multicastAddress; + if (!(multicastAddress instanceof Inet4Address)) { + throw new IllegalArgumentException(message + ", " + + "ipv4 multicast address scope: 224.0.0.0 - 239.255.255.255."); + } else { + throw new IllegalArgumentException(message + ", " + "ipv6 multicast address must start with ff, " + + "for example: ff01::1"); + } + } + } + private static boolean isMulticastAddress(String ip) { int i = ip.indexOf('.'); if (i > 0) { @@ -233,12 +245,12 @@ private void receive(String msg, InetSocketAddress remoteAddress) { private void broadcast(String msg) { if (logger.isInfoEnabled()) { - logger.info("Send broadcast message: " + msg + " to " + mutilcastAddress + ":" + mutilcastPort); + logger.info("Send broadcast message: " + msg + " to " + multicastAddress + ":" + multicastPort); } try { byte[] data = (msg + "\n").getBytes(); - DatagramPacket hi = new DatagramPacket(data, data.length, mutilcastAddress, mutilcastPort); - mutilcastSocket.send(hi); + DatagramPacket hi = new DatagramPacket(data, data.length, multicastAddress, multicastPort); + multicastSocket.send(hi); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } @@ -246,12 +258,12 @@ private void broadcast(String msg) { private void unicast(String msg, String host) { if (logger.isInfoEnabled()) { - logger.info("Send unicast message: " + msg + " to " + host + ":" + mutilcastPort); + logger.info("Send unicast message: " + msg + " to " + host + ":" + multicastPort); } try { byte[] data = (msg + "\n").getBytes(); - DatagramPacket hi = new DatagramPacket(data, data.length, InetAddress.getByName(host), mutilcastPort); - mutilcastSocket.send(hi); + DatagramPacket hi = new DatagramPacket(data, data.length, InetAddress.getByName(host), multicastPort); + multicastSocket.send(hi); } catch (Exception e) { throw new IllegalStateException(e.getMessage(), e); } @@ -293,7 +305,7 @@ protected void doUnsubscribe(URL url, NotifyListener listener) { @Override public boolean isAvailable() { try { - return mutilcastSocket != null; + return multicastSocket != null; } catch (Throwable t) { return false; } @@ -310,8 +322,8 @@ public void destroy() { logger.warn(t.getMessage(), t); } try { - mutilcastSocket.leaveGroup(mutilcastAddress); - mutilcastSocket.close(); + multicastSocket.leaveGroup(multicastAddress); + multicastSocket.close(); } catch (Throwable t) { logger.warn(t.getMessage(), t); } @@ -434,7 +446,7 @@ public List lookup(URL url) { } public MulticastSocket getMutilcastSocket() { - return mutilcastSocket; + return multicastSocket; } public Map> getReceived() { diff --git a/dubbo-registry/dubbo-registry-multicast/src/test/java/com/alibaba/dubbo/registry/multicast/MulticastRegistryTest.java b/dubbo-registry/dubbo-registry-multicast/src/test/java/com/alibaba/dubbo/registry/multicast/MulticastRegistryTest.java index 3870d54099f..0c32d5d1c76 100644 --- a/dubbo-registry/dubbo-registry-multicast/src/test/java/com/alibaba/dubbo/registry/multicast/MulticastRegistryTest.java +++ b/dubbo-registry/dubbo-registry-multicast/src/test/java/com/alibaba/dubbo/registry/multicast/MulticastRegistryTest.java @@ -23,6 +23,7 @@ import org.junit.Before; import org.junit.Test; +import java.net.InetAddress; import java.net.MulticastSocket; import java.util.List; import java.util.Map; @@ -30,9 +31,9 @@ import java.util.concurrent.atomic.AtomicReference; import static org.hamcrest.CoreMatchers.is; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertEquals; public class MulticastRegistryTest { @@ -52,7 +53,7 @@ public void setUp() throws Exception { registry.register(serviceUrl); } - @Test(expected = IllegalArgumentException.class) + @Test(expected = IllegalStateException.class) public void testUrlError() { URL errorUrl = URL.valueOf("multicast://mullticast/"); new MulticastRegistry(errorUrl); @@ -124,4 +125,43 @@ public void testDefaultPort() { } } + @Test + public void testMulticastAddress() { + InetAddress multicastAddress = null; + MulticastSocket multicastSocket = null; + try { + // ipv4 multicast address + multicastAddress = InetAddress.getByName("224.55.66.77"); + multicastSocket = new MulticastSocket(2345); + multicastSocket.setLoopbackMode(false); + NetUtils.setInterface(multicastSocket, false); + multicastSocket.joinGroup(multicastAddress); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail(e.getMessage()); + } finally { + if (multicastSocket != null) { + multicastSocket.close(); + } + } + + // multicast ipv6 address, + try { + multicastAddress = InetAddress.getByName("ff01::1"); + + multicastSocket = new MulticastSocket(); + multicastSocket.setLoopbackMode(false); + NetUtils.setInterface(multicastSocket, true); + multicastSocket.joinGroup(multicastAddress); + } catch (Throwable t) { + t.printStackTrace(); + } finally { + if (multicastSocket != null) { + multicastSocket.close(); + } + } + + } + + }