Skip to content

Commit

Permalink
spring-projectsGH-1419: Remove RabbitMQ http-client Usage
Browse files Browse the repository at this point in the history
Resolves spring-projects#1419

Use Spring WebFlux instead, while allowing the user to choose some other technology
in the `LocalizedQueueConnectionFactory`..
  • Loading branch information
garyrussell committed Oct 10, 2022
1 parent 639eb16 commit 29dc74f
Show file tree
Hide file tree
Showing 15 changed files with 467 additions and 391 deletions.
18 changes: 8 additions & 10 deletions build.gradle
Expand Up @@ -44,7 +44,7 @@ ext {
assertkVersion = '0.24'
awaitilityVersion = '4.2.0'
commonsCompressVersion = '1.20'
commonsHttpClientVersion = '4.5.13'
commonsHttpClientVersion = '5.1.3'
commonsPoolVersion = '2.11.1'
googleJsr305Version = '3.0.2'
hamcrestVersion = '2.2'
Expand All @@ -62,7 +62,6 @@ ext {
mockitoVersion = '4.8.0'
rabbitmqStreamVersion = '0.8.0'
rabbitmqVersion = project.hasProperty('rabbitmqVersion') ? project.rabbitmqVersion : '5.16.0'
rabbitmqHttpClientVersion = '3.12.1'
reactorVersion = '2022.0.0-SNAPSHOT'
snappyVersion = '1.1.8.4'
springDataVersion = '2022.0.0-SNAPSHOT'
Expand Down Expand Up @@ -384,11 +383,12 @@ project('spring-rabbit') {

api project(':spring-amqp')
api "com.rabbitmq:amqp-client:$rabbitmqVersion"
optionalApi "com.rabbitmq:http-client:$rabbitmqHttpClientVersion"
optionalApi 'org.springframework:spring-aop'
api 'org.springframework:spring-context'
api 'org.springframework:spring-messaging'
api 'org.springframework:spring-tx'
optionalApi 'org.springframework:spring-web'
optionalApi 'org.springframework:spring-webflux'
optionalApi 'io.projectreactor:reactor-core'
optionalApi "ch.qos.logback:logback-classic:$logbackVersion"
optionalApi 'org.apache.logging.log4j:log4j-core'
Expand All @@ -410,7 +410,7 @@ project('spring-rabbit') {
testImplementation 'io.micrometer:micrometer-tracing-test'
testImplementation 'io.micrometer:micrometer-tracing-integration-test'
testRuntimeOnly 'org.springframework:spring-web'
testRuntimeOnly "org.apache.httpcomponents:httpclient:$commonsHttpClientVersion"
testRuntimeOnly "org.apache.httpcomponents.client5:httpclient5:$commonsHttpClientVersion"
testRuntimeOnly 'com.fasterxml.jackson.core:jackson-core'
testRuntimeOnly 'com.fasterxml.jackson.core:jackson-databind'
testRuntimeOnly 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml'
Expand Down Expand Up @@ -465,14 +465,13 @@ project('spring-rabbit-stream') {

api project(':spring-rabbit')
api "com.rabbitmq:stream-client:$rabbitmqStreamVersion"
optionalApi "com.rabbitmq:http-client:$rabbitmqHttpClientVersion"

testApi project(':spring-rabbit-junit')
testRuntimeOnly 'com.fasterxml.jackson.core:jackson-core'
testRuntimeOnly 'com.fasterxml.jackson.core:jackson-databind'
testRuntimeOnly 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml'
testRuntimeOnly 'com.fasterxml.jackson.module:jackson-module-kotlin'
testRuntimeOnly "org.apache.httpcomponents:httpclient:$commonsHttpClientVersion"
testRuntimeOnly "org.apache.httpcomponents.client5:httpclient5:$commonsHttpClientVersion"
testRuntimeOnly "org.apache.commons:commons-compress:$commonsCompressVersion"
testRuntimeOnly "org.xerial.snappy:snappy-java:$snappyVersion"
testRuntimeOnly "org.lz4:lz4-java:$lz4Version"
Expand All @@ -494,16 +493,15 @@ project('spring-rabbit-junit') {
exclude group: 'org.hamcrest', module: 'hamcrest-core'
}
api "com.rabbitmq:amqp-client:$rabbitmqVersion"
api ("com.rabbitmq:http-client:$rabbitmqHttpClientVersion") {
exclude group: 'org.springframework', module: 'spring-web'
}
api 'org.springframework:spring-web'
api 'org.springframework:spring-webflux'
api 'org.junit.jupiter:junit-jupiter-api'
api "org.assertj:assertj-core:$assertjVersion"
optionalApi "ch.qos.logback:logback-classic:$logbackVersion"
optionalApi 'org.apache.logging.log4j:log4j-core'
compileOnly 'org.apiguardian:apiguardian-api:1.0.0'

testRuntimeOnly 'com.fasterxml.jackson.core:jackson-core'
testRuntimeOnly 'com.fasterxml.jackson.core:jackson-databind'
}

}
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,8 +17,11 @@
package org.springframework.amqp.rabbit.junit;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand All @@ -30,13 +33,17 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.MediaType;
import org.springframework.util.Base64Utils;
import org.springframework.util.StringUtils;
import org.springframework.web.reactive.function.client.ExchangeFilterFunctions;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.util.UriUtils;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.http.client.Client;

/**
* A class that can be used to prevent integration tests from failing if the Rabbit broker application is
Expand Down Expand Up @@ -372,15 +379,33 @@ private Channel createQueues(Connection connection) throws IOException, URISynta
}
}
if (this.management) {
Client client = new Client(getAdminUri(), this.adminUser, this.adminPassword);
if (!client.alivenessTest("/")) {
if (!alivenessTest()) {
throw new BrokerNotAliveException("Aliveness test failed for localhost:15672 guest/quest; "
+ "management not available");
}
}
return channel;
}

private boolean alivenessTest() throws URISyntaxException {
WebClient client = WebClient.builder()
.filter(ExchangeFilterFunctions.basicAuthentication(this.adminUser, this.adminPassword))
.build();
URI uri = new URI(getAdminUri())
.resolve("/api/aliveness-test/" + UriUtils.encodePathSegment("/", StandardCharsets.UTF_8));
HashMap<String, String> result = client.get()
.uri(uri)
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToMono(new ParameterizedTypeReference<HashMap<String, String>>() {
})
.block(Duration.ofSeconds(10)); // NOSONAR magic#
if (result != null) {
return result.get("status").equals("ok");
}
return false;
}

public static boolean fatal() {
String serversRequired = System.getenv(BROKER_REQUIRED);
if (Boolean.parseBoolean(serversRequired)) {
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2019 the original author or authors.
* Copyright 2017-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -30,7 +30,7 @@
* @since 2.0.2
*
*/
@RabbitAvailable(queues = "rabbitAvailableTests.queue")
@RabbitAvailable(queues = "rabbitAvailableTests.queue", management = true)
public class RabbitAvailableTests {

@Test
Expand Down
Expand Up @@ -18,14 +18,18 @@

import static org.assertj.core.api.Assertions.assertThat;

import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import org.springframework.amqp.core.Queue;
Expand All @@ -42,6 +46,8 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.MediaType;
import org.springframework.rabbit.stream.config.StreamRabbitListenerContainerFactory;
import org.springframework.rabbit.stream.producer.RabbitStreamTemplate;
import org.springframework.rabbit.stream.retry.StreamRetryOperationsInterceptorFactoryBean;
Expand All @@ -50,9 +56,10 @@
import org.springframework.retry.interceptor.RetryOperationsInterceptor;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
import org.springframework.web.reactive.function.client.ExchangeFilterFunctions;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.util.UriUtils;

import com.rabbitmq.http.client.Client;
import com.rabbitmq.http.client.domain.QueueInfo;
import com.rabbitmq.stream.Address;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.Message;
Expand Down Expand Up @@ -99,12 +106,38 @@ void nativeMsg(@Autowired RabbitTemplate template) throws InterruptedException {
assertThat(this.config.latch4.await(10, TimeUnit.SECONDS)).isTrue();
}

@SuppressWarnings("unchecked")
@Test
@Disabled("Temporary until SF uses Micrometer snaps")
void queueOverAmqp() throws Exception {
Client client = new Client("http://guest:guest@localhost:" + managementPort() + "/api");
QueueInfo queue = client.getQueue("/", "stream.created.over.amqp");
assertThat(queue.getArguments().get("x-queue-type")).isEqualTo("stream");
WebClient client = WebClient.builder()
.filter(ExchangeFilterFunctions.basicAuthentication("guest", "guest"))
.build();
Map<String, Object> queue = queueInfo("stream.created.over.amqp");
assertThat(((Map<String, Object>) queue.get("arguments")).get("x-queue-type")).isEqualTo("stream");
}

private Map<String, Object> queueInfo(String queueName) throws URISyntaxException {
WebClient client = createClient("guest", "guest");
URI uri = queueUri(queueName);
return client.get()
.uri(uri)
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToMono(new ParameterizedTypeReference<Map<String, Object>>() {
})
.block(Duration.ofSeconds(10));
}

private URI queueUri(String queue) throws URISyntaxException {
URI uri = new URI("http://localhost:" + managementPort() + "/api")
.resolve("/api/queues/" + UriUtils.encodePathSegment("/", StandardCharsets.UTF_8) + "/" + queue);
return uri;
}

private WebClient createClient(String adminUser, String adminPassword) {
return WebClient.builder()
.filter(ExchangeFilterFunctions.basicAuthentication(adminUser, adminPassword))
.build();
}

@Configuration(proxyBeanMethods = false)
Expand Down
@@ -0,0 +1,63 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.amqp.rabbit.connection;

import java.net.URI;
import java.time.Duration;
import java.util.HashMap;

import org.springframework.amqp.rabbit.connection.LocalizedQueueConnectionFactory.NodeLocator;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.client.ExchangeFilterFunctions;
import org.springframework.web.reactive.function.client.WebClient;

/**
* Default {@link NodeLocator} using the Spring WebFlux {@link WebClient}.
*
* @author Gary Russell
* @since 2.4.8
*
*/
public class DefaultNodeLocator implements NodeLocator {

@Override
public HashMap<String, Object> restCall(String username, String password, URI uri) {
WebClient client = createClient(username, password);
HashMap<String, Object> queueInfo = client.get()
.uri(uri)
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToMono(new ParameterizedTypeReference<HashMap<String, Object>>() {
})
.block(Duration.ofSeconds(10)); // NOSONAR magic#
return queueInfo;
}

/**
* Create a client instance.
* @param username the username
* @param password the password.
* @return The client.
*/
protected WebClient createClient(String username, String password) {
return WebClient.builder()
.filter(ExchangeFilterFunctions.basicAuthentication(username, password))
.build();
}

}

0 comments on commit 29dc74f

Please sign in to comment.