Skip to content

Commit

Permalink
Move the LoadBalancer to the io.vertx.net.endpoint package
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed May 16, 2024
1 parent dc4a16c commit e09909b
Show file tree
Hide file tree
Showing 20 changed files with 65 additions and 71 deletions.
10 changes: 5 additions & 5 deletions src/main/asciidoc/http.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -1558,14 +1558,14 @@ The http client can be configured to perform client side load balancing instead

Vert.x provides out of the box several load balancing policies you can use

- {@link io.vertx.core.loadbalancing.LoadBalancer#ROUND_ROBIN Round-robin}
- {@link io.vertx.core.loadbalancing.LoadBalancer#LEAST_REQUESTS Least requests}
- {@link io.vertx.core.loadbalancing.LoadBalancer#POWER_OF_TWO_CHOICES Power of two choices}
- {@link io.vertx.core.loadbalancing.LoadBalancer#CONSISTENT_HASHING Consistent hashing}
- {@link io.vertx.core.net.endpoint.LoadBalancer#ROUND_ROBIN Round-robin}
- {@link io.vertx.core.net.endpoint.LoadBalancer#LEAST_REQUESTS Least requests}
- {@link io.vertx.core.net.endpoint.LoadBalancer#POWER_OF_TWO_CHOICES Power of two choices}
- {@link io.vertx.core.net.endpoint.LoadBalancer#CONSISTENT_HASHING Consistent hashing}

Most load balancing policies are pretty much self-explanatory.

Hash based routing can be achieved with the {@link io.vertx.core.loadbalancing.LoadBalancer#CONSISTENT_HASHING} policy.
Hash based routing can be achieved with the {@link io.vertx.core.net.endpoint.LoadBalancer#CONSISTENT_HASHING} policy.

[source,$lang]
----
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/examples/HTTPExamples.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import io.vertx.core.file.OpenOptions;
import io.vertx.core.http.*;
import io.vertx.core.json.JsonObject;
import io.vertx.core.loadbalancing.LoadBalancer;
import io.vertx.core.net.endpoint.LoadBalancer;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.ProxyOptions;
import io.vertx.core.net.ProxyType;
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/vertx/core/http/HttpClientBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import io.vertx.codegen.annotations.VertxGen;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.loadbalancing.LoadBalancer;
import io.vertx.core.net.endpoint.LoadBalancer;
import io.vertx.core.net.AddressResolver;

import java.util.function.Function;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import io.vertx.core.impl.CloseFuture;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.loadbalancing.LoadBalancer;
import io.vertx.core.net.endpoint.LoadBalancer;
import io.vertx.core.net.AddressResolver;
import io.vertx.core.net.endpoint.impl.EndpointResolverImpl;
import io.vertx.core.net.endpoint.EndpointResolver;
Expand Down
7 changes: 4 additions & 3 deletions src/main/java/io/vertx/core/http/impl/HttpClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@
import io.vertx.core.http.*;
import io.vertx.core.impl.*;
import io.vertx.core.net.*;
import io.vertx.core.net.endpoint.impl.EndpointResolverInternal;
import io.vertx.core.net.impl.endpoint.EndpointProvider;
import io.vertx.core.net.impl.pool.*;
import io.vertx.core.net.endpoint.Interaction;
import io.vertx.core.net.endpoint.EndpointInteraction;
import io.vertx.core.net.endpoint.EndpointNode;
import io.vertx.core.spi.metrics.ClientMetrics;
import io.vertx.core.spi.metrics.MetricsProvider;
Expand Down Expand Up @@ -101,7 +102,7 @@ public class HttpClientImpl extends HttpClientBase implements HttpClientInternal

private final PoolOptions poolOptions;
private final io.vertx.core.net.impl.endpoint.EndpointManager<EndpointKey, SharedClientHttpStreamEndpoint> httpCM;
private final EndpointResolverImpl<?, Address, ?> endpointResolver;
private final EndpointResolverInternal<Address> endpointResolver;
private volatile Function<HttpClientResponse, Future<RequestOptions>> redirectHandler = DEFAULT_HANDLER;
private long timerID;
private volatile Handler<HttpConnection> connectionHandler;
Expand Down Expand Up @@ -395,7 +396,7 @@ private Future<HttpClientRequest> doRequest(
if (fut2 == null) {
return null;
} else {
Interaction endpointRequest = lookup.initiateInteraction();
EndpointInteraction endpointRequest = lookup.newInteraction();
return fut2.andThen(ar -> {
if (ar.failed()) {
endpointRequest.reportFailure(ar.cause());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import io.vertx.core.http.HttpVersion;
import io.vertx.core.http.StreamPriority;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.net.endpoint.Interaction;
import io.vertx.core.net.endpoint.EndpointInteraction;
import io.vertx.core.streams.WriteStream;

/**
Expand All @@ -32,9 +32,9 @@
class StatisticsGatheringHttpClientStream implements HttpClientStream {

private final HttpClientStream delegate;
private final Interaction endpointRequest;
private final EndpointInteraction endpointRequest;

StatisticsGatheringHttpClientStream(HttpClientStream delegate, Interaction endpointRequest) {
StatisticsGatheringHttpClientStream(HttpClientStream delegate, EndpointInteraction endpointRequest) {
this.delegate = delegate;
this.endpointRequest = endpointRequest;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
package io.vertx.core.net.endpoint;

/**
* Carries request latencies.
* Carries request/response latencies.
*/
public class InteractionMetric {
public class DefaultInteractionMetric {
long requestBegin;
long requestEnd;
long responseBegin;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
/**
* Default interaction metrics.
*/
public class DefaultInteractionMetrics implements InteractionMetrics<InteractionMetric> {
public class DefaultInteractionMetrics implements InteractionMetrics<DefaultInteractionMetric> {

private final LongAdder numberOfInflightRequests = new LongAdder();
private final LongAdder numberOfRequests = new LongAdder();
Expand All @@ -25,14 +25,14 @@ public class DefaultInteractionMetrics implements InteractionMetrics<Interaction
private final AtomicLong maxResponseTime = new AtomicLong(0);

@Override
public InteractionMetric initiateRequest() {
public DefaultInteractionMetric initiateRequest() {
numberOfInflightRequests.increment();
numberOfRequests.increment();
return new InteractionMetric();
return new DefaultInteractionMetric();
}

@Override
public void reportFailure(InteractionMetric metric, Throwable failure) {
public void reportFailure(DefaultInteractionMetric metric, Throwable failure) {
if (metric.failure == null) {
metric.failure = failure;
numberOfInflightRequests.decrement();
Expand All @@ -41,30 +41,30 @@ public void reportFailure(InteractionMetric metric, Throwable failure) {
}

@Override
public void reportRequestBegin(InteractionMetric metric) {
public void reportRequestBegin(DefaultInteractionMetric metric) {
metric.requestBegin = System.currentTimeMillis();
}

@Override
public void reportRequestEnd(InteractionMetric metric) {
public void reportRequestEnd(DefaultInteractionMetric metric) {
metric.requestEnd = System.currentTimeMillis();
}

@Override
public void reportResponseBegin(InteractionMetric metric) {
public void reportResponseBegin(DefaultInteractionMetric metric) {
metric.responseBegin = System.currentTimeMillis();
}

@Override
public void reportResponseEnd(InteractionMetric metric) {
public void reportResponseEnd(DefaultInteractionMetric metric) {
metric.responseEnd = System.currentTimeMillis();
if (metric.failure == null) {
reportRequestMetric(metric);
numberOfInflightRequests.decrement();
}
}

private void reportRequestMetric(InteractionMetric metric) {
private void reportRequestMetric(DefaultInteractionMetric metric) {
long responseTime = metric.responseEnd - metric.requestBegin;
while (true) {
long val = minResponseTime.get();
Expand Down
13 changes: 11 additions & 2 deletions src/main/java/io/vertx/core/net/endpoint/Endpoint.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,18 @@ public interface Endpoint {
/**
* Select a node.
*
* @param routingKey the optional routing key
* @return the selected server
*/
EndpointNode selectNode(String routingKey);
default EndpointNode selectNode() {
return selectNode(null);
}

/**
* Select a node, using a routing {@code key}
*
* @param key the routing key
* @return the selected server
*/
EndpointNode selectNode(String key);

}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
*
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
public interface Interaction {
public interface EndpointInteraction {

/**
* Report a failure.
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/io/vertx/core/net/endpoint/EndpointNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,10 @@ public interface EndpointNode {
*
* @return the request
*/
Interaction initiateInteraction();
EndpointInteraction newInteraction();

// Should be private somehow
InteractionMetrics<?> metrics();

Object unwrap();

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,4 @@ public interface EndpointResolver<A extends Address> {
*/
Future<Endpoint> resolveEndpoint(A address);

/**
* Check expired endpoints, this method is called by the client periodically to give the opportunity to trigger eviction
* or refreshes.
*/
void checkExpired();

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,10 @@
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.core.loadbalancing;
package io.vertx.core.net.endpoint;

import io.vertx.core.loadbalancing.impl.ConsistentHashingSelector;
import io.vertx.core.loadbalancing.impl.NoMetricsLoadBalancer;
import io.vertx.core.net.endpoint.EndpointNode;
import io.vertx.core.net.endpoint.DefaultInteractionMetrics;
import io.vertx.core.net.endpoint.InteractionMetrics;
import io.vertx.core.net.endpoint.EndpointSelector;
import io.vertx.core.net.endpoint.impl.ConsistentHashingSelector;
import io.vertx.core.net.endpoint.impl.NoMetricsLoadBalancer;

import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.core.loadbalancing.impl;
package io.vertx.core.net.endpoint.impl;

import io.vertx.core.net.endpoint.EndpointNode;
import io.vertx.core.net.endpoint.EndpointSelector;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,11 @@

import io.vertx.core.Future;
import io.vertx.core.net.endpoint.EndpointNode;
import io.vertx.core.net.endpoint.Interaction;
import io.vertx.core.net.endpoint.EndpointInteraction;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.net.endpoint.InteractionMetrics;
import io.vertx.core.loadbalancing.LoadBalancer;
import io.vertx.core.net.endpoint.LoadBalancer;
import io.vertx.core.net.Address;
import io.vertx.core.net.SocketAddress;
import io.vertx.core.net.impl.endpoint.Endpoint;
Expand All @@ -38,7 +38,7 @@
*
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
public class EndpointResolverImpl<S, A extends Address, E> implements io.vertx.core.net.endpoint.EndpointResolver<A> {
public class EndpointResolverImpl<S, A extends Address, E> implements EndpointResolverInternal<A> {

private final VertxInternal vertx;
private final LoadBalancer loadBalancer;
Expand Down Expand Up @@ -104,11 +104,11 @@ private EndpointNode selectEndpoint(S state, String routingKey) {
}
return null;
}
public EndpointNode selectNode(String routingKey) {
public EndpointNode selectNode(String key) {
if (!endpointResolver.isValid(state)) {
throw new IllegalStateException("Cannot resolve address " + address );
}
EndpointNode endpoint = selectEndpoint(state, routingKey);
EndpointNode endpoint = selectEndpoint(state, key);
if (endpoint == null) {
throw new IllegalStateException("No results for " + address );
}
Expand Down Expand Up @@ -245,11 +245,11 @@ public SocketAddress address() {
return endpointResolver.addressOf(endpoint);
}
@Override
public Interaction initiateInteraction() {
public EndpointInteraction newInteraction() {
lastAccessed.set(System.currentTimeMillis());
InteractionMetrics metrics = this.metrics;
Object metric = metrics.initiateRequest();
return new Interaction() {
return new EndpointInteraction() {
@Override
public void reportRequestBegin() {
metrics.reportRequestBegin(metric);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.core.loadbalancing.impl;
package io.vertx.core.net.endpoint.impl;

import io.vertx.core.loadbalancing.LoadBalancer;
import io.vertx.core.net.endpoint.LoadBalancer;
import io.vertx.core.net.endpoint.InteractionMetrics;

@FunctionalInterface
Expand Down
8 changes: 4 additions & 4 deletions src/test/java/io/vertx/core/http/ResolvingHttpClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import io.vertx.core.http.impl.CleanableHttpClient;
import io.vertx.core.http.impl.HttpClientImpl;
import io.vertx.core.http.impl.HttpClientInternal;
import io.vertx.core.loadbalancing.LoadBalancer;
import io.vertx.core.net.endpoint.LoadBalancer;
import io.vertx.core.net.*;
import io.vertx.core.net.endpoint.EndpointNode;
import io.vertx.core.spi.endpoint.EndpointBuilder;
Expand Down Expand Up @@ -332,7 +332,7 @@ public void testStatistics() throws Exception {
.andThen(onSuccess(resp -> assertEquals(200, resp.statusCode())))
.compose(HttpClientResponse::body)
));
FakeLoadBalancer.FakeInteractionMetrics<?> endpoint = (FakeLoadBalancer.FakeInteractionMetrics<?>) ((EndpointNode) lb.endpoints().get(0)).metrics();
FakeLoadBalancer.FakeLoadBalancerMetrics<?> endpoint = (FakeLoadBalancer.FakeLoadBalancerMetrics<?>) ((EndpointNode) lb.endpoints().get(0)).metrics();
FakeLoadBalancer.FakeMetric metric = endpoint.metrics2().get(0);
assertTrue(metric.requestEnd() - metric.requestBegin() >= 0);
assertTrue(metric.responseBegin() - metric.requestEnd() > 500);
Expand Down Expand Up @@ -367,7 +367,7 @@ public void testStatisticsReportingFailure0() throws Exception {
} catch (Throwable e) {
assertTrue(e.getMessage().contains("timeout"));
}
FakeLoadBalancer.FakeInteractionMetrics<?> endpoint = (FakeLoadBalancer.FakeInteractionMetrics) lb.endpoints().get(0).metrics();
FakeLoadBalancer.FakeLoadBalancerMetrics<?> endpoint = (FakeLoadBalancer.FakeLoadBalancerMetrics) lb.endpoints().get(0).metrics();
assertWaitUntil(() -> endpoint.metrics2().size() == 6);
FakeLoadBalancer.FakeMetric metric = endpoint.metrics2().get(5);
assertNotNull(metric.failure);
Expand Down Expand Up @@ -466,7 +466,7 @@ private FakeLoadBalancer.FakeMetric testStatisticsReportingFailure(BiConsumer<Bo
} catch (RuntimeException e) {
assertTrue(e.getMessage().contains("Connection was closed"));
}
FakeLoadBalancer.FakeInteractionMetrics<?> endpoint = (FakeLoadBalancer.FakeInteractionMetrics) (lb.endpoints().get(0).metrics());
FakeLoadBalancer.FakeLoadBalancerMetrics<?> endpoint = (FakeLoadBalancer.FakeLoadBalancerMetrics) (lb.endpoints().get(0).metrics());
assertWaitUntil(() -> endpoint.metrics2().size() == 11);
for (int i = 0;i < 10;i++) {
FakeLoadBalancer.FakeMetric metric = endpoint.metrics2().get(i);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
package io.vertx.core.loadbalancing;

import io.vertx.core.net.SocketAddress;
import io.vertx.core.net.endpoint.EndpointNode;
import io.vertx.core.net.endpoint.Interaction;
import io.vertx.core.net.endpoint.InteractionMetrics;
import io.vertx.core.net.endpoint.EndpointSelector;
import io.vertx.core.net.endpoint.*;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
Expand Down Expand Up @@ -61,7 +58,7 @@ public InteractionMetrics<?> metrics() {
return metrics;
}
@Override
public Interaction initiateInteraction() {
public EndpointInteraction newInteraction() {
return null;
}
};
Expand Down

0 comments on commit e09909b

Please sign in to comment.