From f00ed9ef4efc29f56c14f4553633bdf6b9812292 Mon Sep 17 00:00:00 2001 From: Guillaume Koenig Date: Sat, 26 Jan 2019 18:55:56 +0100 Subject: [PATCH] Add unblock method for graceful shutdown --- src/lib.rs | 13 +++++++++++-- src/util/messages_queue.rs | 37 +++++++++++++++++++++++++++++-------- tests/unblock-test.rs | 34 ++++++++++++++++++++++++++++++++++ 3 files changed, 74 insertions(+), 10 deletions(-) create mode 100644 tests/unblock-test.rs diff --git a/src/lib.rs b/src/lib.rs index fb91ba3f6..c67db508c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -120,6 +120,7 @@ extern crate openssl; use std::error::Error; use std::io::Error as IoError; +use std::io::ErrorKind as IoErrorKind; use std::io::Result as IoResult; use std::sync::Arc; use std::sync::atomic::AtomicBool; @@ -375,8 +376,9 @@ impl Server { /// Blocks until an HTTP request has been submitted and returns it. pub fn recv(&self) -> IoResult { match self.messages.pop() { - Message::Error(err) => return Err(err), - Message::NewRequest(rq) => return Ok(rq), + Some(Message::Error(err)) => return Err(err), + Some(Message::NewRequest(rq)) => return Ok(rq), + None => return Err(IoError::new(IoErrorKind::Other, "thread unblocked")), } } @@ -397,6 +399,13 @@ impl Server { None => return Ok(None) } } + + /// Unblock thread stuck in recv() or incoming_requests(). + /// If there are several such threads, only one is unblocked. + /// This method allows graceful shutdown of server. + pub fn unblock(&self) { + self.messages.unblock(); + } } impl<'a> Iterator for IncomingRequests<'a> { diff --git a/src/util/messages_queue.rs b/src/util/messages_queue.rs index 9ecec19d4..208918c45 100644 --- a/src/util/messages_queue.rs +++ b/src/util/messages_queue.rs @@ -2,8 +2,13 @@ use std::collections::VecDeque; use std::time::{Duration, Instant}; use std::sync::{Arc, Mutex, Condvar}; +enum Control { + Elem(T), + Unblock, +} + pub struct MessagesQueue where T: Send { - queue: Mutex>, + queue: Mutex>>, condvar: Condvar, } @@ -18,17 +23,27 @@ impl MessagesQueue where T: Send { /// Pushes an element to the queue. pub fn push(&self, value: T) { let mut queue = self.queue.lock().unwrap(); - queue.push_back(value); + queue.push_back(Control::Elem(value)); + self.condvar.notify_one(); + } + + /// Unblock one thread stuck in pop loop. + pub fn unblock(&self) { + let mut queue = self.queue.lock().unwrap(); + queue.push_back(Control::Unblock); self.condvar.notify_one(); } /// Pops an element. Blocks until one is available. - pub fn pop(&self) -> T { + /// Returns None in case unblock() was issued. + pub fn pop(&self) -> Option { let mut queue = self.queue.lock().unwrap(); loop { - if let Some(elem) = queue.pop_front() { - return elem; + match queue.pop_front() { + Some(Control::Elem(value)) => return Some(value), + Some(Control::Unblock) => return None, + None => (), } queue = self.condvar.wait(queue).unwrap(); @@ -38,17 +53,23 @@ impl MessagesQueue where T: Send { /// Tries to pop an element without blocking. pub fn try_pop(&self) -> Option { let mut queue = self.queue.lock().unwrap(); - queue.pop_front() + match queue.pop_front() { + Some(Control::Elem(value)) => Some(value), + Some(Control::Unblock) | None => None, + } } /// Tries to pop an element without blocking /// more than the specified timeout duration + /// or unblock() was issued pub fn pop_timeout(&self, timeout: Duration) -> Option { let mut queue = self.queue.lock().unwrap(); let mut duration = timeout; loop { - if let Some(elem) = queue.pop_front() { - return Some(elem); + match queue.pop_front() { + Some(Control::Elem(value)) => return Some(value), + Some(Control::Unblock) => return None, + None => (), } let now = Instant::now(); let (_queue, result) = self.condvar.wait_timeout(queue, timeout).unwrap(); diff --git a/tests/unblock-test.rs b/tests/unblock-test.rs new file mode 100644 index 000000000..001568a48 --- /dev/null +++ b/tests/unblock-test.rs @@ -0,0 +1,34 @@ +extern crate tiny_http; + +use std::sync::Arc; +use std::thread; + +#[test] +fn unblock_server() { + let server = tiny_http::Server::http("0.0.0.0:0").unwrap(); + let s = Arc::new(server); + + let s1 = s.clone(); + thread::spawn(move || s1.unblock()); + + // Without unblock this would hang forever + for _rq in s.incoming_requests() {} +} + +#[test] +fn unblock_threads() { + let server = tiny_http::Server::http("0.0.0.0:0").unwrap(); + let s = Arc::new(server); + + let s1 = s.clone(); + let s2 = s.clone(); + let h1 = thread::spawn(move || for _rq in s1.incoming_requests() {}); + let h2 = thread::spawn(move || for _rq in s2.incoming_requests() {}); + + // Graceful shutdown; removing even one of the + // unblock calls prevents termination + s.unblock(); + s.unblock(); + h1.join().unwrap(); + h2.join().unwrap(); +}