diff --git a/examples/pod_portforward_bind.rs b/examples/pod_portforward_bind.rs index e950a14d9..55db4d9d7 100644 --- a/examples/pod_portforward_bind.rs +++ b/examples/pod_portforward_bind.rs @@ -91,9 +91,11 @@ async fn forward_connection( let mut upstream_conn = forwarder .take_stream(port) .context("port not found in forwarder")?; - tokio::io::copy_bidirectional(&mut client_conn, &mut upstream_conn).await?; + let copy_result = tokio::io::copy_bidirectional(&mut client_conn, &mut upstream_conn).await; drop(upstream_conn); - forwarder.join().await?; + forwarder.abort(); + forwarder.join().await.ok(); + copy_result?; info!("connection closed"); Ok(()) } diff --git a/kube-client/src/api/portforward.rs b/kube-client/src/api/portforward.rs index 6dee12b4c..07877d907 100644 --- a/kube-client/src/api/portforward.rs +++ b/kube-client/src/api/portforward.rs @@ -60,6 +60,10 @@ pub enum Error { #[error("failed to complete the background task: {0}")] Spawn(#[source] tokio::task::JoinError), + + /// Failed to shutdown a write to pod channel. + #[error("failed to shutdown write to Pod channel: {0}")] + Shutdown(#[source] std::io::Error), } type ErrorReceiver = oneshot::Receiver; @@ -69,6 +73,8 @@ type ErrorSender = oneshot::Sender; enum Message { FromPod(u8, Bytes), ToPod(u8, Bytes), + FromPodClose, + ToPodClose(u8), } /// Manages port-forwarded streams. @@ -192,6 +198,10 @@ async fn to_pod_loop( .map_err(Error::ForwardToPod)?; } } + sender + .send(Message::ToPodClose(ch)) + .await + .map_err(Error::ForwardToPod)?; Ok(()) } @@ -217,6 +227,12 @@ where .await .map_err(Error::ForwardFromPod)?; } + message if message.is_close() => { + sender + .send(Message::FromPodClose) + .await + .map_err(Error::ForwardFromPod)?; + } // REVIEW should we error on unexpected websocket message? _ => {} } @@ -293,6 +309,15 @@ where .await .map_err(Error::SendWebSocketMessage)?; } + Message::ToPodClose(ch) => { + let ch = ch as usize; + writers[ch].shutdown().await.map_err(Error::Shutdown)?; + } + Message::FromPodClose => { + for writer in &mut writers { + writer.shutdown().await.map_err(Error::Shutdown)?; + } + } } } Ok(())