Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: insufficient buf size when reading windows named pipe message #1778

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
76 changes: 64 additions & 12 deletions src/sys/windows/named_pipe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@ use std::sync::{Arc, Mutex};
use std::{fmt, mem, slice};

use windows_sys::Win32::Foundation::{
ERROR_BROKEN_PIPE, ERROR_IO_INCOMPLETE, ERROR_IO_PENDING, ERROR_NO_DATA, ERROR_PIPE_CONNECTED,
ERROR_PIPE_LISTENING, HANDLE, INVALID_HANDLE_VALUE,
ERROR_BROKEN_PIPE, ERROR_IO_INCOMPLETE, ERROR_IO_PENDING, ERROR_MORE_DATA, ERROR_NO_DATA,
ERROR_PIPE_CONNECTED, ERROR_PIPE_LISTENING, HANDLE, INVALID_HANDLE_VALUE,
};
use windows_sys::Win32::Storage::FileSystem::{
ReadFile, WriteFile, FILE_FLAG_FIRST_PIPE_INSTANCE, FILE_FLAG_OVERLAPPED, PIPE_ACCESS_DUPLEX,
};
use windows_sys::Win32::System::Pipes::{
ConnectNamedPipe, CreateNamedPipeW, DisconnectNamedPipe, PIPE_TYPE_BYTE,
ConnectNamedPipe, CreateNamedPipeW, DisconnectNamedPipe, PeekNamedPipe, PIPE_TYPE_BYTE,
PIPE_UNLIMITED_INSTANCES,
};
use windows_sys::Win32::System::IO::{
Expand All @@ -27,6 +27,8 @@ use crate::sys::windows::{Event, Handle, Overlapped};
use crate::Registry;
use crate::{Interest, Token};

const MAX_BUFFER_SZ: usize = 65536;

/// Non-blocking windows named pipe.
///
/// This structure internally contains a `HANDLE` which represents the named
Expand Down Expand Up @@ -307,6 +309,25 @@ impl Inner {
Ok(transferred as usize)
}
}

/// Calls the `PeekNamedPipe` function to get the remaining size of message in NamedPipe
#[inline]
unsafe fn remaining_size(&self) -> io::Result<usize> {
let mut remaining = 0;
let r = PeekNamedPipe(
self.handle.raw(),
std::ptr::null_mut(),
0,
std::ptr::null_mut(),
std::ptr::null_mut(),
&mut remaining,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it guaranteed that ERROR_MORE_DATA is never returned for stream oriented named pipes?

Because according to the PeekNamedPipe docs (https://learn.microsoft.com/en-us/windows/win32/api/namedpipeapi/nf-namedpipeapi-peeknamedpipe) this will return 0 for streams. Which would mean we're effectively creating an infinite loop where we try to read using a buffer of length 0.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be surprised if ERROR_MORE_DATA is ever returned by stream mode named pipe.

But that's a fair point. We can just err out if PeekNamedPipe returned 0.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case PeekNamedPipe returns 0 we should probably just return the short-read buffer, not an error.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That will happen only when it's stream mode, and we do not expect it to happen, no?

I think we should at least have a debug_assert in this case

);
if r == 0 {
Err(io::Error::last_os_error())
} else {
Ok(remaining as usize)
}
}
}

#[test]
Expand Down Expand Up @@ -349,6 +370,7 @@ enum State {
Pending(Vec<u8>, usize),
Ok(Vec<u8>, usize),
Err(io::Error),
InsufficientBufferSize(Vec<u8>, usize),
}

// Odd tokens are for named pipes
Expand Down Expand Up @@ -535,7 +557,7 @@ impl<'a> Read for &'a NamedPipe {
}

// We previously read something into `data`, try to copy out some
// data. If we copy out all the data schedule a new read and
// data. If we copy out all the data, schedule a new read
// otherwise store the buffer to get read later.
State::Ok(data, cur) => {
let n = {
Expand All @@ -552,6 +574,10 @@ impl<'a> Read for &'a NamedPipe {
Ok(n)
}

// We scheduled another read with a bigger buffer after the first read (see `read_done`)
// This is not possible in theory, just like `State::None` case, but return would block for now.
State::InsufficientBufferSize(..) => Err(would_block()),

// Looks like an in-flight read hit an error, return that here while
// we schedule a new one.
State::Err(e) => {
Expand Down Expand Up @@ -703,19 +729,26 @@ impl Inner {
/// scheduled.
fn schedule_read(me: &Arc<Inner>, io: &mut Io, events: Option<&mut Vec<Event>>) -> bool {
// Check to see if a read is already scheduled/completed
match io.read {
State::None => {}
_ => return true,
}

let mut buf = match mem::replace(&mut io.read, State::None) {
State::None => me.get_buffer(),
State::InsufficientBufferSize(mut buf, rem) => {
let sz_rem = std::cmp::min(rem, MAX_BUFFER_SZ);
buf.reserve_exact(sz_rem);
buf
}
e @ _ => {
io.read = e;
return true;
}
};

// Allocate a buffer and schedule the read.
let mut buf = me.get_buffer();
let e = unsafe {
let overlapped = me.read.as_ptr() as *mut _;
let slice = slice::from_raw_parts_mut(buf.as_mut_ptr(), buf.capacity());
me.read_overlapped(slice, overlapped)
me.read_overlapped(&mut slice[buf.len()..], overlapped)
};

match e {
// See `NamedPipe::connect` above for the rationale behind `forget`
Ok(_) => {
Expand Down Expand Up @@ -874,9 +907,28 @@ fn read_done(status: &OVERLAPPED_ENTRY, events: Option<&mut Vec<Event>>) {
match me.result(status.overlapped()) {
Ok(n) => {
debug_assert_eq!(status.bytes_transferred() as usize, n);
buf.set_len(status.bytes_transferred() as usize);
// Extend the len depending on the initial len is necessary
// when we call `ReadFile` again after resizing
// our internal buffer
buf.set_len(buf.len() + status.bytes_transferred() as usize);
Darksonn marked this conversation as resolved.
Show resolved Hide resolved
io.read = State::Ok(buf, 0);
}
Err(e) if e.raw_os_error() == Some(ERROR_MORE_DATA as i32) => {
buf.set_len(status.bytes_transferred() as usize);
match me.remaining_size() {
Ok(rem) if rem == 0 => {
io.read = State::Ok(buf, 0);
}
Ok(rem) => {
io.read = State::InsufficientBufferSize(buf, rem);
Inner::schedule_read(&me, &mut io, None);
return;
}
Err(e) => {
io.read = State::Err(e);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about this. This makes it an unrecoverable error, but the original error is not. I feel like this is making the situation worse in case PeekNamedPipe returns an error.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is the original error recoverable?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original ERROR_MORE_DATA error is not really an error, it we actually read bytes, so it can be ignored.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah ok, i thought you were referring to the err flow before the change.

Guess we can truncate it in this case

}
}
}
Err(e) => {
debug_assert_eq!(status.bytes_transferred(), 0);
io.read = State::Err(e);
Expand Down
133 changes: 130 additions & 3 deletions tests/win_named_pipe.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,25 @@
#![cfg(all(windows, feature = "os-poll", feature = "os-ext"))]

use std::ffi::OsStr;
use std::fs::OpenOptions;
use std::io::{self, Read, Write};
use std::io::{self, ErrorKind, Read, Write};
use std::iter;
use std::os::windows::ffi::OsStrExt;
use std::os::windows::fs::OpenOptionsExt;
use std::os::windows::io::{FromRawHandle, IntoRawHandle};
use std::os::windows::io::{FromRawHandle, IntoRawHandle, RawHandle};
use std::time::Duration;

use mio::windows::NamedPipe;
use mio::{Events, Interest, Poll, Token};
use rand::Rng;
use windows_sys::Win32::{Foundation::ERROR_NO_DATA, Storage::FileSystem::FILE_FLAG_OVERLAPPED};
use windows_sys::Win32::Foundation::ERROR_NO_DATA;
use windows_sys::Win32::Storage::FileSystem::{
CreateFileW, FILE_FLAG_FIRST_PIPE_INSTANCE, FILE_FLAG_OVERLAPPED, OPEN_EXISTING,
PIPE_ACCESS_DUPLEX,
};
use windows_sys::Win32::System::Pipes::{
CreateNamedPipeW, PIPE_READMODE_MESSAGE, PIPE_TYPE_MESSAGE, PIPE_UNLIMITED_INSTANCES,
};

fn _assert_kinds() {
fn _assert_send<T: Send>() {}
Expand Down Expand Up @@ -43,6 +53,38 @@ fn client(name: &str) -> NamedPipe {
unsafe { NamedPipe::from_raw_handle(file.into_raw_handle()) }
}

fn pipe_msg_mode() -> (NamedPipe, NamedPipe) {
let num: u64 = rand::thread_rng().gen();
let name = format!(r"\\.\pipe\my-pipe-{}", num);
let name: Vec<_> = OsStr::new(&name).encode_wide().chain(Some(0)).collect();
unsafe {
let h = CreateNamedPipeW(
name.as_ptr(),
PIPE_ACCESS_DUPLEX | FILE_FLAG_FIRST_PIPE_INSTANCE | FILE_FLAG_OVERLAPPED,
PIPE_TYPE_MESSAGE | PIPE_READMODE_MESSAGE,
PIPE_UNLIMITED_INSTANCES,
65536,
65536,
0,
std::ptr::null_mut(),
);

let server = NamedPipe::from_raw_handle(h as RawHandle);

let h = CreateFileW(
name.as_ptr(),
PIPE_ACCESS_DUPLEX,
0,
std::ptr::null_mut(),
OPEN_EXISTING,
FILE_FLAG_OVERLAPPED,
0,
);
let client = NamedPipe::from_raw_handle(h as RawHandle);
(server, client)
}
}

fn pipe() -> (NamedPipe, NamedPipe) {
let (pipe, name) = server();
(pipe, client(&name))
Expand Down Expand Up @@ -108,6 +150,91 @@ fn write_then_read() {
assert_eq!(&buf[..4], b"1234");
}

#[test]
fn read_sz_greater_than_default_buf_size() {
let (mut server, mut client) = pipe_msg_mode();
let mut poll = t!(Poll::new());
t!(poll.registry().register(
&mut server,
Token(0),
Interest::READABLE | Interest::WRITABLE,
));
t!(poll.registry().register(
&mut client,
Token(1),
Interest::READABLE | Interest::WRITABLE,
));

let mut events = Events::with_capacity(128);
let msg = (0..4106)
.map(|e| e.to_string())
.collect::<Vec<_>>()
.join("");

t!(poll.poll(&mut events, None));
assert_eq!(t!(client.write(msg.as_bytes())), 15314);

loop {
t!(poll.poll(&mut events, None));
let events = events.iter().collect::<Vec<_>>();
if let Some(event) = events.iter().find(|e| e.token() == Token(0)) {
if event.is_readable() {
break;
}
}
}

let mut buf = [0; 15314];
assert_eq!(t!(server.read(&mut buf)), 15314);
assert_eq!(&buf[..15314], msg.as_bytes());
}

#[test]
fn multi_read_sz_greater_than_default_buf_size() {
let (mut server, mut client) = pipe_msg_mode();
let mut poll = t!(Poll::new());
t!(poll.registry().register(
&mut server,
Token(0),
Interest::READABLE | Interest::WRITABLE,
));

std::thread::spawn(move || {
let msgs = vec!["hello".repeat(10), "world".repeat(100), "mio".repeat(1000)];

let mut poll = t!(Poll::new());
t!(poll.registry().register(
&mut client,
Token(1),
Interest::READABLE | Interest::WRITABLE,
));
let mut events = Events::with_capacity(128);
for msg in msgs.iter() {
t!(poll.poll(&mut events, None));
t!(client.write(msg.as_bytes()));
}
});

let mut events = Events::with_capacity(128);
let msgs = vec!["hello".repeat(10), "world".repeat(100), "mio".repeat(1000)];
for m in msgs.into_iter() {
let m = m.as_bytes();
loop {
t!(poll.poll(&mut events, None));
let events = events.iter().collect::<Vec<_>>();
if let Some(event) = events.iter().find(|e| e.token() == Token(0)) {
let mut buf = [0; 3000];
let Ok(read) = server.read(&mut buf) else {
continue;
};
assert_eq!(read, m.len());
assert_eq!(buf[..read], *m);
break;
}
}
}
}

#[test]
fn connect_before_client() {
let (mut server, name) = server();
Expand Down