Skip to content

Commit

Permalink
spring-projectsGH-1419: Add RestTemplateNodeLocator
Browse files Browse the repository at this point in the history
- also remove hard dependency on `spring-webflux` from `spring-rabbit-junit`.
  • Loading branch information
garyrussell committed Oct 11, 2022
1 parent c143c5b commit d8171b4
Show file tree
Hide file tree
Showing 8 changed files with 204 additions and 25 deletions.
7 changes: 4 additions & 3 deletions build.gradle
Expand Up @@ -44,6 +44,7 @@ ext {
assertkVersion = '0.24'
awaitilityVersion = '4.2.0'
commonsCompressVersion = '1.20'
commonsHttpClientVersion = '5.1.3'
commonsPoolVersion = '2.11.1'
googleJsr305Version = '3.0.2'
hamcrestVersion = '2.2'
Expand Down Expand Up @@ -387,6 +388,7 @@ project('spring-rabbit') {
api 'org.springframework:spring-messaging'
api 'org.springframework:spring-tx'
optionalApi 'org.springframework:spring-webflux'
optionalApi "org.apache.httpcomponents.client5:httpclient5:$commonsHttpClientVersion"
optionalApi 'io.projectreactor:reactor-core'
optionalApi "ch.qos.logback:logback-classic:$logbackVersion"
optionalApi 'org.apache.logging.log4j:log4j-core'
Expand Down Expand Up @@ -473,6 +475,7 @@ project('spring-rabbit-stream') {
testRuntimeOnly "com.github.luben:zstd-jni:$zstdJniVersion"
testImplementation "org.testcontainers:rabbitmq:1.17.3"
testImplementation "org.apache.logging.log4j:log4j-slf4j-impl:$log4jVersion"
testImplementation 'org.springframework:spring-webflux'
}

}
Expand All @@ -488,14 +491,12 @@ project('spring-rabbit-junit') {
exclude group: 'org.hamcrest', module: 'hamcrest-core'
}
api "com.rabbitmq:amqp-client:$rabbitmqVersion"
api 'org.springframework:spring-webflux'
api 'org.springframework:spring-web'
api 'org.junit.jupiter:junit-jupiter-api'
api "org.assertj:assertj-core:$assertjVersion"
optionalApi "ch.qos.logback:logback-classic:$logbackVersion"
optionalApi 'org.apache.logging.log4j:log4j-core'
compileOnly 'org.apiguardian:apiguardian-api:1.0.0'
testRuntimeOnly 'com.fasterxml.jackson.core:jackson-core'
testRuntimeOnly 'com.fasterxml.jackson.core:jackson-databind'
}

}
Expand Down
Expand Up @@ -17,11 +17,16 @@
package org.springframework.amqp.rabbit.junit;

import java.io.IOException;
import java.net.Authenticator;
import java.net.PasswordAuthentication;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
Expand All @@ -33,12 +38,8 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.MediaType;
import org.springframework.util.Base64Utils;
import org.springframework.util.StringUtils;
import org.springframework.web.reactive.function.client.ExchangeFilterFunctions;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.util.UriUtils;

import com.rabbitmq.client.Channel;
Expand Down Expand Up @@ -388,22 +389,35 @@ private Channel createQueues(Connection connection) throws IOException, URISynta
}

private boolean alivenessTest() throws URISyntaxException {
WebClient client = WebClient.builder()
.filter(ExchangeFilterFunctions.basicAuthentication(this.adminUser, this.adminPassword))
HttpClient client = HttpClient.newBuilder()
.authenticator(new Authenticator() {

@Override
protected PasswordAuthentication getPasswordAuthentication() {
return new PasswordAuthentication(getAdminUser(), getAdminPassword().toCharArray());
}

})
.build();
URI uri = new URI(getAdminUri())
.resolve("/api/aliveness-test/" + UriUtils.encodePathSegment("/", StandardCharsets.UTF_8));
HashMap<String, String> result = client.get()
HttpRequest request = HttpRequest.newBuilder()
.GET()
.uri(uri)
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToMono(new ParameterizedTypeReference<HashMap<String, String>>() {
})
.block(Duration.ofSeconds(10)); // NOSONAR magic#
if (result != null) {
return result.get("status").equals("ok");
.build();
HttpResponse<String> response;
try {
response = client.send(request, BodyHandlers.ofString());
}
catch (IOException ex) {
LOGGER.error("Exception checking admin aliveness", ex);
return false;
}
catch (InterruptedException ex) {
Thread.currentThread().interrupt();
return false;
}
return false;
return response.body().contentEquals("{\"status\":\"ok\"}");
}

public static boolean fatal() {
Expand Down
Expand Up @@ -35,6 +35,7 @@
import org.springframework.core.log.LogAccessor;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;

/**
* A {@link RoutingConnectionFactory} that determines the node on which a queue is located and
Expand All @@ -54,6 +55,13 @@
*/
public class LocalizedQueueConnectionFactory implements ConnectionFactory, RoutingConnectionFactory, DisposableBean {

private static final boolean USING_WEBFLUX;

static {
USING_WEBFLUX = ClassUtils.isPresent("org.springframework.web.reactive.function.client.WebClient",
LocalizedQueueConnectionFactory.class.getClassLoader());
}

private final Log logger = LogFactory.getLog(getClass());

private final Map<String, ConnectionFactory> nodeFactories = new HashMap<String, ConnectionFactory>();
Expand Down Expand Up @@ -82,7 +90,7 @@ public class LocalizedQueueConnectionFactory implements ConnectionFactory, Routi

private final String trustStorePassPhrase;

private NodeLocator<?> nodeLocator = new WebFluxNodeLocator();
private NodeLocator<?> nodeLocator;

/**
* @param defaultConnectionFactory the fallback connection factory to use if the queue
Expand Down Expand Up @@ -190,6 +198,12 @@ private LocalizedQueueConnectionFactory(ConnectionFactory defaultConnectionFacto
this.trustStore = trustStore;
this.keyStorePassPhrase = keyStorePassPhrase;
this.trustStorePassPhrase = trustStorePassPhrase;
if (USING_WEBFLUX) {
this.nodeLocator = new WebFluxNodeLocator();
}
else {
this.nodeLocator = new RestTemplateNodeLocator();
}
}

private static Map<String, String> nodesAddressesToMap(String[] nodes, String[] addresses) {
Expand All @@ -206,6 +220,7 @@ private static Map<String, String> nodesAddressesToMap(String[] nodes, String[]
* @since 2.4.8
*/
public void setNodeLocator(NodeLocator<?> nodeLocator) {
Assert.notNull(nodeLocator, "'nodeLocator' cannot be null");
this.nodeLocator = nodeLocator;
}

Expand Down Expand Up @@ -378,7 +393,7 @@ default ConnectionFactory locate(String[] adminUris, Map<String, String> nodeToA
try {
String uri = new URI(adminUri)
.resolve("/api/queues/").toString();
HashMap<String, Object> queueInfo = restCall(client, uri, vhost, queue);
Map<String, Object> queueInfo = restCall(client, uri, vhost, queue);
if (queueInfo != null) {
String node = (String) queueInfo.get("node");
if (node != null) {
Expand Down Expand Up @@ -429,7 +444,8 @@ default void close(T client) {
* @return the map of queue properties.
* @throws URISyntaxException if the syntax is bad.
*/
HashMap<String, Object> restCall(T client, String baseUri, String vhost, String queue)
@Nullable
Map<String, Object> restCall(T client, String baseUri, String vhost, String queue)
throws URISyntaxException;

}
Expand Down
@@ -0,0 +1,41 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.amqp.rabbit.connection;

import org.springframework.web.client.RestTemplate;

/**
* Holder for a {@link RestTemplate} and credentials.
*
* @author Gary Russell
* @since 2.4.8
*
*/
class RestTemplateHolder {

final String userName; // NOSONAR

final String password; // NOSONAR

RestTemplate template; // NOSONAR

RestTemplateHolder(String userName, String password) {
this.userName = userName;
this.password = password;
}

}
@@ -0,0 +1,97 @@
/*
* Copyright 2022 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.springframework.amqp.rabbit.connection;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.Map;

import org.apache.hc.client5.http.auth.AuthCache;
import org.apache.hc.client5.http.impl.auth.BasicAuthCache;
import org.apache.hc.client5.http.impl.auth.BasicScheme;
import org.apache.hc.client5.http.protocol.HttpClientContext;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.http.protocol.BasicHttpContext;
import org.apache.hc.core5.http.protocol.HttpContext;

import org.springframework.amqp.rabbit.connection.LocalizedQueueConnectionFactory.NodeLocator;
import org.springframework.http.HttpMethod;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.http.client.HttpComponentsClientHttpRequestFactory;
import org.springframework.http.client.support.BasicAuthenticationInterceptor;
import org.springframework.lang.Nullable;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.util.UriUtils;

/**
* A {@link NodeLocator} using the {@link RestTemplate}.
*
* @author Gary Russell
* @since 3.0
*
*/
public class RestTemplateNodeLocator implements NodeLocator<RestTemplateHolder> {

@Override
public RestTemplateHolder createClient(String userName, String password) {
return new RestTemplateHolder(userName, password);
}

@SuppressWarnings({ "unchecked", "rawtypes" })
@Override
@Nullable
public Map<String, Object> restCall(RestTemplateHolder client, String baseUri, String vhost, String queue)
throws URISyntaxException {

if (client.template == null) {
URI uri = new URI(baseUri);
HttpHost host = new HttpHost(uri.getHost(), uri.getPort());
client.template = new RestTemplate(new HttpComponentsClientHttpRequestFactory() {

@Override
@Nullable
protected HttpContext createHttpContext(HttpMethod httpMethod, URI uri) {
AuthCache cache = new BasicAuthCache();
BasicScheme scheme = new BasicScheme();
cache.put(host, scheme);
BasicHttpContext context = new BasicHttpContext();
context.setAttribute(HttpClientContext.AUTH_CACHE, cache);
return context;
}

});
client.template.getInterceptors().add(new BasicAuthenticationInterceptor(client.userName, client.password));
}
URI uri = new URI(baseUri)
.resolve("/api/queues/" + UriUtils.encodePathSegment(vhost, StandardCharsets.UTF_8) + "/" + queue);
ResponseEntity<Map> response = client.template.exchange(uri, HttpMethod.GET, null, Map.class);
return response.getStatusCode().equals(HttpStatus.OK) ? response.getBody() : null;
}

@Override
public void close(RestTemplateHolder client) {
try {
client.template.close();
}
catch (IOException e) {
}
}

}
Expand Up @@ -21,10 +21,12 @@
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;

import org.springframework.amqp.rabbit.connection.LocalizedQueueConnectionFactory.NodeLocator;
import org.springframework.core.ParameterizedTypeReference;
import org.springframework.http.MediaType;
import org.springframework.lang.Nullable;
import org.springframework.web.reactive.function.client.ExchangeFilterFunctions;
import org.springframework.web.reactive.function.client.WebClient;
import org.springframework.web.util.UriUtils;
Expand All @@ -39,7 +41,8 @@
public class WebFluxNodeLocator implements NodeLocator<WebClient> {

@Override
public HashMap<String, Object> restCall(WebClient client, String baseUri, String vhost, String queue)
@Nullable
public Map<String, Object> restCall(WebClient client, String baseUri, String vhost, String queue)
throws URISyntaxException {

URI uri = new URI(baseUri)
Expand Down
Expand Up @@ -83,6 +83,9 @@ void findLocal() {
ConnectionFactory cf = lqcf.getTargetConnectionFactory("[local]");
RabbitAdmin admin = new RabbitAdmin(cf);
assertThat(admin.getQueueProperties("local")).isNotNull();
lqcf.setNodeLocator(new RestTemplateNodeLocator());
ConnectionFactory cf2 = lqcf.getTargetConnectionFactory("[local]");
assertThat(cf2).isSameAs(cf);
lqcf.destroy();
}

Expand Down
8 changes: 6 additions & 2 deletions src/reference/asciidoc/amqp.adoc
Expand Up @@ -796,7 +796,9 @@ Notice that the first three parameters are arrays of `addresses`, `adminUris`, a
These are positional in that, when a container attempts to connect to a queue, it uses the admin API to determine which node is the lead for the queue and connects to the address in the same array position as that node.

IMPORTANT: Starting with version 3.0, the RabbitMQ `http-client` is no longer used to access the Rest API.
Instead, by default, the `WebClient` from Spring Webflux is used; which is not added to the class path by default.
Instead, by default, the `WebClient` from Spring Webflux is used if `spring-webflux` is on the class path; otherwise a `RestTemplate` is used.

To add `WebFlux` to the class path:

.Maven
====
Expand All @@ -821,7 +823,7 @@ You can also use other REST technology by implementing `LocalizedQueueConnection
====
[source, java]
----
lqcf.setNodeLocator(new DefaultNodeLocator<MyClient>() {
lqcf.setNodeLocator(new NodeLocator<MyClient>() {
@Override
public MyClient createClient(String userName, String password) {
Expand All @@ -837,6 +839,8 @@ lqcf.setNodeLocator(new DefaultNodeLocator<MyClient>() {
----
====

The framework provides the `WebFluxNodeLocator` and `RestTemplateNodeLocator`, with the default as discussed above.

[[cf-pub-conf-ret]]
===== Publisher Confirms and Returns

Expand Down

0 comments on commit d8171b4

Please sign in to comment.