/
line_writer.rs
127 lines (112 loc) · 3.71 KB
/
line_writer.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
use super::buf_writer::BufWriter;
use futures_core::ready;
use futures_core::task::{Context, Poll};
use futures_io::AsyncWrite;
use pin_project_lite::pin_project;
use pin_utils::pin_mut;
use std::io;
use std::pin::Pin;
pin_project! {
#[derive(Debug)]
struct LineWriterShim<W: AsyncWrite> {
#[pin]
buffer: BufWriter<W>, // TODO HELP what's this field's type suppossed to be?
}
}
impl<W: AsyncWrite> LineWriterShim<W> {
/// TODO WIP
fn buffered(&self) -> &[u8] {
self.buffer.buffer()
}
/// TODO WIP
fn flush_if_completed_line(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
let this = self.project();
//let this = &mut *self;
match self.buffered().last().copied() {
Some(b'\n') => this.buffer.flush_buf(cx),
_ => Poll::Ready(Ok(())),
}
}
}
impl<W: AsyncWrite> AsyncWrite for LineWriterShim<W> {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
let this = self.project();
let newline_index = match memchr::memrchr(b'\n', buf) {
None => {
ready!(self.flush_if_completed_line(cx)?);
return this.buffer.poll_write(cx, buf);
}
Some(newline_index) => newline_index + 1,
};
this.buffer.poll_flush(cx)?;
let lines = &buf[..newline_index];
let flushed = ready!(this.buffer.as_mut().poll_write(cx, lines))?;
if flushed == 0 {
return Poll::Ready(Ok(0));
}
let tail = if flushed >= newline_index {
&buf[flushed..]
} else if newline_index - flushed <= self.buffer.capacity() {
&buf[flushed..newline_index]
} else {
let scan_area = &buf[flushed..];
let scan_area = &scan_area[..self.buffer.capacity()];
match memchr::memrchr(b'\n', scan_area) {
Some(newline_index) => &scan_area[..newline_index + 1],
None => scan_area,
}
};
let buffered = self.buffer.write_to_buf(tail);
Poll::Ready(Ok(flushed + buffered))
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.project().buffer.poll_flush(cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.project().buffer.poll_close(cx)
}
}
pin_project! {
/// TODO: WIP
#[derive(Debug)]
pub struct LineWriter<W: AsyncWrite> {
#[pin]
inner: BufWriter<W>,
}
}
impl<W: AsyncWrite> LineWriter<W> {
/// TODO: WIP
pub fn new(inner: W) -> LineWriter<W> {
// 1024 is taken from std::io::buffered::LineWriter
LineWriter::with_capacity(1024, inner)
}
/// TODO: WIP
pub fn with_capacity(capacity: usize, inner: W) -> LineWriter<W> {
LineWriter { inner: BufWriter::with_capacity(capacity, inner) }
}
}
impl<W: AsyncWrite> AsyncWrite for LineWriter<W> {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
let lws = LineWriterShim { buffer: Box::new(&self.inner).as_mut() };
pin_mut!(lws);
lws.poll_write(cx, buf)
//lws.poll_write(cx, buf)
//Pin::new(&mut lws).poll_write(cx, buf)
//Box::pin(&mut lws).poll_write(cx, buf)
//Pin::new(&mut *lws).poll_write(cx, buf)
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.project().inner.poll_flush(cx)
}
fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
self.project().inner.poll_close(cx)
}
}