From 90a6e781321b39760519ef55924b6beeabc90cfc Mon Sep 17 00:00:00 2001 From: Tiago Castro Date: Wed, 3 Aug 2022 19:09:50 +0100 Subject: [PATCH] fix: portforward connections In some situations it was observed that connections lingered in a CLOSE_WAIT state. In others, an existing connection would be reused and would fail at the time of use. When the reader half is closed, shutdown the writer half. When a message is received indicating a closed socket, shutdown all write half's. Change the example to shutdown the portforward, as otherwise it might never stop. Signed-off-by: Tiago Castro --- examples/pod_portforward_bind.rs | 6 ++++-- kube-client/src/api/portforward.rs | 25 +++++++++++++++++++++++++ 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/examples/pod_portforward_bind.rs b/examples/pod_portforward_bind.rs index e950a14d9..55db4d9d7 100644 --- a/examples/pod_portforward_bind.rs +++ b/examples/pod_portforward_bind.rs @@ -91,9 +91,11 @@ async fn forward_connection( let mut upstream_conn = forwarder .take_stream(port) .context("port not found in forwarder")?; - tokio::io::copy_bidirectional(&mut client_conn, &mut upstream_conn).await?; + let copy_result = tokio::io::copy_bidirectional(&mut client_conn, &mut upstream_conn).await; drop(upstream_conn); - forwarder.join().await?; + forwarder.abort(); + forwarder.join().await.ok(); + copy_result?; info!("connection closed"); Ok(()) } diff --git a/kube-client/src/api/portforward.rs b/kube-client/src/api/portforward.rs index 6dee12b4c..07877d907 100644 --- a/kube-client/src/api/portforward.rs +++ b/kube-client/src/api/portforward.rs @@ -60,6 +60,10 @@ pub enum Error { #[error("failed to complete the background task: {0}")] Spawn(#[source] tokio::task::JoinError), + + /// Failed to shutdown a write to pod channel. + #[error("failed to shutdown write to Pod channel: {0}")] + Shutdown(#[source] std::io::Error), } type ErrorReceiver = oneshot::Receiver; @@ -69,6 +73,8 @@ type ErrorSender = oneshot::Sender; enum Message { FromPod(u8, Bytes), ToPod(u8, Bytes), + FromPodClose, + ToPodClose(u8), } /// Manages port-forwarded streams. @@ -192,6 +198,10 @@ async fn to_pod_loop( .map_err(Error::ForwardToPod)?; } } + sender + .send(Message::ToPodClose(ch)) + .await + .map_err(Error::ForwardToPod)?; Ok(()) } @@ -217,6 +227,12 @@ where .await .map_err(Error::ForwardFromPod)?; } + message if message.is_close() => { + sender + .send(Message::FromPodClose) + .await + .map_err(Error::ForwardFromPod)?; + } // REVIEW should we error on unexpected websocket message? _ => {} } @@ -293,6 +309,15 @@ where .await .map_err(Error::SendWebSocketMessage)?; } + Message::ToPodClose(ch) => { + let ch = ch as usize; + writers[ch].shutdown().await.map_err(Error::Shutdown)?; + } + Message::FromPodClose => { + for writer in &mut writers { + writer.shutdown().await.map_err(Error::Shutdown)?; + } + } } } Ok(())