Skip to content

Commit

Permalink
Merge pull request #184 from rawler/unblock-shutdown
Browse files Browse the repository at this point in the history
Add unblock method for graceful shutdown
  • Loading branch information
rawler committed Oct 25, 2020
2 parents 325af94 + 42211a8 commit 36af7c5
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 10 deletions.
13 changes: 11 additions & 2 deletions src/lib.rs
Expand Up @@ -106,6 +106,7 @@ extern crate openssl;

use std::error::Error;
use std::io::Error as IoError;
use std::io::ErrorKind as IoErrorKind;
use std::io::Result as IoResult;
use std::net;
use std::net::{Shutdown, TcpStream, ToSocketAddrs};
Expand Down Expand Up @@ -383,8 +384,9 @@ impl Server {
/// Blocks until an HTTP request has been submitted and returns it.
pub fn recv(&self) -> IoResult<Request> {
match self.messages.pop() {
Message::Error(err) => Err(err),
Message::NewRequest(rq) => Ok(rq),
Some(Message::Error(err)) => return Err(err),
Some(Message::NewRequest(rq)) => return Ok(rq),
None => return Err(IoError::new(IoErrorKind::Other, "thread unblocked")),
}
}

Expand All @@ -405,6 +407,13 @@ impl Server {
None => Ok(None),
}
}

/// Unblock thread stuck in recv() or incoming_requests().
/// If there are several such threads, only one is unblocked.
/// This method allows graceful shutdown of server.
pub fn unblock(&self) {
self.messages.unblock();
}
}

impl<'a> Iterator for IncomingRequests<'a> {
Expand Down
37 changes: 29 additions & 8 deletions src/util/messages_queue.rs
Expand Up @@ -2,11 +2,16 @@ use std::collections::VecDeque;
use std::sync::{Arc, Condvar, Mutex};
use std::time::{Duration, Instant};

enum Control<T> {
Elem(T),
Unblock,
}

pub struct MessagesQueue<T>
where
T: Send,
{
queue: Mutex<VecDeque<T>>,
queue: Mutex<VecDeque<Control<T>>>,
condvar: Condvar,
}

Expand All @@ -24,17 +29,27 @@ where
/// Pushes an element to the queue.
pub fn push(&self, value: T) {
let mut queue = self.queue.lock().unwrap();
queue.push_back(value);
queue.push_back(Control::Elem(value));
self.condvar.notify_one();
}

/// Unblock one thread stuck in pop loop.
pub fn unblock(&self) {
let mut queue = self.queue.lock().unwrap();
queue.push_back(Control::Unblock);
self.condvar.notify_one();
}

/// Pops an element. Blocks until one is available.
pub fn pop(&self) -> T {
/// Returns None in case unblock() was issued.
pub fn pop(&self) -> Option<T> {
let mut queue = self.queue.lock().unwrap();

loop {
if let Some(elem) = queue.pop_front() {
return elem;
match queue.pop_front() {
Some(Control::Elem(value)) => return Some(value),
Some(Control::Unblock) => return None,
None => (),
}

queue = self.condvar.wait(queue).unwrap();
Expand All @@ -44,17 +59,23 @@ where
/// Tries to pop an element without blocking.
pub fn try_pop(&self) -> Option<T> {
let mut queue = self.queue.lock().unwrap();
queue.pop_front()
match queue.pop_front() {
Some(Control::Elem(value)) => Some(value),
Some(Control::Unblock) | None => None,
}
}

/// Tries to pop an element without blocking
/// more than the specified timeout duration
/// or unblock() was issued
pub fn pop_timeout(&self, timeout: Duration) -> Option<T> {
let mut queue = self.queue.lock().unwrap();
let mut duration = timeout;
loop {
if let Some(elem) = queue.pop_front() {
return Some(elem);
match queue.pop_front() {
Some(Control::Elem(value)) => return Some(value),
Some(Control::Unblock) => return None,
None => (),
}
let now = Instant::now();
let (_queue, result) = self.condvar.wait_timeout(queue, timeout).unwrap();
Expand Down
34 changes: 34 additions & 0 deletions tests/unblock-test.rs
@@ -0,0 +1,34 @@
extern crate tiny_http;

use std::sync::Arc;
use std::thread;

#[test]
fn unblock_server() {
let server = tiny_http::Server::http("0.0.0.0:0").unwrap();
let s = Arc::new(server);

let s1 = s.clone();
thread::spawn(move || s1.unblock());

// Without unblock this would hang forever
for _rq in s.incoming_requests() {}
}

#[test]
fn unblock_threads() {
let server = tiny_http::Server::http("0.0.0.0:0").unwrap();
let s = Arc::new(server);

let s1 = s.clone();
let s2 = s.clone();
let h1 = thread::spawn(move || for _rq in s1.incoming_requests() {});
let h2 = thread::spawn(move || for _rq in s2.incoming_requests() {});

// Graceful shutdown; removing even one of the
// unblock calls prevents termination
s.unblock();
s.unblock();
h1.join().unwrap();
h2.join().unwrap();
}

0 comments on commit 36af7c5

Please sign in to comment.