Skip to content

Commit

Permalink
spring-projectsGH-1419: Remove RabbitMQ http-client Usage
Browse files Browse the repository at this point in the history
Resolves spring-projects#1419

Use Spring WebFlux instead, while allowing the user to choose some other technology
in the `LocalizedQueueConnectionFactory`.

* Rename DefaultNodeLocator; add generics.
* Remove unnecessary dependencies.

spring-projectsGH-1419: Add RestTemplateNodeLocator

- also remove hard dependency on `spring-webflux` from `spring-rabbit-junit`.

Fix Javadoc.

Use RestTemplate for aliveness test; JVM HttpClient not available in Java 8.

Restore spring-rabbit-junit jackson dependency.
  • Loading branch information
garyrussell committed Oct 11, 2022
1 parent 7ea8fc7 commit a927143
Show file tree
Hide file tree
Showing 20 changed files with 680 additions and 381 deletions.
18 changes: 11 additions & 7 deletions build.gradle
Expand Up @@ -60,7 +60,7 @@ ext {
rabbitmqStreamVersion = '0.4.0'
rabbitmqVersion = project.hasProperty('rabbitmqVersion') ? project.rabbitmqVersion : '5.13.1'
rabbitmqHttpClientVersion = '3.12.1'
reactorVersion = '2020.0.20'
reactorVersion = '2020.0.24'
snappyVersion = '1.1.8.4'
springDataCommonsVersion = '2.6.7'
springVersion = project.hasProperty('springVersion') ? project.springVersion : '5.3.23'
Expand Down Expand Up @@ -366,7 +366,10 @@ project('spring-rabbit') {
api 'org.springframework:spring-context'
api 'org.springframework:spring-messaging'
api 'org.springframework:spring-tx'
optionalApi 'org.springframework:spring-webflux'
optionalApi "org.apache.httpcomponents:httpclient:$commonsHttpClientVersion"
optionalApi 'io.projectreactor:reactor-core'
optionalApi 'io.projectreactor.netty:reactor-netty-http'
optionalApi "ch.qos.logback:logback-classic:$logbackVersion"
optionalApi 'org.apache.logging.log4j:log4j-core'
optionalApi "io.micrometer:micrometer-core:$micrometerVersion"
Expand All @@ -380,10 +383,10 @@ project('spring-rabbit') {
testApi project(':spring-rabbit-junit')
testImplementation("com.willowtreeapps.assertk:assertk-jvm:$assertkVersion")
testImplementation "org.hibernate.validator:hibernate-validator:$hibernateValidationVersion"
testRuntimeOnly 'org.springframework:spring-web'
testRuntimeOnly 'org.springframework:spring-webflux'
testRuntimeOnly "org.apache.httpcomponents:httpclient:$commonsHttpClientVersion"
testRuntimeOnly 'com.fasterxml.jackson.core:jackson-core'
testRuntimeOnly 'com.fasterxml.jackson.core:jackson-databind'
testImplementation 'com.fasterxml.jackson.core:jackson-databind'
testRuntimeOnly 'com.fasterxml.jackson.dataformat:jackson-dataformat-xml'
testRuntimeOnly 'com.fasterxml.jackson.module:jackson-module-kotlin'
testRuntimeOnly ("junit:junit:$junit4Version") {
Expand All @@ -407,7 +410,6 @@ project('spring-rabbit-stream') {

api project(':spring-rabbit')
api "com.rabbitmq:stream-client:$rabbitmqStreamVersion"
optionalApi "com.rabbitmq:http-client:$rabbitmqHttpClientVersion"

testApi project(':spring-rabbit-junit')
testRuntimeOnly 'com.fasterxml.jackson.core:jackson-core'
Expand All @@ -419,8 +421,11 @@ project('spring-rabbit-stream') {
testRuntimeOnly "org.xerial.snappy:snappy-java:$snappyVersion"
testRuntimeOnly "org.lz4:lz4-java:$lz4Version"
testRuntimeOnly "com.github.luben:zstd-jni:$zstdJniVersion"
testRuntimeOnly 'io.projectreactor.netty:reactor-netty-http'
testImplementation "org.testcontainers:rabbitmq:1.15.3"
testImplementation "org.apache.logging.log4j:log4j-slf4j-impl:$log4jVersion"
testImplementation 'org.springframework:spring-webflux'
testImplementation 'io.projectreactor.netty:reactor-netty-http'
}

}
Expand All @@ -436,12 +441,11 @@ project('spring-rabbit-junit') {
exclude group: 'org.hamcrest', module: 'hamcrest-core'
}
api "com.rabbitmq:amqp-client:$rabbitmqVersion"
api ("com.rabbitmq:http-client:$rabbitmqHttpClientVersion") {
exclude group: 'org.springframework', module: 'spring-web'
}
api 'org.springframework:spring-web'
api 'org.junit.jupiter:junit-jupiter-api'
api "org.assertj:assertj-core:$assertjVersion"
api "org.apache.httpcomponents:httpclient:$commonsHttpClientVersion"
api 'com.fasterxml.jackson.core:jackson-databind'
optionalApi "ch.qos.logback:logback-classic:$logbackVersion"
optionalApi 'org.apache.logging.log4j:log4j-core'
compileOnly 'org.apiguardian:apiguardian-api:1.0.0'
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -27,7 +27,6 @@
import org.springframework.amqp.core.MessageProperties;
import org.springframework.beans.DirectFieldAccessor;
import org.springframework.core.ConfigurableObjectInputStream;
import org.springframework.core.NestedIOException;
import org.springframework.core.serializer.DefaultDeserializer;
import org.springframework.core.serializer.DefaultSerializer;
import org.springframework.core.serializer.Deserializer;
Expand Down Expand Up @@ -182,7 +181,7 @@ protected Class<?> resolveClass(ObjectStreamClass classDesc)
return objectInputStream.readObject();
}
catch (ClassNotFoundException ex) {
throw new NestedIOException("Failed to deserialize object type", ex);
throw new IOException("Failed to deserialize object type", ex);
}
}

Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2006-2019 the original author or authors.
* Copyright 2006-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -26,7 +26,6 @@
import java.util.Set;

import org.springframework.core.ConfigurableObjectInputStream;
import org.springframework.core.NestedIOException;
import org.springframework.util.ObjectUtils;
import org.springframework.util.PatternMatchUtils;

Expand Down Expand Up @@ -126,7 +125,7 @@ protected Class<?> resolveClass(ObjectStreamClass classDesc)
return objectInputStream.readObject();
}
catch (ClassNotFoundException ex) {
throw new NestedIOException("Failed to deserialize object type", ex);
throw new IOException("Failed to deserialize object type", ex);
}
}

Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2019 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -23,6 +23,7 @@

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
Expand All @@ -33,7 +34,6 @@
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.utils.test.TestUtils;
import org.springframework.core.NestedIOException;
import org.springframework.core.serializer.DefaultDeserializer;
import org.springframework.core.serializer.Deserializer;

Expand Down Expand Up @@ -178,7 +178,7 @@ public void messageConversionExceptionForClassNotFound() throws Exception {
body[10] = 'z';
assertThatThrownBy(() -> converter.fromMessage(message))
.isExactlyInstanceOf(MessageConversionException.class)
.hasCauseExactlyInstanceOf(NestedIOException.class);
.hasCauseExactlyInstanceOf(IOException.class);
}

}
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2020 the original author or authors.
* Copyright 2002-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -17,8 +17,10 @@
package org.springframework.amqp.rabbit.junit;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand All @@ -29,14 +31,28 @@

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.apache.http.HttpHost;
import org.apache.http.client.AuthCache;
import org.apache.http.client.protocol.HttpClientContext;
import org.apache.http.impl.auth.BasicScheme;
import org.apache.http.impl.client.BasicAuthCache;
import org.apache.http.protocol.BasicHttpContext;
import org.apache.http.protocol.HttpContext;

import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
import org.springframework.http.client.support.BasicAuthenticationInterceptor;
import org.springframework.lang.Nullable;
import org.springframework.util.Base64Utils;
import org.springframework.util.StringUtils;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.util.UriUtils;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.http.client.Client;

/**
* A class that can be used to prevent integration tests from failing if the Rabbit broker application is
Expand Down Expand Up @@ -372,15 +388,39 @@ private Channel createQueues(Connection connection) throws IOException, URISynta
}
}
if (this.management) {
Client client = new Client(getAdminUri(), this.adminUser, this.adminPassword);
if (!client.alivenessTest("/")) {
if (!alivenessTest()) {
throw new BrokerNotAliveException("Aliveness test failed for localhost:15672 guest/quest; "
+ "management not available");
}
}
return channel;
}

private boolean alivenessTest() throws URISyntaxException {
URI uri = new URI(getAdminUri())
.resolve("/api/aliveness-test/" + UriUtils.encodePathSegment("/", StandardCharsets.UTF_8));
HttpHost host = new HttpHost(uri.getHost(), uri.getPort());
RestTemplate template = new RestTemplate(new HttpComponentsClientHttpRequestFactory() {

@Override
@Nullable
protected HttpContext createHttpContext(HttpMethod httpMethod, URI uri) {
AuthCache cache = new BasicAuthCache();
BasicScheme scheme = new BasicScheme();
cache.put(host, scheme);
BasicHttpContext context = new BasicHttpContext();
context.setAttribute(HttpClientContext.AUTH_CACHE, cache);
return context;
}

});
template.getInterceptors().add(new BasicAuthenticationInterceptor(this.adminUser, this.adminPassword));
ResponseEntity<String> response = template.exchange(uri, HttpMethod.GET, null, String.class);
return response.getStatusCode().equals(HttpStatus.OK)
? response.getBody().equals("{\"status\":\"ok\"}")
: false;
}

public static boolean fatal() {
String serversRequired = System.getenv(BROKER_REQUIRED);
if (Boolean.parseBoolean(serversRequired)) {
Expand Down
@@ -1,5 +1,5 @@
/*
* Copyright 2017-2019 the original author or authors.
* Copyright 2017-2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -30,7 +30,7 @@
* @since 2.0.2
*
*/
@RabbitAvailable(queues = "rabbitAvailableTests.queue")
@RabbitAvailable(queues = "rabbitAvailableTests.queue", management = true)
public class RabbitAvailableTests {

@Test
Expand Down
Expand Up @@ -18,8 +18,13 @@

import static org.assertj.core.api.Assertions.assertThat;

import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
Expand All @@ -41,16 +46,19 @@
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.DependsOn;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.MediaType;
import org.springframework.rabbit.stream.config.StreamRabbitListenerContainerFactory;
import org.springframework.rabbit.stream.producer.RabbitStreamTemplate;
import org.springframework.rabbit.stream.retry.StreamRetryOperationsInterceptorFactoryBean;
import org.springframework.rabbit.stream.support.StreamMessageProperties;
import org.springframework.retry.interceptor.RetryOperationsInterceptor;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
import org.springframework.web.reactive.function.client.ExchangeFilterFunctions;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.util.UriUtils;

import com.rabbitmq.http.client.Client;
import com.rabbitmq.http.client.domain.QueueInfo;
import com.rabbitmq.stream.Address;
import com.rabbitmq.stream.Environment;
import com.rabbitmq.stream.Message;
Expand Down Expand Up @@ -97,11 +105,38 @@ void nativeMsg(@Autowired RabbitTemplate template) throws InterruptedException {
assertThat(this.config.latch4.await(10, TimeUnit.SECONDS)).isTrue();
}

@SuppressWarnings("unchecked")
@Test
void queueOverAmqp() throws Exception {
Client client = new Client("http://guest:guest@localhost:" + managementPort() + "/api");
QueueInfo queue = client.getQueue("/", "stream.created.over.amqp");
assertThat(queue.getArguments().get("x-queue-type")).isEqualTo("stream");
WebClient client = WebClient.builder()
.filter(ExchangeFilterFunctions.basicAuthentication("guest", "guest"))
.build();
Map<String, Object> queue = queueInfo("stream.created.over.amqp");
assertThat(((Map<String, Object>) queue.get("arguments")).get("x-queue-type")).isEqualTo("stream");
}

private Map<String, Object> queueInfo(String queueName) throws URISyntaxException {
WebClient client = createClient("guest", "guest");
URI uri = queueUri(queueName);
return client.get()
.uri(uri)
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToMono(new ParameterizedTypeReference<Map<String, Object>>() {
})
.block(Duration.ofSeconds(10));
}

private URI queueUri(String queue) throws URISyntaxException {
URI uri = new URI("http://localhost:" + managementPort() + "/api")
.resolve("/api/queues/" + UriUtils.encodePathSegment("/", StandardCharsets.UTF_8) + "/" + queue);
return uri;
}

private WebClient createClient(String adminUser, String adminPassword) {
return WebClient.builder()
.filter(ExchangeFilterFunctions.basicAuthentication(adminUser, adminPassword))
.build();
}

@Configuration(proxyBeanMethods = false)
Expand Down

0 comments on commit a927143

Please sign in to comment.