forked from yewstack/yew
-
Notifications
You must be signed in to change notification settings - Fork 0
/
io.rs
116 lines (98 loc) · 3.88 KB
/
io.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
//! This module contains types for I/O functionality.
// This module should remain private until impl trait type alias becomes available so
// `BufReader` can be produced with an existential type.
use std::borrow::Cow;
use futures::stream::Stream;
use crate::platform::sync::mpsc::{self, UnboundedReceiverStream, UnboundedSender};
// Same as std::io::BufWriter and futures::io::BufWriter.
pub(crate) const DEFAULT_BUF_SIZE: usize = 8 * 1024;
/// A [`futures::io::BufWriter`], but operates over string and yields into a Stream.
pub(crate) struct BufWriter {
buf: String,
tx: UnboundedSender<String>,
capacity: usize,
}
/// Creates a Buffer pair.
pub(crate) fn buffer(capacity: usize) -> (BufWriter, impl Stream<Item = String>) {
let (tx, rx) = mpsc::unbounded_channel::<String>();
let tx = BufWriter {
// We start without allocation so empty strings will not be allocated.
buf: String::new(),
tx,
capacity,
};
(tx, UnboundedReceiverStream::new(rx))
}
// Implementation Notes:
//
// When jemalloc is used and a reasonable buffer length is chosen,
// performance of this buffer is related to the number of allocations
// instead of the amount of memory that is allocated.
//
// A Bytes-based implementation is also tested, and yielded a similar performance to String-based
// buffer.
//
// Having a String-based buffer avoids unsafe / cost of conversion between String and Bytes
// when text based content is needed (e.g.: post-processing).
//
// `Bytes::from` can be used to convert a `String` to `Bytes` if web server asks for an
// `impl Stream<Item = Bytes>`. This conversion incurs no memory allocation.
//
// Yielding the output with a Stream provides a couple advantages:
//
// 1. All child components of a VList can have their own buffer and be rendered concurrently.
// 2. If a fixed buffer is used, the rendering process can become blocked if the buffer is filled.
// Using a stream avoids this side effect and allows the renderer to finish rendering
// without being actively polled.
impl BufWriter {
#[inline]
pub fn capacity(&self) -> usize {
self.capacity
}
#[inline]
fn drain(&mut self) {
if !self.buf.is_empty() {
let _ = self.tx.send(self.buf.split_off(0));
}
}
#[inline]
fn reserve(&mut self) {
if self.buf.is_empty() {
self.buf.reserve(self.capacity);
}
}
/// Returns `True` if the internal buffer has capacity to fit a string of certain length.
#[inline]
fn has_capacity_of(&self, next_part_len: usize) -> bool {
self.buf.capacity() >= self.buf.len() + next_part_len
}
/// Writes a string into the buffer, optionally drains the buffer.
pub fn write(&mut self, s: Cow<'_, str>) {
// Try to reserve the capacity first.
self.reserve();
if !self.has_capacity_of(s.len()) {
// There isn't enough capacity, we drain the buffer.
self.drain();
}
if self.has_capacity_of(s.len()) {
// The next part is going to fit into the buffer, we push it onto the buffer.
self.buf.push_str(&s);
} else {
// if the next part is more than buffer size, we send the next part.
// We don't need to drain the buffer here as the result of self.has_capacity_of() only
// changes if the buffer was drained. If the buffer capacity didn't change,
// then it means self.has_capacity_of() has returned true the first time which will be
// guaranteed to be matched by the left hand side of this implementation.
let _ = self.tx.send(s.into_owned());
}
}
}
impl Drop for BufWriter {
fn drop(&mut self) {
if !self.buf.is_empty() {
let mut buf = String::new();
std::mem::swap(&mut buf, &mut self.buf);
let _ = self.tx.send(buf);
}
}
}