Skip to content

Commit

Permalink
feat: Implement 'attach to pod' functionality
Browse files Browse the repository at this point in the history
  • Loading branch information
kuzznya authored and manusa committed Aug 29, 2022
1 parent f3078b5 commit 216b40d
Show file tree
Hide file tree
Showing 6 changed files with 233 additions and 27 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

#### New Features
* Fix #2271: Support periodic refresh of access tokens before they expire
* Fix #4333: Implement "attach to pod" functionality

#### _**Note**_: Breaking changes in the API
* Fix #4206: The Serialization utility class will throw an Exception, instead of returning null, if an untyped unmarshall method is used on something that lacks type information
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,21 @@

public interface Execable {

/**
* Execute a command in a container
*
* @param input the command to run
* @return container with stdin, stdout, stderr streams
* (if redirectingInput(), redirectingOutput(), redirectingError() were called respectively)
*/
ExecWatch exec(String... input);

/**
* Attach to the main process of a container
*
* @return container with stdin, stdout, stderr streams
* (if redirectingInput(), redirectingOutput(), redirectingError() were called respectively)
*/
ExecWatch attach();

}
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import java.io.Reader;
import java.net.InetAddress;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
Expand Down Expand Up @@ -271,39 +272,56 @@ public PodOperationsImpl inContainer(
public ExecWatch exec(String... command) {
String[] actualCommands = command.length >= 1 ? command : EMPTY_COMMAND;
try {
URL url = getURLWithCommandParams(actualCommands);
HttpClient clone = httpClient.newBuilder().readTimeout(0, TimeUnit.MILLISECONDS).build();
final ExecWebSocketListener execWebSocketListener = new ExecWebSocketListener(getContext(), this.context.getExecutor());
CompletableFuture<WebSocket> startedFuture = clone.newWebSocketBuilder()
.subprotocol("v4.channel.k8s.io")
.uri(url.toURI())
.buildAsync(execWebSocketListener);
startedFuture.whenComplete((w, t) -> {
if (t != null) {
execWebSocketListener.onError(w, t);
}
});
Utils.waitUntilReadyOrFail(startedFuture, config.getWebsocketTimeout(), TimeUnit.MILLISECONDS);
return execWebSocketListener;
URL url = getExecURLWithCommandParams(actualCommands);
return setupConnectionToPod(url.toURI());
} catch (Throwable t) {
throw KubernetesClientException.launderThrowable(forOperationType("exec"), t);
}
}

URL getURLWithCommandParams(String[] commands) throws MalformedURLException {
String url = URLUtils.join(getResourceUrl().toString(), "exec");
@Override
public ExecWatch attach() {
try {
URL url = getAttachURL();
return setupConnectionToPod(url.toURI());
} catch (Throwable t) {
throw KubernetesClientException.launderThrowable(forOperationType("attach"), t);
}
}

private URL getExecURLWithCommandParams(String[] commands) throws MalformedURLException {
String url = URLUtils.join(getResourceUrl().toString(), "exec");
URLBuilder httpUrlBuilder = new URLBuilder(url);

for (String cmd : commands) {
httpUrlBuilder.addQueryParameter("command", cmd);
}

getContext().addQueryParameters(httpUrlBuilder);
return httpUrlBuilder.build();
}

private URL getAttachURL() throws MalformedURLException {
String url = URLUtils.join(getResourceUrl().toString(), "attach");
URLBuilder httpUrlBuilder = new URLBuilder(url);
getContext().addQueryParameters(httpUrlBuilder);
return httpUrlBuilder.build();
}

private ExecWebSocketListener setupConnectionToPod(URI uri) {
HttpClient clone = httpClient.newBuilder().readTimeout(0, TimeUnit.MILLISECONDS).build();
ExecWebSocketListener execWebSocketListener = new ExecWebSocketListener(getContext(), this.context.getExecutor());
CompletableFuture<WebSocket> startedFuture = clone.newWebSocketBuilder()
.subprotocol("v4.channel.k8s.io")
.uri(uri)
.buildAsync(execWebSocketListener);
startedFuture.whenComplete((w, t) -> {
if (t != null) {
execWebSocketListener.onError(w, t);
}
});
Utils.waitUntilReadyOrFail(startedFuture, config.getWebsocketTimeout(), TimeUnit.MILLISECONDS);
return execWebSocketListener;
}

@Override
public PodOperationsImpl file(String file) {
return new PodOperationsImpl(getContext().withFile(file), context);
Expand Down
44 changes: 44 additions & 0 deletions kubernetes-itests/src/test/java/io/fabric8/kubernetes/PodIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,50 @@ void execInteractiveShell() throws Exception {
assertEquals(-1, watch.getError().read());
}

@Test
void attach() throws Exception {
client.pods().withName("pod-interactive").waitUntilReady(POD_READY_WAIT_IN_SECONDS, TimeUnit.SECONDS);

CountDownLatch latch = new CountDownLatch(1);
ByteArrayOutputStream stdout = new ByteArrayOutputStream();
AtomicBoolean closed = new AtomicBoolean();
AtomicBoolean failed = new AtomicBoolean();

ExecWatch watch = client.pods().withName("pod-interactive")
.redirectingInput()
.redirectingOutput()
.redirectingError()
.withTTY()
.usingListener(new ExecListener() {
@Override
public void onFailure(Throwable t, Response failureResponse) {
failed.set(true);
latch.countDown();
}

@Override
public void onClose(int i, String s) {
closed.set(true);
latch.countDown();
}
})
.attach();

watch.getInput().write("whoami\n".getBytes(StandardCharsets.UTF_8));
watch.getInput().flush();

InputStreamPumper.pump(watch.getOutput(), stdout::write, Executors.newSingleThreadExecutor());

Awaitility.await().atMost(30, TimeUnit.SECONDS)
.until(() -> stdout.toString().contains("root"));

watch.close();

assertTrue(latch.await(5, TimeUnit.SECONDS));
assertTrue(closed.get());
assertFalse(failed.get());
}

@Test
void readFile() throws IOException {
client.pods().withName("pod-standard").waitUntilReady(POD_READY_WAIT_IN_SECONDS, TimeUnit.SECONDS);
Expand Down
12 changes: 12 additions & 0 deletions kubernetes-itests/src/test/resources/pod-it.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,15 @@ spec:
- name: busybox
image: busybox
command: ["sleep", "36000"]
---
apiVersion: v1
kind: Pod
metadata:
name: pod-interactive
spec:
containers:
- name: busybox
image: busybox
command: ["timeout", "-s", "SIGKILL", "36000", "sh", "-i"]
stdin: true
tty: true
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,11 @@
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
import io.fabric8.kubernetes.client.server.mock.OutputStreamMessage;
import io.fabric8.kubernetes.client.utils.InputStreamPumper;
import io.fabric8.kubernetes.client.utils.Utils;
import io.fabric8.mockwebserver.internal.WebSocketMessage;
import okio.ByteString;
import org.awaitility.Awaitility;
import org.junit.Assert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -64,6 +67,7 @@
import java.nio.file.Path;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import static org.junit.jupiter.api.Assertions.assertEquals;
Expand Down Expand Up @@ -337,23 +341,135 @@ void testExec() throws InterruptedException {

final CountDownLatch execLatch = new CountDownLatch(1);
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ExecWatch watch = client.pods().withName("pod1").writingOutput(baos).usingListener(new ExecListener() {
ExecWatch watch = client.pods().withName("pod1")
.writingOutput(baos)
.usingListener(createCountDownLatchListener(execLatch))
.exec("ls");

execLatch.await(10, TimeUnit.MINUTES);
assertNotNull(watch);
assertEquals(expectedOutput, baos.toString());
watch.close();
}

@Test
void testAttachWithWritingOutput() throws InterruptedException, IOException {
// Given
String validInput = "input";
String expectedOutput = "output";

String invalidInput = "invalid";
String expectedError = "error";

String shutdownInput = "shutdown";

server.expect().withPath("/api/v1/namespaces/test/pods/pod1/attach?stdin=true&stdout=true&stderr=true")
.andUpgradeToWebSocket()
.open()
.expect("\u0000" + validInput) // \u0000 is the file descriptor for stdin
.andEmit(new WebSocketMessage(0L, "\u0001" + expectedOutput, false, true)) // \u0001 is the file descriptor for stdout
.always()
.expect("\u0000" + invalidInput)
.andEmit(new WebSocketMessage(0L, "\u0002" + expectedError, false, true)) // \u0002 is the file descriptor for stderr
.always()
.expect("\u0000" + shutdownInput)
.andEmit(new WebSocketMessage(0L, "\u0003shutdown", false, true))
.always()
.done()
.always();

ByteArrayOutputStream stdout = new ByteArrayOutputStream();
ByteArrayOutputStream stderr = new ByteArrayOutputStream();

CountDownLatch latch = new CountDownLatch(1);

// When
ExecWatch watch = client.pods().withName("pod1")
.redirectingInput()

.writingOutput(stdout)
.writingError(stderr)
.usingListener(createCountDownLatchListener(latch))
.attach();

watch.getInput().write(validInput.getBytes(StandardCharsets.UTF_8));
watch.getInput().flush();
watch.getInput().write(invalidInput.getBytes(StandardCharsets.UTF_8));
watch.getInput().flush();
watch.getInput().write(shutdownInput.getBytes(StandardCharsets.UTF_8));
watch.getInput().flush();

latch.await(1, TimeUnit.MINUTES);

// Then
assertEquals(expectedOutput, stdout.toString());
assertEquals(expectedError, stderr.toString());

watch.close();
}

@Test
void testAttachWithRedirectOutput() throws InterruptedException, IOException {
// Given
String validInput = "input";
String expectedOutput = "output";

String invalidInput = "invalid";
String expectedError = "error";

server.expect().withPath("/api/v1/namespaces/test/pods/pod1/attach?stdin=true&stdout=true&stderr=true")
.andUpgradeToWebSocket()
.open()
.expect("\u0000" + validInput) // \u0000 is the file descriptor for stdin
.andEmit(new WebSocketMessage(0L, "\u0001" + expectedOutput, false, true)) // \u0001 is the file descriptor for stdout
.always()
.expect("\u0000" + invalidInput)
.andEmit(new WebSocketMessage(0L, "\u0002" + expectedError, false, true)) // \u0002 is the file descriptor for stderr
.always()
.done()
.always();

ByteArrayOutputStream stdout = new ByteArrayOutputStream();
ByteArrayOutputStream stderr = new ByteArrayOutputStream();

CountDownLatch latch = new CountDownLatch(1);

// When
ExecWatch watch = client.pods().withName("pod1")
.redirectingInput()
.redirectingOutput()
.redirectingError()
.usingListener(createCountDownLatchListener(latch))
.attach();

watch.getInput().write(validInput.getBytes(StandardCharsets.UTF_8));
watch.getInput().flush();
watch.getInput().write(invalidInput.getBytes(StandardCharsets.UTF_8));
watch.getInput().flush();

InputStreamPumper.pump(watch.getOutput(), stdout::write, Executors.newSingleThreadExecutor());
InputStreamPumper.pump(watch.getError(), stderr::write, Executors.newSingleThreadExecutor());

// Then
Awaitility.await().atMost(30, TimeUnit.SECONDS)
.until(() -> stdout.toString().equals(expectedOutput) && stderr.toString().equals(expectedError));

watch.close();
latch.await(1, TimeUnit.MINUTES);
}

private ExecListener createCountDownLatchListener(CountDownLatch latch) {
return new ExecListener() {
@Override
public void onFailure(Throwable t, Response failureResponse) {
execLatch.countDown();
latch.countDown();
}

@Override
public void onClose(int code, String reason) {
execLatch.countDown();
latch.countDown();
}
}).exec("ls");

execLatch.await(10, TimeUnit.MINUTES);
assertNotNull(watch);
assertEquals(expectedOutput, baos.toString());
watch.close();
};
}

@Test
Expand Down

0 comments on commit 216b40d

Please sign in to comment.