diff --git a/src/lib.rs b/src/lib.rs index a76307985..40b5491c4 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -106,6 +106,7 @@ extern crate openssl; use std::error::Error; use std::io::Error as IoError; +use std::io::ErrorKind as IoErrorKind; use std::io::Result as IoResult; use std::net; use std::net::{Shutdown, TcpStream, ToSocketAddrs}; @@ -383,8 +384,9 @@ impl Server { /// Blocks until an HTTP request has been submitted and returns it. pub fn recv(&self) -> IoResult { match self.messages.pop() { - Message::Error(err) => Err(err), - Message::NewRequest(rq) => Ok(rq), + Some(Message::Error(err)) => return Err(err), + Some(Message::NewRequest(rq)) => return Ok(rq), + None => return Err(IoError::new(IoErrorKind::Other, "thread unblocked")), } } @@ -405,6 +407,13 @@ impl Server { None => Ok(None), } } + + /// Unblock thread stuck in recv() or incoming_requests(). + /// If there are several such threads, only one is unblocked. + /// This method allows graceful shutdown of server. + pub fn unblock(&self) { + self.messages.unblock(); + } } impl<'a> Iterator for IncomingRequests<'a> { diff --git a/src/util/messages_queue.rs b/src/util/messages_queue.rs index dda91c212..9b95b174c 100644 --- a/src/util/messages_queue.rs +++ b/src/util/messages_queue.rs @@ -2,11 +2,16 @@ use std::collections::VecDeque; use std::sync::{Arc, Condvar, Mutex}; use std::time::{Duration, Instant}; +enum Control { + Elem(T), + Unblock, +} + pub struct MessagesQueue where T: Send, { - queue: Mutex>, + queue: Mutex>>, condvar: Condvar, } @@ -24,17 +29,27 @@ where /// Pushes an element to the queue. pub fn push(&self, value: T) { let mut queue = self.queue.lock().unwrap(); - queue.push_back(value); + queue.push_back(Control::Elem(value)); + self.condvar.notify_one(); + } + + /// Unblock one thread stuck in pop loop. + pub fn unblock(&self) { + let mut queue = self.queue.lock().unwrap(); + queue.push_back(Control::Unblock); self.condvar.notify_one(); } /// Pops an element. Blocks until one is available. - pub fn pop(&self) -> T { + /// Returns None in case unblock() was issued. + pub fn pop(&self) -> Option { let mut queue = self.queue.lock().unwrap(); loop { - if let Some(elem) = queue.pop_front() { - return elem; + match queue.pop_front() { + Some(Control::Elem(value)) => return Some(value), + Some(Control::Unblock) => return None, + None => (), } queue = self.condvar.wait(queue).unwrap(); @@ -44,17 +59,23 @@ where /// Tries to pop an element without blocking. pub fn try_pop(&self) -> Option { let mut queue = self.queue.lock().unwrap(); - queue.pop_front() + match queue.pop_front() { + Some(Control::Elem(value)) => Some(value), + Some(Control::Unblock) | None => None, + } } /// Tries to pop an element without blocking /// more than the specified timeout duration + /// or unblock() was issued pub fn pop_timeout(&self, timeout: Duration) -> Option { let mut queue = self.queue.lock().unwrap(); let mut duration = timeout; loop { - if let Some(elem) = queue.pop_front() { - return Some(elem); + match queue.pop_front() { + Some(Control::Elem(value)) => return Some(value), + Some(Control::Unblock) => return None, + None => (), } let now = Instant::now(); let (_queue, result) = self.condvar.wait_timeout(queue, timeout).unwrap(); diff --git a/tests/unblock-test.rs b/tests/unblock-test.rs new file mode 100644 index 000000000..001568a48 --- /dev/null +++ b/tests/unblock-test.rs @@ -0,0 +1,34 @@ +extern crate tiny_http; + +use std::sync::Arc; +use std::thread; + +#[test] +fn unblock_server() { + let server = tiny_http::Server::http("0.0.0.0:0").unwrap(); + let s = Arc::new(server); + + let s1 = s.clone(); + thread::spawn(move || s1.unblock()); + + // Without unblock this would hang forever + for _rq in s.incoming_requests() {} +} + +#[test] +fn unblock_threads() { + let server = tiny_http::Server::http("0.0.0.0:0").unwrap(); + let s = Arc::new(server); + + let s1 = s.clone(); + let s2 = s.clone(); + let h1 = thread::spawn(move || for _rq in s1.incoming_requests() {}); + let h2 = thread::spawn(move || for _rq in s2.incoming_requests() {}); + + // Graceful shutdown; removing even one of the + // unblock calls prevents termination + s.unblock(); + s.unblock(); + h1.join().unwrap(); + h2.join().unwrap(); +}