Skip to content

Commit

Permalink
Use a mspc channel to fix the deadlock in HTTPS requests
Browse files Browse the repository at this point in the history
  • Loading branch information
mb64 committed Mar 8, 2019
1 parent b5b44bc commit 122bf60
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 5 deletions.
5 changes: 5 additions & 0 deletions src/client.rs
Expand Up @@ -83,6 +83,11 @@ impl ClientConnection {
}
}

/// true if the connection is HTTPS
pub fn secure(&self) -> bool {
self.secure
}

/// Reads the next line from self.next_header_source.
///
/// Reads until `CRLF` is reached. The next read will start
Expand Down
14 changes: 12 additions & 2 deletions src/lib.rs
Expand Up @@ -123,6 +123,7 @@ use std::io::Error as IoError;
use std::io::Result as IoResult;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::sync::mpsc;
use std::thread;
use std::net;
use std::net::{ToSocketAddrs, TcpStream, Shutdown};
Expand Down Expand Up @@ -327,8 +328,17 @@ impl Server {
let mut client = Some(client);
tasks_pool.spawn(Box::new(move || {
if let Some(client) = client.take() {
for rq in client {
messages.push(rq.into());
// Synchronization is needed for HTTPS requests to avoid a deadlock
if client.secure() {
let (sender, receiver) = mpsc::channel();
for rq in client {
messages.push(rq.with_notify_sender(sender.clone()).into());
receiver.recv().unwrap();
}
} else {
for rq in client {
messages.push(rq.into());
}
}
}
}));
Expand Down
59 changes: 56 additions & 3 deletions src/request.rs
Expand Up @@ -19,6 +19,8 @@ use std::net::SocketAddr;
use std::fmt;
use std::str::FromStr;

use std::sync::mpsc::Sender;

use {Header, HTTPVersion, Method, Response, StatusCode};
use util::EqualReader;
use chunked_transfer::Decoder;
Expand Down Expand Up @@ -79,6 +81,33 @@ pub struct Request {

// true if a `100 Continue` response must be sent when `as_reader()` is called
must_send_continue: bool,

// If Some, a message must be sent after responding
notify_when_responded: Option<Sender<()>>,
}

struct NotifyOnDrop<R> {
sender: Sender<()>,
inner: R,
}

impl<R: Read> Read for NotifyOnDrop<R> {
fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
self.inner.read(buf)
}
}
impl<R: Write> Write for NotifyOnDrop<R> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.inner.write(buf)
}
fn flush(&mut self) -> io::Result<()> {
self.inner.flush()
}
}
impl<R> Drop for NotifyOnDrop<R> {
fn drop(&mut self) {
self.sender.send(()).unwrap();
}
}

/// Error that can happen when building a `Request` object.
Expand Down Expand Up @@ -207,6 +236,7 @@ pub fn new_request<R, W>(secure: bool, method: Method, path: String,
headers: headers,
body_length: content_length,
must_send_continue: expects_continue,
notify_when_responded: None,
})
}

Expand Down Expand Up @@ -277,7 +307,12 @@ impl Request {
self.response_writer.as_mut().unwrap().flush().ok(); // TODO: unused result

let stream = CustomStream::new(self.into_reader_impl(), self.into_writer_impl());
Box::new(stream) as Box<ReadWrite + Send>
if let Some(sender) = self.notify_when_responded.take() {
let stream = NotifyOnDrop { sender, inner: stream };
Box::new(stream) as Box<ReadWrite + Send>
} else {
Box::new(stream) as Box<ReadWrite + Send>
}
}

/// Allows to read the body of the request.
Expand Down Expand Up @@ -329,7 +364,13 @@ impl Request {
/// Therefore you should always destroy the `Writer` as soon as possible.
#[inline]
pub fn into_writer(mut self) -> Box<Write + Send + 'static> {
self.into_writer_impl()
let writer = self.into_writer_impl();
if let Some(sender) = self.notify_when_responded.take() {
let writer = NotifyOnDrop { sender, inner: writer };
Box::new(writer) as Box<Write + Send + 'static>
} else {
writer
}
}

fn into_writer_impl(&mut self) -> Box<Write + Send + 'static> {
Expand Down Expand Up @@ -357,7 +398,11 @@ impl Request {
pub fn respond<R>(mut self, response: Response<R>) -> Result<(), IoError>
where R: Read
{
self.respond_impl(response)
let res = self.respond_impl(response);
if let Some(sender) = self.notify_when_responded.take() {
sender.send(()).unwrap();
}
res
}

fn respond_impl<R>(&mut self, response: Response<R>) -> Result<(), IoError>
Expand All @@ -384,6 +429,11 @@ impl Request {

writer.flush()
}

pub(crate) fn with_notify_sender(mut self, sender: Sender<()>) -> Self {
self.notify_when_responded = Some(sender);
self
}
}

impl fmt::Debug for Request {
Expand All @@ -400,6 +450,9 @@ impl Drop for Request {
if self.response_writer.is_some() {
let response = Response::empty(500);
let _ = self.respond_impl(response); // ignoring any potential error
if let Some(sender) = self.notify_when_responded.take() {
sender.send(()).unwrap();
}
}
}
}
Expand Down

0 comments on commit 122bf60

Please sign in to comment.