From f49b4b5fe75418c3a23df030af5644b8f28b8b3b Mon Sep 17 00:00:00 2001 From: netikras Date: Sun, 11 Oct 2020 00:11:14 +0300 Subject: [PATCH 1/2] Exec.exec returning Future and hiding Process as an implementation detail --- .../kubernetes/client/custom/AsyncPump.java | 123 ++++++++++ .../io/kubernetes/client/custom/IOTrio.java | 81 +++++++ .../main/java/io/kubernetes/client/Exec.java | 215 +++++++++++++++++- 3 files changed, 411 insertions(+), 8 deletions(-) create mode 100644 kubernetes/src/main/java/io/kubernetes/client/custom/AsyncPump.java create mode 100644 kubernetes/src/main/java/io/kubernetes/client/custom/IOTrio.java diff --git a/kubernetes/src/main/java/io/kubernetes/client/custom/AsyncPump.java b/kubernetes/src/main/java/io/kubernetes/client/custom/AsyncPump.java new file mode 100644 index 0000000000..eb0ee6154b --- /dev/null +++ b/kubernetes/src/main/java/io/kubernetes/client/custom/AsyncPump.java @@ -0,0 +1,123 @@ +/* +Copyright 2020 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package io.kubernetes.client.custom; + +import java.io.InputStream; +import java.util.concurrent.*; + +public class AsyncPump implements Runnable { + + private Thread thread; + private ExecutorService executorService; + private byte[] buff; + private InputStream is; + private CompletableFuture promise; + private NoisyConsumer consumer; + private NoisyConsumer errorHandler; + private NoisyRunnable onFinish; + + public AsyncPump(NoisyConsumer consumer, NoisyConsumer errorHandler, NoisyRunnable onFinish) { + buff = new byte[1024]; + this.consumer = consumer != null ? consumer : NoisyConsumer.NOP; + this.errorHandler = errorHandler != null ? errorHandler : NoisyConsumer.NOP; + this.onFinish = onFinish != null ? onFinish : NoisyRunnable.NOP; + this.promise = new CompletableFuture<>(); + } + + public AsyncPump(NoisyConsumer consumer, NoisyRunnable onFinish) { + this(consumer, Exception::printStackTrace, onFinish); + } + + public AsyncPump(NoisyConsumer consumer) { + this(consumer, NoisyRunnable.NOP); + } + + public AsyncPump setExecutorService(ExecutorService executorService) { + this.executorService = executorService; + return this; + } + + public AsyncPump stop() { + if (thread != null) + thread.interrupt(); + return this; + } + + public AsyncPump startAsync(InputStream is) { + this.is = is; + + if (executorService != null) { + executorService.submit(this); + } else { + thread = new Thread(this); + thread.setName("Async pump"); + thread.start(); + } + return this; + } + + public boolean waitForExit() { + try { + promise.get(); + return true; + } catch (InterruptedException | ExecutionException e) { + return false; + } + } + + public boolean waitForExit(long timeoutMs) { + try { + promise.get(timeoutMs, TimeUnit.MILLISECONDS); + return true; + } catch (InterruptedException | ExecutionException | TimeoutException e) { + return false; + } + } + + public AsyncPump start(InputStream is) { + this.is = is; + return this; + } + + @Override + public void run() { + int nRead; + try { + while ((nRead = is.read(buff)) > -1) { + byte[] giveAway = new byte[nRead]; + System.arraycopy(buff, 0, giveAway, 0, nRead); + consumer.accept(giveAway); + } + onFinish.run(); + } catch (Exception e) { + try { + errorHandler.accept(e); + } catch (Exception exception) { + promise.completeExceptionally(exception); + } + } + promise.complete(null); + } + + public interface NoisyConsumer { + NoisyConsumer NOP = t -> { }; + + void accept(T t) throws Exception; + } + + public interface NoisyRunnable { + NoisyRunnable NOP = () -> { }; + + void run() throws Exception; + } +} diff --git a/kubernetes/src/main/java/io/kubernetes/client/custom/IOTrio.java b/kubernetes/src/main/java/io/kubernetes/client/custom/IOTrio.java new file mode 100644 index 0000000000..b43185de5a --- /dev/null +++ b/kubernetes/src/main/java/io/kubernetes/client/custom/IOTrio.java @@ -0,0 +1,81 @@ +/* +Copyright 2020 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package io.kubernetes.client.custom; + +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.function.BiConsumer; + +/** + * A collection of all the 3 main pipes used in stdio: STDIN, STDOUT and STDERR. As working with the named pipes is usually carried out + * asynchronously, this collection also provides means to initiate and handle the close() event. A close() initiator calls the + * {@link #close(int, long)} method expressing its intent to close the communication channel. Handlers are notified of this intent and + * its up to the handlers to decide what's to be done next. Calling {@link #close(int, long)} does not close the streams or do anything + * else besides notifying the handlers. + */ +public class IOTrio { + private InputStream stdout; + private InputStream stderr; + private OutputStream stdin; + private final Collection> closeHandlers; + + public IOTrio() { + closeHandlers = new ArrayList<>(); + } + + public InputStream getStdout() { + return stdout; + } + + public void setStdout(InputStream stdout) { + this.stdout = stdout; + } + + public InputStream getStderr() { + return stderr; + } + + public void setStderr(InputStream stderr) { + this.stderr = stderr; + } + + public OutputStream getStdin() { + return stdin; + } + + public void setStdin(OutputStream stdin) { + this.stdin = stdin; + } + + /** + * Capture the CLOSE intent and handle it accordingly. + * + * @param handler the handler that's invoked when someone intends to close this communication. Multiple handlers can be registered + */ + public void onClose(BiConsumer handler) { + closeHandlers.add(handler); + } + + /** + * Express an intent to close this communication. This intent will be relayed to all the registered handlers and it's up to them what + * to do with it. + * @param code proposed exit code + * @param timeout time in milliseconds given to terminate this communication. Negative timeout means no timeout (i.e. wait for as long + * as it takes). 0 means "stop it now". + */ + public void close(int code, long timeout) { + closeHandlers.forEach(handler -> handler.accept(code, timeout)); + } +} \ No newline at end of file diff --git a/util/src/main/java/io/kubernetes/client/Exec.java b/util/src/main/java/io/kubernetes/client/Exec.java index 1f1c69e07d..bff27068e2 100644 --- a/util/src/main/java/io/kubernetes/client/Exec.java +++ b/util/src/main/java/io/kubernetes/client/Exec.java @@ -16,6 +16,8 @@ import com.google.common.io.CharStreams; import com.google.gson.reflect.TypeToken; +import io.kubernetes.client.custom.AsyncPump; +import io.kubernetes.client.custom.IOTrio; import io.kubernetes.client.openapi.ApiClient; import io.kubernetes.client.openapi.ApiException; import io.kubernetes.client.openapi.Configuration; @@ -26,19 +28,19 @@ import io.kubernetes.client.openapi.models.V1StatusDetails; import io.kubernetes.client.util.WebSocketStreamHandler; import io.kubernetes.client.util.WebSockets; -import java.io.IOException; -import java.io.InputStream; -import java.io.InputStreamReader; -import java.io.OutputStream; -import java.io.Reader; -import java.io.UnsupportedEncodingException; + +import java.io.*; import java.lang.reflect.Type; import java.net.URLEncoder; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.*; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Supplier; + import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,6 +49,7 @@ public class Exec { private static final Logger log = LoggerFactory.getLogger(Exec.class); private ApiClient apiClient; + private long graceTimeoutMs = 5_000L; /** Simple Exec API constructor, uses default configuration */ public Exec() { @@ -191,6 +194,202 @@ public Process exec( .execute(); } + /** + * A convenience method. It simply executes a single command and returns its complete stdout, stderr and exit code as a result. Command + * execution is asynchronous and can be synchronized through the returned {@link Future}. Any error during the execution will fail the + * {@link Future}. + *
See {@link #exec(String, String, Consumer, BiConsumer, BiConsumer, Long, boolean, String...)} for more details. + * + * @return a {@link Future} promise which contains all the collected output and an exit code. In case an error occurs during execution + * the promise will be completed exceptionally. + * @throws IOException + */ + public Future execSimple(String namespace, String podName, long timeout, boolean tty, String... command) + throws IOException { + CompletableFuture promise = new CompletableFuture<>(); + ByteArrayOutputStream outBuffer = new ByteArrayOutputStream(); + ByteArrayOutputStream errBuffer = new ByteArrayOutputStream(); + AsyncPump outPump = new AsyncPump(outBuffer::write, Throwable::printStackTrace, null); + AsyncPump errPump = new AsyncPump(errBuffer::write, Throwable::printStackTrace, null); + exec(namespace, podName, + io -> { + outPump.startAsync(io.getStdout()); + errPump.startAsync(io.getStderr()); + }, + (code, io) -> { + SimpleExecResult result = new SimpleExecResult(); + result.setCode(code); + outPump.waitForExit(); + errPump.waitForExit(); + result.setOut(outBuffer.toByteArray()); + result.setError(errBuffer.toByteArray()); + promise.complete(result); + }, + (err, io) -> { + promise.completeExceptionally(err); + io.close(-1, 0L); + }, + timeout, tty, command); + + return promise; + } + + public static class SimpleExecResult { + private byte[] error; + private byte[] out; + private int code; + + public String getErrorAsString() { + if (hasError()) return new String(error); + return ""; + } + + public String getOutAsString() { + if (hasOut()) return new String(out); + return ""; + } + + public boolean hasError() { + return error != null && error.length > 0; + } + + public boolean hasOut() { + return out != null && out.length > 0; + } + + public byte[] getError() { + return error; + } + + public void setError(byte[] error) { + this.error = error; + } + + public byte[] getOut() { + return out; + } + + public void setOut(byte[] out) { + this.out = out; + } + + public int getCode() { + return code; + } + + public void setCode(int code) { + this.code = code; + } + } + + /** + * A convenience method. Executes a command remotely on a pod and monitors for events in that execution. The monitored events are: + *
- connection established (onOpen) + *
- connection closed (onClosed) + *
- execution error occurred (onError) + *
+ * This method also allows to specify a MAX timeout for the execution and returns a future in order to monitor the execution flow. + * + * @param namespace a namespace the target pod "lives" in + * @param podName a name of the pod to exec the command on + * @param onOpen a callback invoked upon the connection established event. + * @param onClosed a callback invoked upon the process termination. Return code might not always be there. + * @param onError a callback to handle k8s errors (NOT the command errors/stderr!) + * @param timeoutMs timeout in milliseconds for the execution. I.e. the execution will take this many ms or less. If the timeout + * command is running longer than the allowed timeout, the command will be "asked" to terminate gracefully. If the + * command is still running after the grace period, the sigkill will be issued. If null is passed, the timeout will not + * be used and will wait for process to exit itself. + * @param tty whether you need tty to pipe the data. TTY mode will trim some binary data in order to make it possible to show + * on screen (tty) + * @param command a tokenized command to run on the pod + * @return a {@link Future} promise representing this execution. Unless something goes south, the promise will contain the process return + * exit code. If the timeoutMs is non-null and the timeout expires before the process exits, promise will contain + * {@link Integer#MAX_VALUE}. + * @throws IOException + */ + public Future exec(String namespace, String podName, Consumer onOpen, BiConsumer onClosed, + BiConsumer onError, Long timeoutMs, boolean tty, String... command) throws IOException { + CompletableFuture future = new CompletableFuture<>(); + IOTrio io = new IOTrio(); + String cmdStr = Arrays.toString(command); + try { + Process process = newExecutionBuilder(namespace, podName, command) + .setStdin(true) + .setStdout(true) + .setStderr(true) + .setTty(tty) + .execute(); + io.onClose((code, timeout) -> { + process.destroy(); + waitForProcessToExit(process, timeout, cmdStr, err -> onError.accept(err, io)); + // processWaitingThread will handle the rest + }); + io.setStdin(process.getOutputStream()); + io.setStdout(process.getInputStream()); + io.setStderr(process.getErrorStream()); + Thread processWaitingThread = new Thread(() -> { + Supplier returnCode = process::exitValue; + try { + log.debug("Waiting for process to close in {} ms: {}", timeoutMs, cmdStr); + boolean beforeTimout = waitForProcessToExit(process, timeoutMs, cmdStr, err -> onError.accept(err, io)); + if (!beforeTimout) { + returnCode = () -> Integer.MAX_VALUE; + } + } catch (Exception e) { + onError.accept(e, io); + } + log.debug("process.onExit({}): {}", returnCode.get(), cmdStr); + future.complete(returnCode.get()); + onClosed.accept(returnCode.get(), io); + }); + processWaitingThread.setName("Process-Waiting-Thread-" + command[0] + "-" + podName); + processWaitingThread.start(); + onOpen.accept(io); + } catch (ApiException e) { + onError.accept(e, io); + future.completeExceptionally(e); + } + return future; + } + + private boolean waitForProcessToExit(Process process, Long timeoutMs, String cmdStr, Consumer onError) { + boolean beforeTimeout = true; + if (timeoutMs != null && timeoutMs >= 0) { + boolean exited = false; + try { + exited = process.waitFor(timeoutMs, TimeUnit.MILLISECONDS); + } catch (Exception e) { + onError.accept(e); + } + log.debug("Process closed={}: {}", exited, cmdStr); + if (!exited && process.isAlive()) { + beforeTimeout = false; + log.warn("Process timed out after {} ms. Shutting down: {}", timeoutMs, cmdStr); + process.destroy(); + try { + exited = process.waitFor(graceTimeoutMs, TimeUnit.MILLISECONDS); + } catch (Exception e) { + onError.accept(e); + } + if (!exited && process.isAlive()) { + log.warn("Process timed out after {} ms and remained running after grace period. Killing: {}", timeoutMs, cmdStr); + process.destroyForcibly(); + } + } + } else { + try { + process.waitFor(); + } catch (InterruptedException e) { + onError.accept(e); + } + } + return beforeTimeout; + } + + public void setGraceTimeoutMs(long graceTimeoutMs) { + this.graceTimeoutMs = graceTimeoutMs; + } + public final class ExecutionBuilder { private final String namespace; private final String name; From ee527eb13539f1f69beed5df19ec358561747d13 Mon Sep 17 00:00:00 2001 From: netikras Date: Sun, 22 Nov 2020 01:22:49 +0200 Subject: [PATCH 2/2] Exec.exec with callbacks. Tested --- .../kubernetes/client/custom/AsyncPump.java | 123 ------ .../io/kubernetes/client/custom/IOTrio.java | 27 +- .../main/java/io/kubernetes/client/Exec.java | 253 +++++-------- .../kubernetes/client/ExecCallbacksTest.java | 353 ++++++++++++++++++ 4 files changed, 460 insertions(+), 296 deletions(-) delete mode 100644 kubernetes/src/main/java/io/kubernetes/client/custom/AsyncPump.java create mode 100644 util/src/test/java/io/kubernetes/client/ExecCallbacksTest.java diff --git a/kubernetes/src/main/java/io/kubernetes/client/custom/AsyncPump.java b/kubernetes/src/main/java/io/kubernetes/client/custom/AsyncPump.java deleted file mode 100644 index eb0ee6154b..0000000000 --- a/kubernetes/src/main/java/io/kubernetes/client/custom/AsyncPump.java +++ /dev/null @@ -1,123 +0,0 @@ -/* -Copyright 2020 The Kubernetes Authors. -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at -http://www.apache.org/licenses/LICENSE-2.0 -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ -package io.kubernetes.client.custom; - -import java.io.InputStream; -import java.util.concurrent.*; - -public class AsyncPump implements Runnable { - - private Thread thread; - private ExecutorService executorService; - private byte[] buff; - private InputStream is; - private CompletableFuture promise; - private NoisyConsumer consumer; - private NoisyConsumer errorHandler; - private NoisyRunnable onFinish; - - public AsyncPump(NoisyConsumer consumer, NoisyConsumer errorHandler, NoisyRunnable onFinish) { - buff = new byte[1024]; - this.consumer = consumer != null ? consumer : NoisyConsumer.NOP; - this.errorHandler = errorHandler != null ? errorHandler : NoisyConsumer.NOP; - this.onFinish = onFinish != null ? onFinish : NoisyRunnable.NOP; - this.promise = new CompletableFuture<>(); - } - - public AsyncPump(NoisyConsumer consumer, NoisyRunnable onFinish) { - this(consumer, Exception::printStackTrace, onFinish); - } - - public AsyncPump(NoisyConsumer consumer) { - this(consumer, NoisyRunnable.NOP); - } - - public AsyncPump setExecutorService(ExecutorService executorService) { - this.executorService = executorService; - return this; - } - - public AsyncPump stop() { - if (thread != null) - thread.interrupt(); - return this; - } - - public AsyncPump startAsync(InputStream is) { - this.is = is; - - if (executorService != null) { - executorService.submit(this); - } else { - thread = new Thread(this); - thread.setName("Async pump"); - thread.start(); - } - return this; - } - - public boolean waitForExit() { - try { - promise.get(); - return true; - } catch (InterruptedException | ExecutionException e) { - return false; - } - } - - public boolean waitForExit(long timeoutMs) { - try { - promise.get(timeoutMs, TimeUnit.MILLISECONDS); - return true; - } catch (InterruptedException | ExecutionException | TimeoutException e) { - return false; - } - } - - public AsyncPump start(InputStream is) { - this.is = is; - return this; - } - - @Override - public void run() { - int nRead; - try { - while ((nRead = is.read(buff)) > -1) { - byte[] giveAway = new byte[nRead]; - System.arraycopy(buff, 0, giveAway, 0, nRead); - consumer.accept(giveAway); - } - onFinish.run(); - } catch (Exception e) { - try { - errorHandler.accept(e); - } catch (Exception exception) { - promise.completeExceptionally(exception); - } - } - promise.complete(null); - } - - public interface NoisyConsumer { - NoisyConsumer NOP = t -> { }; - - void accept(T t) throws Exception; - } - - public interface NoisyRunnable { - NoisyRunnable NOP = () -> { }; - - void run() throws Exception; - } -} diff --git a/kubernetes/src/main/java/io/kubernetes/client/custom/IOTrio.java b/kubernetes/src/main/java/io/kubernetes/client/custom/IOTrio.java index b43185de5a..cc26503647 100644 --- a/kubernetes/src/main/java/io/kubernetes/client/custom/IOTrio.java +++ b/kubernetes/src/main/java/io/kubernetes/client/custom/IOTrio.java @@ -19,11 +19,12 @@ import java.util.function.BiConsumer; /** - * A collection of all the 3 main pipes used in stdio: STDIN, STDOUT and STDERR. As working with the named pipes is usually carried out - * asynchronously, this collection also provides means to initiate and handle the close() event. A close() initiator calls the - * {@link #close(int, long)} method expressing its intent to close the communication channel. Handlers are notified of this intent and - * its up to the handlers to decide what's to be done next. Calling {@link #close(int, long)} does not close the streams or do anything - * else besides notifying the handlers. + * A collection of all the 3 main pipes used in stdio: STDIN, STDOUT and STDERR. As working with the + * named pipes is usually carried out asynchronously, this collection also provides means to + * initiate and handle the close() event. A close() initiator calls the {@link #close(int, long)} + * method expressing its intent to close the communication channel. Handlers are notified of this + * intent and its up to the handlers to decide what's to be done next. Calling {@link #close(int, + * long)} does not close the streams or do anything else besides notifying the handlers. */ public class IOTrio { private InputStream stdout; @@ -62,20 +63,22 @@ public void setStdin(OutputStream stdin) { /** * Capture the CLOSE intent and handle it accordingly. * - * @param handler the handler that's invoked when someone intends to close this communication. Multiple handlers can be registered + * @param handler the handler that's invoked when someone intends to close this communication. + * Multiple handlers can be registered */ public void onClose(BiConsumer handler) { closeHandlers.add(handler); } /** - * Express an intent to close this communication. This intent will be relayed to all the registered handlers and it's up to them what - * to do with it. - * @param code proposed exit code - * @param timeout time in milliseconds given to terminate this communication. Negative timeout means no timeout (i.e. wait for as long - * as it takes). 0 means "stop it now". + * Express an intent to close this communication. This intent will be relayed to all the + * registered handlers and it's up to them what to do with it. + * + * @param code proposed exit code + * @param timeout time in milliseconds given to terminate this communication. Negative timeout + * means no timeout (i.e. wait for as long as it takes). 0 means "stop it now". */ public void close(int code, long timeout) { closeHandlers.forEach(handler -> handler.accept(code, timeout)); } -} \ No newline at end of file +} diff --git a/util/src/main/java/io/kubernetes/client/Exec.java b/util/src/main/java/io/kubernetes/client/Exec.java index bff27068e2..17cf21a18c 100644 --- a/util/src/main/java/io/kubernetes/client/Exec.java +++ b/util/src/main/java/io/kubernetes/client/Exec.java @@ -16,7 +16,6 @@ import com.google.common.io.CharStreams; import com.google.gson.reflect.TypeToken; -import io.kubernetes.client.custom.AsyncPump; import io.kubernetes.client.custom.IOTrio; import io.kubernetes.client.openapi.ApiClient; import io.kubernetes.client.openapi.ApiException; @@ -28,19 +27,25 @@ import io.kubernetes.client.openapi.models.V1StatusDetails; import io.kubernetes.client.util.WebSocketStreamHandler; import io.kubernetes.client.util.WebSockets; - -import java.io.*; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.Reader; +import java.io.UnsupportedEncodingException; import java.lang.reflect.Type; import java.net.URLEncoder; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.*; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.Consumer; import java.util.function.Supplier; - import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,7 +54,6 @@ public class Exec { private static final Logger log = LoggerFactory.getLogger(Exec.class); private ApiClient apiClient; - private long graceTimeoutMs = 5_000L; /** Simple Exec API constructor, uses default configuration */ public Exec() { @@ -195,170 +199,110 @@ public Process exec( } /** - * A convenience method. It simply executes a single command and returns its complete stdout, stderr and exit code as a result. Command - * execution is asynchronous and can be synchronized through the returned {@link Future}. Any error during the execution will fail the - * {@link Future}. - *
See {@link #exec(String, String, Consumer, BiConsumer, BiConsumer, Long, boolean, String...)} for more details. - * - * @return a {@link Future} promise which contains all the collected output and an exit code. In case an error occurs during execution - * the promise will be completed exceptionally. - * @throws IOException - */ - public Future execSimple(String namespace, String podName, long timeout, boolean tty, String... command) - throws IOException { - CompletableFuture promise = new CompletableFuture<>(); - ByteArrayOutputStream outBuffer = new ByteArrayOutputStream(); - ByteArrayOutputStream errBuffer = new ByteArrayOutputStream(); - AsyncPump outPump = new AsyncPump(outBuffer::write, Throwable::printStackTrace, null); - AsyncPump errPump = new AsyncPump(errBuffer::write, Throwable::printStackTrace, null); - exec(namespace, podName, - io -> { - outPump.startAsync(io.getStdout()); - errPump.startAsync(io.getStderr()); - }, - (code, io) -> { - SimpleExecResult result = new SimpleExecResult(); - result.setCode(code); - outPump.waitForExit(); - errPump.waitForExit(); - result.setOut(outBuffer.toByteArray()); - result.setError(errBuffer.toByteArray()); - promise.complete(result); - }, - (err, io) -> { - promise.completeExceptionally(err); - io.close(-1, 0L); - }, - timeout, tty, command); - - return promise; - } - - public static class SimpleExecResult { - private byte[] error; - private byte[] out; - private int code; - - public String getErrorAsString() { - if (hasError()) return new String(error); - return ""; - } - - public String getOutAsString() { - if (hasOut()) return new String(out); - return ""; - } - - public boolean hasError() { - return error != null && error.length > 0; - } - - public boolean hasOut() { - return out != null && out.length > 0; - } - - public byte[] getError() { - return error; - } - - public void setError(byte[] error) { - this.error = error; - } - - public byte[] getOut() { - return out; - } - - public void setOut(byte[] out) { - this.out = out; - } - - public int getCode() { - return code; - } - - public void setCode(int code) { - this.code = code; - } - } - - /** - * A convenience method. Executes a command remotely on a pod and monitors for events in that execution. The monitored events are: - *
- connection established (onOpen) - *
- connection closed (onClosed) - *
- execution error occurred (onError) - *
- * This method also allows to specify a MAX timeout for the execution and returns a future in order to monitor the execution flow. + * A convenience method. Executes a command remotely on a pod and monitors for events in that + * execution. The monitored events are:
+ * - connection established (onOpen)
+ * - connection closed (onClosed)
+ * - execution error occurred (onError)
+ * This method also allows to specify a MAX timeout for the execution and returns a future in + * order to monitor the execution flow.
+ * onError and onClosed callbacks are invoked asynchronously, in a separate thread.
* * @param namespace a namespace the target pod "lives" in - * @param podName a name of the pod to exec the command on - * @param onOpen a callback invoked upon the connection established event. - * @param onClosed a callback invoked upon the process termination. Return code might not always be there. - * @param onError a callback to handle k8s errors (NOT the command errors/stderr!) - * @param timeoutMs timeout in milliseconds for the execution. I.e. the execution will take this many ms or less. If the timeout - * command is running longer than the allowed timeout, the command will be "asked" to terminate gracefully. If the - * command is still running after the grace period, the sigkill will be issued. If null is passed, the timeout will not - * be used and will wait for process to exit itself. - * @param tty whether you need tty to pipe the data. TTY mode will trim some binary data in order to make it possible to show - * on screen (tty) - * @param command a tokenized command to run on the pod - * @return a {@link Future} promise representing this execution. Unless something goes south, the promise will contain the process return - * exit code. If the timeoutMs is non-null and the timeout expires before the process exits, promise will contain - * {@link Integer#MAX_VALUE}. + * @param podName a name of the pod to exec the command on + * @param onOpen a callback invoked upon the connection established event. + * @param onClosed a callback invoked upon the process termination. Return code might not always + * be there. N.B. this callback is invoked before the returned {@link Future} is completed. + * @param onError a callback to handle k8s errors (NOT the command errors/stderr!) + * @param timeoutMs timeout in milliseconds for the execution. I.e. the execution will take this + * many ms or less. If the timeout command is running longer than the allowed timeout, the + * command will be "asked" to terminate gracefully. If the command is still running after the + * grace period, the sigkill will be issued. If null is passed, the timeout will not be used + * and will wait for process to exit itself. + * @param tty whether you need tty to pipe the data. TTY mode will trim some binary data in order + * to make it possible to show on screen (tty) + * @param command a tokenized command to run on the pod + * @return a {@link Future} promise representing this execution. Unless something goes south, the + * promise will contain the process return exit code. If the timeoutMs is non-null and the + * timeout expires before the process exits, promise will contain {@link Integer#MAX_VALUE}. * @throws IOException */ - public Future exec(String namespace, String podName, Consumer onOpen, BiConsumer onClosed, - BiConsumer onError, Long timeoutMs, boolean tty, String... command) throws IOException { + public Future exec( + String namespace, + String podName, + Consumer onOpen, + BiConsumer onClosed, + BiConsumer onError, + Long timeoutMs, + boolean tty, + String... command) + throws IOException { CompletableFuture future = new CompletableFuture<>(); IOTrio io = new IOTrio(); String cmdStr = Arrays.toString(command); + BiConsumer errHandler = + (err, errIO) -> { + if (onError != null) { + onError.accept(err, errIO); + } + }; try { - Process process = newExecutionBuilder(namespace, podName, command) - .setStdin(true) - .setStdout(true) - .setStderr(true) - .setTty(tty) - .execute(); - io.onClose((code, timeout) -> { - process.destroy(); - waitForProcessToExit(process, timeout, cmdStr, err -> onError.accept(err, io)); - // processWaitingThread will handle the rest - }); + Process process = exec(namespace, podName, command, null, true, tty); + + io.onClose( + (code, timeout) -> { + process.destroy(); + waitForProcessToExit(process, timeout, cmdStr, err -> errHandler.accept(err, io)); + // processWaitingThread will handle the rest + }); io.setStdin(process.getOutputStream()); io.setStdout(process.getInputStream()); io.setStderr(process.getErrorStream()); - Thread processWaitingThread = new Thread(() -> { - Supplier returnCode = process::exitValue; - try { - log.debug("Waiting for process to close in {} ms: {}", timeoutMs, cmdStr); - boolean beforeTimout = waitForProcessToExit(process, timeoutMs, cmdStr, err -> onError.accept(err, io)); - if (!beforeTimout) { - returnCode = () -> Integer.MAX_VALUE; - } - } catch (Exception e) { - onError.accept(e, io); - } - log.debug("process.onExit({}): {}", returnCode.get(), cmdStr); - future.complete(returnCode.get()); - onClosed.accept(returnCode.get(), io); - }); - processWaitingThread.setName("Process-Waiting-Thread-" + command[0] + "-" + podName); - processWaitingThread.start(); - onOpen.accept(io); + runAsync( + "Process-Waiting-Thread-" + command[0] + "-" + podName, + () -> { + Supplier returnCode = process::exitValue; + try { + log.debug("Waiting for process to close in {} ms: {}", timeoutMs, cmdStr); + boolean beforeTimout = + waitForProcessToExit( + process, timeoutMs, cmdStr, err -> errHandler.accept(err, io)); + if (!beforeTimout) { + returnCode = () -> Integer.MAX_VALUE; + } + } catch (Exception e) { + errHandler.accept(e, io); + } + log.debug("process.onExit({}): {}", returnCode.get(), cmdStr); + if (onClosed != null) { + onClosed.accept(returnCode.get(), io); + } + future.complete(returnCode.get()); + }); + if (onOpen != null) { + onOpen.accept(io); + } } catch (ApiException e) { - onError.accept(e, io); + errHandler.accept(e, io); future.completeExceptionally(e); } return future; } - private boolean waitForProcessToExit(Process process, Long timeoutMs, String cmdStr, Consumer onError) { + protected void runAsync(String taskName, Runnable task) { + Thread thread = new Thread(task); + thread.setName(taskName); + thread.start(); + } + + private boolean waitForProcessToExit( + Process process, Long timeoutMs, String cmdStr, Consumer onError) { boolean beforeTimeout = true; if (timeoutMs != null && timeoutMs >= 0) { boolean exited = false; try { exited = process.waitFor(timeoutMs, TimeUnit.MILLISECONDS); - } catch (Exception e) { + } catch (InterruptedException e) { onError.accept(e); } log.debug("Process closed={}: {}", exited, cmdStr); @@ -366,15 +310,6 @@ private boolean waitForProcessToExit(Process process, Long timeoutMs, String cmd beforeTimeout = false; log.warn("Process timed out after {} ms. Shutting down: {}", timeoutMs, cmdStr); process.destroy(); - try { - exited = process.waitFor(graceTimeoutMs, TimeUnit.MILLISECONDS); - } catch (Exception e) { - onError.accept(e); - } - if (!exited && process.isAlive()) { - log.warn("Process timed out after {} ms and remained running after grace period. Killing: {}", timeoutMs, cmdStr); - process.destroyForcibly(); - } } } else { try { @@ -386,10 +321,6 @@ private boolean waitForProcessToExit(Process process, Long timeoutMs, String cmd return beforeTimeout; } - public void setGraceTimeoutMs(long graceTimeoutMs) { - this.graceTimeoutMs = graceTimeoutMs; - } - public final class ExecutionBuilder { private final String namespace; private final String name; diff --git a/util/src/test/java/io/kubernetes/client/ExecCallbacksTest.java b/util/src/test/java/io/kubernetes/client/ExecCallbacksTest.java new file mode 100644 index 0000000000..e3c5a16de0 --- /dev/null +++ b/util/src/test/java/io/kubernetes/client/ExecCallbacksTest.java @@ -0,0 +1,353 @@ +/* +Copyright 2020 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package io.kubernetes.client; + +import static org.junit.Assert.*; + +import io.kubernetes.client.custom.IOTrio; +import io.kubernetes.client.openapi.ApiException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiConsumer; +import org.junit.Test; + +public class ExecCallbacksTest { + + @Test + public void testMaxTimeout() throws Exception { + Exec exec = getExec((future, io) -> Thread.sleep(30_000L)); + + long startTime = System.currentTimeMillis(); + Future promise = + exec.exec("ns-1", "pod1", null, null, null, 1_000L, true, "random-command"); + int exitCode = promise.get(10_000, TimeUnit.MILLISECONDS); + long delta = System.currentTimeMillis() - startTime; + + assertTrue(delta <= 10_000); + assertTrue(delta >= 1_000L); + assertEquals(Integer.MAX_VALUE, exitCode); + } + + @Test + public void stderrTest() throws Exception { + Exec exec = + getExec( + (future, io) -> { + io.getErrors().write("Some error".getBytes()); + future.complete(1); + }); + List errors = new ArrayList<>(); + Future promise = + exec.exec( + "ns-1", + "pod1", + io -> + quietly( + () -> { + byte[] buff = new byte[10]; + io.getStderr().read(buff); + errors.add(new String(buff)); + }), + null, + null, + 100_000L, + true, + "random-command"); + + assertEquals(Integer.valueOf(1), promise.get()); + assertEquals(1, errors.size()); + assertEquals("Some error", errors.get(0)); + } + + @Test + public void stdoutTest() throws Exception { + Exec exec = + getExec( + (future, io) -> { + io.getOutput().write("Some stream".getBytes()); + future.complete(0); + }); + + List output = new ArrayList<>(); + Future promise = + exec.exec( + "ns-1", + "pod1", + io -> + quietly( + () -> { + byte[] buff = new byte[11]; + io.getStdout().read(buff); + output.add(new String(buff)); + }), + null, + null, + 100_000L, + true, + "random-command"); + + assertEquals(Integer.valueOf(0), promise.get()); + assertEquals(1, output.size()); + assertEquals("Some stream", output.get(0)); + } + + @Test + public void onClosedTriggerTest() throws Exception { + Exec exec = getExec((future, io) -> future.complete(9)); + + List codes = new ArrayList<>(1); + Future promise = + exec.exec( + "ns-1", + "pod1", + null, + (code, io) -> codes.add(code), + null, + 100_000L, + true, + "random-command"); + + assertEquals(Integer.valueOf(9), promise.get()); + assertEquals(1, codes.size()); + assertEquals(Integer.valueOf(9), codes.get(0)); + } + + @Test + public void mockedExecutionWithMixedEventsTest() throws Exception { + Exec exec = + getExec( + (future, io) -> + quietly( + () -> { + io.getErrors() + .write( + ("bash: cannot set terminal process group (-1): Inappropriate ioctl for device\n" + + "bash: no job control in this shell\n") + .getBytes()); + read(io.getInput(), "uotime\n"); + write(io.getErrors(), "uotime: command not found\n"); + read(io.getInput(), "uptime\n"); + write( + io.getOutput(), + " 00:44:54 up 30 days, 1:15, 0 users, load average: 0,01, 0,04, 0,00\n"); + read(io.getInput(), "exit\n"); + write(io.getOutput(), "Bye!\n"); + future.complete(5); + })); + + List codes = new ArrayList<>(1); + AtomicBoolean callbackInvoked = new AtomicBoolean(false); + Future promise = + exec.exec( + "ns-1", + "pod1", + io -> + quietly( + () -> { + read( + io.getStderr(), + ("bash: cannot set terminal process group (-1): Inappropriate ioctl for device\n" + + "bash: no job control in this shell\n")); + write(io.getStdin(), "uotime\n"); + read(io.getStderr(), "uotime: command not found\n"); + write(io.getStdin(), "uptime\n"); + read( + io.getStdout(), + " 00:44:54 up 30 days, 1:15, 0 users, load average: 0,01, 0,04, 0,00\n"); + write(io.getStdin(), "exit\n"); + read(io.getStdout(), "Bye!\n"); + callbackInvoked.set(true); + }), + (code, io) -> codes.add(code), + null, + 100_000L, + true, + "ssh", + "pc", + "ssh", + "pi", + "bash", + "-i"); + + assertEquals(Integer.valueOf(5), promise.get()); + assertEquals(1, codes.size()); + assertEquals(Integer.valueOf(5), codes.get(0)); + assertTrue(callbackInvoked.get()); + } + + // helper functions + + private Exec getExec(NoisyBiConsumer, PipedIO> remoteProcess) { + return new MockedExecution((fut, io) -> quietly(() -> remoteProcess.accept(fut, io))); + } + + private void quietly(NoisyRunnable task) { + try { + task.run(); + } catch (Exception e) { + e.printStackTrace(); + fail(); + } + } + + private String read(InputStream is, String expectedText) { + String readText = read(is, expectedText.length()); + assertEquals(expectedText, readText); + return readText; + } + + private void write(OutputStream os, String text) { + quietly(() -> os.write(text.getBytes())); + } + + private String read(InputStream is, int count) { + try { + byte[] buff = new byte[count]; + is.read(buff); + return new String(buff); + } catch (IOException e) { + e.printStackTrace(); + fail(); + } + return null; + } + + private interface NoisyBiConsumer { + void accept(V1 value1, V2 value2) throws Exception; + } + + private interface NoisyRunnable { + void run() throws Exception; + } + + // mocked EXEC/Process and the IO pipe + + private static class MockedExecution extends Exec { + private final BiConsumer, PipedIO> remoteProcess; + private final IOTrio localIO; + + private MockedExecution(BiConsumer, PipedIO> remoteProcess) { + this.remoteProcess = remoteProcess; + localIO = new IOTrio(); + } + + @Override + public Process exec( + String namespace, + String name, + String[] command, + String container, + boolean stdin, + boolean tty) + throws ApiException, IOException { + CompletableFuture taskPromise = new CompletableFuture<>(); + PipedIO pipedIO = new PipedIO(localIO); + runAsync("mocked exec", () -> remoteProcess.accept(taskPromise, pipedIO)); + + return new Process() { + @Override + public OutputStream getOutputStream() { + return localIO.getStdin(); + } + + @Override + public InputStream getInputStream() { + return localIO.getStdout(); + } + + @Override + public InputStream getErrorStream() { + return localIO.getStderr(); + } + + @Override + public int waitFor() throws InterruptedException { + try { + return taskPromise.get(); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + } + + @Override + public int exitValue() { + try { + return taskPromise.get(1, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } catch (TimeoutException e) { + throw new IllegalThreadStateException(); + } + } + + @Override + public void destroy() { + taskPromise.complete(1); + } + }; + } + } + + private static class PipedIO { + private final PipedOutputStream errors; + private final PipedOutputStream output; + private final PipedInputStream input; + private final IOTrio localIO; + + public PipedIO(IOTrio localIO) { + this.localIO = localIO; + try { + output = new PipedOutputStream(); + localIO.setStdout(new PipedInputStream(output)); + + errors = new PipedOutputStream(); + localIO.setStderr(new PipedInputStream(errors)); + + input = new PipedInputStream(); + localIO.setStdin(new PipedOutputStream(input)); + + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public IOTrio getLocalIO() { + return localIO; + } + + public OutputStream getErrors() { + return errors; + } + + public OutputStream getOutput() { + return output; + } + + public InputStream getInput() { + return input; + } + } +}