forked from tokio-rs/tokio
/
io_inspect.rs
194 lines (176 loc) · 5.42 KB
/
io_inspect.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
use futures::future::poll_fn;
use std::{
io::IoSlice,
pin::Pin,
task::{Context, Poll},
};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadBuf};
use tokio_util::io::{InspectReader, InspectWriter};
/// An AsyncRead implementation that works byte-by-byte, to catch out callers
/// who don't allow for `buf` being part-filled before the call
struct SmallReader {
contents: Vec<u8>,
}
impl Unpin for SmallReader {}
impl AsyncRead for SmallReader {
fn poll_read(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
if let Some(byte) = self.contents.pop() {
buf.put_slice(&[byte])
}
Poll::Ready(Ok(()))
}
}
#[tokio::test]
async fn read_tee() {
let contents = b"This could be really long, you know".to_vec();
let reader = SmallReader {
contents: contents.clone(),
};
let mut altout: Vec<u8> = Vec::new();
let mut teeout = Vec::new();
{
let mut tee = InspectReader::new(reader, |bytes| altout.extend(bytes));
tee.read_to_end(&mut teeout).await.unwrap();
}
assert_eq!(teeout, altout);
assert_eq!(altout.len(), contents.len());
}
/// An AsyncWrite implementation that works byte-by-byte for poll_write, and
/// that reads the whole of the first buffer plus one byte from the second in
/// poll_write_vectored.
///
/// This is designed to catch bugs in handling partially written buffers
#[derive(Debug)]
struct SmallWriter {
contents: Vec<u8>,
}
impl Unpin for SmallWriter {}
impl AsyncWrite for SmallWriter {
fn poll_write(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, std::io::Error>> {
// Just write one byte at a time
if buf.is_empty() {
return Poll::Ready(Ok(0));
}
self.contents.push(buf[0]);
Poll::Ready(Ok(1))
}
fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), std::io::Error>> {
Poll::Ready(Ok(()))
}
fn poll_shutdown(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<(), std::io::Error>> {
Poll::Ready(Ok(()))
}
fn poll_write_vectored(
mut self: Pin<&mut Self>,
_cx: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<Result<usize, std::io::Error>> {
// Write all of the first buffer, then one byte from the second buffer
// This should trip up anything that doesn't correctly handle multiple
// buffers.
if bufs.is_empty() {
return Poll::Ready(Ok(0));
}
let mut written_len = bufs[0].len();
self.contents.extend_from_slice(&bufs[0]);
if bufs.len() > 1 {
let buf = bufs[1];
if !buf.is_empty() {
written_len += 1;
self.contents.push(buf[0]);
}
}
Poll::Ready(Ok(written_len))
}
fn is_write_vectored(&self) -> bool {
true
}
}
#[tokio::test]
async fn write_tee() {
let mut altout: Vec<u8> = Vec::new();
let mut writeout = SmallWriter {
contents: Vec::new(),
};
{
let mut tee = InspectWriter::new(&mut writeout, |bytes| altout.extend(bytes));
tee.write_all(b"A testing string, very testing")
.await
.unwrap();
}
assert_eq!(altout, writeout.contents);
}
// This is inefficient, but works well enough for test use.
// If you want something similar for real code, you'll want to avoid all the
// fun of manipulating `bufs` - ideally, by the time you read this,
// IoSlice::advance_slices will be stable, and you can use that.
async fn write_all_vectored<W: AsyncWrite + Unpin>(
mut writer: W,
mut bufs: Vec<Vec<u8>>,
) -> Result<usize, std::io::Error> {
let mut res = 0;
while !bufs.is_empty() {
let mut written = poll_fn(|cx| {
let bufs: Vec<IoSlice> = bufs.iter().map(|v| IoSlice::new(v)).collect();
Pin::new(&mut writer).poll_write_vectored(cx, &bufs)
})
.await?;
res += written;
while written > 0 {
let buf_len = bufs[0].len();
if buf_len <= written {
bufs.remove(0);
written -= buf_len;
} else {
let buf = &mut bufs[0];
let drain_len = written.min(buf.len());
buf.drain(..drain_len);
written -= drain_len;
}
}
}
Ok(res)
}
#[tokio::test]
async fn write_tee_vectored() {
let mut altout: Vec<u8> = Vec::new();
let mut writeout = SmallWriter {
contents: Vec::new(),
};
let original = b"A very long string split up";
let bufs: Vec<Vec<u8>> = original
.split(|b| b.is_ascii_whitespace())
.map(Vec::from)
.collect();
assert!(bufs.len() > 1);
let expected: Vec<u8> = {
let mut out = Vec::new();
for item in &bufs {
out.extend_from_slice(item)
}
out
};
{
let mut bufcount = 0;
let tee = InspectWriter::new(&mut writeout, |bytes| {
bufcount += 1;
altout.extend(bytes)
});
assert!(tee.is_write_vectored());
write_all_vectored(tee, bufs.clone()).await.unwrap();
assert!(bufcount >= bufs.len());
}
assert_eq!(altout, writeout.contents);
assert_eq!(writeout.contents, expected);
}