diff --git a/transports/tcp/src/lib.rs b/transports/tcp/src/lib.rs index be2f235deaf..3a9c6c31fce 100644 --- a/transports/tcp/src/lib.rs +++ b/transports/tcp/src/lib.rs @@ -46,7 +46,6 @@ use futures::{ future::{self, BoxFuture, Ready}, prelude::*, ready, - stream::FusedStream, }; use futures_timer::Delay; use libp2p_core::{ @@ -306,7 +305,7 @@ impl GenTcpConfig { /// let listen_addr1: Multiaddr = "/ip4/127.0.0.1/tcp/9001".parse().unwrap(); /// let listen_addr2: Multiaddr = "/ip4/127.0.0.1/tcp/9002".parse().unwrap(); /// - /// let mut tcp1 = TcpTransport::new(GenTcpConfig::new().port_reuse(true)); + /// let mut tcp1 = TcpTransport::new(GenTcpConfig::new().port_reuse(true)).boxed(); /// tcp1.listen_on(ListenerId::new(1), listen_addr1.clone()).expect("listener"); /// match tcp1.select_next_some().await { /// TransportEvent::NewAddress { listen_addr, .. } => { @@ -317,7 +316,7 @@ impl GenTcpConfig { /// _ => {} /// } /// - /// let mut tcp2 = TcpTransport::new(GenTcpConfig::new().port_reuse(true)); + /// let mut tcp2 = TcpTransport::new(GenTcpConfig::new().port_reuse(true)).boxed(); /// tcp2.listen_on(ListenerId::new(1), listen_addr2).expect("listener"); /// match tcp2.select_next_some().await { /// TransportEvent::NewAddress { listen_addr, .. } => { @@ -551,31 +550,6 @@ where } } -impl Stream for GenTcpTransport -where - T: Provider + Send + 'static, - T::Listener: Unpin, - T::IfWatcher: Unpin, - T::Stream: Unpin, -{ - type Item = TransportEvent<::ListenerUpgrade, ::Error>; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Transport::poll(self, cx).map(Some) - } -} -impl FusedStream for GenTcpTransport -where - T: Provider + Send + 'static, - T::Listener: Unpin, - T::IfWatcher: Unpin, - T::Stream: Unpin, -{ - fn is_terminated(&self) -> bool { - false - } -} - #[derive(Debug)] pub enum TcpTransportEvent { /// The transport is listening on a new additional [`Multiaddr`]. @@ -937,7 +911,7 @@ mod tests { env_logger::try_init().ok(); async fn listener(addr: Multiaddr, mut ready_tx: mpsc::Sender) { - let mut tcp = GenTcpTransport::::new(GenTcpConfig::new()); + let mut tcp = GenTcpTransport::::new(GenTcpConfig::new()).boxed(); tcp.listen_on(ListenerId::new(1), addr).unwrap(); loop { match tcp.select_next_some().await { @@ -1006,7 +980,7 @@ mod tests { env_logger::try_init().ok(); async fn listener(addr: Multiaddr, mut ready_tx: mpsc::Sender) { - let mut tcp = GenTcpTransport::::new(GenTcpConfig::new()); + let mut tcp = GenTcpTransport::::new(GenTcpConfig::new()).boxed(); tcp.listen_on(ListenerId::new(1), addr).unwrap(); loop { @@ -1075,7 +1049,7 @@ mod tests { env_logger::try_init().ok(); async fn listener(addr: Multiaddr, mut ready_tx: mpsc::Sender) { - let mut tcp = GenTcpTransport::::new(GenTcpConfig::new()); + let mut tcp = GenTcpTransport::::new(GenTcpConfig::new()).boxed(); tcp.listen_on(ListenerId::new(1), addr).unwrap(); loop { match tcp.select_next_some().await { @@ -1097,7 +1071,7 @@ mod tests { async fn dialer(addr: Multiaddr, mut ready_rx: mpsc::Receiver) { let dest_addr = ready_rx.next().await.unwrap(); - let mut tcp = GenTcpTransport::::new(GenTcpConfig::new().port_reuse(true)); + let mut tcp = GenTcpTransport::::new(GenTcpConfig::new().port_reuse(true)).boxed(); tcp.listen_on(ListenerId::new(1), addr).unwrap(); match tcp.select_next_some().await { TransportEvent::NewAddress { .. } => { @@ -1155,7 +1129,7 @@ mod tests { T::IfWatcher: Sync, T::Stream: Sync, { - let mut tcp = GenTcpTransport::::new(GenTcpConfig::new().port_reuse(true)); + let mut tcp = GenTcpTransport::::new(GenTcpConfig::new().port_reuse(true)).boxed(); tcp.listen_on(ListenerId::new(1), addr).unwrap(); match tcp.select_next_some().await { TransportEvent::NewAddress { @@ -1207,7 +1181,7 @@ mod tests { T: Provider, T::IfWatcher: Sync, { - let mut tcp = GenTcpTransport::::new(GenTcpConfig::new()); + let mut tcp = GenTcpTransport::::new(GenTcpConfig::new()).boxed(); tcp.listen_on(ListenerId::new(1), addr).unwrap(); tcp.select_next_some() .await