diff --git a/deny.toml b/deny.toml index 94f097d0d..bb69ab040 100644 --- a/deny.toml +++ b/deny.toml @@ -83,6 +83,26 @@ multiple-versions = "deny" name = "idna" version = "0.2" +# waiting on hyper-rustls and below to bump its chain +[[bans.skip]] +name = "windows_i686_msvc" +version = "0.36" +[[bans.skip]] +name = "windows_aarch64_msvc" +version = "0.36" +[[bans.skip]] +name = "windows-sys" +version = "0.36" +[[bans.skip]] +name = "windows_i686_gnu" +version = "0.36" +[[bans.skip]] +name = "windows_x86_64_msvc" +version = "0.36" +[[bans.skip]] +name = "windows_x86_64_gnu" +version = "0.36" + [[bans.skip]] # waiting for ahash/getrandom to bump wasi as we have two branches: # ahash -> getrandom -> wasi old diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 679a5b2b4..8f7700039 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -53,6 +53,7 @@ backoff = "0.4.0" clap = { version = "4.0", default-features = false, features = ["std", "cargo", "derive"] } edit = "0.1.3" tokio-stream = { version = "0.1.9", features = ["net"] } +crossterm = {version = "0.25.0" } [[example]] name = "configmapgen_controller" @@ -206,3 +207,8 @@ path = "custom_client_trace.rs" [[example]] name = "secret_syncer" path = "secret_syncer.rs" + +[[example]] +name = "pod_shell_crossterm" +path = "pod_shell_crossterm.rs" +required-features = ["ws"] \ No newline at end of file diff --git a/examples/pod_shell_crossterm.rs b/examples/pod_shell_crossterm.rs new file mode 100644 index 000000000..e542af2df --- /dev/null +++ b/examples/pod_shell_crossterm.rs @@ -0,0 +1,131 @@ +use futures::{channel::mpsc::Sender, SinkExt, StreamExt}; +use k8s_openapi::api::core::v1::Pod; + +use kube::{ + api::{Api, AttachParams, AttachedProcess, DeleteParams, PostParams, ResourceExt, TerminalSize}, + runtime::wait::{await_condition, conditions::is_pod_running}, + Client, +}; +#[cfg(unix)] use tokio::signal; +use tokio::{io::AsyncWriteExt, select}; + +#[cfg(unix)] +// Send the new terminal size to channel when it change +async fn handle_terminal_size(mut channel: Sender) -> Result<(), anyhow::Error> { + let (width, height) = crossterm::terminal::size()?; + channel.send(TerminalSize { height, width }).await?; + + // create a stream to catch SIGWINCH signal + let mut sig = signal::unix::signal(signal::unix::SignalKind::window_change())?; + loop { + if sig.recv().await == None { + return Ok(()); + } + + let (width, height) = crossterm::terminal::size()?; + channel.send(TerminalSize { height, width }).await?; + } +} + +#[cfg(windows)] +// We don't support window for terminal size change, we only send the initial size +async fn handle_terminal_size(mut channel: Sender) -> Result<(), anyhow::Error> { + let (width, height) = crossterm::terminal::size()?; + channel.send(TerminalSize { height, width }).await?; + let mut ctrl_c = tokio::signal::windows::ctrl_c()?; + ctrl_c.recv().await; + Ok(()) +} + + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let client = Client::try_default().await?; + + let pods: Api = Api::default_namespaced(client); + let p: Pod = serde_json::from_value(serde_json::json!({ + "apiVersion": "v1", + "kind": "Pod", + "metadata": { "name": "example" }, + "spec": { + "containers": [{ + "name": "example", + "image": "alpine", + // Do nothing + "command": ["tail", "-f", "/dev/null"], + }], + } + }))?; + // Create pod if don't exist + pods.create(&PostParams::default(), &p).await?; + + // Wait until the pod is running, otherwise we get 500 error. + let running = await_condition(pods.clone(), "example", is_pod_running()); + let _ = tokio::time::timeout(std::time::Duration::from_secs(15), running).await?; + + { + // Here we we put the terminal in 'raw' mode to directly get the input from the user and sending it to the server and getting the result from the server to display directly. + // We also watch for change in your terminal size and send it to the server so that application that use the size work properly. + crossterm::terminal::enable_raw_mode()?; + let mut attached: AttachedProcess = pods + .exec( + "example", + vec!["sh"], + &AttachParams::default().stdin(true).tty(true).stderr(false), + ) + .await?; + + let mut stdin = tokio_util::io::ReaderStream::new(tokio::io::stdin()); + let mut stdout = tokio::io::stdout(); + + let mut output = tokio_util::io::ReaderStream::new(attached.stdout().unwrap()); + let mut input = attached.stdin().unwrap(); + + let term_tx = attached.terminal_size().unwrap(); + + let mut handle_terminal_size_handle = tokio::spawn(handle_terminal_size(term_tx)); + + loop { + select! { + message = stdin.next() => { + match message { + Some(Ok(message)) => { + input.write(&message).await?; + } + _ => { + break; + }, + } + }, + message = output.next() => { + match message { + Some(Ok(message)) => { + stdout.write(&message).await?; + stdout.flush().await?; + }, + _ => { + break + }, + } + }, + result = &mut handle_terminal_size_handle => { + match result { + Ok(_) => println!("End of terminal size stream"), + Err(e) => println!("Error getting terminal size: {e:?}") + } + }, + }; + } + crossterm::terminal::disable_raw_mode()?; + } + + // Delete it + pods.delete("example", &DeleteParams::default()) + .await? + .map_left(|pdel| { + assert_eq!(pdel.name_any(), "example"); + }); + + println!(""); + Ok(()) +} diff --git a/kube-client/Cargo.toml b/kube-client/Cargo.toml index 9d4a2b066..86349ea44 100644 --- a/kube-client/Cargo.toml +++ b/kube-client/Cargo.toml @@ -37,7 +37,7 @@ rustdoc-args = ["--cfg", "docsrs"] [dependencies] base64 = { version = "0.13.0", optional = true } -chrono = { version = "0.4.19", optional = true, default-features = false } +chrono = { version = "0.4.23", optional = true, default-features = false } dirs = { package = "dirs-next", optional = true, version = "2.0.0" } serde = { version = "1.0.130", features = ["derive"] } serde_json = "1.0.68" diff --git a/kube-client/src/api/mod.rs b/kube-client/src/api/mod.rs index 593624ba9..b65e6855a 100644 --- a/kube-client/src/api/mod.rs +++ b/kube-client/src/api/mod.rs @@ -5,7 +5,7 @@ mod core_methods; #[cfg(feature = "ws")] mod remote_command; use std::fmt::Debug; -#[cfg(feature = "ws")] pub use remote_command::AttachedProcess; +#[cfg(feature = "ws")] pub use remote_command::{AttachedProcess, TerminalSize}; #[cfg(feature = "ws")] mod portforward; #[cfg(feature = "ws")] pub use portforward::Portforwarder; diff --git a/kube-client/src/api/remote_command.rs b/kube-client/src/api/remote_command.rs index 26b1be8c1..96b93ac47 100644 --- a/kube-client/src/api/remote_command.rs +++ b/kube-client/src/api/remote_command.rs @@ -3,22 +3,38 @@ use std::future::Future; use k8s_openapi::apimachinery::pkg::apis::meta::v1::Status; use futures::{ - channel::oneshot, - future::{ - select, - Either::{Left, Right}, - }, + channel::{mpsc, oneshot}, FutureExt, SinkExt, StreamExt, }; +use serde::{Deserialize, Serialize}; use thiserror::Error; -use tokio::io::{AsyncRead, AsyncWrite, AsyncWriteExt, DuplexStream}; -use tokio_tungstenite::{tungstenite as ws, WebSocketStream}; +use tokio::{ + io::{AsyncRead, AsyncWrite, AsyncWriteExt, DuplexStream}, + select, +}; +use tokio_tungstenite::{ + tungstenite::{self as ws}, + WebSocketStream, +}; use super::AttachParams; type StatusReceiver = oneshot::Receiver; type StatusSender = oneshot::Sender; +type TerminalSizeReceiver = mpsc::Receiver; +type TerminalSizeSender = mpsc::Sender; + +/// TerminalSize define the size of a terminal +#[derive(Debug, Serialize, Deserialize)] +#[cfg_attr(docsrs, doc(cfg(feature = "ws")))] +pub struct TerminalSize { + /// width of the terminal + pub width: u16, + /// height of the terminal + pub height: u16, +} + /// Errors from attaching to a pod. #[derive(Debug, Error)] pub enum Error { @@ -57,6 +73,18 @@ pub enum Error { /// Failed to send status object #[error("failed to send status object")] SendStatus, + + /// Fail to serialize Terminalsize object + #[error("failed to serialize TerminalSize object: {0}")] + SerializeTerminalSize(#[source] serde_json::Error), + + /// Fail to send terminal size message + #[error("failed to send terminal size message")] + SendTerminalSize(#[source] ws::Error), + + /// Failed to set terminal size, tty need to be true to resize the terminal + #[error("failed to set terminal size, tty need to be true to resize the terminal")] + TtyNeedToBeTrue, } const MAX_BUF_SIZE: usize = 1024; @@ -78,6 +106,7 @@ pub struct AttachedProcess { stdout_reader: Option, stderr_reader: Option, status_rx: Option, + terminal_resize_tx: Option, task: tokio::task::JoinHandle>, } @@ -102,6 +131,12 @@ impl AttachedProcess { (None, None) }; let (status_tx, status_rx) = oneshot::channel(); + let (terminal_resize_tx, terminal_resize_rx) = if ap.tty { + let (w, r) = mpsc::channel(10); + (Some(w), Some(r)) + } else { + (None, None) + }; let task = tokio::spawn(start_message_loop( stream, @@ -109,6 +144,7 @@ impl AttachedProcess { stdout_writer, stderr_writer, status_tx, + terminal_resize_rx, )); AttachedProcess { @@ -119,6 +155,7 @@ impl AttachedProcess { stdin_writer: Some(stdin_writer), stdout_reader, stderr_reader, + terminal_resize_tx, status_rx: Some(status_rx), } } @@ -179,14 +216,29 @@ impl AttachedProcess { pub fn take_status(&mut self) -> Option>> { self.status_rx.take().map(|recv| recv.map(|res| res.ok())) } + + /// Async writer to change the terminal size + /// ```ignore + /// let mut terminal_size_writer = attached.terminal_size().unwrap(); + /// terminal_size_writer.send(TerminalSize{ + /// height: 100, + /// width: 200, + /// }).await?; + /// ``` + /// Only available if [`AttachParams`](super::AttachParams) had `tty`. + pub fn terminal_size(&mut self) -> Option { + self.terminal_resize_tx.take() + } } +// theses values come from here: https://github.com/kubernetes/kubernetes/blob/master/pkg/kubelet/cri/streaming/remotecommand/websocket.go#L34 const STDIN_CHANNEL: u8 = 0; const STDOUT_CHANNEL: u8 = 1; const STDERR_CHANNEL: u8 = 2; // status channel receives `Status` object on exit. const STATUS_CHANNEL: u8 = 3; -// const RESIZE_CHANNEL: u8 = 4; +// resize channel is use to send TerminalSize object to change the size of the terminal +const RESIZE_CHANNEL: u8 = 4; async fn start_message_loop( stream: WebSocketStream, @@ -194,6 +246,7 @@ async fn start_message_loop( mut stdout: Option, mut stderr: Option, status_tx: StatusSender, + mut terminal_size_rx: Option, ) -> Result<(), Error> where S: AsyncRead + AsyncWrite + Unpin + Sized + Send + 'static, @@ -202,71 +255,80 @@ where let (mut server_send, raw_server_recv) = stream.split(); // Work with filtered messages to reduce noise. let mut server_recv = raw_server_recv.filter_map(filter_message).boxed(); - let mut server_msg = server_recv.next(); - let mut next_stdin = stdin_stream.next(); + let have_terminal_size_rx = terminal_size_rx.is_some(); loop { - match select(server_msg, next_stdin).await { - // from server - Left((Some(message), p_next_stdin)) => { - match message { - Ok(Message::Stdout(bin)) => { + let terminal_size_next = async { + match terminal_size_rx.as_mut() { + Some(tmp) => Some(tmp.next().await), + None => None, + } + }; + select! { + server_message = server_recv.next() => { + match server_message { + Some(Ok(Message::Stdout(bin))) => { if let Some(stdout) = stdout.as_mut() { stdout.write_all(&bin[1..]).await.map_err(Error::WriteStdout)?; } - } - - Ok(Message::Stderr(bin)) => { + }, + Some(Ok(Message::Stderr(bin))) => { if let Some(stderr) = stderr.as_mut() { stderr.write_all(&bin[1..]).await.map_err(Error::WriteStderr)?; } - } - - Ok(Message::Status(bin)) => { - let status = - serde_json::from_slice::(&bin[1..]).map_err(Error::DeserializeStatus)?; + }, + Some(Ok(Message::Status(bin))) => { + let status = serde_json::from_slice::(&bin[1..]).map_err(Error::DeserializeStatus)?; status_tx.send(status).map_err(|_| Error::SendStatus)?; - break; - } - - Err(err) => { + break + }, + Some(Err(err)) => { return Err(Error::ReceiveWebSocketMessage(err)); + }, + None => { + // Connection closed properly + break + }, + } + }, + stdin_message = stdin_stream.next() => { + match stdin_message { + Some(Ok(bytes)) => { + if !bytes.is_empty() { + let mut vec = Vec::with_capacity(bytes.len() + 1); + vec.push(STDIN_CHANNEL); + vec.extend_from_slice(&bytes[..]); + server_send + .send(ws::Message::binary(vec)) + .await + .map_err(Error::SendStdin)?; + } + }, + Some(Err(err)) => { + return Err(Error::ReadStdin(err)); + } + None => { + // Stdin closed (writer half dropped). + // Let the server know and disconnect. + server_send.close().await.map_err(Error::SendClose)?; + break; } } - server_msg = server_recv.next(); - next_stdin = p_next_stdin; - } - - Left((None, _)) => { - // Connection closed properly - break; - } - - // from stdin - Right((Some(Ok(bytes)), p_server_msg)) => { - if !bytes.is_empty() { - let mut vec = Vec::with_capacity(bytes.len() + 1); - vec.push(STDIN_CHANNEL); - vec.extend_from_slice(&bytes[..]); - server_send - .send(ws::Message::binary(vec)) - .await - .map_err(Error::SendStdin)?; + }, + Some(terminal_size_message) = terminal_size_next, if have_terminal_size_rx => { + match terminal_size_message { + Some(new_size) => { + let new_size = serde_json::to_vec(&new_size).map_err(Error::SerializeTerminalSize)?; + let mut vec = Vec::with_capacity(new_size.len() + 1); + vec.push(RESIZE_CHANNEL); + vec.extend_from_slice(&new_size[..]); + server_send.send(ws::Message::Binary(vec)).await.map_err(Error::SendTerminalSize)?; + }, + None => { + break + } } - server_msg = p_server_msg; - next_stdin = stdin_stream.next(); - } - - Right((Some(Err(err)), _)) => { - return Err(Error::ReadStdin(err)); - } - - Right((None, _)) => { - // Stdin closed (writer half dropped). - // Let the server know and disconnect. - server_send.close().await.map_err(Error::SendClose)?; - break; - } + }, } } diff --git a/kube-client/src/client/auth/mod.rs b/kube-client/src/client/auth/mod.rs index 36bd4bb6d..49337266a 100644 --- a/kube-client/src/client/auth/mod.rs +++ b/kube-client/src/client/auth/mod.rs @@ -324,6 +324,9 @@ fn token_from_provider(provider: &AuthProviderConfig) -> Result token_from_oidc_provider(provider), "gcp" => token_from_gcp_provider(provider), + "azure" => Err(Error::AuthExec( + "The azure auth plugin is not supported; use https://github.com/Azure/kubelogin instead".into(), + )), _ => Err(Error::AuthExec(format!( "Authentication with provider {:} not supported", provider.name diff --git a/kube-derive/tests/crd_schema_test.rs b/kube-derive/tests/crd_schema_test.rs index a7d48cb82..48c1e7b45 100644 --- a/kube-derive/tests/crd_schema_test.rs +++ b/kube-derive/tests/crd_schema_test.rs @@ -132,7 +132,7 @@ fn test_serialized_matches_expected() { nullable: None, nullable_skipped_with_default: None, nullable_with_default: None, - timestamp: DateTime::from_utc(NaiveDateTime::from_timestamp(0, 0), Utc), + timestamp: DateTime::from_utc(NaiveDateTime::from_timestamp_opt(0, 0).unwrap(), Utc), complex_enum: ComplexEnum::VariantOne { int: 23 }, untagged_enum_person: UntaggedEnumPerson::GenderAndAge(GenderAndAge { age: 42,