Skip to content

Commit

Permalink
Reworking: Owned buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
dbischof90 committed Oct 1, 2022
1 parent dae1fb8 commit 7f02930
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 46 deletions.
28 changes: 12 additions & 16 deletions tokio-util/src/io/sink_writer.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
use futures_sink::Sink;
use futures_util::{
future::{ok, Ready},
sink::{SinkMapErr, With},
SinkExt,
};

use pin_project_lite::pin_project;
use std::io;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
use tokio::io::AsyncWrite;
Expand All @@ -31,15 +26,15 @@ pin_project! {
/// // Construct a channel pair to send data across and wrap a pollable sink.
/// // Note that the sink must mimic a writable object, e.g. have `std::io::Error`
/// // as its error type.
/// let (tx, mut rx) = tokio::sync::mpsc::channel::<&[u8]>(10);
/// let (tx, mut rx) = tokio::sync::mpsc::channel::<Vec<u8>>(1);
/// let mut writer = SinkWriter::new(
/// PollSender::new(tx).sink_map_err(|_| Error::from(ErrorKind::Other)),
/// );
/// PollSender::new(tx).sink_map_err(|_| Error::from(ErrorKind::BrokenPipe)),
/// );
/// // Write data to our interface...
/// let data: [u8; 4] = [1, 2, 3, 4];
/// let _ = writer.write(&data).await?;
/// // ... and receive it.
/// assert_eq!(&data, rx.recv().await.unwrap());
/// assert_eq!(data.to_vec(), rx.recv().await.unwrap());
///
/// # Ok(())
/// # }
Expand All @@ -59,7 +54,7 @@ pin_project! {

impl<S> SinkWriter<S>
where
for<'r> S: Sink<&'r [u8], Error = io::Error>,
S: Sink<Vec<u8>, Error = io::Error>,
{
/// Creates a new [`SinkWriter`].
pub fn new(sink: S) -> Self {
Expand Down Expand Up @@ -88,9 +83,10 @@ where
}
}

impl<S> AsyncWrite for SinkWriter<S>
impl<S, E> AsyncWrite for SinkWriter<S>
where
for<'r> S: Sink<&'r [u8], Error = io::Error>,
S: Sink<Vec<u8>, Error = E>,
E: Into<io::Error>
{
fn poll_write(
mut self: Pin<&mut Self>,
Expand All @@ -99,13 +95,13 @@ where
) -> Poll<Result<usize, io::Error>> {
match self.as_mut().project().inner.poll_ready(cx) {
Poll::Ready(Ok(())) => {
if let Err(e) = self.as_mut().project().inner.start_send(buf) {
Poll::Ready(Err(e))
if let Err(e) = self.as_mut().project().inner.start_send(buf.to_vec()) {
Poll::Ready(Err(e.into()))
} else {
Poll::Ready(Ok(buf.len()))
}
}
Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
Poll::Ready(Err(e)) => Poll::Ready(Err(e.into())),
Poll::Pending => {
cx.waker().wake_by_ref();
Poll::Pending
Expand Down
35 changes: 5 additions & 30 deletions tokio-util/tests/io_sink_writer.rs
Original file line number Diff line number Diff line change
@@ -1,52 +1,27 @@
#![warn(rust_2018_idioms)]

use futures_util::SinkExt;
use std::fmt::{self, Debug, Display};
use std::io::{self, Error, ErrorKind};
use tokio::io::AsyncWriteExt;
use tokio_util::io::SinkWriter;
use tokio_util::sync::{PollSendError, PollSender};

#[derive(Debug)]
struct PollSendErrorCoupler<T>(PollSendError<T>);

impl<T: Debug> Display for PollSendErrorCoupler<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
Display::fmt(&self, f)
}
}

impl<T: Debug> std::error::Error for PollSendErrorCoupler<T> {}

impl<T> From<PollSendError<T>> for PollSendErrorCoupler<T> {
fn from(e: PollSendError<T>) -> Self {
PollSendErrorCoupler(e)
}
}
impl<T> Into<io::Error> for PollSendErrorCoupler<T> {
fn into(self) -> io::Error {
io::Error::from(ErrorKind::BrokenPipe)
}
// add code here
}
use tokio_util::sync::PollSender;

#[tokio::test]
async fn test_sink_writer() -> Result<(), Error> {
// Construct a channel pair to send data across and wrap a pollable sink.
// Note that the sink must mimic a writable object, e.g. have `std::io::Error`
// as its error type.
let (tx, mut rx) = tokio::sync::mpsc::channel::<&[u8]>(1);
let writer = SinkWriter::new(
let (tx, mut rx) = tokio::sync::mpsc::channel::<Vec<u8>>(1);
let mut writer = SinkWriter::new(
PollSender::new(tx).sink_map_err(|_| io::Error::from(ErrorKind::BrokenPipe)),
);

// Write data to our interface...

let data: [u8; 4] = [1, 2, 3, 4];
//let _ = writer.write(&data);
let _ = writer.write(&data).await;

// ... and receive it.
assert_eq!(&data, rx.recv().await.unwrap());
assert_eq!(data.to_vec(), rx.recv().await.unwrap());

Ok(())
}

0 comments on commit 7f02930

Please sign in to comment.