Skip to content

Commit

Permalink
Refactor of the endpoint resolver to bring simplifications and more r…
Browse files Browse the repository at this point in the history
…eusability.
  • Loading branch information
vietj committed May 16, 2024
1 parent 54c5255 commit dc4a16c
Show file tree
Hide file tree
Showing 39 changed files with 750 additions and 693 deletions.
4 changes: 2 additions & 2 deletions src/main/java/examples/HTTPExamples.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.ProxyOptions;
import io.vertx.core.net.ProxyType;
import io.vertx.core.spi.loadbalancing.Endpoint;
import io.vertx.core.net.endpoint.EndpointNode;
import io.vertx.core.streams.Pipe;
import io.vertx.core.streams.ReadStream;

Expand Down Expand Up @@ -1457,7 +1457,7 @@ public static void customLoadBalancingPolicy(Vertx vertx) {
.build();
}

private static int indexOfEndpoint(List<? extends Endpoint<?>> endpoints) {
private static int indexOfEndpoint(List<? extends EndpointNode> endpoints) {
return 0;
}

Expand Down
1 change: 1 addition & 0 deletions src/main/java/io/vertx/core/Vertx.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import io.vertx.core.net.NetClientOptions;
import io.vertx.core.net.NetServer;
import io.vertx.core.net.NetServerOptions;
import io.vertx.core.net.endpoint.EndpointResolver;
import io.vertx.core.shareddata.SharedData;
import io.vertx.core.spi.VerticleFactory;
import io.vertx.core.spi.VertxMetricsFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
import io.netty.resolver.DefaultAddressResolverGroup;
import io.vertx.core.Future;
import io.vertx.core.dns.AddressResolverOptions;
import io.vertx.core.spi.resolver.dns.AddressResolverProvider;
import io.vertx.core.spi.dns.AddressResolverProvider;

import java.net.InetSocketAddress;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,13 @@
import io.netty.resolver.*;
import io.netty.resolver.dns.*;
import io.netty.util.NetUtil;
import io.netty.util.concurrent.FastThreadLocal;
import io.vertx.core.*;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.dns.AddressResolverOptions;
import io.vertx.core.impl.HostnameResolver;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.spi.resolver.dns.AddressResolverProvider;
import io.vertx.core.spi.dns.AddressResolverProvider;

import java.io.File;
import java.io.IOException;
Expand Down
6 changes: 6 additions & 0 deletions src/main/java/io/vertx/core/dns/impl/SrvRecordImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -82,4 +82,10 @@ public String target() {
public int compareTo(SrvRecord o) {
return Integer.valueOf(priority()).compareTo(o.priority());
}

@Override
public String toString() {
return "SrvRecord[name=" + name + ",service=" + service + ",port=" + port + ",target=" + target
+ ",weight=" + weight + ",priority=" + priority + ",ttl=" + ttl + "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.loadbalancing.LoadBalancer;
import io.vertx.core.net.AddressResolver;
import io.vertx.core.net.impl.resolver.EndpointResolverImpl;
import io.vertx.core.spi.resolver.endpoint.EndpointResolver;
import io.vertx.core.net.endpoint.impl.EndpointResolverImpl;
import io.vertx.core.net.endpoint.EndpointResolver;

import java.util.function.Function;

Expand All @@ -22,9 +22,8 @@ public final class HttpClientBuilderInternal implements HttpClientBuilder {
private PoolOptions poolOptions;
private Handler<HttpConnection> connectHandler;
private Function<HttpClientResponse, Future<RequestOptions>> redirectHandler;
private io.vertx.core.net.AddressResolver addressResolver;
private AddressResolver addressResolver;
private LoadBalancer loadBalancer = null;
private EndpointResolver<?> endpointResolver;

public HttpClientBuilderInternal(VertxInternal vertx) {
this.vertx = vertx;
Expand Down Expand Up @@ -55,7 +54,7 @@ public HttpClientBuilder withRedirectHandler(Function<HttpClientResponse, Future
}

@Override
public HttpClientBuilder withAddressResolver(io.vertx.core.net.AddressResolver addressResolver) {
public HttpClientBuilder withAddressResolver(AddressResolver addressResolver) {
this.addressResolver = addressResolver;
return this;
}
Expand All @@ -66,17 +65,12 @@ public HttpClientBuilder withLoadBalancer(LoadBalancer loadBalancer) {
return this;
}

public HttpClientBuilderInternal withEndpointResolver(EndpointResolver<?> resolver) {
endpointResolver = resolver;
return this;
}

private CloseFuture resolveCloseFuture() {
ContextInternal context = vertx.getContext();
return context != null ? context.closeFuture() : vertx.closeFuture();
}

private EndpointResolver<?> endpointResolver(HttpClientOptions co) {
private EndpointResolver endpointResolver(HttpClientOptions co) {
LoadBalancer _loadBalancer = loadBalancer;
AddressResolver _addressResolver = addressResolver;
if (_loadBalancer != null) {
Expand All @@ -88,11 +82,10 @@ private EndpointResolver<?> endpointResolver(HttpClientOptions co) {
_loadBalancer = LoadBalancer.ROUND_ROBIN;
}
}
EndpointResolver<?> resolver = endpointResolver;
if (endpointResolver == null && _addressResolver != null) {
resolver = new EndpointResolverImpl<>(_addressResolver.resolver(vertx), _loadBalancer, co.getKeepAliveTimeout() * 1000);
if (_addressResolver != null) {
return new EndpointResolverImpl<>(vertx, _addressResolver.endpointResolver(vertx), _loadBalancer, co.getKeepAliveTimeout() * 1000);
}
return resolver;
return null;
}

@Override
Expand Down
24 changes: 13 additions & 11 deletions src/main/java/io/vertx/core/http/impl/HttpClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,17 @@
import io.vertx.core.Handler;
import io.vertx.core.MultiMap;
import io.vertx.core.Promise;
import io.vertx.core.net.endpoint.impl.EndpointResolverImpl;
import io.vertx.core.http.*;
import io.vertx.core.impl.*;
import io.vertx.core.net.*;
import io.vertx.core.net.impl.endpoint.EndpointManager;
import io.vertx.core.net.impl.endpoint.EndpointProvider;
import io.vertx.core.net.impl.pool.*;
import io.vertx.core.spi.resolver.endpoint.EndpointRequest;
import io.vertx.core.spi.resolver.endpoint.EndpointLookup;
import io.vertx.core.net.endpoint.Interaction;
import io.vertx.core.net.endpoint.EndpointNode;
import io.vertx.core.spi.metrics.ClientMetrics;
import io.vertx.core.spi.metrics.MetricsProvider;
import io.vertx.core.spi.resolver.endpoint.EndpointResolver;
import io.vertx.core.net.endpoint.EndpointResolver;

import java.lang.ref.WeakReference;
import java.net.URI;
Expand Down Expand Up @@ -100,8 +100,8 @@ public class HttpClientImpl extends HttpClientBase implements HttpClientInternal
};

private final PoolOptions poolOptions;
private final EndpointManager<EndpointKey, SharedClientHttpStreamEndpoint> httpCM;
private final EndpointResolver endpointResolver;
private final io.vertx.core.net.impl.endpoint.EndpointManager<EndpointKey, SharedClientHttpStreamEndpoint> httpCM;
private final EndpointResolverImpl<?, Address, ?> endpointResolver;
private volatile Function<HttpClientResponse, Future<RequestOptions>> redirectHandler = DEFAULT_HANDLER;
private long timerID;
private volatile Handler<HttpConnection> connectionHandler;
Expand All @@ -113,9 +113,9 @@ public HttpClientImpl(VertxInternal vertx,
PoolOptions poolOptions) {
super(vertx, options);

this.endpointResolver = endpointResolver;
this.endpointResolver = (EndpointResolverImpl) endpointResolver;
this.poolOptions = poolOptions;
httpCM = new EndpointManager<>();
httpCM = new io.vertx.core.net.impl.endpoint.EndpointManager<>();
if (poolOptions.getCleanerPeriod() > 0 && (options.getKeepAliveTimeout() > 0L || options.getHttp2KeepAliveTimeout() > 0L)) {
PoolChecker checker = new PoolChecker(this);
ContextInternal timerContext = vertx.createEventLoopContext();
Expand Down Expand Up @@ -382,8 +382,10 @@ private Future<HttpClientRequest> doRequest(
ContextInternal connCtx = ctx.isEventLoopContext() ? ctx : vertx.createEventLoopContext(ctx.nettyEventLoop(), ctx.workerPool(), ctx.classLoader());
Promise<HttpClientRequest> promise = ctx.promise();
Future<ConnectionObtainedResult> future;
if (endpointResolver != null && endpointResolver.accepts(server) != null) {
Future<EndpointLookup> fut = endpointResolver.lookupEndpoint(ctx, server, routingKey);
if (endpointResolver != null) {
Future<EndpointNode> fut = endpointResolver
.lookupEndpoint(ctx, server)
.map(endpoint -> endpoint.selectNode(routingKey));
future = fut.compose(lookup -> {
SocketAddress address = lookup.address();
ProxyOptions proxyOptions = computeProxyOptions(proxyConfig, address);
Expand All @@ -393,7 +395,7 @@ private Future<HttpClientRequest> doRequest(
if (fut2 == null) {
return null;
} else {
EndpointRequest endpointRequest = lookup.initiateRequest();
Interaction endpointRequest = lookup.initiateInteraction();
return fut2.andThen(ar -> {
if (ar.failed()) {
endpointRequest.reportFailure(ar.cause());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import io.vertx.core.http.HttpVersion;
import io.vertx.core.http.StreamPriority;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.spi.resolver.endpoint.EndpointRequest;
import io.vertx.core.net.endpoint.Interaction;
import io.vertx.core.streams.WriteStream;

/**
Expand All @@ -32,9 +32,9 @@
class StatisticsGatheringHttpClientStream implements HttpClientStream {

private final HttpClientStream delegate;
private final EndpointRequest endpointRequest;
private final Interaction endpointRequest;

StatisticsGatheringHttpClientStream(HttpClientStream delegate, EndpointRequest endpointRequest) {
StatisticsGatheringHttpClientStream(HttpClientStream delegate, Interaction endpointRequest) {
this.delegate = delegate;
this.endpointRequest = endpointRequest;
}
Expand Down
31 changes: 16 additions & 15 deletions src/main/java/io/vertx/core/impl/HostnameResolver.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.net.Address;
import io.vertx.core.net.AddressResolver;
import io.vertx.core.net.SocketAddress;
import io.vertx.core.spi.resolver.address.EndpointListBuilder;
import io.vertx.core.spi.resolver.dns.AddressResolverProvider;
import io.vertx.core.spi.resolver.address.AddressResolver;
import io.vertx.core.spi.endpoint.EndpointBuilder;
import io.vertx.core.spi.dns.AddressResolverProvider;
import io.vertx.core.spi.endpoint.EndpointResolver;

import java.io.File;
import java.net.InetAddress;
Expand All @@ -39,7 +40,7 @@
*
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
public class HostnameResolver implements io.vertx.core.net.AddressResolver {
public class HostnameResolver implements AddressResolver {

private static final Logger log = LoggerFactory.getLogger(HostnameResolver.class);

Expand Down Expand Up @@ -85,7 +86,7 @@ public HostnameResolver(Vertx vertx, AddressResolverOptions options) {
}

@Override
public AddressResolver<?, ?, ?, ?> resolver(Vertx vertx) {
public EndpointResolver<?, ?, ?, ?> endpointResolver(Vertx vertx) {
return new Impl();
}

Expand Down Expand Up @@ -141,25 +142,25 @@ public static boolean parseRotateOptionFromResolvConf(String s) {
return matcher.find();
}

class Impl<L> implements AddressResolver<SocketAddress, SocketAddress, L, L> {
class Impl<L> implements EndpointResolver<SocketAddress, SocketAddress, L, L> {
@Override
public SocketAddress tryCast(Address address) {
return address instanceof SocketAddress ? (SocketAddress) address : null;
}

@Override
public SocketAddress addressOfEndpoint(SocketAddress endpoint) {
return endpoint;
public SocketAddress addressOf(SocketAddress server) {
return server;
}

@Override
public Future<L> resolve(SocketAddress address, EndpointListBuilder<L, SocketAddress> builder) {
public Future<L> resolve(SocketAddress address, EndpointBuilder<L, SocketAddress> builder) {
Promise<L> promise = Promise.promise();
resolveHostnameAll(address.host(), ar -> {
EndpointListBuilder<L, SocketAddress> builder2 = builder;
EndpointBuilder<L, SocketAddress> builder2 = builder;
if (ar.succeeded()) {
for (InetSocketAddress addr : ar.result()) {
builder2 = builder2.addEndpoint(SocketAddress.inetSocketAddress(address.port(), addr.getAddress().getHostAddress()));
builder2 = builder2.addNode(SocketAddress.inetSocketAddress(address.port(), addr.getAddress().getHostAddress()));
}
promise.complete(builder2.build());
} else {
Expand All @@ -170,18 +171,18 @@ public Future<L> resolve(SocketAddress address, EndpointListBuilder<L, SocketAdd
}

@Override
public L endpoints(L state) {
return state;
public L endpoint(L data) {
return data;
}

@Override
public boolean isValid(L state) {
public boolean isValid(L data) {
// NEED EXPIRATION
return true;
}

@Override
public void dispose(L state) {
public void dispose(L data) {
}

@Override
Expand Down

0 comments on commit dc4a16c

Please sign in to comment.