diff --git a/kubernetes/src/main/java/io/kubernetes/client/custom/IOTrio.java b/kubernetes/src/main/java/io/kubernetes/client/custom/IOTrio.java new file mode 100644 index 0000000000..cc26503647 --- /dev/null +++ b/kubernetes/src/main/java/io/kubernetes/client/custom/IOTrio.java @@ -0,0 +1,84 @@ +/* +Copyright 2020 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package io.kubernetes.client.custom; + +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.Collection; +import java.util.function.BiConsumer; + +/** + * A collection of all the 3 main pipes used in stdio: STDIN, STDOUT and STDERR. As working with the + * named pipes is usually carried out asynchronously, this collection also provides means to + * initiate and handle the close() event. A close() initiator calls the {@link #close(int, long)} + * method expressing its intent to close the communication channel. Handlers are notified of this + * intent and its up to the handlers to decide what's to be done next. Calling {@link #close(int, + * long)} does not close the streams or do anything else besides notifying the handlers. + */ +public class IOTrio { + private InputStream stdout; + private InputStream stderr; + private OutputStream stdin; + private final Collection> closeHandlers; + + public IOTrio() { + closeHandlers = new ArrayList<>(); + } + + public InputStream getStdout() { + return stdout; + } + + public void setStdout(InputStream stdout) { + this.stdout = stdout; + } + + public InputStream getStderr() { + return stderr; + } + + public void setStderr(InputStream stderr) { + this.stderr = stderr; + } + + public OutputStream getStdin() { + return stdin; + } + + public void setStdin(OutputStream stdin) { + this.stdin = stdin; + } + + /** + * Capture the CLOSE intent and handle it accordingly. + * + * @param handler the handler that's invoked when someone intends to close this communication. + * Multiple handlers can be registered + */ + public void onClose(BiConsumer handler) { + closeHandlers.add(handler); + } + + /** + * Express an intent to close this communication. This intent will be relayed to all the + * registered handlers and it's up to them what to do with it. + * + * @param code proposed exit code + * @param timeout time in milliseconds given to terminate this communication. Negative timeout + * means no timeout (i.e. wait for as long as it takes). 0 means "stop it now". + */ + public void close(int code, long timeout) { + closeHandlers.forEach(handler -> handler.accept(code, timeout)); + } +} diff --git a/util/src/main/java/io/kubernetes/client/Exec.java b/util/src/main/java/io/kubernetes/client/Exec.java index 1f1c69e07d..17cf21a18c 100644 --- a/util/src/main/java/io/kubernetes/client/Exec.java +++ b/util/src/main/java/io/kubernetes/client/Exec.java @@ -16,6 +16,7 @@ import com.google.common.io.CharStreams; import com.google.gson.reflect.TypeToken; +import io.kubernetes.client.custom.IOTrio; import io.kubernetes.client.openapi.ApiClient; import io.kubernetes.client.openapi.ApiException; import io.kubernetes.client.openapi.Configuration; @@ -34,11 +35,17 @@ import java.io.UnsupportedEncodingException; import java.lang.reflect.Type; import java.net.URLEncoder; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Supplier; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -191,6 +198,129 @@ public Process exec( .execute(); } + /** + * A convenience method. Executes a command remotely on a pod and monitors for events in that + * execution. The monitored events are:
+ * - connection established (onOpen)
+ * - connection closed (onClosed)
+ * - execution error occurred (onError)
+ * This method also allows to specify a MAX timeout for the execution and returns a future in + * order to monitor the execution flow.
+ * onError and onClosed callbacks are invoked asynchronously, in a separate thread.
+ * + * @param namespace a namespace the target pod "lives" in + * @param podName a name of the pod to exec the command on + * @param onOpen a callback invoked upon the connection established event. + * @param onClosed a callback invoked upon the process termination. Return code might not always + * be there. N.B. this callback is invoked before the returned {@link Future} is completed. + * @param onError a callback to handle k8s errors (NOT the command errors/stderr!) + * @param timeoutMs timeout in milliseconds for the execution. I.e. the execution will take this + * many ms or less. If the timeout command is running longer than the allowed timeout, the + * command will be "asked" to terminate gracefully. If the command is still running after the + * grace period, the sigkill will be issued. If null is passed, the timeout will not be used + * and will wait for process to exit itself. + * @param tty whether you need tty to pipe the data. TTY mode will trim some binary data in order + * to make it possible to show on screen (tty) + * @param command a tokenized command to run on the pod + * @return a {@link Future} promise representing this execution. Unless something goes south, the + * promise will contain the process return exit code. If the timeoutMs is non-null and the + * timeout expires before the process exits, promise will contain {@link Integer#MAX_VALUE}. + * @throws IOException + */ + public Future exec( + String namespace, + String podName, + Consumer onOpen, + BiConsumer onClosed, + BiConsumer onError, + Long timeoutMs, + boolean tty, + String... command) + throws IOException { + CompletableFuture future = new CompletableFuture<>(); + IOTrio io = new IOTrio(); + String cmdStr = Arrays.toString(command); + BiConsumer errHandler = + (err, errIO) -> { + if (onError != null) { + onError.accept(err, errIO); + } + }; + try { + Process process = exec(namespace, podName, command, null, true, tty); + + io.onClose( + (code, timeout) -> { + process.destroy(); + waitForProcessToExit(process, timeout, cmdStr, err -> errHandler.accept(err, io)); + // processWaitingThread will handle the rest + }); + io.setStdin(process.getOutputStream()); + io.setStdout(process.getInputStream()); + io.setStderr(process.getErrorStream()); + runAsync( + "Process-Waiting-Thread-" + command[0] + "-" + podName, + () -> { + Supplier returnCode = process::exitValue; + try { + log.debug("Waiting for process to close in {} ms: {}", timeoutMs, cmdStr); + boolean beforeTimout = + waitForProcessToExit( + process, timeoutMs, cmdStr, err -> errHandler.accept(err, io)); + if (!beforeTimout) { + returnCode = () -> Integer.MAX_VALUE; + } + } catch (Exception e) { + errHandler.accept(e, io); + } + log.debug("process.onExit({}): {}", returnCode.get(), cmdStr); + if (onClosed != null) { + onClosed.accept(returnCode.get(), io); + } + future.complete(returnCode.get()); + }); + if (onOpen != null) { + onOpen.accept(io); + } + } catch (ApiException e) { + errHandler.accept(e, io); + future.completeExceptionally(e); + } + return future; + } + + protected void runAsync(String taskName, Runnable task) { + Thread thread = new Thread(task); + thread.setName(taskName); + thread.start(); + } + + private boolean waitForProcessToExit( + Process process, Long timeoutMs, String cmdStr, Consumer onError) { + boolean beforeTimeout = true; + if (timeoutMs != null && timeoutMs >= 0) { + boolean exited = false; + try { + exited = process.waitFor(timeoutMs, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + onError.accept(e); + } + log.debug("Process closed={}: {}", exited, cmdStr); + if (!exited && process.isAlive()) { + beforeTimeout = false; + log.warn("Process timed out after {} ms. Shutting down: {}", timeoutMs, cmdStr); + process.destroy(); + } + } else { + try { + process.waitFor(); + } catch (InterruptedException e) { + onError.accept(e); + } + } + return beforeTimeout; + } + public final class ExecutionBuilder { private final String namespace; private final String name; diff --git a/util/src/test/java/io/kubernetes/client/ExecCallbacksTest.java b/util/src/test/java/io/kubernetes/client/ExecCallbacksTest.java new file mode 100644 index 0000000000..e3c5a16de0 --- /dev/null +++ b/util/src/test/java/io/kubernetes/client/ExecCallbacksTest.java @@ -0,0 +1,353 @@ +/* +Copyright 2020 The Kubernetes Authors. +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at +http://www.apache.org/licenses/LICENSE-2.0 +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package io.kubernetes.client; + +import static org.junit.Assert.*; + +import io.kubernetes.client.custom.IOTrio; +import io.kubernetes.client.openapi.ApiException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.PipedInputStream; +import java.io.PipedOutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BiConsumer; +import org.junit.Test; + +public class ExecCallbacksTest { + + @Test + public void testMaxTimeout() throws Exception { + Exec exec = getExec((future, io) -> Thread.sleep(30_000L)); + + long startTime = System.currentTimeMillis(); + Future promise = + exec.exec("ns-1", "pod1", null, null, null, 1_000L, true, "random-command"); + int exitCode = promise.get(10_000, TimeUnit.MILLISECONDS); + long delta = System.currentTimeMillis() - startTime; + + assertTrue(delta <= 10_000); + assertTrue(delta >= 1_000L); + assertEquals(Integer.MAX_VALUE, exitCode); + } + + @Test + public void stderrTest() throws Exception { + Exec exec = + getExec( + (future, io) -> { + io.getErrors().write("Some error".getBytes()); + future.complete(1); + }); + List errors = new ArrayList<>(); + Future promise = + exec.exec( + "ns-1", + "pod1", + io -> + quietly( + () -> { + byte[] buff = new byte[10]; + io.getStderr().read(buff); + errors.add(new String(buff)); + }), + null, + null, + 100_000L, + true, + "random-command"); + + assertEquals(Integer.valueOf(1), promise.get()); + assertEquals(1, errors.size()); + assertEquals("Some error", errors.get(0)); + } + + @Test + public void stdoutTest() throws Exception { + Exec exec = + getExec( + (future, io) -> { + io.getOutput().write("Some stream".getBytes()); + future.complete(0); + }); + + List output = new ArrayList<>(); + Future promise = + exec.exec( + "ns-1", + "pod1", + io -> + quietly( + () -> { + byte[] buff = new byte[11]; + io.getStdout().read(buff); + output.add(new String(buff)); + }), + null, + null, + 100_000L, + true, + "random-command"); + + assertEquals(Integer.valueOf(0), promise.get()); + assertEquals(1, output.size()); + assertEquals("Some stream", output.get(0)); + } + + @Test + public void onClosedTriggerTest() throws Exception { + Exec exec = getExec((future, io) -> future.complete(9)); + + List codes = new ArrayList<>(1); + Future promise = + exec.exec( + "ns-1", + "pod1", + null, + (code, io) -> codes.add(code), + null, + 100_000L, + true, + "random-command"); + + assertEquals(Integer.valueOf(9), promise.get()); + assertEquals(1, codes.size()); + assertEquals(Integer.valueOf(9), codes.get(0)); + } + + @Test + public void mockedExecutionWithMixedEventsTest() throws Exception { + Exec exec = + getExec( + (future, io) -> + quietly( + () -> { + io.getErrors() + .write( + ("bash: cannot set terminal process group (-1): Inappropriate ioctl for device\n" + + "bash: no job control in this shell\n") + .getBytes()); + read(io.getInput(), "uotime\n"); + write(io.getErrors(), "uotime: command not found\n"); + read(io.getInput(), "uptime\n"); + write( + io.getOutput(), + " 00:44:54 up 30 days, 1:15, 0 users, load average: 0,01, 0,04, 0,00\n"); + read(io.getInput(), "exit\n"); + write(io.getOutput(), "Bye!\n"); + future.complete(5); + })); + + List codes = new ArrayList<>(1); + AtomicBoolean callbackInvoked = new AtomicBoolean(false); + Future promise = + exec.exec( + "ns-1", + "pod1", + io -> + quietly( + () -> { + read( + io.getStderr(), + ("bash: cannot set terminal process group (-1): Inappropriate ioctl for device\n" + + "bash: no job control in this shell\n")); + write(io.getStdin(), "uotime\n"); + read(io.getStderr(), "uotime: command not found\n"); + write(io.getStdin(), "uptime\n"); + read( + io.getStdout(), + " 00:44:54 up 30 days, 1:15, 0 users, load average: 0,01, 0,04, 0,00\n"); + write(io.getStdin(), "exit\n"); + read(io.getStdout(), "Bye!\n"); + callbackInvoked.set(true); + }), + (code, io) -> codes.add(code), + null, + 100_000L, + true, + "ssh", + "pc", + "ssh", + "pi", + "bash", + "-i"); + + assertEquals(Integer.valueOf(5), promise.get()); + assertEquals(1, codes.size()); + assertEquals(Integer.valueOf(5), codes.get(0)); + assertTrue(callbackInvoked.get()); + } + + // helper functions + + private Exec getExec(NoisyBiConsumer, PipedIO> remoteProcess) { + return new MockedExecution((fut, io) -> quietly(() -> remoteProcess.accept(fut, io))); + } + + private void quietly(NoisyRunnable task) { + try { + task.run(); + } catch (Exception e) { + e.printStackTrace(); + fail(); + } + } + + private String read(InputStream is, String expectedText) { + String readText = read(is, expectedText.length()); + assertEquals(expectedText, readText); + return readText; + } + + private void write(OutputStream os, String text) { + quietly(() -> os.write(text.getBytes())); + } + + private String read(InputStream is, int count) { + try { + byte[] buff = new byte[count]; + is.read(buff); + return new String(buff); + } catch (IOException e) { + e.printStackTrace(); + fail(); + } + return null; + } + + private interface NoisyBiConsumer { + void accept(V1 value1, V2 value2) throws Exception; + } + + private interface NoisyRunnable { + void run() throws Exception; + } + + // mocked EXEC/Process and the IO pipe + + private static class MockedExecution extends Exec { + private final BiConsumer, PipedIO> remoteProcess; + private final IOTrio localIO; + + private MockedExecution(BiConsumer, PipedIO> remoteProcess) { + this.remoteProcess = remoteProcess; + localIO = new IOTrio(); + } + + @Override + public Process exec( + String namespace, + String name, + String[] command, + String container, + boolean stdin, + boolean tty) + throws ApiException, IOException { + CompletableFuture taskPromise = new CompletableFuture<>(); + PipedIO pipedIO = new PipedIO(localIO); + runAsync("mocked exec", () -> remoteProcess.accept(taskPromise, pipedIO)); + + return new Process() { + @Override + public OutputStream getOutputStream() { + return localIO.getStdin(); + } + + @Override + public InputStream getInputStream() { + return localIO.getStdout(); + } + + @Override + public InputStream getErrorStream() { + return localIO.getStderr(); + } + + @Override + public int waitFor() throws InterruptedException { + try { + return taskPromise.get(); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } + } + + @Override + public int exitValue() { + try { + return taskPromise.get(1, TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } catch (ExecutionException e) { + throw new RuntimeException(e); + } catch (TimeoutException e) { + throw new IllegalThreadStateException(); + } + } + + @Override + public void destroy() { + taskPromise.complete(1); + } + }; + } + } + + private static class PipedIO { + private final PipedOutputStream errors; + private final PipedOutputStream output; + private final PipedInputStream input; + private final IOTrio localIO; + + public PipedIO(IOTrio localIO) { + this.localIO = localIO; + try { + output = new PipedOutputStream(); + localIO.setStdout(new PipedInputStream(output)); + + errors = new PipedOutputStream(); + localIO.setStderr(new PipedInputStream(errors)); + + input = new PipedInputStream(); + localIO.setStdin(new PipedOutputStream(input)); + + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + public IOTrio getLocalIO() { + return localIO; + } + + public OutputStream getErrors() { + return errors; + } + + public OutputStream getOutput() { + return output; + } + + public InputStream getInput() { + return input; + } + } +}