diff --git a/protocols/mdns/src/behaviour/iface.rs b/protocols/mdns/src/behaviour/iface.rs index 40ebdabac59..c5bacced138 100644 --- a/protocols/mdns/src/behaviour/iface.rs +++ b/protocols/mdns/src/behaviour/iface.rs @@ -201,7 +201,9 @@ where params: &impl PollParameters, ) -> Option<(PeerId, Multiaddr, Instant)> { // Poll receive socket. - while let Poll::Ready(data) = self.recv_socket.poll_read(cx, &mut self.recv_buffer) { + while let Poll::Ready(data) = + Pin::new(&mut self.recv_socket).poll_read(cx, &mut self.recv_buffer) + { match data { Ok((len, from)) => { if let Some(packet) = MdnsPacket::new_from_bytes(&self.recv_buffer[..len], from) @@ -221,7 +223,7 @@ where // Send responses. while let Some(packet) = self.send_buffer.pop_front() { - match self.send_socket.poll_write( + match Pin::new(&mut self.send_socket).poll_write( cx, &packet, SocketAddr::new(self.multicast_addr, 5353), diff --git a/protocols/mdns/src/behaviour/socket.rs b/protocols/mdns/src/behaviour/socket.rs index 86d5ec97c12..4406ed33fde 100644 --- a/protocols/mdns/src/behaviour/socket.rs +++ b/protocols/mdns/src/behaviour/socket.rs @@ -20,12 +20,13 @@ use std::{ io::Error, + marker::Unpin, net::{SocketAddr, UdpSocket}, task::{Context, Poll}, }; /// Interface that must be implemented by the different runtimes to use the [`UdpSocket`] in async mode -pub trait AsyncSocket: Send + 'static { +pub trait AsyncSocket: Unpin + Send + 'static { /// Create the async socket from the [`std::net::UdpSocket`] fn from_std(socket: UdpSocket) -> std::io::Result where @@ -93,7 +94,7 @@ pub mod asio { #[cfg(feature = "tokio")] pub mod tokio { use super::*; - use ::tokio::net::UdpSocket as TkUdpSocket; + use ::tokio::{io::ReadBuf, net::UdpSocket as TkUdpSocket}; /// Tokio ASync Socket` pub type TokioUdpSocket = TkUdpSocket; @@ -109,8 +110,12 @@ pub mod tokio { cx: &mut Context, buf: &mut [u8], ) -> Poll> { - futures::ready!(self.poll_recv_ready(cx))?; - Poll::Ready(self.try_recv_from(buf)) + let mut rbuf = ReadBuf::new(buf); + match self.poll_recv_from(cx, &mut rbuf) { + Poll::Pending => Poll::Pending, + Poll::Ready(Err(err)) => Poll::Ready(Err(err)), + Poll::Ready(Ok(addr)) => Poll::Ready(Ok((rbuf.filled().len(), addr))), + } } fn poll_write( @@ -119,10 +124,10 @@ pub mod tokio { packet: &[u8], to: SocketAddr, ) -> Poll> { - futures::ready!(self.poll_send_ready(cx))?; - match self.try_send_to(packet, to) { - Ok(_len) => Poll::Ready(Ok(())), - Err(err) => Poll::Ready(Err(err)), + match self.poll_send_to(cx, packet, to) { + Poll::Pending => Poll::Pending, + Poll::Ready(Err(err)) => Poll::Ready(Err(err)), + Poll::Ready(Ok(_len)) => Poll::Ready(Ok(())), } } }