forked from tokio-rs/tokio-uring
-
Notifications
You must be signed in to change notification settings - Fork 0
/
shared_fd.rs
158 lines (138 loc) · 4.44 KB
/
shared_fd.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
use crate::io::Close;
use std::future::poll_fn;
use std::cell::RefCell;
use std::os::unix::io::{FromRawFd, RawFd};
use std::rc::Rc;
use std::task::Waker;
use crate::runtime::driver::op::Op;
use crate::runtime::CONTEXT;
// Tracks in-flight operations on a file descriptor. Ensures all in-flight
// operations complete before submitting the close.
//
// If the runtime is unavailable, will fall back to synchronous Close to ensure
// File resources are not leaked.
#[derive(Clone)]
pub(crate) struct SharedFd {
inner: Rc<Inner>,
}
struct Inner {
// Open file descriptor
fd: RawFd,
// Waker to notify when the close operation completes.
state: RefCell<State>,
}
enum State {
/// Initial state
Init,
/// Waiting for all in-flight operation to complete.
Waiting(Option<Waker>),
/// The FD is closing
Closing(Op<Close>),
/// The FD is fully closed
Closed,
}
impl SharedFd {
pub(crate) fn new(fd: RawFd) -> SharedFd {
SharedFd {
inner: Rc::new(Inner {
fd,
state: RefCell::new(State::Init),
}),
}
}
/// Returns the RawFd
pub(crate) fn raw_fd(&self) -> RawFd {
self.inner.fd
}
/// An FD cannot be closed until all in-flight operation have completed.
/// This prevents bugs where in-flight reads could operate on the incorrect
/// file descriptor.
///
/// TO model this, if there are no in-flight operations, then
pub(crate) async fn close(mut self) {
// Get a mutable reference to Inner, indicating there are no
// in-flight operations on the FD.
if let Some(inner) = Rc::get_mut(&mut self.inner) {
// Submit the close operation
inner.submit_close_op();
}
self.inner.closed().await;
}
}
impl Inner {
/// If there are no in-flight operations, submit the operation.
fn submit_close_op(&mut self) {
// Close the FD
let state = RefCell::get_mut(&mut self.state);
// Submit a close operation
// If either:
// - runtime has already closed, or
// - submitting the Close operation fails
// we fall back on a synchronous `close`. This is safe as, at this point,
// we guarantee all in-flight operations have completed. The most
// common cause for an error is attempting to close the FD while
// off runtime.
//
// This is done by initializing a `File` with the FD and
// dropping it.
//
// TODO: Should we warn?
*state = match CONTEXT.try_with(|cx| cx.is_set()) {
Ok(true) => match Op::close(self.fd) {
Ok(op) => State::Closing(op),
Err(_) => {
let _ = unsafe { std::fs::File::from_raw_fd(self.fd) };
State::Closed
}
},
_ => {
let _ = unsafe { std::fs::File::from_raw_fd(self.fd) };
State::Closed
}
};
}
/// Completes when the FD has been closed.
async fn closed(&self) {
use std::future::Future;
use std::pin::Pin;
use std::task::Poll;
poll_fn(|cx| {
let mut state = self.state.borrow_mut();
match &mut *state {
State::Init => {
*state = State::Waiting(Some(cx.waker().clone()));
Poll::Pending
}
State::Waiting(Some(waker)) => {
if !waker.will_wake(cx.waker()) {
*waker = cx.waker().clone();
}
Poll::Pending
}
State::Waiting(None) => {
*state = State::Waiting(Some(cx.waker().clone()));
Poll::Pending
}
State::Closing(op) => {
// Nothing to do if the close opeation failed.
let _ = ready!(Pin::new(op).poll(cx));
*state = State::Closed;
Poll::Ready(())
}
State::Closed => Poll::Ready(()),
}
})
.await;
}
}
impl Drop for Inner {
fn drop(&mut self) {
// Submit the close operation, if needed
match RefCell::get_mut(&mut self.state) {
State::Init | State::Waiting(..) => {
self.submit_close_op();
}
_ => {}
}
}
}