Skip to content

Commit

Permalink
update to support new Transport interface
Browse files Browse the repository at this point in the history
  • Loading branch information
melekes committed Jul 12, 2022
1 parent e8207f9 commit d96b499
Show file tree
Hide file tree
Showing 5 changed files with 418 additions and 311 deletions.
8 changes: 4 additions & 4 deletions transports/webrtc/Cargo.toml
Expand Up @@ -18,8 +18,8 @@ futures-lite = "1"
futures-timer = "3"
hex = "0.4"
if-watch = "0.2"
libp2p-core = { version = "0.33.0", path = "../../core", default-features = false }
libp2p-noise = { version = "0.36.0", path = "../../transports/noise" }
libp2p-core = { version = "0.34.0", path = "../../core", default-features = false }
libp2p-noise = { version = "0.37.0", path = "../../transports/noise" }
log = "0.4"
serde = { version = "1.0", features = ["derive"] }
stun = "0.4"
Expand All @@ -35,8 +35,8 @@ webrtc-util = { version = "0.5.3", default-features = false, features = ["conn",
[dev-dependencies]
anyhow = "1.0"
env_logger = "0.9"
libp2p-request-response = { version = "0.18.0", path = "../../protocols/request-response" }
libp2p-swarm = { version = "0.36.0", path = "../../swarm" }
libp2p-request-response = { version = "0.20.0", path = "../../protocols/request-response" }
libp2p-swarm = { version = "0.38.0", path = "../../swarm" }
rand = "0.8"
rand_core = "0.5"
rcgen = "0.9"
107 changes: 53 additions & 54 deletions transports/webrtc/src/connection.rs
Expand Up @@ -33,7 +33,6 @@ use webrtc::peer_connection::RTCPeerConnection;
use webrtc_data::data_channel::DataChannel as DetachedDataChannel;

use std::io;
use std::pin::Pin;
use std::sync::{Arc, Mutex as StdMutex};
use std::task::{Context, Poll};

Expand Down Expand Up @@ -226,68 +225,68 @@ impl<'a> StreamMuxer for Connection {
/// abruptly interrupt the execution.
fn destroy_outbound(&self, _s: Self::OutboundSubstream) {}

fn read_substream(
&self,
cx: &mut Context<'_>,
s: &mut Self::Substream,
buf: &mut [u8],
) -> Poll<Result<usize, Self::Error>> {
Pin::new(s).poll_read(cx, buf)
}

fn write_substream(
&self,
cx: &mut Context<'_>,
s: &mut Self::Substream,
buf: &[u8],
) -> Poll<Result<usize, Self::Error>> {
Pin::new(s).poll_write(cx, buf)
}

fn flush_substream(
&self,
cx: &mut Context<'_>,
s: &mut Self::Substream,
) -> Poll<Result<(), Self::Error>> {
trace!("Flushing substream {}", s.stream_identifier());
Pin::new(s).poll_flush(cx)
}

fn shutdown_substream(
&self,
cx: &mut Context<'_>,
s: &mut Self::Substream,
) -> Poll<Result<(), Self::Error>> {
trace!("Closing substream {}", s.stream_identifier());
Pin::new(s).poll_close(cx)
}

fn destroy_substream(&self, s: Self::Substream) {
trace!("Destroying substream {}", s.stream_identifier());
let mut data_channels_inner = self.data_channels_inner.lock().unwrap();
data_channels_inner.map.remove(&s.stream_identifier());
}
// fn read_substream(
// &self,
// cx: &mut Context<'_>,
// s: &mut Self::Substream,
// buf: &mut [u8],
// ) -> Poll<Result<usize, Self::Error>> {
// Pin::new(s).poll_read(cx, buf)
// }

// fn write_substream(
// &self,
// cx: &mut Context<'_>,
// s: &mut Self::Substream,
// buf: &[u8],
// ) -> Poll<Result<usize, Self::Error>> {
// Pin::new(s).poll_write(cx, buf)
// }

// fn flush_substream(
// &self,
// cx: &mut Context<'_>,
// s: &mut Self::Substream,
// ) -> Poll<Result<(), Self::Error>> {
// trace!("Flushing substream {}", s.stream_identifier());
// Pin::new(s).poll_flush(cx)
// }

// fn shutdown_substream(
// &self,
// cx: &mut Context<'_>,
// s: &mut Self::Substream,
// ) -> Poll<Result<(), Self::Error>> {
// trace!("Closing substream {}", s.stream_identifier());
// Pin::new(s).poll_close(cx)
// }

// fn destroy_substream(&self, s: Self::Substream) {
// trace!("Destroying substream {}", s.stream_identifier());
// let mut data_channels_inner = self.data_channels_inner.lock().unwrap();
// data_channels_inner.map.remove(&s.stream_identifier());
// }

fn poll_close(&self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
debug!("Closing connection");

let mut data_channels_inner = self.data_channels_inner.lock().unwrap();

// First, flush all the buffered data.
for (_, ch) in &mut data_channels_inner.map {
match ready!(self.flush_substream(cx, ch)) {
Ok(_) => continue,
Err(e) => return Poll::Ready(Err(e)),
}
}
// for (_, ch) in &mut data_channels_inner.map {
// match ready!(self.flush_substream(cx, ch)) {
// Ok(_) => continue,
// Err(e) => return Poll::Ready(Err(e)),
// }
// }

// Second, shutdown all the substreams.
for (_, ch) in &mut data_channels_inner.map {
match ready!(self.shutdown_substream(cx, ch)) {
Ok(_) => continue,
Err(e) => return Poll::Ready(Err(e)),
}
}
// for (_, ch) in &mut data_channels_inner.map {
// match ready!(self.shutdown_substream(cx, ch)) {
// Ok(_) => continue,
// Err(e) => return Poll::Ready(Err(e)),
// }
// }

// Third, close `incoming_data_channels_rx`
data_channels_inner.incoming_data_channels_rx.close();
Expand Down
3 changes: 3 additions & 0 deletions transports/webrtc/src/error.rs
Expand Up @@ -42,6 +42,9 @@ pub enum Error {
got: PeerId,
},

#[error("no active listeners")]
NoListeners,

#[error("internal error: {0} (see debug logs)")]
InternalError(String),
}

0 comments on commit d96b499

Please sign in to comment.