Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: Replace Future impl with poll_next method #23

Merged
merged 5 commits into from Aug 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 7 additions & 0 deletions CHANGELOG.md
Expand Up @@ -4,6 +4,13 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [2.0.0] [Unreleased]

### Changed
- Add `IfWatcher::poll_next`. Implement `Stream` instead of `Future` for `IfWatcher`. See [PR 23].

[PR 23]: https://github.com/mxinden/if-watch/pull/23

## [1.1.1]

### Fixed
Expand Down
2 changes: 1 addition & 1 deletion Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "if-watch"
version = "1.1.1"
version = "2.0.0"
authors = ["David Craven <david@craven.ch>", "Parity Technologies Limited <admin@parity.io>"]
edition = "2021"
keywords = ["asynchronous", "routing"]
Expand Down
5 changes: 3 additions & 2 deletions examples/if_watch.rs
@@ -1,12 +1,13 @@
use futures::StreamExt;
use if_watch::IfWatcher;
use std::pin::Pin;

fn main() {
env_logger::init();
futures::executor::block_on(async {
let mut set = IfWatcher::new().await.unwrap();
loop {
println!("Got event {:?}", Pin::new(&mut set).await);
let event = set.select_next_some().await;
println!("Got event {:?}", event);
}
});
}
20 changes: 8 additions & 12 deletions src/apple.rs
Expand Up @@ -7,7 +7,6 @@ use futures::channel::mpsc;
use futures::stream::Stream;
use if_addrs::IfAddr;
use std::collections::VecDeque;
use std::future::Future;
use std::io::Result;
use std::pin::Pin;
use std::task::{Context, Poll};
Expand Down Expand Up @@ -59,22 +58,19 @@ impl IfWatcher {
pub fn iter(&self) -> impl Iterator<Item = &IpNet> {
self.addrs.iter()
}
}

impl Future for IfWatcher {
type Output = Result<IfEvent>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
while let Poll::Ready(_) = Pin::new(&mut self.rx).poll_next(cx) {
pub fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<IfEvent>> {
loop {
if let Some(event) = self.queue.pop_front() {
return Poll::Ready(Ok(event));
}
if Pin::new(&mut self.rx).poll_next(cx).is_pending() {
return Poll::Pending;
}
if let Err(error) = self.resync() {
return Poll::Ready(Err(error));
}
}
if let Some(event) = self.queue.pop_front() {
Poll::Ready(Ok(event))
} else {
Poll::Pending
}
}
}

Expand Down
6 changes: 1 addition & 5 deletions src/fallback.rs
Expand Up @@ -48,12 +48,8 @@ impl IfWatcher {
pub fn iter(&self) -> impl Iterator<Item = &IpNet> {
self.addrs.iter()
}
}

impl Future for IfWatcher {
type Output = Result<IfEvent>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
pub fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<IfEvent>> {
loop {
if let Some(event) = self.queue.pop_front() {
return Poll::Ready(Ok(event));
Expand Down
24 changes: 18 additions & 6 deletions src/lib.rs
Expand Up @@ -2,8 +2,9 @@
#![deny(missing_docs)]
#![deny(warnings)]

use futures::stream::FusedStream;
use futures::Stream;
pub use ipnet::{IpNet, Ipv4Net, Ipv6Net};
use std::future::Future;
use std::io::Result;
use std::pin::Pin;
use std::task::{Context, Poll};
Expand Down Expand Up @@ -63,25 +64,36 @@ impl IfWatcher {
pub fn iter(&self) -> impl Iterator<Item = &IpNet> {
self.0.iter()
}

/// Poll for an address change event.
pub fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<IfEvent>> {
Pin::new(&mut self.0).poll_next(cx)
}
}

impl Future for IfWatcher {
type Output = Result<IfEvent>;
impl Stream for IfWatcher {
type Item = Result<IfEvent>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.poll_next(cx).map(Some)
}
}

fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
Pin::new(&mut self.0).poll(cx)
impl FusedStream for IfWatcher {
fn is_terminated(&self) -> bool {
false
}
}

#[cfg(test)]
mod tests {
use super::*;
use futures::StreamExt;

#[test]
fn test_ip_watch() {
futures::executor::block_on(async {
let mut set = IfWatcher::new().await.unwrap();
let event = Pin::new(&mut set).await.unwrap();
let event = set.select_next_some().await.unwrap();
println!("Got event {:?}", event);
});
}
Expand Down
33 changes: 15 additions & 18 deletions src/linux.rs
Expand Up @@ -2,6 +2,7 @@ use crate::{IfEvent, IpNet, Ipv4Net, Ipv6Net};
use fnv::FnvHashSet;
use futures::channel::mpsc::UnboundedReceiver;
use futures::future::Either;
use futures::ready;
use futures::stream::{Stream, TryStreamExt};
use rtnetlink::constants::{RTMGRP_IPV4_IFADDR, RTMGRP_IPV6_IFADDR};
use rtnetlink::packet::address::nlas::Nla;
Expand All @@ -12,7 +13,6 @@ use std::collections::VecDeque;
use std::future::Future;
use std::io::{Error, ErrorKind, Result};
use std::net::{Ipv4Addr, Ipv6Addr};
use std::ops::DerefMut;
use std::pin::Pin;
use std::task::{Context, Poll};

Expand Down Expand Up @@ -93,20 +93,17 @@ impl IfWatcher {
}
}
}
}

impl Future for IfWatcher {
type Output = Result<IfEvent>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
log::trace!("polling IfWatcher {:p}", self.deref_mut());
if Pin::new(&mut self.conn).poll(cx).is_ready() {
return Poll::Ready(Err(std::io::Error::new(
ErrorKind::BrokenPipe,
"rtnetlink socket closed",
)));
}
while let Poll::Ready(Some((message, _))) = Pin::new(&mut self.messages).poll_next(cx) {
pub fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<IfEvent>> {
loop {
if let Some(event) = self.queue.pop_front() {
return Poll::Ready(Ok(event));
}
if Pin::new(&mut self.conn).poll(cx).is_ready() {
return Poll::Ready(Err(socket_err()));
}
let (message, _) =
ready!(Pin::new(&mut self.messages).poll_next(cx)).ok_or_else(socket_err)?;
match message.payload {
NetlinkPayload::Error(err) => return Poll::Ready(Err(err.to_io())),
NetlinkPayload::InnerMessage(msg) => match msg {
Expand All @@ -117,13 +114,13 @@ impl Future for IfWatcher {
_ => {}
}
}
if let Some(event) = self.queue.pop_front() {
return Poll::Ready(Ok(event));
}
Poll::Pending
}
}

fn socket_err() -> std::io::Error {
std::io::Error::new(ErrorKind::BrokenPipe, "rtnetlink socket closed")
}

fn iter_nets(msg: AddressMessage) -> impl Iterator<Item = IpNet> {
let prefix = msg.header.prefix_len;
let family = msg.header.family;
Expand Down
22 changes: 9 additions & 13 deletions src/win.rs
Expand Up @@ -4,7 +4,6 @@ use futures::task::AtomicWaker;
use if_addrs::IfAddr;
use std::collections::VecDeque;
use std::ffi::c_void;
use std::future::Future;
use std::io::{Error, ErrorKind, Result};
use std::pin::Pin;
use std::sync::atomic::{AtomicBool, Ordering};
Expand Down Expand Up @@ -68,23 +67,20 @@ impl IfWatcher {
pub fn iter(&self) -> impl Iterator<Item = &IpNet> {
self.addrs.iter()
}
}

impl Future for IfWatcher {
type Output = Result<IfEvent>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
self.waker.register(cx.waker());
if self.resync.swap(false, Ordering::Relaxed) {
pub fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Result<IfEvent>> {
loop {
if let Some(event) = self.queue.pop_front() {
return Poll::Ready(Ok(event));
}
if !self.resync.swap(false, Ordering::Relaxed) {
self.waker.register(cx.waker());
return Poll::Pending;
}
if let Err(error) = self.resync() {
return Poll::Ready(Err(error));
}
}
if let Some(event) = self.queue.pop_front() {
Poll::Ready(Ok(event))
} else {
Poll::Pending
}
}
}

Expand Down