forked from tokio-rs/tokio
/
process_stdio.rs
243 lines (195 loc) · 7.19 KB
/
process_stdio.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
#![warn(rust_2018_idioms)]
#![cfg(all(feature = "full", not(target_os = "wasi")))]
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::join;
use tokio::process::{Child, Command};
use tokio_test::assert_ok;
use futures::future::{self, FutureExt};
use std::convert::TryInto;
use std::env;
use std::io;
use std::process::{ExitStatus, Stdio};
fn cat() -> Command {
let mut cmd = Command::new(env!("CARGO_BIN_EXE_test-cat"));
cmd.stdin(Stdio::piped()).stdout(Stdio::piped());
cmd
}
async fn feed_cat(mut cat: Child, n: usize) -> io::Result<ExitStatus> {
let mut stdin = cat.stdin.take().unwrap();
let stdout = cat.stdout.take().unwrap();
// Produce n lines on the child's stdout.
let write = async {
for i in 0..n {
let bytes = format!("line {}\n", i).into_bytes();
stdin.write_all(&bytes).await.unwrap();
}
drop(stdin);
};
let read = async {
let mut reader = BufReader::new(stdout).lines();
let mut num_lines = 0;
// Try to read `n + 1` lines, ensuring the last one is empty
// (i.e. EOF is reached after `n` lines.
loop {
let data = reader
.next_line()
.await
.unwrap_or_else(|_| Some(String::new()))
.expect("failed to read line");
let num_read = data.len();
let done = num_lines >= n;
match (done, num_read) {
(false, 0) => panic!("broken pipe"),
(true, n) if n != 0 => panic!("extraneous data"),
_ => {
let expected = format!("line {}", num_lines);
assert_eq!(expected, data);
}
};
num_lines += 1;
if num_lines >= n {
break;
}
}
};
// Compose reading and writing concurrently.
future::join3(write, read, cat.wait())
.map(|(_, _, status)| status)
.await
}
/// Check for the following properties when feeding stdin and
/// consuming stdout of a cat-like process:
///
/// - A number of lines that amounts to a number of bytes exceeding a
/// typical OS buffer size can be fed to the child without
/// deadlock. This tests that we also consume the stdout
/// concurrently; otherwise this would deadlock.
///
/// - We read the same lines from the child that we fed it.
///
/// - The child does produce EOF on stdout after the last line.
#[tokio::test]
async fn feed_a_lot() {
let child = cat().spawn().unwrap();
let status = feed_cat(child, 10000).await.unwrap();
assert_eq!(status.code(), Some(0));
}
#[tokio::test]
async fn wait_with_output_captures() {
let mut child = cat().spawn().unwrap();
let mut stdin = child.stdin.take().unwrap();
let write_bytes = b"1234";
let future = async {
stdin.write_all(write_bytes).await?;
drop(stdin);
let out = child.wait_with_output();
out.await
};
let output = future.await.unwrap();
assert!(output.status.success());
assert_eq!(output.stdout, write_bytes);
assert_eq!(output.stderr.len(), 0);
}
#[tokio::test]
async fn status_closes_any_pipes() {
// Cat will open a pipe between the parent and child.
// If `status_async` doesn't ensure the handles are closed,
// we would end up blocking forever (and time out).
let child = cat().status();
assert_ok!(child.await);
}
#[tokio::test]
async fn try_wait() {
let mut child = cat().spawn().unwrap();
let id = child.id().expect("missing id");
assert!(id > 0);
assert_eq!(None, assert_ok!(child.try_wait()));
// Drop the child's stdio handles so it can terminate
drop(child.stdin.take());
drop(child.stderr.take());
drop(child.stdout.take());
assert_ok!(child.wait().await);
// test that the `.try_wait()` method is fused just like the stdlib
assert!(assert_ok!(child.try_wait()).unwrap().success());
// Can't get id after process has exited
assert_eq!(child.id(), None);
}
#[tokio::test]
async fn pipe_from_one_command_to_another() {
let mut first = cat().spawn().expect("first cmd");
let mut third = cat().spawn().expect("third cmd");
// Convert ChildStdout to Stdio
let second_stdin: Stdio = first
.stdout
.take()
.expect("first.stdout")
.try_into()
.expect("first.stdout into Stdio");
// Convert ChildStdin to Stdio
let second_stdout: Stdio = third
.stdin
.take()
.expect("third.stdin")
.try_into()
.expect("third.stdin into Stdio");
let mut second = cat()
.stdin(second_stdin)
.stdout(second_stdout)
.spawn()
.expect("first cmd");
let msg = "hello world! please pipe this message through";
let mut stdin = first.stdin.take().expect("first.stdin");
let write = async move { stdin.write_all(msg.as_bytes()).await };
let mut stdout = third.stdout.take().expect("third.stdout");
let read = async move {
let mut data = String::new();
stdout.read_to_string(&mut data).await.map(|_| data)
};
let (read, write, first_status, second_status, third_status) =
join!(read, write, first.wait(), second.wait(), third.wait());
assert_eq!(msg, read.expect("read result"));
write.expect("write result");
assert!(first_status.expect("first status").success());
assert!(second_status.expect("second status").success());
assert!(third_status.expect("third status").success());
}
#[tokio::test]
async fn vectored_writes() {
use bytes::{Buf, Bytes};
use std::{io::IoSlice, pin::Pin};
use tokio::io::AsyncWrite;
let mut cat = cat().spawn().unwrap();
let mut stdin = cat.stdin.take().unwrap();
let are_writes_vectored = stdin.is_write_vectored();
let mut stdout = cat.stdout.take().unwrap();
let write = async {
let mut input = Bytes::from_static(b"hello\n").chain(Bytes::from_static(b"world!\n"));
let mut writes_completed = 0;
futures::future::poll_fn(|cx| loop {
let mut slices = [IoSlice::new(&[]); 2];
let vectored = input.chunks_vectored(&mut slices);
if vectored == 0 {
return std::task::Poll::Ready(std::io::Result::Ok(()));
}
let n = futures::ready!(Pin::new(&mut stdin).poll_write_vectored(cx, &slices))?;
writes_completed += 1;
input.advance(n);
})
.await?;
drop(stdin);
std::io::Result::Ok(writes_completed)
};
let read = async {
let mut buffer = Vec::with_capacity(6 + 7);
stdout.read_to_end(&mut buffer).await?;
std::io::Result::Ok(buffer)
};
let (write, read, status) = future::join3(write, read, cat.wait()).await;
assert!(status.unwrap().success());
let writes_completed = write.unwrap();
// on unix our small payload should always fit in whatever default sized pipe with a single
// syscall. if multiple are used, then the forwarding does not work, or we are on a platform
// for which the `std` does not support vectored writes.
assert_eq!(writes_completed == 1, are_writes_vectored);
assert_eq!(&read.unwrap(), b"hello\nworld!\n");
}