From 5852779c0a06556e40659745bd1852ae8aec8983 Mon Sep 17 00:00:00 2001 From: Armand Picard Date: Thu, 11 Aug 2022 18:03:50 +0200 Subject: [PATCH 01/10] Add support for terminal size when executing command inside a container Signed-off-by: armandpicard --- examples/Cargo.toml | 5 + examples/pod_exec_terminal_size.rs | 134 ++++++++++++++++++++ kube-client/src/api/mod.rs | 2 +- kube-client/src/api/remote_command.rs | 171 +++++++++++++++++--------- 4 files changed, 255 insertions(+), 57 deletions(-) create mode 100644 examples/pod_exec_terminal_size.rs diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 069489761..7f3b4d633 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -53,6 +53,7 @@ backoff = "0.4.0" clap = { version = "3.1.9", default-features = false, features = ["std", "cargo", "derive"] } edit = "0.1.3" tokio-stream = { version = "0.1.9", features = ["net"] } +crossterm = "0.24.0" [[example]] name = "configmapgen_controller" @@ -202,3 +203,7 @@ path = "custom_client_trace.rs" [[example]] name = "secret_syncer" path = "secret_syncer.rs" + +[[example]] +name = "pod_exec_terminal_size" +path = "pod_exec_terminal_size.rs" diff --git a/examples/pod_exec_terminal_size.rs b/examples/pod_exec_terminal_size.rs new file mode 100644 index 000000000..9f269dbe8 --- /dev/null +++ b/examples/pod_exec_terminal_size.rs @@ -0,0 +1,134 @@ +use futures::{channel::mpsc::Sender, SinkExt, StreamExt, TryStreamExt}; +use k8s_openapi::api::core::v1::Pod; +use tracing::*; + +use kube::{ + api::{ + Api, AttachParams, AttachedProcess, DeleteParams, ListParams, PostParams, ResourceExt, TerminalSize, + WatchEvent, + }, + Client, +}; +use tokio::{io::AsyncWriteExt, select}; + +// check terminal size every 1 second and send it to channel if different +async fn handle_terminal_size(mut channel: Sender) { + let (mut width, mut height) = crossterm::terminal::size().unwrap(); + channel + .send(TerminalSize { height, width }) + .await + .expect("fail to write new size to channel"); + + loop { + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; + let (n_width, n_height) = crossterm::terminal::size().unwrap(); + if n_height != height || n_width != width { + height = n_height; + width = n_width; + channel + .send(TerminalSize { height, width }) + .await + .expect("fail to write new size to channel"); + } + } +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt::init(); + let client = Client::try_default().await?; + + let pods: Api = Api::default_namespaced(client); + let p: Pod = serde_json::from_value(serde_json::json!({ + "apiVersion": "v1", + "kind": "Pod", + "metadata": { "name": "example" }, + "spec": { + "containers": [{ + "name": "example", + "image": "alpine", + // Do nothing + "command": ["tail", "-f", "/dev/null"], + }], + } + }))?; + // Create pod if don't exist + pods.create(&PostParams::default(), &p).await?; + + // Wait until the pod is running, otherwise we get 500 error. + let lp = ListParams::default().fields("metadata.name=example").timeout(10); + let mut stream = pods.watch(&lp, "0").await?.boxed(); + while let Some(status) = stream.try_next().await? { + match status { + WatchEvent::Added(o) => { + info!("Added {}", o.name_any()); + } + WatchEvent::Modified(o) => { + let s = o.status.as_ref().expect("status exists on pod"); + if s.phase.clone().unwrap_or_default() == "Running" { + info!("Ready to attach to {}", o.name_any()); + break; + } + } + _ => {} + } + } + + { + crossterm::terminal::enable_raw_mode()?; + let mut attached: AttachedProcess = pods + .exec( + "example", + vec!["sh"], + &AttachParams::default().stdin(true).tty(true).stderr(false), + ) + .await?; + + let mut stdin = tokio_util::io::ReaderStream::new(tokio::io::stdin()); + let mut stdout = tokio::io::stdout(); + + let mut output = tokio_util::io::ReaderStream::new(attached.stdout().unwrap()); + let mut input = attached.stdin().unwrap(); + + let term_tx = attached.terminal_size().unwrap(); + + tokio::spawn(handle_terminal_size(term_tx)); + + loop { + select! { + message = stdin.next() => { + match message { + Some(Ok(message)) => { + input.write(&message).await?; + } + _ => { + break; + }, + } + }, + message = output.next() => { + match message { + Some(Ok(message)) => { + stdout.write(&message).await?; + stdout.flush().await?; + }, + _ => { + break + }, + } + }, + }; + } + crossterm::terminal::disable_raw_mode()?; + } + + // Delete it + pods.delete("example", &DeleteParams::default()) + .await? + .map_left(|pdel| { + assert_eq!(pdel.name_any(), "example"); + }); + + println!(""); + Ok(()) +} diff --git a/kube-client/src/api/mod.rs b/kube-client/src/api/mod.rs index 7872e78b2..fd005c44b 100644 --- a/kube-client/src/api/mod.rs +++ b/kube-client/src/api/mod.rs @@ -5,7 +5,7 @@ mod core_methods; #[cfg(feature = "ws")] mod remote_command; use std::fmt::Debug; -#[cfg(feature = "ws")] pub use remote_command::AttachedProcess; +#[cfg(feature = "ws")] pub use remote_command::{AttachedProcess, TerminalSize}; #[cfg(feature = "ws")] mod portforward; #[cfg(feature = "ws")] pub use portforward::Portforwarder; diff --git a/kube-client/src/api/remote_command.rs b/kube-client/src/api/remote_command.rs index 26b1be8c1..b2ab85ca7 100644 --- a/kube-client/src/api/remote_command.rs +++ b/kube-client/src/api/remote_command.rs @@ -3,22 +3,45 @@ use std::future::Future; use k8s_openapi::apimachinery::pkg::apis::meta::v1::Status; use futures::{ - channel::oneshot, + channel::{ + mpsc::{self, Sender}, + oneshot, + }, future::{ select, Either::{Left, Right}, }, FutureExt, SinkExt, StreamExt, }; +use serde::{Deserialize, Serialize}; use thiserror::Error; -use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, DuplexStream}; -use tokio_tungstenite::{tungstenite as ws, WebSocketStream}; +use tokio::{ + io::{AsyncRead, AsyncWrite, AsyncWriteExt, DuplexStream}, + select, +}; +use tokio_tungstenite::{ + tungstenite::{self as ws, util::NonBlockingError}, + WebSocketStream, +}; use super::AttachParams; type StatusReceiver = oneshot::Receiver; type StatusSender = oneshot::Sender; +type TerminalSizeReceiver = mpsc::Receiver; +type TerminalSizeSender = mpsc::Sender; + +/// TerminalSize define the size of a terminal +#[derive(Debug, Serialize, Deserialize)] +#[cfg_attr(docsrs, doc(cfg(feature = "ws")))] +pub struct TerminalSize { + /// width of the terminal + pub width: u16, + /// height of the terminal + pub height: u16, +} + /// Errors from attaching to a pod. #[derive(Debug, Error)] pub enum Error { @@ -57,6 +80,18 @@ pub enum Error { /// Failed to send status object #[error("failed to send status object")] SendStatus, + + /// Fail to serialize Terminalsize object + #[error("failed to serialize TerminalSize object: {0}")] + SerializeTerminalSize(#[source] serde_json::Error), + + /// Fail to send terminal size message + #[error("failed to send terminal size message")] + SendTerminalSize(#[source] ws::Error), + + /// Failed to set terminal size, tty need to be true to resize the terminal + #[error("failed to set terminal size, tty need to be true to resize the terminal")] + TtyNeedToBeTrue, } const MAX_BUF_SIZE: usize = 1024; @@ -78,6 +113,7 @@ pub struct AttachedProcess { stdout_reader: Option, stderr_reader: Option, status_rx: Option, + terminal_resize_tx: Option, task: tokio::task::JoinHandle>, } @@ -102,6 +138,12 @@ impl AttachedProcess { (None, None) }; let (status_tx, status_rx) = oneshot::channel(); + let (terminal_resize_tx, terminal_resize_rx) = if ap.tty { + let (w, r) = mpsc::channel(10); + (Some(w), Some(r)) + } else { + (None, None) + }; let task = tokio::spawn(start_message_loop( stream, @@ -109,6 +151,7 @@ impl AttachedProcess { stdout_writer, stderr_writer, status_tx, + terminal_resize_rx, )); AttachedProcess { @@ -119,6 +162,7 @@ impl AttachedProcess { stdin_writer: Some(stdin_writer), stdout_reader, stderr_reader, + terminal_resize_tx, status_rx: Some(status_rx), } } @@ -179,6 +223,19 @@ impl AttachedProcess { pub fn take_status(&mut self) -> Option>> { self.status_rx.take().map(|recv| recv.map(|res| res.ok())) } + + /// Async writer to change the terminal size + /// ```ignore + /// let mut terminal_size_writer = attached.terminal_size().unwrap(); + /// terminal_size_writer.send(TerminalSize{ + /// height: 100, + /// width: 200, + /// }).await?; + /// ``` + /// Only available if [`AttachParams`](super::AttachParams) had `tty`. + pub fn terminal_size(&mut self) -> Option { + self.terminal_resize_tx.take() + } } const STDIN_CHANNEL: u8 = 0; @@ -186,7 +243,8 @@ const STDOUT_CHANNEL: u8 = 1; const STDERR_CHANNEL: u8 = 2; // status channel receives `Status` object on exit. const STATUS_CHANNEL: u8 = 3; -// const RESIZE_CHANNEL: u8 = 4; +// resize channel is use to send TerminalSize object to change the size of the terminal +const RESIZE_CHANNEL: u8 = 4; async fn start_message_loop( stream: WebSocketStream, @@ -194,6 +252,7 @@ async fn start_message_loop( mut stdout: Option, mut stderr: Option, status_tx: StatusSender, + mut terminal_size_rx: Option, ) -> Result<(), Error> where S: AsyncRead + AsyncWrite + Unpin + Sized + Send + 'static, @@ -202,71 +261,71 @@ where let (mut server_send, raw_server_recv) = stream.split(); // Work with filtered messages to reduce noise. let mut server_recv = raw_server_recv.filter_map(filter_message).boxed(); - let mut server_msg = server_recv.next(); - let mut next_stdin = stdin_stream.next(); loop { - match select(server_msg, next_stdin).await { - // from server - Left((Some(message), p_next_stdin)) => { - match message { - Ok(Message::Stdout(bin)) => { + select! { + server_message = server_recv.next() => { + match server_message { + Some(Ok(Message::Stdout(bin))) => { if let Some(stdout) = stdout.as_mut() { stdout.write_all(&bin[1..]).await.map_err(Error::WriteStdout)?; } - } - - Ok(Message::Stderr(bin)) => { + }, + Some(Ok(Message::Stderr(bin))) => { if let Some(stderr) = stderr.as_mut() { stderr.write_all(&bin[1..]).await.map_err(Error::WriteStderr)?; } - } - - Ok(Message::Status(bin)) => { - let status = - serde_json::from_slice::(&bin[1..]).map_err(Error::DeserializeStatus)?; + }, + Some(Ok(Message::Status(bin))) => { + let status = serde_json::from_slice::(&bin[1..]).map_err(Error::DeserializeStatus)?; status_tx.send(status).map_err(|_| Error::SendStatus)?; - break; - } - - Err(err) => { + break + }, + Some(Err(err)) => { return Err(Error::ReceiveWebSocketMessage(err)); + }, + None => { + break + }, + } + }, + stdin_message = stdin_stream.next() => { + match stdin_message { + Some(Ok(bytes)) => { + if !bytes.is_empty() { + let mut vec = Vec::with_capacity(bytes.len() + 1); + vec.push(STDIN_CHANNEL); + vec.extend_from_slice(&bytes[..]); + server_send + .send(ws::Message::binary(vec)) + .await + .map_err(Error::SendStdin)?; + } + }, + Some(Err(err)) => { + return Err(Error::ReadStdin(err)); + } + None => { + // Stdin closed (writer half dropped). + // Let the server know and disconnect. + server_send.close().await.map_err(Error::SendClose)?; + break; } } - server_msg = server_recv.next(); - next_stdin = p_next_stdin; - } - - Left((None, _)) => { - // Connection closed properly - break; - } - - // from stdin - Right((Some(Ok(bytes)), p_server_msg)) => { - if !bytes.is_empty() { - let mut vec = Vec::with_capacity(bytes.len() + 1); - vec.push(STDIN_CHANNEL); - vec.extend_from_slice(&bytes[..]); - server_send - .send(ws::Message::binary(vec)) - .await - .map_err(Error::SendStdin)?; + }, + terminal_size_message = terminal_size_rx.as_mut().unwrap().next(), if terminal_size_rx.is_some() => { + match terminal_size_message { + Some(new_size) => { + let mut vec = vec![RESIZE_CHANNEL]; + let mut serde_vec = serde_json::to_vec(&new_size).map_err(Error::SerializeTerminalSize)?; + vec.append(&mut serde_vec); + server_send.send(ws::Message::Binary(vec)).await.map_err(Error::SendTerminalSize)?; + }, + None => { + break + } } - server_msg = p_server_msg; - next_stdin = stdin_stream.next(); - } - - Right((Some(Err(err)), _)) => { - return Err(Error::ReadStdin(err)); - } - - Right((None, _)) => { - // Stdin closed (writer half dropped). - // Let the server know and disconnect. - server_send.close().await.map_err(Error::SendClose)?; - break; - } + }, } } From a0f42ab17595bc86cd56f3663bc1e01f4ab49edb Mon Sep 17 00:00:00 2001 From: armandpicard Date: Fri, 12 Aug 2022 21:56:15 +0200 Subject: [PATCH 02/10] remove unused import + fix example Signed-off-by: armandpicard --- examples/pod_exec_terminal_size.rs | 53 ++++++++++----------------- kube-client/src/api/remote_command.rs | 9 ++--- 2 files changed, 22 insertions(+), 40 deletions(-) diff --git a/examples/pod_exec_terminal_size.rs b/examples/pod_exec_terminal_size.rs index 9f269dbe8..4427a147e 100644 --- a/examples/pod_exec_terminal_size.rs +++ b/examples/pod_exec_terminal_size.rs @@ -1,35 +1,35 @@ -use futures::{channel::mpsc::Sender, SinkExt, StreamExt, TryStreamExt}; +use futures::{channel::mpsc::Sender, SinkExt, StreamExt}; use k8s_openapi::api::core::v1::Pod; -use tracing::*; use kube::{ api::{ - Api, AttachParams, AttachedProcess, DeleteParams, ListParams, PostParams, ResourceExt, TerminalSize, - WatchEvent, + Api, AttachParams, AttachedProcess, DeleteParams, PostParams, ResourceExt, TerminalSize, }, - Client, + Client, runtime::wait::{await_condition, conditions::is_pod_running}, }; use tokio::{io::AsyncWriteExt, select}; -// check terminal size every 1 second and send it to channel if different +// send the new terminal size to channel when it change async fn handle_terminal_size(mut channel: Sender) { - let (mut width, mut height) = crossterm::terminal::size().unwrap(); + let (width, height) = crossterm::terminal::size().unwrap(); channel .send(TerminalSize { height, width }) .await .expect("fail to write new size to channel"); + let mut stream = tokio::signal::unix::signal( + tokio::signal::unix::SignalKind::window_change() + ).expect("fail to create signal handler for SIGWINCH"); + loop { - tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; - let (n_width, n_height) = crossterm::terminal::size().unwrap(); - if n_height != height || n_width != width { - height = n_height; - width = n_width; - channel - .send(TerminalSize { height, width }) - .await - .expect("fail to write new size to channel"); - } + // wait for a change + stream.recv().await; + let (width, height) = crossterm::terminal::size().unwrap(); + channel + .send(TerminalSize { height, width }) + .await + .expect("fail to write new size to channel"); + } } @@ -56,23 +56,8 @@ async fn main() -> anyhow::Result<()> { pods.create(&PostParams::default(), &p).await?; // Wait until the pod is running, otherwise we get 500 error. - let lp = ListParams::default().fields("metadata.name=example").timeout(10); - let mut stream = pods.watch(&lp, "0").await?.boxed(); - while let Some(status) = stream.try_next().await? { - match status { - WatchEvent::Added(o) => { - info!("Added {}", o.name_any()); - } - WatchEvent::Modified(o) => { - let s = o.status.as_ref().expect("status exists on pod"); - if s.phase.clone().unwrap_or_default() == "Running" { - info!("Ready to attach to {}", o.name_any()); - break; - } - } - _ => {} - } - } + let running = await_condition(pods.clone(), "example", is_pod_running()); + let _ = tokio::time::timeout(std::time::Duration::from_secs(15), running).await?; { crossterm::terminal::enable_raw_mode()?; diff --git a/kube-client/src/api/remote_command.rs b/kube-client/src/api/remote_command.rs index b2ab85ca7..bd02a144c 100644 --- a/kube-client/src/api/remote_command.rs +++ b/kube-client/src/api/remote_command.rs @@ -4,13 +4,9 @@ use k8s_openapi::apimachinery::pkg::apis::meta::v1::Status; use futures::{ channel::{ - mpsc::{self, Sender}, + mpsc, oneshot, }, - future::{ - select, - Either::{Left, Right}, - }, FutureExt, SinkExt, StreamExt, }; use serde::{Deserialize, Serialize}; @@ -20,7 +16,7 @@ use tokio::{ select, }; use tokio_tungstenite::{ - tungstenite::{self as ws, util::NonBlockingError}, + tungstenite::{self as ws}, WebSocketStream, }; @@ -285,6 +281,7 @@ where return Err(Error::ReceiveWebSocketMessage(err)); }, None => { + // Connection closed properly break }, } From 28b61feb0b4bf66d03bfdfbc82c2014c64a4e4f8 Mon Sep 17 00:00:00 2001 From: armandpicard Date: Sat, 13 Aug 2022 00:40:37 +0200 Subject: [PATCH 03/10] Use crossterm::event::EventStream to get terminal change in pod_exec_terminal size example Signed-off-by: armandpicard --- examples/Cargo.toml | 2 +- examples/pod_exec_terminal_size.rs | 45 +++++++++++++++++++----------- 2 files changed, 30 insertions(+), 17 deletions(-) diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 7f3b4d633..abebcdad2 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -53,7 +53,7 @@ backoff = "0.4.0" clap = { version = "3.1.9", default-features = false, features = ["std", "cargo", "derive"] } edit = "0.1.3" tokio-stream = { version = "0.1.9", features = ["net"] } -crossterm = "0.24.0" +crossterm = {version = "0.25.0", features = ["event-stream"]} [[example]] name = "configmapgen_controller" diff --git a/examples/pod_exec_terminal_size.rs b/examples/pod_exec_terminal_size.rs index 4427a147e..3833db4e1 100644 --- a/examples/pod_exec_terminal_size.rs +++ b/examples/pod_exec_terminal_size.rs @@ -8,29 +8,36 @@ use kube::{ Client, runtime::wait::{await_condition, conditions::is_pod_running}, }; use tokio::{io::AsyncWriteExt, select}; +use crossterm::event::{EventStream, Event}; // send the new terminal size to channel when it change -async fn handle_terminal_size(mut channel: Sender) { - let (width, height) = crossterm::terminal::size().unwrap(); +async fn handle_terminal_size(mut channel: Sender) -> Result<(), anyhow::Error> { + // get event stream to get resize event + let mut reader = EventStream::new(); + + let (width, height) = crossterm::terminal::size()?; channel .send(TerminalSize { height, width }) - .await - .expect("fail to write new size to channel"); - - let mut stream = tokio::signal::unix::signal( - tokio::signal::unix::SignalKind::window_change() - ).expect("fail to create signal handler for SIGWINCH"); + .await?; loop { // wait for a change - stream.recv().await; - let (width, height) = crossterm::terminal::size().unwrap(); - channel - .send(TerminalSize { height, width }) - .await - .expect("fail to write new size to channel"); - + let maybe_event = reader.next().await; + match maybe_event { + Some(Ok(Event::Resize(width, height))) => { + channel + .send(TerminalSize { height, width }) + .await? + }, + // we don't care about other events type + Some(Ok(_)) => {}, + Some(Err(err)) => { + return Err(err.into()); + } + None => break, + } } + Ok(()) } #[tokio::main] @@ -77,7 +84,7 @@ async fn main() -> anyhow::Result<()> { let term_tx = attached.terminal_size().unwrap(); - tokio::spawn(handle_terminal_size(term_tx)); + let mut handle_terminal_size_handle = tokio::spawn(handle_terminal_size(term_tx)); loop { select! { @@ -102,6 +109,12 @@ async fn main() -> anyhow::Result<()> { }, } }, + result = &mut handle_terminal_size_handle => { + match result { + Ok(_) => println!("End of terminal size stream"), + Err(e) => println!("Error getting terminal size: {e:?}") + } + }, }; } crossterm::terminal::disable_raw_mode()?; From e173782ce7c147c748835f87da949df95e88552e Mon Sep 17 00:00:00 2001 From: armandpicard Date: Sat, 13 Aug 2022 14:14:26 +0200 Subject: [PATCH 04/10] Fix error when terminal_resize_tx is None Signed-off-by: armandpicard --- kube-client/src/api/remote_command.rs | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/kube-client/src/api/remote_command.rs b/kube-client/src/api/remote_command.rs index bd02a144c..82a1a49fb 100644 --- a/kube-client/src/api/remote_command.rs +++ b/kube-client/src/api/remote_command.rs @@ -257,8 +257,15 @@ where let (mut server_send, raw_server_recv) = stream.split(); // Work with filtered messages to reduce noise. let mut server_recv = raw_server_recv.filter_map(filter_message).boxed(); + let have_terminal_size_rx = terminal_size_rx.is_some(); loop { + let terminal_size_next = async { + match terminal_size_rx.as_mut() { + Some(tmp) => Some(tmp.next().await), + None => None, + } + }; select! { server_message = server_recv.next() => { match server_message { @@ -310,7 +317,7 @@ where } } }, - terminal_size_message = terminal_size_rx.as_mut().unwrap().next(), if terminal_size_rx.is_some() => { + Some(terminal_size_message) = terminal_size_next, if have_terminal_size_rx => { match terminal_size_message { Some(new_size) => { let mut vec = vec![RESIZE_CHANNEL]; From 1c5fdafe58ac4528117977f1c2ac8a1b4aa67167 Mon Sep 17 00:00:00 2001 From: armandpicard Date: Tue, 1 Nov 2022 20:14:15 +0100 Subject: [PATCH 05/10] Add some comments Signed-off-by: armandpicard --- examples/pod_exec_terminal_size.rs | 2 ++ kube-client/src/api/remote_command.rs | 1 + 2 files changed, 3 insertions(+) diff --git a/examples/pod_exec_terminal_size.rs b/examples/pod_exec_terminal_size.rs index 3833db4e1..97d22a0cd 100644 --- a/examples/pod_exec_terminal_size.rs +++ b/examples/pod_exec_terminal_size.rs @@ -67,6 +67,8 @@ async fn main() -> anyhow::Result<()> { let _ = tokio::time::timeout(std::time::Duration::from_secs(15), running).await?; { + // Here we we put the teerminal in 'raw' mode to directly get the input from the user and sending it to the server and getting the result from the server to disploy directly. + // We also watch for change in your terminal size and send it to the server so that application that use the size work properly. crossterm::terminal::enable_raw_mode()?; let mut attached: AttachedProcess = pods .exec( diff --git a/kube-client/src/api/remote_command.rs b/kube-client/src/api/remote_command.rs index 82a1a49fb..599fa113a 100644 --- a/kube-client/src/api/remote_command.rs +++ b/kube-client/src/api/remote_command.rs @@ -234,6 +234,7 @@ impl AttachedProcess { } } +// theses values come from here: https://github.com/kubernetes/kubernetes/blob/master/pkg/kubelet/cri/streaming/remotecommand/websocket.go#L34 const STDIN_CHANNEL: u8 = 0; const STDOUT_CHANNEL: u8 = 1; const STDERR_CHANNEL: u8 = 2; From d0ce48a62fd91b677eb0040c78e0f578afbaf71b Mon Sep 17 00:00:00 2001 From: armand picard <33733022+armandpicard@users.noreply.github.com> Date: Tue, 1 Nov 2022 22:39:51 +0100 Subject: [PATCH 06/10] Fix typos Co-authored-by: kazk Signed-off-by: armand picard <33733022+armandpicard@users.noreply.github.com> --- examples/pod_exec_terminal_size.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/pod_exec_terminal_size.rs b/examples/pod_exec_terminal_size.rs index 97d22a0cd..24a90b889 100644 --- a/examples/pod_exec_terminal_size.rs +++ b/examples/pod_exec_terminal_size.rs @@ -67,7 +67,7 @@ async fn main() -> anyhow::Result<()> { let _ = tokio::time::timeout(std::time::Duration::from_secs(15), running).await?; { - // Here we we put the teerminal in 'raw' mode to directly get the input from the user and sending it to the server and getting the result from the server to disploy directly. + // Here we we put the terminal in 'raw' mode to directly get the input from the user and sending it to the server and getting the result from the server to display directly. // We also watch for change in your terminal size and send it to the server so that application that use the size work properly. crossterm::terminal::enable_raw_mode()?; let mut attached: AttachedProcess = pods From 8a984345f2ec010f5aa7a88e89b14d918a98cb5b Mon Sep 17 00:00:00 2001 From: armandpicard Date: Wed, 2 Nov 2022 23:20:30 +0100 Subject: [PATCH 07/10] Fix: uniformize message send Signed-off-by: armandpicard --- kube-client/src/api/remote_command.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/kube-client/src/api/remote_command.rs b/kube-client/src/api/remote_command.rs index 599fa113a..1b86095cf 100644 --- a/kube-client/src/api/remote_command.rs +++ b/kube-client/src/api/remote_command.rs @@ -321,9 +321,10 @@ where Some(terminal_size_message) = terminal_size_next, if have_terminal_size_rx => { match terminal_size_message { Some(new_size) => { - let mut vec = vec![RESIZE_CHANNEL]; - let mut serde_vec = serde_json::to_vec(&new_size).map_err(Error::SerializeTerminalSize)?; - vec.append(&mut serde_vec); + let new_size = serde_json::to_vec(&new_size).map_err(Error::SerializeTerminalSize)?; + let mut vec = Vec::with_capacity(new_size.len() + 1); + vec.push(RESIZE_CHANNEL); + vec.extend_from_slice(&new_size[..]); server_send.send(ws::Message::Binary(vec)).await.map_err(Error::SendTerminalSize)?; }, None => { From 32a6769298f19472bb2a609575d5a462f4abd305 Mon Sep 17 00:00:00 2001 From: armandpicard Date: Tue, 8 Nov 2022 19:40:59 +0100 Subject: [PATCH 08/10] Remove crossterm event stream Signed-off-by: armandpicard --- examples/pod_exec_terminal_size.rs | 29 ++++++++++------------------- 1 file changed, 10 insertions(+), 19 deletions(-) diff --git a/examples/pod_exec_terminal_size.rs b/examples/pod_exec_terminal_size.rs index 24a90b889..8f6af319a 100644 --- a/examples/pod_exec_terminal_size.rs +++ b/examples/pod_exec_terminal_size.rs @@ -7,42 +7,33 @@ use kube::{ }, Client, runtime::wait::{await_condition, conditions::is_pod_running}, }; -use tokio::{io::AsyncWriteExt, select}; +use tokio::{io::AsyncWriteExt, select, signal}; use crossterm::event::{EventStream, Event}; // send the new terminal size to channel when it change async fn handle_terminal_size(mut channel: Sender) -> Result<(), anyhow::Error> { - // get event stream to get resize event - let mut reader = EventStream::new(); - let (width, height) = crossterm::terminal::size()?; channel .send(TerminalSize { height, width }) .await?; + // create a stream to catch SIGWINCH signal + let mut sig = signal::unix::signal(signal::unix::SignalKind::window_change())?; loop { - // wait for a change - let maybe_event = reader.next().await; - match maybe_event { - Some(Ok(Event::Resize(width, height))) => { - channel - .send(TerminalSize { height, width }) - .await? - }, - // we don't care about other events type - Some(Ok(_)) => {}, - Some(Err(err)) => { - return Err(err.into()); - } - None => break, + if sig.recv().await == None { + return Ok(()) } + + let (width, height) = crossterm::terminal::size()?; + channel + .send(TerminalSize { height, width }) + .await?; } Ok(()) } #[tokio::main] async fn main() -> anyhow::Result<()> { - tracing_subscriber::fmt::init(); let client = Client::try_default().await?; let pods: Api = Api::default_namespaced(client); From 80651acf7c03b9cfc5eb43f2cb132da0cca349f6 Mon Sep 17 00:00:00 2001 From: armandpicard Date: Sat, 12 Nov 2022 16:21:29 +0100 Subject: [PATCH 09/10] Rename example pod_shell_crossterm Signed-off-by: armandpicard --- examples/Cargo.toml | 4 ++-- .../{pod_exec_terminal_size.rs => pod_shell_crossterm.rs} | 0 2 files changed, 2 insertions(+), 2 deletions(-) rename examples/{pod_exec_terminal_size.rs => pod_shell_crossterm.rs} (100%) diff --git a/examples/Cargo.toml b/examples/Cargo.toml index abebcdad2..1c7dab8f6 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -205,5 +205,5 @@ name = "secret_syncer" path = "secret_syncer.rs" [[example]] -name = "pod_exec_terminal_size" -path = "pod_exec_terminal_size.rs" +name = "pod_shell_crossterm" +path = "pod_shell_crossterm.rs" diff --git a/examples/pod_exec_terminal_size.rs b/examples/pod_shell_crossterm.rs similarity index 100% rename from examples/pod_exec_terminal_size.rs rename to examples/pod_shell_crossterm.rs From ea0d2ecded277764f3a07e5ef7e707edd9177e60 Mon Sep 17 00:00:00 2001 From: armandpicard Date: Sat, 12 Nov 2022 23:24:45 +0100 Subject: [PATCH 10/10] Make example run on window but without handling terminal size change + run fmt Signed-off-by: armandpicard --- examples/Cargo.toml | 3 ++- examples/pod_shell_crossterm.rs | 35 ++++++++++++++++----------- kube-client/src/api/remote_command.rs | 5 +--- 3 files changed, 24 insertions(+), 19 deletions(-) diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 90bd472f0..8f7700039 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -53,7 +53,7 @@ backoff = "0.4.0" clap = { version = "4.0", default-features = false, features = ["std", "cargo", "derive"] } edit = "0.1.3" tokio-stream = { version = "0.1.9", features = ["net"] } -crossterm = {version = "0.25.0", features = ["event-stream"]} +crossterm = {version = "0.25.0" } [[example]] name = "configmapgen_controller" @@ -211,3 +211,4 @@ path = "secret_syncer.rs" [[example]] name = "pod_shell_crossterm" path = "pod_shell_crossterm.rs" +required-features = ["ws"] \ No newline at end of file diff --git a/examples/pod_shell_crossterm.rs b/examples/pod_shell_crossterm.rs index 8f6af319a..3cfc27acf 100644 --- a/examples/pod_shell_crossterm.rs +++ b/examples/pod_shell_crossterm.rs @@ -1,37 +1,44 @@ use futures::{channel::mpsc::Sender, SinkExt, StreamExt}; use k8s_openapi::api::core::v1::Pod; +#[cfg(unix)] use crossterm::event::Event; use kube::{ - api::{ - Api, AttachParams, AttachedProcess, DeleteParams, PostParams, ResourceExt, TerminalSize, - }, - Client, runtime::wait::{await_condition, conditions::is_pod_running}, + api::{Api, AttachParams, AttachedProcess, DeleteParams, PostParams, ResourceExt, TerminalSize}, + runtime::wait::{await_condition, conditions::is_pod_running}, + Client, }; -use tokio::{io::AsyncWriteExt, select, signal}; -use crossterm::event::{EventStream, Event}; +#[cfg(unix)] use tokio::signal; +use tokio::{io::AsyncWriteExt, select}; -// send the new terminal size to channel when it change +#[cfg(unix)] +// Send the new terminal size to channel when it change async fn handle_terminal_size(mut channel: Sender) -> Result<(), anyhow::Error> { let (width, height) = crossterm::terminal::size()?; - channel - .send(TerminalSize { height, width }) - .await?; + channel.send(TerminalSize { height, width }).await?; // create a stream to catch SIGWINCH signal let mut sig = signal::unix::signal(signal::unix::SignalKind::window_change())?; loop { if sig.recv().await == None { - return Ok(()) + return Ok(()); } let (width, height) = crossterm::terminal::size()?; - channel - .send(TerminalSize { height, width }) - .await?; + channel.send(TerminalSize { height, width }).await?; } +} + +#[cfg(windows)] +// We don't support window for terminal size change, we only send the initial size +async fn handle_terminal_size(mut channel: Sender) -> Result<(), anyhow::Error> { + let (width, height) = crossterm::terminal::size()?; + channel.send(TerminalSize { height, width }).await?; + let mut ctrl_c = tokio::signal::windows::ctrl_c()?; + ctrl_c.recv().await; Ok(()) } + #[tokio::main] async fn main() -> anyhow::Result<()> { let client = Client::try_default().await?; diff --git a/kube-client/src/api/remote_command.rs b/kube-client/src/api/remote_command.rs index 1b86095cf..96b93ac47 100644 --- a/kube-client/src/api/remote_command.rs +++ b/kube-client/src/api/remote_command.rs @@ -3,10 +3,7 @@ use std::future::Future; use k8s_openapi::apimachinery::pkg::apis::meta::v1::Status; use futures::{ - channel::{ - mpsc, - oneshot, - }, + channel::{mpsc, oneshot}, FutureExt, SinkExt, StreamExt, }; use serde::{Deserialize, Serialize};