Skip to content

Commit

Permalink
fix: portforward connections
Browse files Browse the repository at this point in the history
In some situations it was observed that connections lingered in a CLOSE_WAIT state.
In others, an existing connection would be reused and would fail at the time of use.

When the reader half is closed, shutdown the writer half.
When a message is received indicating a closed socket, shutdown all write half's.

Change the example to shutdown the portforward, as otherwise it might never stop.

Signed-off-by: Tiago Castro <tiagolobocastro@gmail.com>
  • Loading branch information
tiagolobocastro committed Aug 3, 2022
1 parent 79f1d7a commit 90a6e78
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 2 deletions.
6 changes: 4 additions & 2 deletions examples/pod_portforward_bind.rs
Expand Up @@ -91,9 +91,11 @@ async fn forward_connection(
let mut upstream_conn = forwarder
.take_stream(port)
.context("port not found in forwarder")?;
tokio::io::copy_bidirectional(&mut client_conn, &mut upstream_conn).await?;
let copy_result = tokio::io::copy_bidirectional(&mut client_conn, &mut upstream_conn).await;
drop(upstream_conn);
forwarder.join().await?;
forwarder.abort();
forwarder.join().await.ok();
copy_result?;
info!("connection closed");
Ok(())
}
25 changes: 25 additions & 0 deletions kube-client/src/api/portforward.rs
Expand Up @@ -60,6 +60,10 @@ pub enum Error {

#[error("failed to complete the background task: {0}")]
Spawn(#[source] tokio::task::JoinError),

/// Failed to shutdown a write to pod channel.
#[error("failed to shutdown write to Pod channel: {0}")]
Shutdown(#[source] std::io::Error),
}

type ErrorReceiver = oneshot::Receiver<String>;
Expand All @@ -69,6 +73,8 @@ type ErrorSender = oneshot::Sender<String>;
enum Message {
FromPod(u8, Bytes),
ToPod(u8, Bytes),
FromPodClose,
ToPodClose(u8),
}

/// Manages port-forwarded streams.
Expand Down Expand Up @@ -192,6 +198,10 @@ async fn to_pod_loop(
.map_err(Error::ForwardToPod)?;
}
}
sender
.send(Message::ToPodClose(ch))
.await
.map_err(Error::ForwardToPod)?;
Ok(())
}

Expand All @@ -217,6 +227,12 @@ where
.await
.map_err(Error::ForwardFromPod)?;
}
message if message.is_close() => {
sender
.send(Message::FromPodClose)
.await
.map_err(Error::ForwardFromPod)?;
}
// REVIEW should we error on unexpected websocket message?
_ => {}
}
Expand Down Expand Up @@ -293,6 +309,15 @@ where
.await
.map_err(Error::SendWebSocketMessage)?;
}
Message::ToPodClose(ch) => {
let ch = ch as usize;
writers[ch].shutdown().await.map_err(Error::Shutdown)?;
}
Message::FromPodClose => {
for writer in &mut writers {
writer.shutdown().await.map_err(Error::Shutdown)?;
}
}
}
}
Ok(())
Expand Down

0 comments on commit 90a6e78

Please sign in to comment.