Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add exponential backoff #541

Open
wants to merge 8 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,11 @@ THE SOFTWARE.
</properties>

<dependencies>
<dependency>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally we don't like to add dependencies to Remoting, but this one is small, so it's probably okay.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Though do we really need a third-party dependency for exponential backoff? Remoting already retries connection, and typically you need to tune backoff behavior a bit for the caller anyway. E.g. https://github.com/jenkinsci/apache-httpcomponents-client-4-api-plugin/blob/4e7d9a7bae61f816f95f6fc1fb144e6ed40895f1/src/main/java/io/jenkins/plugins/httpclient/RobustHTTPClient.java#L159-L216

<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-retry</artifactId>
<version>1.7.1</version>
</dependency>
<dependency>
<groupId>args4j</groupId>
<artifactId>args4j</artifactId>
Expand Down
192 changes: 130 additions & 62 deletions src/main/java/hudson/remoting/Engine.java
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
/*
* The MIT License
*
*
* Copyright (c) 2004-2009, Sun Microsystems, Inc., Kohsuke Kawaguchi
*
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
Expand All @@ -31,12 +31,9 @@
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.Socket;
import java.net.URI;
import java.net.URL;
import java.net.*;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't combine imports. They should be one per line.

Also, it would be nice if the reformatting were separated. At least in a separate commit. Maybe a separate PR.

import java.nio.ByteBuffer;
import java.nio.channels.UnresolvedAddressException;
import java.nio.file.Path;
import java.security.AccessController;
import java.security.KeyManagementException;
Expand All @@ -62,6 +59,7 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.stream.Collectors;
Expand All @@ -70,13 +68,11 @@
import javax.net.ssl.SSLSocketFactory;
import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
import javax.websocket.ClientEndpointConfig;
import javax.websocket.CloseReason;
import javax.websocket.ContainerProvider;
import javax.websocket.Endpoint;
import javax.websocket.EndpointConfig;
import javax.websocket.HandshakeResponse;
import javax.websocket.Session;
import javax.websocket.*;

import io.github.resilience4j.core.IntervalFunction;
import io.github.resilience4j.retry.Retry;
import io.github.resilience4j.retry.RetryConfig;
import net.jcip.annotations.NotThreadSafe;
import org.jenkinsci.remoting.engine.Jnlp4ConnectionState;
import org.jenkinsci.remoting.engine.JnlpAgentEndpoint;
Expand Down Expand Up @@ -180,7 +176,7 @@ public Thread newThread(@NonNull final Runnable r) {
* @since 2.62.1
*/
private boolean keepAlive = true;

@CheckForNull
private JarCache jarCache = null;

Expand All @@ -192,14 +188,14 @@ public Thread newThread(@NonNull final Runnable r) {
*/
@CheckForNull
private Path agentLog;

/**
* Specified location of the property file with JUL settings.
* @since 3.8
*/
@CheckForNull
private Path loggingConfigFilePath = null;

/**
* Specifies a default working directory of the remoting instance.
* If specified, this directory will be used to store logs, JAR cache, etc.
Expand Down Expand Up @@ -236,6 +232,8 @@ public Thread newThread(@NonNull final Runnable r) {
private final String instanceIdentity;
private final Set<String> protocols;

private Integer retryAttempts;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason not to use primitives?


public Engine(EngineListener listener, List<URL> hudsonUrls, String secretKey, String agentName) {
this(listener, hudsonUrls, secretKey, agentName, null, null, null);
}
Expand Down Expand Up @@ -280,7 +278,7 @@ private static URL ensureTrailingSlash(URL u) {
public synchronized void startEngine() throws IOException {
startEngine(false);
}

/**
* Starts engine.
* @param dryRun If {@code true}, do not actually start the engine.
Expand All @@ -289,27 +287,27 @@ public synchronized void startEngine() throws IOException {
/*package*/ void startEngine(boolean dryRun) throws IOException {
LOGGER.log(Level.INFO, "Using Remoting version: {0}", Launcher.VERSION);
@CheckForNull File jarCacheDirectory = null;

// Prepare the working directory if required
if (workDir != null) {
final WorkDirManager workDirManager = WorkDirManager.getInstance();
if (jarCache != null) {
// Somebody has already specificed Jar Cache, hence we do not need it in the workspace.
workDirManager.disable(WorkDirManager.DirType.JAR_CACHE_DIR);
}

if (loggingConfigFilePath != null) {
workDirManager.setLoggingConfig(loggingConfigFilePath.toFile());
}

final Path path = workDirManager.initializeWorkDir(workDir.toFile(), internalDir, failIfWorkDirIsMissing);
jarCacheDirectory = workDirManager.getLocation(WorkDirManager.DirType.JAR_CACHE_DIR);
workDirManager.setupLogging(path, agentLog);
} else if (jarCache == null) {
LOGGER.log(Level.WARNING, "No Working Directory. Using the legacy JAR Cache location: {0}", JarCache.DEFAULT_NOWS_JAR_CACHE_LOCATION);
jarCacheDirectory = JarCache.DEFAULT_NOWS_JAR_CACHE_LOCATION;
}

if (jarCache == null){
if (jarCacheDirectory == null) {
// Should never happen in the current code
Expand All @@ -324,7 +322,7 @@ public synchronized void startEngine() throws IOException {
} else {
LOGGER.log(Level.INFO, "Using custom JAR Cache: {0}", jarCache);
}

// Start the engine thread
if (!dryRun) {
this.start();
Expand All @@ -340,7 +338,7 @@ public synchronized void startEngine() throws IOException {
public void setJarCache(@NonNull JarCache jarCache) {
this.jarCache = jarCache;
}

/**
* Sets path to the property file with JUL settings.
* @param filePath JAR Cache to be used
Expand Down Expand Up @@ -660,8 +658,32 @@ public void closeRead() throws IOException {
}
hudsonUrl = candidateUrls.get(0);
String wsUrl = hudsonUrl.toString().replaceFirst("^http", "ws");
ContainerProvider.getWebSocketContainer().connectToServer(new AgentEndpoint(),
ClientEndpointConfig.Builder.create().configurator(headerHandler).build(), URI.create(wsUrl + "wsagents/"));
ClientEndpointConfig endpointConfig = ClientEndpointConfig.Builder.create()
.configurator(headerHandler).build();
URI wsAgentsUri = URI.create(wsUrl + "wsagents/");

Supplier<Boolean> wsConnectSupplier = () -> {
AgentEndpoint endpoint = new AgentEndpoint();

try {
ContainerProvider.getWebSocketContainer()
.connectToServer(endpoint, endpointConfig, wsAgentsUri);
} catch (UnresolvedAddressException x) {
LOGGER.log(Level.WARNING, "Error connect to WS server", x);
return Boolean.TRUE;
} catch (DeploymentException | IOException x) {
throw new RuntimeException(x);
}

return Boolean.FALSE;
};

Boolean retryResult = exponentialRetry(retryAttempts, wsConnectSupplier);

if (retryResult) {
throw new IllegalStateException("Can't connect to WebSocket instance");
}

while (ch.get() == null) {
Thread.sleep(100);
}
Expand Down Expand Up @@ -697,6 +719,26 @@ public void closeRead() throws IOException {
}
}

/**
* Tries to perform supplier function with exponential retry
*
* @param attempts Retry attempts
* @param supplier Supplier function
* @return
*/
public static Boolean exponentialRetry(Integer attempts, Supplier<Boolean> supplier) {
IntervalFunction fn = IntervalFunction.ofExponentialBackoff();
RetryConfig rc = RetryConfig.custom()
.maxAttempts(attempts)
.intervalFunction(fn)
.retryOnResult(result -> (Boolean) result)
.build();

Retry retry = Retry.of("retry", rc);

return retry.executeSupplier(supplier);
}

private void reconnect() {
try {
events.status("Performing onReconnect operation.");
Expand Down Expand Up @@ -735,33 +777,42 @@ private void innerRun(IOHub hub, SSLContext context, ExecutorService service) {
}

events.status("Locating server among " + candidateUrls);
final JnlpAgentEndpoint endpoint;
try {
endpoint = resolver.resolve();
} catch (Exception e) {
if (Boolean.getBoolean(Engine.class.getName() + ".nonFatalJnlpAgentEndpointResolutionExceptions")) {
events.status("Could not resolve JNLP agent endpoint", e);
} else {
events.error(e);
AtomicReference<JnlpAgentEndpoint> endpointRef = new AtomicReference<>();

Supplier<Boolean> endpointSupplier = () -> {
try {
JnlpAgentEndpoint endpoint = resolver.resolve();

endpointRef.set(endpoint);
} catch (Exception x) {
LOGGER.log(Level.WARNING, "Can't resolve JNLP endpoint", x);
return Boolean.TRUE;
}
return;
}
if (endpoint == null) {

return Boolean.FALSE;
};

Boolean retryResult = exponentialRetry(retryAttempts, endpointSupplier);

JnlpAgentEndpoint jnlpAgentEndpoint = endpointRef.get();

if (retryResult || jnlpAgentEndpoint == null) {
events.status("Could not resolve server among " + candidateUrls);
return;
}
hudsonUrl = endpoint.getServiceUrl();

hudsonUrl = jnlpAgentEndpoint.getServiceUrl();

events.status(String.format("Agent discovery successful%n"
+ " Agent address: %s%n"
+ " Agent port: %d%n"
+ " Identity: %s",
endpoint.getHost(),
endpoint.getPort(),
KeyUtils.fingerprint(endpoint.getPublicKey()))
jnlpAgentEndpoint.getHost(),
jnlpAgentEndpoint.getPort(),
KeyUtils.fingerprint(jnlpAgentEndpoint.getPublicKey()))
);
PublicKeyMatchingX509ExtendedTrustManager delegate = new PublicKeyMatchingX509ExtendedTrustManager();
RSAPublicKey publicKey = endpoint.getPublicKey();
RSAPublicKey publicKey = jnlpAgentEndpoint.getPublicKey();
if (publicKey != null) {
// This is so that JNLP4-connect will only connect if the public key matches
// if the public key is not published then JNLP4-connect will refuse to connect
Expand All @@ -770,7 +821,7 @@ private void innerRun(IOHub hub, SSLContext context, ExecutorService service) {
agentTrustManager.setDelegate(delegate);

events.status("Handshaking");
Socket jnlpSocket = connectTcp(endpoint);
Socket jnlpSocket = connectTcp(jnlpAgentEndpoint);
Channel channel = null;

try {
Expand All @@ -782,16 +833,16 @@ private void innerRun(IOHub hub, SSLContext context, ExecutorService service) {
continue;
}
if (jnlpSocket == null) {
jnlpSocket = connectTcp(endpoint);
jnlpSocket = connectTcp(jnlpAgentEndpoint);
}
if (!endpoint.isProtocolSupported(protocol.getName())) {
if (!jnlpAgentEndpoint.isProtocolSupported(protocol.getName())) {
events.status("Server reports protocol " + protocol.getName() + " not supported, skipping");
continue;
}
triedAtLeastOneProtocol = true;
events.status("Trying protocol: " + protocol.getName());
try {
channel = protocol.connect(jnlpSocket, headers, new EngineJnlpConnectionStateListener(endpoint.getPublicKey(), headers)).get();
channel = protocol.connect(jnlpSocket, headers, new EngineJnlpConnectionStateListener(jnlpAgentEndpoint.getPublicKey(), headers)).get();
} catch (IOException ioe) {
events.status("Protocol " + protocol.getName() + " failed to establish channel", ioe);
} catch (RuntimeException e) {
Expand Down Expand Up @@ -879,24 +930,37 @@ private void onConnectionRejected(String greeting) throws InterruptedException {
* @param endpoint Connection endpoint
* @throws IOException Connection failure or invalid parameter specification
*/
private Socket connectTcp(@NonNull JnlpAgentEndpoint endpoint) throws IOException, InterruptedException {

private Socket connectTcp(@NonNull JnlpAgentEndpoint endpoint) {
String msg = "Connecting to " + endpoint.getHost() + ':' + endpoint.getPort();
events.status(msg);
int retry = 1;
while(true) {

AtomicReference<Socket> socketRef = new AtomicReference<>();

Supplier<Boolean> tcpSupplier = () -> {
try {
final Socket s = endpoint.open(SOCKET_TIMEOUT); // default is 30 mins. See PingThread for the ping interval
s.setKeepAlive(keepAlive);
return s;
} catch (IOException e) {
if(retry++>10) {
throw e;
}
TimeUnit.SECONDS.sleep(10);
events.status(msg+" (retrying:"+retry+")",e);
Socket socket = endpoint.open(SOCKET_TIMEOUT);
socket.setKeepAlive(keepAlive);

socketRef.set(socket);
} catch (IOException x) {
LOGGER.log(Level.WARNING, "Can't open TCP connection", x);
return Boolean.TRUE;
}

return Boolean.FALSE;
};

Boolean retryResult = exponentialRetry(retryAttempts, tcpSupplier);

if (retryResult) {
throw new IllegalStateException("TCP socket is not initialized");
}

if (socketRef.get() != null) {
return socketRef.get();
}

throw new IllegalStateException("TCP socket is not initialized");
}

/**
Expand Down Expand Up @@ -1028,7 +1092,7 @@ private SSLSocketFactory getSSLSocketFactory()
}
return sslSocketFactory;
}

/**
* Socket read timeout.
* A {@link SocketInputStream#read()} call associated with underlying Socket will block for only this amount of time
Expand Down Expand Up @@ -1059,6 +1123,10 @@ public String getProtocolName() {
return this.protocolName;
}

public void setRetryAttempts(Integer retryAttempts) {
this.retryAttempts = retryAttempts;
}

private class EngineJnlpConnectionStateListener extends JnlpConnectionStateListener {

private final RSAPublicKey publicKey;
Expand Down