Skip to content

Commit

Permalink
Switch to Reactor 2020.0.0 snapshots
Browse files Browse the repository at this point in the history
A switch to RSocket 1.0.1 snapshots is also required to pick up a
for froward compatibility with Reactor Netty 1.0.

See gh-25085
  • Loading branch information
rstoyanchev committed May 29, 2020
1 parent 42ff01b commit 6d6269f
Show file tree
Hide file tree
Showing 13 changed files with 84 additions and 45 deletions.
6 changes: 4 additions & 2 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ configure(allprojects) { project ->
imports {
mavenBom "com.fasterxml.jackson:jackson-bom:2.11.0"
mavenBom "io.netty:netty-bom:4.1.50.Final"
mavenBom "io.projectreactor:reactor-bom:Dysprosium-SR7"
mavenBom "io.rsocket:rsocket-bom:1.0.0"
mavenBom "io.projectreactor:reactor-bom:2020.0.0-SNAPSHOT"
mavenBom "io.rsocket:rsocket-bom:1.0.1-SNAPSHOT"
mavenBom "org.eclipse.jetty:jetty-bom:9.4.29.v20200521"
mavenBom "org.jetbrains.kotlin:kotlin-bom:1.3.72"
mavenBom "org.jetbrains.kotlinx:kotlinx-coroutines-bom:1.3.5"
Expand Down Expand Up @@ -281,6 +281,8 @@ configure(allprojects) { project ->
repositories {
mavenCentral()
maven { url "https://repo.spring.io/libs-spring-framework-build" }
maven { url "https://repo.spring.io/snapshot" } // Reactor
maven { url "https://oss.jfrog.org/artifactory/oss-snapshot-local" } // RSocket
}
}
configurations.all {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
Expand All @@ -34,7 +33,6 @@
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import reactor.core.publisher.DirectProcessor;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.core.scheduler.Scheduler;
Expand All @@ -46,6 +44,7 @@
import reactor.netty.resources.ConnectionProvider;
import reactor.netty.resources.LoopResources;
import reactor.netty.tcp.TcpClient;
import reactor.util.retry.Retry;

import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
Expand Down Expand Up @@ -103,14 +102,13 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
* @param codec for encoding and decoding the input/output byte streams
* @see org.springframework.messaging.simp.stomp.StompReactorNettyCodec
*/
@SuppressWarnings("deprecation")
public ReactorNettyTcpClient(String host, int port, ReactorNettyCodec<P> codec) {
Assert.notNull(host, "host is required");
Assert.notNull(codec, "ReactorNettyCodec is required");

this.channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
this.loopResources = LoopResources.create("tcp-client-loop");
this.poolResources = ConnectionProvider.fixed("tcp-client-pool", 10000);
this.poolResources = ConnectionProvider.create("tcp-client-pool", 10000);
this.codec = codec;

this.tcpClient = TcpClient.create(this.poolResources)
Expand All @@ -129,13 +127,12 @@ public ReactorNettyTcpClient(String host, int port, ReactorNettyCodec<P> codec)
* @since 5.1.3
* @see org.springframework.messaging.simp.stomp.StompReactorNettyCodec
*/
@SuppressWarnings("deprecation")
public ReactorNettyTcpClient(Function<TcpClient, TcpClient> clientConfigurer, ReactorNettyCodec<P> codec) {
Assert.notNull(codec, "ReactorNettyCodec is required");

this.channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
this.loopResources = LoopResources.create("tcp-client-loop");
this.poolResources = ConnectionProvider.fixed("tcp-client-pool", 10000);
this.poolResources = ConnectionProvider.create("tcp-client-pool", 10000);
this.codec = codec;

this.tcpClient = clientConfigurer.apply(TcpClient
Expand Down Expand Up @@ -199,7 +196,6 @@ public ListenableFuture<Void> connect(final TcpConnectionHandler<P> handler) {
}

@Override
@SuppressWarnings("deprecation")
public ListenableFuture<Void> connect(TcpConnectionHandler<P> handler, ReconnectStrategy strategy) {
Assert.notNull(handler, "TcpConnectionHandler is required");
Assert.notNull(strategy, "ReconnectStrategy is required");
Expand All @@ -218,8 +214,12 @@ public ListenableFuture<Void> connect(TcpConnectionHandler<P> handler, Reconnect
.doOnError(updateConnectMono(connectMono))
.doOnError(handler::afterConnectFailure) // report all connect failures to the handler
.flatMap(Connection::onDispose) // post-connect issues
.retryWhen(reconnectFunction(strategy))
.repeatWhen(reconnectFunction(strategy))
.retryWhen(Retry.from(signals -> signals
.map(retrySignal -> (int) retrySignal.totalRetriesInARow())
.flatMap(attempt -> reconnect(attempt, strategy))))
.repeatWhen(flux -> flux
.scan(1, (count, element) -> count++)
.flatMap(attempt -> reconnect(attempt, strategy)))
.subscribe();

return new MonoToListenableFutureAdapter<>(connectMono);
Expand All @@ -244,12 +244,9 @@ private <T> Consumer<T> updateConnectMono(MonoProcessor<Void> connectMono) {
};
}

private <T> Function<Flux<T>, Publisher<?>> reconnectFunction(ReconnectStrategy reconnectStrategy) {
return flux -> flux
.scan(1, (count, element) -> count++)
.flatMap(attempt -> Optional.ofNullable(reconnectStrategy.getTimeToNextAttempt(attempt))
.map(time -> Mono.delay(Duration.ofMillis(time), this.scheduler))
.orElse(Mono.empty()));
private Publisher<? extends Long> reconnect(Integer attempt, ReconnectStrategy reconnectStrategy) {
Long time = reconnectStrategy.getTimeToNextAttempt(attempt);
return (time != null ? Mono.delay(Duration.ofMillis(time), this.scheduler) : Mono.empty());
}

@Override
Expand Down Expand Up @@ -342,7 +339,7 @@ private static class StompMessageDecoder<P> extends ByteToMessageDecoder {

private final ReactorNettyCodec<P> codec;

public StompMessageDecoder(ReactorNettyCodec<P> codec) {
StompMessageDecoder(ReactorNettyCodec<P> codec) {
this.codec = codec;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -64,13 +64,11 @@ public ListenableFuture<Void> send(Message<P> message) {
}

@Override
@SuppressWarnings("deprecation")
public void onReadInactivity(Runnable runnable, long inactivityDuration) {
this.inbound.withConnection(conn -> conn.onReadIdle(inactivityDuration, runnable));
}

@Override
@SuppressWarnings("deprecation")
public void onWriteInactivity(Runnable runnable, long inactivityDuration) {
this.inbound.withConnection(conn -> conn.onWriteIdle(inactivityDuration, runnable));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.BDDMockito.given;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
Expand All @@ -75,7 +74,7 @@ public class DefaultRSocketRequesterBuilderTests {
@BeforeEach
public void setup() {
this.transport = mock(ClientTransport.class);
given(this.transport.connect(anyInt())).willReturn(Mono.just(this.connection));
given(this.transport.connect()).willReturn(Mono.just(this.connection));
}


Expand Down Expand Up @@ -106,7 +105,7 @@ public void rsocketConnectorConfigurer() {

// RSocketStrategies and RSocketConnector configurers should have been called

verify(this.transport).connect(anyInt());
verify(this.transport).connect();
verify(strategiesConfigurer).accept(any(RSocketStrategies.Builder.class));
verify(factoryConfigurer).configure(any(io.rsocket.RSocketFactory.ClientRSocketFactory.class));
assertThat(this.connectorConfigurer.connector()).isNotNull();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ private static HttpClient initHttpClient(ReactorResourceFactory resourceFactory)
LoopResources resources = resourceFactory.getLoopResources();
Assert.notNull(provider, "No ConnectionProvider: is ReactorResourceFactory not initialized yet?");
Assert.notNull(resources, "No LoopResources: is ReactorResourceFactory not initialized yet?");
return HttpClient.create(provider).tcpConfiguration(tcpClient -> tcpClient.runOn(resources));
return HttpClient.create(provider).runOn(resources);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,7 @@ public class ReactorResourceFactory implements InitializingBean, DisposableBean
@Nullable
private Consumer<HttpResources> globalResourcesConsumer;

@SuppressWarnings("deprecation")
private Supplier<ConnectionProvider> connectionProviderSupplier = () -> ConnectionProvider.fixed("webflux", 500);
private Supplier<ConnectionProvider> connectionProviderSupplier = () -> ConnectionProvider.create("webflux", 500);

@Nullable
private ConnectionProvider connectionProvider;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,7 @@

package org.springframework.web.testfixture.http.server.reactive.bootstrap;

import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicReference;

import reactor.netty.DisposableServer;
Expand All @@ -38,8 +39,7 @@ public class ReactorHttpServer extends AbstractHttpServer {
protected void initServer() {
this.reactorHandler = createHttpHandlerAdapter();
this.reactorServer = reactor.netty.http.server.HttpServer.create()
.tcpConfiguration(server -> server.host(getHost()))
.port(getPort());
.host(getHost()).port(getPort());
}

private ReactorHttpHandlerAdapter createHttpHandlerAdapter() {
Expand All @@ -49,7 +49,7 @@ private ReactorHttpHandlerAdapter createHttpHandlerAdapter() {
@Override
protected void startInternal() {
DisposableServer server = this.reactorServer.handle(this.reactorHandler).bind().block();
setPort(server.address().getPort());
setPort(((InetSocketAddress) server.address()).getPort());
this.serverRef.set(server);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2002-2018 the original author or authors.
* Copyright 2002-2020 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,7 @@

package org.springframework.web.testfixture.http.server.reactive.bootstrap;

import java.net.InetSocketAddress;
import java.util.concurrent.atomic.AtomicReference;

import io.netty.handler.ssl.SslContextBuilder;
Expand Down Expand Up @@ -57,7 +58,7 @@ private ReactorHttpHandlerAdapter createHttpHandlerAdapter() {
@Override
protected void startInternal() {
DisposableServer server = this.reactorServer.handle(this.reactorHandler).bind().block();
setPort(server.address().getPort());
setPort(((InetSocketAddress) server.address()).getPort());
this.serverRef.set(server);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,8 @@ public WebClient.Builder exchangeStrategies(ExchangeStrategies strategies) {
return this;
}

@SuppressWarnings("deprecation")
@Override
@SuppressWarnings("deprecation")
public WebClient.Builder exchangeStrategies(Consumer<ExchangeStrategies.Builder> configurer) {
if (this.strategiesConfigurers == null) {
this.strategiesConfigurers = new ArrayList<>(4);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ public CssLinkResourceTransformer() {
}


@SuppressWarnings("deprecation")
@Override
@SuppressWarnings("deprecation")
public Mono<Resource> transform(ServerWebExchange exchange, Resource inputResource,
ResourceTransformerChain transformerChain) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,18 @@
package org.springframework.web.reactive.socket.client;

import java.net.URI;
import java.util.function.Supplier;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import reactor.core.publisher.Mono;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.WebsocketClientSpec;
import reactor.netty.http.websocket.WebsocketInbound;

import org.springframework.core.io.buffer.NettyDataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import org.springframework.web.reactive.socket.HandshakeInfo;
Expand All @@ -47,9 +50,13 @@ public class ReactorNettyWebSocketClient implements WebSocketClient {

private final HttpClient httpClient;

private int maxFramePayloadLength = NettyWebSocketSessionSupport.DEFAULT_FRAME_MAX_SIZE;
private final Supplier<WebsocketClientSpec.Builder> specBuilderSupplier;

private boolean handlePing;
@Nullable
private Integer maxFramePayloadLength = NettyWebSocketSessionSupport.DEFAULT_FRAME_MAX_SIZE;

@Nullable
private Boolean handlePing;


/**
Expand All @@ -60,12 +67,25 @@ public ReactorNettyWebSocketClient() {
}

/**
* Constructor that accepts an existing {@link HttpClient} builder.
* Constructor that accepts an existing {@link HttpClient}.
* @since 5.1
*/
public ReactorNettyWebSocketClient(HttpClient httpClient) {
this(httpClient, WebsocketClientSpec.builder());
}

/**
* Constructor with an {@link HttpClient} and a supplier for the
* {@link WebsocketClientSpec.Builder} to use.
* @since 5.3
*/
public ReactorNettyWebSocketClient(
HttpClient httpClient, Supplier<WebsocketClientSpec.Builder> builderSupplier) {

Assert.notNull(httpClient, "HttpClient is required");
Assert.notNull(builderSupplier, "WebsocketClientSpec.Builder is required");
this.httpClient = httpClient;
this.specBuilderSupplier = builderSupplier;
}


Expand All @@ -76,6 +96,31 @@ public HttpClient getHttpClient() {
return this.httpClient;
}

/**
* Build an instance of {@code WebsocketClientSpec} that reflects the current
* configuration. This can be used to check the configured parameters except
* for sub-protocols which depend on the {@link WebSocketHandler} that is used
* for a given upgrade.
* @since 5.3
*/
public WebsocketClientSpec getWebsocketClientSpec() {
return buildSpec(null);
}

private WebsocketClientSpec buildSpec(@Nullable String protocols) {
WebsocketClientSpec.Builder builder = this.specBuilderSupplier.get();
if (StringUtils.hasText(protocols)) {
builder.protocols(protocols);
}
if (this.maxFramePayloadLength != null) {
builder.maxFramePayloadLength(this.maxFramePayloadLength);
}
if (this.handlePing != null) {
builder.handlePing(this.handlePing);
}
return builder.build();
}

/**
* Configure the maximum allowable frame payload length. Setting this value
* to your application's requirement may reduce denial of service attacks
Expand All @@ -96,7 +141,7 @@ public void setMaxFramePayloadLength(int maxFramePayloadLength) {
* @since 5.2
*/
public int getMaxFramePayloadLength() {
return this.maxFramePayloadLength;
return getWebsocketClientSpec().maxFramePayloadLength();
}

/**
Expand All @@ -119,7 +164,7 @@ public void setHandlePing(boolean handlePing) {
* @since 5.2.4
*/
public boolean getHandlePing() {
return this.handlePing;
return getWebsocketClientSpec().handlePing();
}

@Override
Expand All @@ -128,12 +173,11 @@ public Mono<Void> execute(URI url, WebSocketHandler handler) {
}

@Override
@SuppressWarnings("deprecation")
public Mono<Void> execute(URI url, HttpHeaders requestHeaders, WebSocketHandler handler) {
String protocols = StringUtils.collectionToCommaDelimitedString(handler.getSubProtocols());
return getHttpClient()
.headers(nettyHeaders -> setNettyHeaders(requestHeaders, nettyHeaders))
.websocket(protocols, getMaxFramePayloadLength(), this.handlePing)
.websocket(buildSpec(protocols))
.uri(url.toString())
.handle((inbound, outbound) -> {
HttpHeaders responseHeaders = toHttpHeaders(inbound);
Expand Down

0 comments on commit 6d6269f

Please sign in to comment.