diff --git a/src/write/encoder.rs b/src/write/encoder.rs index b54fca45..87930163 100644 --- a/src/write/encoder.rs +++ b/src/write/encoder.rs @@ -119,7 +119,7 @@ impl<'a, W: Write> EncoderWriter<'a, W> { &mut self.output[..], ); self.panicked = true; - let _ = self.w.write(&self.output[..encoded_len])?; + let _ = self.w.write_all(&self.output[..encoded_len])?; self.panicked = false; // write succeeded, do not write the encoding of extra again if finish() is retried self.extra_len = 0; @@ -225,14 +225,14 @@ impl<'a, W: Write> Write for EncoderWriter<'a, W> { self.config.char_set.encode_table(), ); self.panicked = true; - let r = self.w.write(&self.output[..encoded_size]); + let r = self.w.write_all(&self.output[..encoded_size]); self.panicked = false; match r { Ok(_) => Ok(extra_input_read_len + input_chunks_to_encode_len), - Err(_) => { + Err(err) => { // in case we filled and encoded `extra`, reset extra_len self.extra_len = orig_extra_len; - r + Err(err) } } diff --git a/src/write/encoder_tests.rs b/src/write/encoder_tests.rs index 6897c5cd..8049500d 100644 --- a/src/write/encoder_tests.rs +++ b/src/write/encoder_tests.rs @@ -363,7 +363,8 @@ fn retrying_writes_that_error_with_interrupted_works() { retry_interrupted_write_all( &mut stream_encoder, &orig_data[bytes_consumed..bytes_consumed + input_len], - ).unwrap(); + ) + .unwrap(); bytes_consumed += input_len; } @@ -453,6 +454,67 @@ fn do_encode_random_config_matches_normal_encode(max_input_len: usize) { } } +#[test] +fn writes_to_slow_writer_works() { + let mut rng = rand::thread_rng(); + let mut orig_data = Vec::::new(); + let mut stream_encoded = Vec::::new(); + let mut normal_encoded = String::new(); + + for _ in 0..1_000 { + orig_data.clear(); + stream_encoded.clear(); + normal_encoded.clear(); + + let orig_len: usize = rng.gen_range(100, 101); //20_000); + for _ in 0..orig_len { + orig_data.push(rng.gen()); + } + + // encode the normal way + let config = random_config(&mut rng); + encode_config_buf(&orig_data, config, &mut normal_encoded); + + // encode via the stream encoder + { + let mut slow_writer = SlowWriter { + w: &mut stream_encoded, + n: rng.gen_range(1, 30), + }; + + let mut stream_encoder = EncoderWriter::new(&mut slow_writer, config); + let mut bytes_consumed = 0; + while bytes_consumed < orig_len { + let input_len: usize = cmp::min(rng.gen_range(0, 512), orig_len - bytes_consumed); + + // write a little bit of the data + retry_interrupted_write_all( + &mut stream_encoder, + &orig_data[bytes_consumed..bytes_consumed + input_len], + ) + .unwrap(); + + bytes_consumed += input_len; + } + + loop { + let res = stream_encoder.finish(); + match res { + Ok(_) => break, + Err(e) => match e.kind() { + io::ErrorKind::Interrupted => continue, + _ => Err(e).unwrap(), // bail + }, + } + } + + assert_eq!(orig_len, bytes_consumed); + } + + assert_eq!(normal_encoded, str::from_utf8(&stream_encoded).unwrap()); + } +} + /// A `Write` implementation that returns Interrupted some fraction of the time, randomly. struct InterruptingWriter<'a, W: 'a + Write, R: 'a + Rng> { w: &'a mut W, @@ -479,3 +541,20 @@ impl<'a, W: Write, R: Rng> Write for InterruptingWriter<'a, W, R> { self.w.flush() } } + +/// A `Write` implementation that can only write `n` bytes at once. +struct SlowWriter<'a, W: 'a + Write> { + w: &'a mut W, + n: usize, +} + +impl<'a, W: Write> Write for SlowWriter<'a, W> { + fn write(&mut self, buf: &[u8]) -> io::Result { + let end = ::std::cmp::min(self.n, buf.len()); + self.w.write(&buf[..end]) + } + + fn flush(&mut self) -> io::Result<()> { + self.w.flush() + } +}