From a14fa0ab963be252c0c608e2516ef30252d6a7e2 Mon Sep 17 00:00:00 2001 From: Mark Barbone Date: Sat, 17 Nov 2018 17:42:12 -0500 Subject: [PATCH] Use a mspc channel to fix the deadlock in HTTPS requests --- src/client.rs | 5 +++++ src/lib.rs | 14 ++++++++++-- src/request.rs | 59 +++++++++++++++++++++++++++++++++++++++++++++++--- 3 files changed, 73 insertions(+), 5 deletions(-) diff --git a/src/client.rs b/src/client.rs index 3eb502a06..07b3d9375 100644 --- a/src/client.rs +++ b/src/client.rs @@ -84,6 +84,11 @@ impl ClientConnection { } } + /// true if the connection is HTTPS + pub fn secure(&self) -> bool { + self.secure + } + /// Reads the next line from self.next_header_source. /// /// Reads until `CRLF` is reached. The next read will start diff --git a/src/lib.rs b/src/lib.rs index a768c80b6..84ad0583b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -124,6 +124,7 @@ use std::io::Error as IoError; use std::io::Result as IoResult; use std::sync::Arc; use std::sync::atomic::AtomicBool; +use std::sync::mpsc; use std::thread; use std::net; use std::net::{ToSocketAddrs, TcpStream, Shutdown}; @@ -330,8 +331,17 @@ impl Server { let mut client = Some(client); tasks_pool.spawn(Box::new(move || { if let Some(client) = client.take() { - for rq in client { - messages.push(rq.into()); + // Synchronization is needed for HTTPS requests to avoid a deadlock + if client.secure() { + let (sender, receiver) = mpsc::channel(); + for rq in client { + messages.push(rq.with_notify_sender(sender.clone()).into()); + receiver.recv().unwrap(); + } + } else { + for rq in client { + messages.push(rq.into()); + } } } })); diff --git a/src/request.rs b/src/request.rs index 965f85fe1..8ffabad58 100644 --- a/src/request.rs +++ b/src/request.rs @@ -21,6 +21,8 @@ use std::net::SocketAddr; use std::fmt; use std::str::FromStr; +use std::sync::mpsc::Sender; + use {Header, HTTPVersion, Method, Response, StatusCode}; use util::EqualReader; use chunked_transfer::Decoder; @@ -81,6 +83,33 @@ pub struct Request { // true if a `100 Continue` response must be sent when `as_reader()` is called must_send_continue: bool, + + // If Some, a message must be sent after responding + notify_when_responded: Option>, +} + +struct NotifyOnDrop { + sender: Sender<()>, + inner: R, +} + +impl Read for NotifyOnDrop { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + self.inner.read(buf) + } +} +impl Write for NotifyOnDrop { + fn write(&mut self, buf: &[u8]) -> io::Result { + self.inner.write(buf) + } + fn flush(&mut self) -> io::Result<()> { + self.inner.flush() + } +} +impl Drop for NotifyOnDrop { + fn drop(&mut self) { + self.sender.send(()).unwrap(); + } } /// Error that can happen when building a `Request` object. @@ -209,6 +238,7 @@ pub fn new_request(secure: bool, method: Method, path: String, headers: headers, body_length: content_length, must_send_continue: expects_continue, + notify_when_responded: None, }) } @@ -279,7 +309,12 @@ impl Request { self.response_writer.as_mut().unwrap().flush().ok(); // TODO: unused result let stream = CustomStream::new(self.into_reader_impl(), self.into_writer_impl()); - Box::new(stream) as Box + if let Some(sender) = self.notify_when_responded.take() { + let stream = NotifyOnDrop { sender, inner: stream }; + Box::new(stream) as Box + } else { + Box::new(stream) as Box + } } /// Allows to read the body of the request. @@ -331,7 +366,13 @@ impl Request { /// Therefore you should always destroy the `Writer` as soon as possible. #[inline] pub fn into_writer(mut self) -> Box { - self.into_writer_impl() + let writer = self.into_writer_impl(); + if let Some(sender) = self.notify_when_responded.take() { + let writer = NotifyOnDrop { sender, inner: writer }; + Box::new(writer) as Box + } else { + writer + } } fn into_writer_impl(&mut self) -> Box { @@ -359,7 +400,11 @@ impl Request { pub fn respond(mut self, response: Response) -> Result<(), IoError> where R: Read { - self.respond_impl(response) + let res = self.respond_impl(response); + if let Some(sender) = self.notify_when_responded.take() { + sender.send(()).unwrap(); + } + res } fn respond_impl(&mut self, response: Response) -> Result<(), IoError> @@ -386,6 +431,11 @@ impl Request { writer.flush() } + + pub(crate) fn with_notify_sender(mut self, sender: Sender<()>) -> Self { + self.notify_when_responded = Some(sender); + self + } } impl fmt::Debug for Request { @@ -402,6 +452,9 @@ impl Drop for Request { if self.response_writer.is_some() { let response = Response::empty(500); let _ = self.respond_impl(response); // ignoring any potential error + if let Some(sender) = self.notify_when_responded.take() { + sender.send(()).unwrap(); + } } } }