Skip to content

Commit

Permalink
*: Replace Future impl with poll_next method (#23)
Browse files Browse the repository at this point in the history
Remove Future impl on all platform IfWatcher's, instead add `poll_next`
method. Implement `Stream` and `FusedStream` for user-facing IfWatcher.
  • Loading branch information
elenaf9 committed Aug 10, 2022
1 parent 2d53554 commit 8ddf200
Show file tree
Hide file tree
Showing 8 changed files with 62 additions and 57 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Expand Up @@ -4,6 +4,13 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [2.0.0] [Unreleased]

### Changed
- Add `IfWatcher::poll_next`. Implement `Stream` instead of `Future` for `IfWatcher`. See [PR 23].

[PR 23]: https://github.com/mxinden/if-watch/pull/23

## [1.1.1]

### Fixed
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "if-watch"
version = "1.1.1"
version = "2.0.0"
authors = ["David Craven <david@craven.ch>", "Parity Technologies Limited <admin@parity.io>"]
edition = "2021"
keywords = ["asynchronous", "routing"]
Expand Down
5 changes: 3 additions & 2 deletions examples/if_watch.rs
@@ -1,12 +1,13 @@
use futures::StreamExt;
use if_watch::IfWatcher;
use std::pin::Pin;

fn main() {
env_logger::init();
futures::executor::block_on(async {
let mut set = IfWatcher::new().await.unwrap();
loop {
println!("Got event {:?}", Pin::new(&mut set).await);
let event = set.select_next_some().await;
println!("Got event {:?}", event);
}
});
}
20 changes: 8 additions & 12 deletions src/apple.rs
Expand Up @@ -7,7 +7,6 @@ use futures::channel::mpsc;
use futures::stream::Stream;
use if_addrs::IfAddr;
use std::collections::VecDeque;
use std::future::Future;
use std::io::Result;
use std::pin::Pin;
use std::task::{Context, Poll};
Expand Down Expand Up @@ -59,22 +58,19 @@ impl IfWatcher {
pub fn iter(&self) -> impl Iterator<Item = &IpNet> {
self.addrs.iter()
}
}

impl Future for IfWatcher {
type Output = Result<IfEvent>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
while let Poll::Ready(_) = Pin::new(&mut self.rx).poll_next(cx) {
pub fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<IfEvent>> {
loop {
if let Some(event) = self.queue.pop_front() {
return Poll::Ready(Ok(event));
}
if Pin::new(&mut self.rx).poll_next(cx).is_pending() {
return Poll::Pending;
}
if let Err(error) = self.resync() {
return Poll::Ready(Err(error));
}
}
if let Some(event) = self.queue.pop_front() {
Poll::Ready(Ok(event))
} else {
Poll::Pending
}
}
}

Expand Down
6 changes: 1 addition & 5 deletions src/fallback.rs
Expand Up @@ -48,12 +48,8 @@ impl IfWatcher {
pub fn iter(&self) -> impl Iterator<Item = &IpNet> {
self.addrs.iter()
}
}

impl Future for IfWatcher {
type Output = Result<IfEvent>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
pub fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<IfEvent>> {
loop {
if let Some(event) = self.queue.pop_front() {
return Poll::Ready(Ok(event));
Expand Down
24 changes: 18 additions & 6 deletions src/lib.rs
Expand Up @@ -2,8 +2,9 @@
#![deny(missing_docs)]
#![deny(warnings)]

use futures::stream::FusedStream;
use futures::Stream;
pub use ipnet::{IpNet, Ipv4Net, Ipv6Net};
use std::future::Future;
use std::io::Result;
use std::pin::Pin;
use std::task::{Context, Poll};
Expand Down Expand Up @@ -63,25 +64,36 @@ impl IfWatcher {
pub fn iter(&self) -> impl Iterator<Item = &IpNet> {
self.0.iter()
}

/// Poll for an address change event.
pub fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<IfEvent>> {
Pin::new(&mut self.0).poll_next(cx)
}
}

impl Future for IfWatcher {
type Output = Result<IfEvent>;
impl Stream for IfWatcher {
type Item = Result<IfEvent>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.poll_next(cx).map(Some)
}
}

fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
Pin::new(&mut self.0).poll(cx)
impl FusedStream for IfWatcher {
fn is_terminated(&self) -> bool {
false
}
}

#[cfg(test)]
mod tests {
use super::*;
use futures::StreamExt;

#[test]
fn test_ip_watch() {
futures::executor::block_on(async {
let mut set = IfWatcher::new().await.unwrap();
let event = Pin::new(&mut set).await.unwrap();
let event = set.select_next_some().await.unwrap();
println!("Got event {:?}", event);
});
}
Expand Down
33 changes: 15 additions & 18 deletions src/linux.rs
Expand Up @@ -2,6 +2,7 @@ use crate::{IfEvent, IpNet, Ipv4Net, Ipv6Net};
use fnv::FnvHashSet;
use futures::channel::mpsc::UnboundedReceiver;
use futures::future::Either;
use futures::ready;
use futures::stream::{Stream, TryStreamExt};
use rtnetlink::constants::{RTMGRP_IPV4_IFADDR, RTMGRP_IPV6_IFADDR};
use rtnetlink::packet::address::nlas::Nla;
Expand All @@ -12,7 +13,6 @@ use std::collections::VecDeque;
use std::future::Future;
use std::io::{Error, ErrorKind, Result};
use std::net::{Ipv4Addr, Ipv6Addr};
use std::ops::DerefMut;
use std::pin::Pin;
use std::task::{Context, Poll};

Expand Down Expand Up @@ -93,20 +93,17 @@ impl IfWatcher {
}
}
}
}

impl Future for IfWatcher {
type Output = Result<IfEvent>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
log::trace!("polling IfWatcher {:p}", self.deref_mut());
if Pin::new(&mut self.conn).poll(cx).is_ready() {
return Poll::Ready(Err(std::io::Error::new(
ErrorKind::BrokenPipe,
"rtnetlink socket closed",
)));
}
while let Poll::Ready(Some((message, _))) = Pin::new(&mut self.messages).poll_next(cx) {
pub fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<IfEvent>> {
loop {
if let Some(event) = self.queue.pop_front() {
return Poll::Ready(Ok(event));
}
if Pin::new(&mut self.conn).poll(cx).is_ready() {
return Poll::Ready(Err(socket_err()));
}
let (message, _) =
ready!(Pin::new(&mut self.messages).poll_next(cx)).ok_or_else(socket_err)?;
match message.payload {
NetlinkPayload::Error(err) => return Poll::Ready(Err(err.to_io())),
NetlinkPayload::InnerMessage(msg) => match msg {
Expand All @@ -117,13 +114,13 @@ impl Future for IfWatcher {
_ => {}
}
}
if let Some(event) = self.queue.pop_front() {
return Poll::Ready(Ok(event));
}
Poll::Pending
}
}

fn socket_err() -> std::io::Error {
std::io::Error::new(ErrorKind::BrokenPipe, "rtnetlink socket closed")
}

fn iter_nets(msg: AddressMessage) -> impl Iterator<Item = IpNet> {
let prefix = msg.header.prefix_len;
let family = msg.header.family;
Expand Down
22 changes: 9 additions & 13 deletions src/win.rs
Expand Up @@ -4,7 +4,6 @@ use futures::task::AtomicWaker;
use if_addrs::IfAddr;
use std::collections::VecDeque;
use std::ffi::c_void;
use std::future::Future;
use std::io::{Error, ErrorKind, Result};
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
Expand Down Expand Up @@ -68,23 +67,20 @@ impl IfWatcher {
pub fn iter(&self) -> impl Iterator<Item = &IpNet> {
self.addrs.iter()
}
}

impl Future for IfWatcher {
type Output = Result<IfEvent>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
self.waker.register(cx.waker());
if self.resync.swap(false, Ordering::Relaxed) {
pub fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<IfEvent>> {
loop {
if let Some(event) = self.queue.pop_front() {
return Poll::Ready(Ok(event));
}
if !self.resync.swap(false, Ordering::Relaxed) {
self.waker.register(cx.waker());
return Poll::Pending;
}
if let Err(error) = self.resync() {
return Poll::Ready(Err(error));
}
}
if let Some(event) = self.queue.pop_front() {
Poll::Ready(Ok(event))
} else {
Poll::Pending
}
}
}

Expand Down

0 comments on commit 8ddf200

Please sign in to comment.