From a6208f1625c63c662c6bd7b71eef9a401e4014dc Mon Sep 17 00:00:00 2001 From: Tiago Castro Date: Wed, 3 Aug 2022 19:09:50 +0100 Subject: [PATCH 1/4] fix: portforward connections In some situations it was observed that connections lingered in a CLOSE_WAIT state or even ESTABLISHED state. In others, an existing connection would be reused and would fail at the time of use. When the reader half is closed, signal it to the forwarder_loop task. When a websocket close message is received, signal it to the forwarder_loop task so it may shutdown all write half's. Close the non-taken streams when joining so we may terminate. Signed-off-by: Tiago Castro --- kube-client/src/api/portforward.rs | 58 +++++++++++++++++++++++++++--- 1 file changed, 53 insertions(+), 5 deletions(-) diff --git a/kube-client/src/api/portforward.rs b/kube-client/src/api/portforward.rs index 6dee12b4c..9a77d1474 100644 --- a/kube-client/src/api/portforward.rs +++ b/kube-client/src/api/portforward.rs @@ -60,6 +60,10 @@ pub enum Error { #[error("failed to complete the background task: {0}")] Spawn(#[source] tokio::task::JoinError), + + /// Failed to shutdown a pod writer channel. + #[error("failed to shutdown write to Pod channel: {0}")] + Shutdown(#[source] std::io::Error), } type ErrorReceiver = oneshot::Receiver; @@ -69,6 +73,8 @@ type ErrorSender = oneshot::Sender; enum Message { FromPod(u8, Bytes), ToPod(u8, Bytes), + FromPodClose, + ToPodClose(u8), } /// Manages port-forwarded streams. @@ -138,7 +144,8 @@ impl Portforwarder { } /// Waits for port forwarding task to complete. - pub async fn join(self) -> Result<(), Error> { + pub async fn join(mut self) -> Result<(), Error> { + self.ports.clear(); self.task.await.unwrap_or_else(|e| Err(Error::Spawn(e))) } } @@ -192,6 +199,10 @@ async fn to_pod_loop( .map_err(Error::ForwardToPod)?; } } + sender + .send(Message::ToPodClose(ch)) + .await + .map_err(Error::ForwardToPod)?; Ok(()) } @@ -217,6 +228,12 @@ where .await .map_err(Error::ForwardFromPod)?; } + message if message.is_close() => { + sender + .send(Message::FromPodClose) + .await + .map_err(Error::ForwardFromPod)?; + } // REVIEW should we error on unexpected websocket message? _ => {} } @@ -240,6 +257,9 @@ where { // Keep track if the channel has received the initialization frame. let mut initialized = vec![false; 2 * ports.len()]; + let mut shutdown = vec![false; 2 * ports.len()]; + let mut closed_ports = 0; + let mut socket_shutdown = false; while let Some(msg) = receiver.next().await { match msg { Message::FromPod(ch, mut bytes) => { @@ -277,10 +297,12 @@ where sender.send(s).map_err(Error::ForwardErrorMessage)?; } } else { - writers[port_index] - .write_all(&bytes) - .await - .map_err(Error::WriteBytesFromPod)?; + if !shutdown[port_index] { + writers[port_index] + .write_all(&bytes) + .await + .map_err(Error::WriteBytesFromPod)?; + } } } @@ -293,6 +315,32 @@ where .await .map_err(Error::SendWebSocketMessage)?; } + Message::ToPodClose(ch) => { + let ch = ch as usize; + if ch >= initialized.len() { + return Err(Error::InvalidChannel(ch)); + } + let port_index = ch / 2; + if !shutdown[port_index] { + writers[port_index].shutdown().await.map_err(Error::Shutdown)?; + shutdown[port_index] = true; + + closed_ports += 1; + } + } + Message::FromPodClose => { + for writer in &mut writers { + writer.shutdown().await.map_err(Error::Shutdown)?; + } + } + } + + if closed_ports == ports.len() && !socket_shutdown { + ws_sink + .send(ws::Message::Close(None)) + .await + .map_err(Error::SendWebSocketMessage)?; + socket_shutdown = true; } } Ok(()) From 1dc2f7a6d6fdfa2fdab2b9e9db5598f2605e3c7e Mon Sep 17 00:00:00 2001 From: Tiago Castro Date: Fri, 26 Aug 2022 23:38:04 +0100 Subject: [PATCH 2/4] refactor: use ChannelState and drop errors Use a ChannelState with initialized and shutdown flags rather than 2 vecs. Drop the errors when joining the portforward task. Break on SocketClose to make it clearer. Signed-off-by: Tiago Castro --- kube-client/src/api/portforward.rs | 40 ++++++++++++++++++++---------- 1 file changed, 27 insertions(+), 13 deletions(-) diff --git a/kube-client/src/api/portforward.rs b/kube-client/src/api/portforward.rs index 9a77d1474..f64726481 100644 --- a/kube-client/src/api/portforward.rs +++ b/kube-client/src/api/portforward.rs @@ -144,9 +144,17 @@ impl Portforwarder { } /// Waits for port forwarding task to complete. - pub async fn join(mut self) -> Result<(), Error> { - self.ports.clear(); - self.task.await.unwrap_or_else(|e| Err(Error::Spawn(e))) + pub async fn join(self) -> Result<(), Error> { + let Self { + mut ports, + mut errors, + task, + } = self; + // Start by terminating any streams that have not yet been taken + // since they would otherwise keep the connection open indefinitely + ports.clear(); + errors.clear(); + task.await.unwrap_or_else(|e| Err(Error::Spawn(e))) } } @@ -233,6 +241,7 @@ where .send(Message::FromPodClose) .await .map_err(Error::ForwardFromPod)?; + break; } // REVIEW should we error on unexpected websocket message? _ => {} @@ -255,22 +264,27 @@ async fn forwarder_loop( where S: AsyncRead + AsyncWrite + Unpin + Sized + Send + 'static, { - // Keep track if the channel has received the initialization frame. - let mut initialized = vec![false; 2 * ports.len()]; - let mut shutdown = vec![false; 2 * ports.len()]; + #[derive(Default, Clone)] + struct ChannelState { + // Keep track if the channel has received the initialization frame. + initialized: bool, + // Keep track if the channel has shutdown. + shutdown: bool, + } + let mut chan_state = vec![ChannelState::default(); 2 * ports.len()]; let mut closed_ports = 0; let mut socket_shutdown = false; while let Some(msg) = receiver.next().await { match msg { Message::FromPod(ch, mut bytes) => { let ch = ch as usize; - if ch >= initialized.len() { + if ch >= chan_state.len() { return Err(Error::InvalidChannel(ch)); } let port_index = ch / 2; // Initialization - if !initialized[ch] { + if !chan_state[ch].initialized { // The initial message must be 3 bytes including the channel prefix. if bytes.len() != 2 { return Err(Error::InvalidInitialFrameSize); @@ -284,7 +298,7 @@ where }); } - initialized[ch] = true; + chan_state[ch].initialized = true; continue; } @@ -297,7 +311,7 @@ where sender.send(s).map_err(Error::ForwardErrorMessage)?; } } else { - if !shutdown[port_index] { + if !chan_state[port_index].shutdown { writers[port_index] .write_all(&bytes) .await @@ -317,13 +331,13 @@ where } Message::ToPodClose(ch) => { let ch = ch as usize; - if ch >= initialized.len() { + if ch >= chan_state.len() { return Err(Error::InvalidChannel(ch)); } let port_index = ch / 2; - if !shutdown[port_index] { + if !chan_state[port_index].shutdown { writers[port_index].shutdown().await.map_err(Error::Shutdown)?; - shutdown[port_index] = true; + chan_state[port_index].shutdown = true; closed_ports += 1; } From 3be996bd14d5dd09d30498f6ac883b29e0fce6cd Mon Sep 17 00:00:00 2001 From: Tiago Castro Date: Mon, 5 Sep 2022 18:18:47 +0100 Subject: [PATCH 3/4] refactor: use channel index to reference port shutdown Make use of the channel index to reference the shutdown port index to make it consistent with the initialized check. This is because even channel indexes are for data and odd indexes are for errors. Signed-off-by: Tiago Castro --- kube-client/src/api/portforward.rs | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/kube-client/src/api/portforward.rs b/kube-client/src/api/portforward.rs index f64726481..28c599a03 100644 --- a/kube-client/src/api/portforward.rs +++ b/kube-client/src/api/portforward.rs @@ -310,13 +310,11 @@ where .map_err(Error::InvalidErrorMessage)?; sender.send(s).map_err(Error::ForwardErrorMessage)?; } - } else { - if !chan_state[port_index].shutdown { - writers[port_index] - .write_all(&bytes) - .await - .map_err(Error::WriteBytesFromPod)?; - } + } else if !chan_state[ch].shutdown { + writers[port_index] + .write_all(&bytes) + .await + .map_err(Error::WriteBytesFromPod)?; } } @@ -335,9 +333,9 @@ where return Err(Error::InvalidChannel(ch)); } let port_index = ch / 2; - if !chan_state[port_index].shutdown { + if !chan_state[ch].shutdown { writers[port_index].shutdown().await.map_err(Error::Shutdown)?; - chan_state[port_index].shutdown = true; + chan_state[ch].shutdown = true; closed_ports += 1; } From b6521a2cd2613bab852bbe0edfe8433a06de83e9 Mon Sep 17 00:00:00 2001 From: Tiago Castro Date: Tue, 6 Sep 2022 15:50:42 +0100 Subject: [PATCH 4/4] refactor: use channel ref to avoid repetitive indexing Use a single channel reference to avoid indexing into the vector everytime. Signed-off-by: Tiago Castro --- kube-client/src/api/portforward.rs | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/kube-client/src/api/portforward.rs b/kube-client/src/api/portforward.rs index 28c599a03..4a009b7be 100644 --- a/kube-client/src/api/portforward.rs +++ b/kube-client/src/api/portforward.rs @@ -278,13 +278,11 @@ where match msg { Message::FromPod(ch, mut bytes) => { let ch = ch as usize; - if ch >= chan_state.len() { - return Err(Error::InvalidChannel(ch)); - } + let channel = chan_state.get_mut(ch).ok_or(Error::InvalidChannel(ch))?; let port_index = ch / 2; // Initialization - if !chan_state[ch].initialized { + if !channel.initialized { // The initial message must be 3 bytes including the channel prefix. if bytes.len() != 2 { return Err(Error::InvalidInitialFrameSize); @@ -298,7 +296,7 @@ where }); } - chan_state[ch].initialized = true; + channel.initialized = true; continue; } @@ -310,7 +308,7 @@ where .map_err(Error::InvalidErrorMessage)?; sender.send(s).map_err(Error::ForwardErrorMessage)?; } - } else if !chan_state[ch].shutdown { + } else if !channel.shutdown { writers[port_index] .write_all(&bytes) .await @@ -329,13 +327,12 @@ where } Message::ToPodClose(ch) => { let ch = ch as usize; - if ch >= chan_state.len() { - return Err(Error::InvalidChannel(ch)); - } + let channel = chan_state.get_mut(ch).ok_or(Error::InvalidChannel(ch))?; let port_index = ch / 2; - if !chan_state[ch].shutdown { + + if !channel.shutdown { writers[port_index].shutdown().await.map_err(Error::Shutdown)?; - chan_state[ch].shutdown = true; + channel.shutdown = true; closed_ports += 1; }