Skip to content

Commit

Permalink
Remove SocketUtils usage
Browse files Browse the repository at this point in the history
* Remove usage of non-stable `org.springframework.util.SocketUtils`
* Replace it with `0` for those tests where it is possible to select OS port
* Remove the mentioning of the `SocketUtils` from the `testing.adoc`
* Use `TransportConstants.DEFAULT_STOMP_PORT` for `StompServerIntegrationTests`.
We may disable this test in the future for CI if it is not going to be stable
* Introduce `Supplier<String> connectUrl` variants for `ZeroMqMessageHandler`
to let it defer connection evaluation until subscription to the socket `Mono`
in the `ZeroMqMessageHandler`.
* Move connection logic in the `ZeroMqMessageHandler` to `Lifecycle.start()`

Related to spring-projects/spring-framework#28054

**Cherry-pick to `5.5.x`**

# Conflicts:
#	spring-integration-stomp/src/test/java/org/springframework/integration/stomp/client/StompServerIntegrationTests.java
  • Loading branch information
artembilan committed Feb 16, 2022
1 parent f8c22dd commit 9b27fbe
Show file tree
Hide file tree
Showing 12 changed files with 187 additions and 148 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2020 the original author or authors.
* Copyright 2015-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,7 +29,6 @@
import org.springframework.integration.ip.util.SocketTestUtils;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.SocketUtils;

/**
* @author Artem Bilan
Expand Down Expand Up @@ -76,7 +75,7 @@ private NetworkInterface checkMulticast() throws Exception {
}
try {
MulticastSocket socket = new MulticastSocket();
socket.joinGroup(new InetSocketAddress(this.group, SocketUtils.findAvailableUdpPort()), nic);
socket.joinGroup(new InetSocketAddress(this.group, 0), nic);
socket.close();
}
catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2021 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -50,7 +50,6 @@
import org.springframework.integration.test.rule.Log4j2LevelAdjuster;
import org.springframework.messaging.Message;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.util.SocketUtils;

/**
*
Expand Down Expand Up @@ -284,8 +283,7 @@ public void testMulticastReceiver() throws Exception {
}

}
DatagramSocket datagramSocket =
new DatagramSocket(SocketUtils.findAvailableUdpPort(), inetAddress);
DatagramSocket datagramSocket = new DatagramSocket(0, inetAddress);
datagramSocket.send(packet);
datagramSocket.close();

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2015-2021 the original author or authors.
* Copyright 2015-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -54,7 +54,6 @@
import org.springframework.messaging.simp.stomp.StompCommand;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.util.SocketUtils;

/**
* @author Artem Bilan
Expand All @@ -70,17 +69,16 @@ public class StompServerIntegrationTests {

@BeforeAll
public static void setup() throws Exception {
int port = SocketUtils.findAvailableTcpPort(61613);
activeMQBroker = new BrokerService();
activeMQBroker.addConnector("stomp://127.0.0.1:" + port);
activeMQBroker.addConnector("stomp://127.0.0.1:" + BrokerService.DEFAULT_PORT);
activeMQBroker.setPersistent(false);
activeMQBroker.setUseJmx(false);
activeMQBroker.setEnableStatistics(false);
activeMQBroker.getSystemUsage().getMemoryUsage().setLimit(1024 * 1024 * 5);
activeMQBroker.getSystemUsage().getTempUsage().setLimit(1024 * 1024 * 5);
activeMQBroker.start();

stompClient = new ReactorNettyTcpStompClient("127.0.0.1", port);
stompClient = new ReactorNettyTcpStompClient("127.0.0.1", Integer.parseInt(BrokerService.DEFAULT_PORT));
stompClient.setMessageConverter(new PassThruMessageConverter());
ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler();
taskScheduler.afterPropertiesSet();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,7 @@
http://www.springframework.org/schema/integration https://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/beans https://www.springframework.org/schema/beans/spring-beans.xsd">

<bean id="socketUtils" class="org.springframework.util.SocketUtils" />

<int-syslog:inbound-channel-adapter id="foo" port="#{socketUtils.findAvailableUdpPort(1514)}" />
<int-syslog:inbound-channel-adapter id="foo" port="0" />

<int-syslog:inbound-channel-adapter id="foobar" channel="foo" port="1514" auto-startup="false" />

Expand Down Expand Up @@ -42,7 +40,7 @@
<bean id="converter"
class="org.springframework.integration.syslog.config.SyslogReceivingChannelAdapterParserTests$PassThruConverter" />

<int-syslog:inbound-channel-adapter id="bar" protocol="tcp" port="#{socketUtils.findAvailableTcpPort(1514)}" />
<int-syslog:inbound-channel-adapter id="bar" protocol="tcp" port="0" />

<int:channel id="bar">
<int:queue/>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,17 +17,18 @@
package org.springframework.integration.syslog.config;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;

import java.lang.reflect.Method;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.charset.StandardCharsets;

import javax.net.SocketFactory;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.jupiter.api.Test;

import org.springframework.beans.factory.BeanCreationException;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -42,19 +43,21 @@
import org.springframework.integration.test.util.TestUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.PollableChannel;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
import org.springframework.util.ReflectionUtils;

/**
* @author Gary Russell
* @author Artem Bilan
*
* @since 3.0
*
*/
@ContextConfiguration
@RunWith(SpringJUnit4ClassRunner.class)
@SpringJUnitConfig
public class SyslogReceivingChannelAdapterParserTests {

@Autowired @Qualifier("foo.adapter")
@Autowired
@Qualifier("foo.adapter")
private UdpSyslogReceivingChannelAdapter adapter1;

@Autowired
Expand All @@ -63,7 +66,8 @@ public class SyslogReceivingChannelAdapterParserTests {
@Autowired
private PollableChannel foo;

@Autowired @Qualifier("explicitUdp.adapter")
@Autowired
@Qualifier("explicitUdp.adapter")
private UdpSyslogReceivingChannelAdapter explicitUdpAdapter;

@Autowired
Expand All @@ -81,7 +85,8 @@ public class SyslogReceivingChannelAdapterParserTests {
@Autowired
private RFC5424MessageConverter rfc5424;

@Autowired @Qualifier("bar.adapter")
@Autowired
@Qualifier("bar.adapter")
private TcpSyslogReceivingChannelAdapter adapter2;

@Autowired
Expand All @@ -95,8 +100,10 @@ public class SyslogReceivingChannelAdapterParserTests {

@Test
public void testSimplestUdp() throws Exception {
int port = TestUtils.getPropertyValue(adapter1, "udpAdapter.port", Integer.class);
byte[] buf = "<157>JUL 26 22:08:35 WEBERN TESTING[70729]: TEST SYSLOG MESSAGE".getBytes("UTF-8");
Method getPort = ReflectionUtils.findMethod(UdpSyslogReceivingChannelAdapter.class, "getPort");
ReflectionUtils.makeAccessible(getPort);
int port = (int) ReflectionUtils.invokeMethod(getPort, this.adapter1);
byte[] buf = "<157>JUL 26 22:08:35 WEBERN TESTING[70729]: TEST SYSLOG MESSAGE".getBytes(StandardCharsets.UTF_8);
DatagramPacket packet = new DatagramPacket(buf, buf.length, new InetSocketAddress("localhost", port));
DatagramSocket socket = new DatagramSocket();
Thread.sleep(1000);
Expand All @@ -108,13 +115,13 @@ public void testSimplestUdp() throws Exception {
}

@Test
public void testExplicitChannelUdp() throws Exception {
public void testExplicitChannelUdp() {
assertThat(TestUtils.getPropertyValue(foobar, "udpAdapter.port")).isEqualTo(1514);
assertThat(TestUtils.getPropertyValue(foobar, "outputChannel")).isSameAs(foo);
}

@Test
public void testExplicitUdp() throws Exception {
public void testExplicitUdp() {
assertThat(TestUtils.getPropertyValue(explicitUdpAdapter, "outputChannel")).isSameAs(explicitUdp);
}

Expand All @@ -133,9 +140,9 @@ public void testFullBoatUdp() {
public void testSimplestTcp() throws Exception {
AbstractServerConnectionFactory connectionFactory = TestUtils.getPropertyValue(adapter2, "connectionFactory",
AbstractServerConnectionFactory.class);
int port = connectionFactory.getPort();
waitListening(connectionFactory, 10000L);
byte[] buf = "<157>JUL 26 22:08:35 WEBERN TESTING[70729]: TEST SYSLOG MESSAGE\n".getBytes("UTF-8");
byte[] buf = "<157>JUL 26 22:08:35 WEBERN TESTING[70729]: TEST SYSLOG MESSAGE\n".getBytes(StandardCharsets.UTF_8);
int port = connectionFactory.getPort();
Socket socket = SocketFactory.getDefault().createSocket("localhost", port);
Thread.sleep(1000);
socket.getOutputStream().write(buf);
Expand All @@ -159,57 +166,40 @@ public void testFullBoatTcp() {

@Test
public void testPortOnUdpChild() {
try {
new ClassPathXmlApplicationContext(this.getClass().getSimpleName() + "-fail1-context.xml", this.getClass())
.close();
fail("Expected exception");
}
catch (BeanDefinitionParsingException e) {
assertThat(e.getMessage().startsWith(
"Configuration problem: When child element 'udp-attributes' is present, 'port' must be defined " +
"there"))
.isTrue();
}
assertThatExceptionOfType(BeanDefinitionParsingException.class)
.isThrownBy(() ->
new ClassPathXmlApplicationContext(getClass().getSimpleName() + "-fail1-context.xml",
getClass()))
.withMessageStartingWith(
"Configuration problem: " +
"When child element 'udp-attributes' is present, 'port' must be defined there");
}

@Test
public void testPortWithTCPFactory() {
try {
new ClassPathXmlApplicationContext(this.getClass().getSimpleName() + "-fail2-context.xml", this.getClass())
.close();
fail("Expected exception");
}
catch (BeanCreationException e) {
assertThat(e.getCause().getMessage()).isEqualTo("Cannot specify both 'port' and 'connectionFactory'");
}
assertThatExceptionOfType(BeanCreationException.class)
.isThrownBy(() ->
new ClassPathXmlApplicationContext(getClass().getSimpleName() + "-fail2-context.xml",
getClass()))
.withStackTraceContaining("Cannot specify both 'port' and 'connectionFactory'");
}

@Test
public void testUdpChildWithTcp() {
try {
new ClassPathXmlApplicationContext(this.getClass().getSimpleName() + "-fail3-context.xml", this.getClass())
.close();
fail("Expected exception");
}
catch (BeanCreationException e) {
e.printStackTrace();

assertThat(e.getCause().getMessage())
.isEqualTo("Cannot specify 'udp-attributes' when the protocol is 'tcp'");
}
assertThatExceptionOfType(BeanCreationException.class)
.isThrownBy(() ->
new ClassPathXmlApplicationContext(getClass().getSimpleName() + "-fail3-context.xml",
getClass()))
.withStackTraceContaining("Cannot specify 'udp-attributes' when the protocol is 'tcp'");
}

@Test
public void testUDPWithTCPFactory() {
try {
new ClassPathXmlApplicationContext(this.getClass().getSimpleName() + "-fail4-context.xml", this.getClass())
.close();
fail("Expected exception");
}
catch (BeanCreationException e) {
assertThat(e.getCause().getMessage())
.isEqualTo("Cannot specify 'connection-factory' unless the protocol is 'tcp'");
}
assertThatExceptionOfType(BeanCreationException.class)
.isThrownBy(() ->
new ClassPathXmlApplicationContext(this.getClass().getSimpleName() + "-fail4-context.xml",
getClass()))
.withStackTraceContaining("Cannot specify 'connection-factory' unless the protocol is 'tcp'");
}

public static class PassThruConverter implements MessageConverter {
Expand All @@ -229,7 +219,7 @@ public Message<?> fromSyslog(Message<?> syslog) {
* @throws IllegalStateException
*/
private void waitListening(AbstractServerConnectionFactory serverConnectionFactory, Long delay)
throws IllegalStateException {
throws IllegalStateException {
if (delay == null) {
delay = 100L;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020-2021 the original author or authors.
* Copyright 2020-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -212,7 +212,7 @@ public int getBackendPort() {

/**
* Return the address an {@code inproc} control socket is bound or null if this proxy has not been started yet.
* @return the the address for control socket or null
* @return the address for control socket or null
*/
@Nullable
public String getControlAddress() {
Expand All @@ -222,7 +222,7 @@ public String getControlAddress() {
/**
* Return the address an {@code inproc} capture socket is bound or null if this proxy has not been started yet
* or {@link #captureAddress} is false.
* @return the the address for capture socket or null
* @return the address for capture socket or null
*/
@Nullable
public String getCaptureAddress() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 the original author or authors.
* Copyright 2020-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,8 @@

package org.springframework.integration.zeromq.dsl;

import java.util.function.Supplier;

import org.zeromq.SocketType;
import org.zeromq.ZContext;

Expand Down Expand Up @@ -53,6 +55,18 @@ public static ZeroMqChannelSpec pubSubZeroMqChannel(ZContext context) {
* @return the spec.
*/
public static ZeroMqMessageHandlerSpec outboundChannelAdapter(ZContext context, String connectUrl) {
return outboundChannelAdapter(context, () -> connectUrl);
}

/**
* Create an instance of {@link ZeroMqMessageHandlerSpec} for the provided {@link ZContext}
* and connection URL supplier.
* @param context the {@link ZContext} to use.
* @param connectUrl the supplier for URL to connect a ZeroMq socket to.
* @return the spec.
* @since 5.5.9
*/
public static ZeroMqMessageHandlerSpec outboundChannelAdapter(ZContext context, Supplier<String> connectUrl) {
return new ZeroMqMessageHandlerSpec(context, connectUrl);
}

Expand All @@ -70,6 +84,21 @@ public static ZeroMqMessageHandlerSpec outboundChannelAdapter(ZContext context,
return new ZeroMqMessageHandlerSpec(context, connectUrl, socketType);
}

/**
* Create an instance of {@link ZeroMqMessageHandlerSpec} for the provided {@link ZContext},
* connection URL supplier and {@link SocketType}.
* @param context the {@link ZContext} to use.
* @param connectUrl the supplier for URL to connect a ZeroMq socket to.
* @param socketType the {@link SocketType} for ZeroMq socket.
* @return the spec.
* @since 5.5.9
*/
public static ZeroMqMessageHandlerSpec outboundChannelAdapter(ZContext context, Supplier<String> connectUrl,
SocketType socketType) {

return new ZeroMqMessageHandlerSpec(context, connectUrl, socketType);
}

/**
* Create an instance of {@link ZeroMqMessageProducerSpec} for the provided {@link ZContext}.
* @param context the {@link ZContext} to use.
Expand Down

0 comments on commit 9b27fbe

Please sign in to comment.