Skip to content

Commit

Permalink
Merge pull request #1 from spebern/master
Browse files Browse the repository at this point in the history
update for newest tokio and rust 2018
  • Loading branch information
mre committed Jan 15, 2019
2 parents 753b90b + 2933446 commit 76b94b1
Show file tree
Hide file tree
Showing 4 changed files with 105 additions and 80 deletions.
10 changes: 6 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,16 @@ authors = ["Matthias Endler <matthias-endler@gmx.net>"]
description = "An adaptor that chunks up elements and flushes them after a timeout or when the buffer is full."
license = "MIT OR Apache-2.0"
name = "tokio-batch"
version = "0.1.1"
version = "0.2.0"
edition = "2018"

[dependencies]
futures = "0.1.16"
tokio-core = "0.1.10"
tokio = "0.1.14"
futures = "0.1.25"
tokio-timer = "0.2.8"

[build-dependencies]
skeptic = "0.13"

[dev-dependencies]
skeptic = "0.13"
skeptic = "0.13"
27 changes: 15 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,28 +13,31 @@ or a predefined timeout was triggered.
## Usage

```rust
extern crate tokio_core;
extern crate futures;
extern crate tokio_batch;

use tokio_core::reactor::Core;
use futures::{stream, Stream};
use std::io;
use std::time::Duration;

use futures::{stream, Future, Stream};
use tokio::runtime::Runtime;
use tokio_batch::*;

fn main() {
let mut core = Core::new().unwrap();
let handle = core.handle();
let mut rt = Runtime::new().unwrap();

let iter = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter();
let stream = stream::iter_ok::<_, io::Error>(iter);

let chunk_stream = Chunks::new(stream, handle, 5, Duration::new(10, 0));
let chunk_stream = Chunks::new(stream, 5, Duration::new(10, 0));

let v = chunk_stream.collect();
let result = core.run(v).unwrap();
assert_eq!(vec![vec![0, 1, 2, 3, 4], vec![5, 6, 7, 8, 9]], result);
rt.spawn(v.then(|res| {
match res {
Ok(v) => assert_eq!(vec![vec![0, 1, 2, 3, 4], vec![5, 6, 7, 8, 9]], v),
Err(_) => assert!(false),
}
Ok(())
}));

rt.shutdown_on_idle().wait().unwrap();
}
```

Expand All @@ -44,4 +47,4 @@ This was taken and adjusted from
https://github.com/alexcrichton/futures-rs/blob/master/src/stream/chunks.rs
and moved into a separate crate for usability.

Thanks to [@arielb1](https://github.com/arielb1) and [@alexcrichton](https://github.com/alexcrichton/) for their support!
Thanks to [@arielb1](https://github.com/arielb1) and [@alexcrichton](https://github.com/alexcrichton/) for their support!
2 changes: 1 addition & 1 deletion build.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
extern crate skeptic;
use skeptic;

fn main() {
skeptic::generate_doc_tests(&["README.md"]);
Expand Down
146 changes: 83 additions & 63 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
extern crate futures;
extern crate tokio_core;

use std::prelude::v1::*;
use std::io;
use std::time::Duration;
use std::mem;
use std::prelude::v1::*;
use std::time::{Duration, Instant};

use tokio_core::reactor::{Handle, Timeout};
use futures::{Async, Future, Poll};
use futures::stream::{Fuse, Stream};
use futures::{Async, Future, Poll};
use tokio::timer;
use tokio::timer::Delay;

/// An adaptor that chunks up elements in a vector.
///
Expand All @@ -24,25 +21,37 @@ pub struct Chunks<S>
where
S: Stream,
{
handle: Handle,
clock: Option<Timeout>,
clock: Option<Delay>,
duration: Duration,
items: Vec<S::Item>,
err: Option<S::Error>,
err: Option<Error<S::Error>>,
stream: Fuse<S>,
}

/// Error returned by `Chunks`.
#[derive(Debug)]
pub struct Error<T>(Kind<T>);

/// Chunks error variants
#[derive(Debug)]
enum Kind<T> {
/// Inner value returned an error
Inner(T),

/// Timer returned an error.
Timer(timer::Error),
}

impl<S> Chunks<S>
where
S: Stream,
{
pub fn new(s: S, handle: Handle, capacity: usize, duration: Duration) -> Chunks<S> {
pub fn new(s: S, capacity: usize, duration: Duration) -> Chunks<S> {
assert!(capacity > 0);

Chunks {
handle: handle,
clock: None,
duration: duration,
duration,
items: Vec::with_capacity(capacity),
err: None,
stream: s.fuse(),
Expand Down Expand Up @@ -86,14 +95,13 @@ where
impl<S> Stream for Chunks<S>
where
S: Stream,
<S as Stream>::Error: From<io::Error>,
{
type Item = Vec<<S as Stream>::Item>;
type Error = <S as Stream>::Error;
type Error = Error<S::Error>;

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
if let Some(err) = self.err.take() {
return Err(err);
if let Some(e) = self.err.take() {
return Err(e);
}

let cap = self.items.capacity();
Expand All @@ -106,11 +114,11 @@ where
// the full one.
Ok(Async::Ready(Some(item))) => {
if self.items.is_empty() {
self.clock = Some(Timeout::new(self.duration, &self.handle).unwrap());
self.clock = Some(Delay::new(Instant::now() + self.duration));
}
self.items.push(item);
if self.items.len() >= cap {
return self.flush();
return self.flush().map_err(|e| Error(Kind::Inner(e)));
} else {
continue;
}
Expand All @@ -129,28 +137,32 @@ where

// If we've got buffered items be sure to return them first,
// we'll defer our error for later.
Err(e) => if self.items.is_empty() {
return Err(e);
} else {
self.err = Some(e);
return self.flush();
},
Err(e) => {
if self.items.is_empty() {
return Err(Error(Kind::Inner(e)));
} else {
self.err = Some(Error(Kind::Inner(e)));
return self.flush().map_err(|e| Error(Kind::Inner(e)));
}
}
}

match self.clock.poll() {
Ok(Async::Ready(Some(()))) => {
return self.flush();
return self.flush().map_err(|e| Error(Kind::Inner(e)));
}
Ok(Async::Ready(None)) => {
assert!(self.items.is_empty(), "no clock but there are items");
}
Ok(Async::NotReady) => {}
Err(e) => if self.items.is_empty() {
return Err(From::from(e));
} else {
self.err = Some(From::from(e));
return self.flush();
},
Err(e) => {
if self.items.is_empty() {
return Err(Error(Kind::Timer(e)));
} else {
self.err = Some(Error(Kind::Timer(e)));
return self.flush().map_err(|e| Error(Kind::Inner(e)));
}
}
}

return Ok(Async::NotReady);
Expand All @@ -160,77 +172,80 @@ where

#[cfg(test)]
mod tests {
use tokio_core::reactor::Core;
use futures::{stream, Stream};
use super::*;
use futures::stream;
use std::io;
use std::iter;
use std::time::{Duration, Instant};
use super::*;

#[test]
fn messages_pass_through() {
let mut core = Core::new().unwrap();
let handle = core.handle();

let iter = iter::once(5);
let stream = stream::iter_ok::<_, io::Error>(iter);

let chunk_stream = Chunks::new(stream, handle, 5, Duration::new(10, 0));
let chunk_stream = Chunks::new(stream, 5, Duration::new(10, 0));

let v = chunk_stream.collect();
let result = core.run(v).unwrap();
assert_eq!(vec![vec![5]], result);
tokio::run(v.then(|res| {
match res {
Err(_) => assert!(false),
Ok(v) => assert_eq!(vec![vec![5]], v),
};
Ok(())
}));
}

#[test]
fn message_chunks() {
let mut core = Core::new().unwrap();
let handle = core.handle();

let iter = vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9].into_iter();
let stream = stream::iter_ok::<_, io::Error>(iter);

let chunk_stream = Chunks::new(stream, handle, 5, Duration::new(10, 0));
let chunk_stream = Chunks::new(stream, 5, Duration::new(10, 0));

let v = chunk_stream.collect();
let result = core.run(v).unwrap();
assert_eq!(vec![vec![0, 1, 2, 3, 4], vec![5, 6, 7, 8, 9]], result);
tokio::run(v.then(|res| {
match res {
Err(_) => assert!(false),
Ok(v) => assert_eq!(vec![vec![0, 1, 2, 3, 4], vec![5, 6, 7, 8, 9]], v),
};
Ok(())
}));
}

#[test]
fn message_early_exit() {
let mut core = Core::new().unwrap();
let handle = core.handle();

let iter = vec![1, 2, 3, 4].into_iter();
let stream = stream::iter_ok::<_, io::Error>(iter);

let chunk_stream = Chunks::new(stream, handle, 5, Duration::new(100, 0));
let chunk_stream = Chunks::new(stream, 5, Duration::new(100, 0));

let v = chunk_stream.collect();
let result = core.run(v).unwrap();
assert_eq!(vec![vec![1, 2, 3, 4]], result);
tokio::run(v.then(|res| {
match res {
Err(_) => assert!(false),
Ok(v) => assert_eq!(vec![vec![1, 2, 3, 4]], v),
};
Ok(())
}));
}

#[test]
fn message_timeout() {
let mut core = Core::new().unwrap();
let handle = core.handle();

let iter = vec![1, 2, 3, 4].into_iter();
let stream0 = stream::iter_ok::<_, io::Error>(iter);

let iter = vec![5].into_iter();
let stream1 = stream::iter_ok::<_, io::Error>(iter).and_then(|n| {
Timeout::new(Duration::from_millis(300), &handle)
.unwrap()
Delay::new(Instant::now() + Duration::from_millis(300))
.and_then(move |_| Ok(n))
.map_err(|e| io::Error::new(io::ErrorKind::TimedOut, e))
});

let iter = vec![6, 7, 8].into_iter();
let stream2 = stream::iter_ok::<_, io::Error>(iter);

let stream = stream0.chain(stream1).chain(stream2);
let chunk_stream = Chunks::new(stream, handle.clone(), 5, Duration::from_millis(100));
let chunk_stream = Chunks::new(stream, 5, Duration::from_millis(100));

let now = Instant::now();
let min_times = [Duration::from_millis(80), Duration::from_millis(150)];
Expand All @@ -239,7 +254,7 @@ mod tests {
let mut i = 0;

let v = chunk_stream
.map(|s| {
.map(move |s| {
let now2 = Instant::now();
println!("{:?}", now2 - now);
assert!((now2 - now) < max_times[i]);
Expand All @@ -249,7 +264,12 @@ mod tests {
})
.collect();

let result = core.run(v).unwrap();
assert_eq!(result, results);
tokio::run(v.then(move |res| {
match res {
Err(_) => assert!(false),
Ok(v) => assert_eq!(v, results),
};
Ok(())
}));
}
}

0 comments on commit 76b94b1

Please sign in to comment.