Skip to content

Commit

Permalink
rebasing and accounting for the portforwarder usage of the common pool
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins committed May 18, 2022
1 parent 0a59d5b commit c43628a
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -54,9 +55,11 @@ public class PortForwarderWebsocket implements PortForwarder {
private static final Logger LOG = LoggerFactory.getLogger(PortForwarderWebsocket.class);

private final HttpClient client;
private final Executor executor;

public PortForwarderWebsocket(HttpClient client) {
public PortForwarderWebsocket(HttpClient client, Executor executor) {
this.client = client;
this.executor = executor;
}

@Override
Expand Down Expand Up @@ -180,7 +183,7 @@ public PortForward forward(URL resourceBaseUrl, int port, final ReadableByteChan
private int messagesRead = 0;

private final ExecutorService pumperService = Executors.newSingleThreadExecutor();
private final SerialExecutor serialExecutor = new SerialExecutor(Utils.getCommonExecutorSerive());
private final SerialExecutor serialExecutor = new SerialExecutor(executor);

@Override
public void onOpen(final WebSocket webSocket) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public Loggable withLogWaitTimeout(Integer logWaitTimeout) {
@Override
public PortForward portForward(int port, ReadableByteChannel in, WritableByteChannel out) {
try {
return new PortForwarderWebsocket(httpClient).forward(getResourceUrl(), port, in, out);
return new PortForwarderWebsocket(httpClient, this.context.getExecutor()).forward(getResourceUrl(), port, in, out);
} catch (Throwable t) {
throw KubernetesClientException.launderThrowable(t);
}
Expand All @@ -192,7 +192,7 @@ public PortForward portForward(int port, ReadableByteChannel in, WritableByteCha
@Override
public LocalPortForward portForward(int port) {
try {
return new PortForwarderWebsocket(httpClient).forward(getResourceUrl(), port);
return new PortForwarderWebsocket(httpClient, this.context.getExecutor()).forward(getResourceUrl(), port);
} catch (Throwable t) {
throw KubernetesClientException.launderThrowable(t);
}
Expand All @@ -201,7 +201,7 @@ public LocalPortForward portForward(int port) {
@Override
public LocalPortForward portForward(int port, int localPort) {
try {
return new PortForwarderWebsocket(httpClient).forward(getResourceUrl(), port, localPort);
return new PortForwarderWebsocket(httpClient, this.context.getExecutor()).forward(getResourceUrl(), port, localPort);
} catch (Throwable t) {
throw KubernetesClientException.launderThrowable(t);
}
Expand All @@ -210,7 +210,8 @@ public LocalPortForward portForward(int port, int localPort) {
@Override
public LocalPortForward portForward(int port, InetAddress localInetAddress, int localPort) {
try {
return new PortForwarderWebsocket(httpClient).forward(getResourceUrl(), port, localInetAddress, localPort);
return new PortForwarderWebsocket(httpClient, this.context.getExecutor()).forward(getResourceUrl(), port,
localInetAddress, localPort);
} catch (MalformedURLException ex) {
throw KubernetesClientException.launderThrowable(ex);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.concurrent.ForkJoinPool;

import static org.assertj.core.api.AssertionsForClassTypes.assertThat;

Expand All @@ -33,7 +34,7 @@ class PortForwarderWebsocketTest {

@BeforeEach
void initPortForwarderWebsocket() {
this.portForwarderWebsocket = new PortForwarderWebsocket(mockHttpClient);
this.portForwarderWebsocket = new PortForwarderWebsocket(mockHttpClient, ForkJoinPool.commonPool());
}

@Test
Expand All @@ -52,7 +53,7 @@ void testCreateNewInetSocketAddressWithNullLocalhost() {
@Test
void testCreateNewInetSocketAddress() throws UnknownHostException {
// Given
InetAddress inetAddress = InetAddress.getByAddress(new byte[] {10, 19, 21, 23});
InetAddress inetAddress = InetAddress.getByAddress(new byte[] { 10, 19, 21, 23 });
int port = 8080;

// When
Expand Down

0 comments on commit c43628a

Please sign in to comment.