Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add unblock method for graceful shutdown #184

Merged
merged 1 commit into from Oct 25, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
13 changes: 11 additions & 2 deletions src/lib.rs
Expand Up @@ -106,6 +106,7 @@ extern crate openssl;

use std::error::Error;
use std::io::Error as IoError;
use std::io::ErrorKind as IoErrorKind;
use std::io::Result as IoResult;
use std::net;
use std::net::{Shutdown, TcpStream, ToSocketAddrs};
Expand Down Expand Up @@ -383,8 +384,9 @@ impl Server {
/// Blocks until an HTTP request has been submitted and returns it.
pub fn recv(&self) -> IoResult<Request> {
match self.messages.pop() {
Message::Error(err) => Err(err),
Message::NewRequest(rq) => Ok(rq),
Some(Message::Error(err)) => return Err(err),
Some(Message::NewRequest(rq)) => return Ok(rq),
None => return Err(IoError::new(IoErrorKind::Other, "thread unblocked")),
}
}

Expand All @@ -405,6 +407,13 @@ impl Server {
None => Ok(None),
}
}

/// Unblock thread stuck in recv() or incoming_requests().
/// If there are several such threads, only one is unblocked.
/// This method allows graceful shutdown of server.
pub fn unblock(&self) {
self.messages.unblock();
}
}

impl<'a> Iterator for IncomingRequests<'a> {
Expand Down
37 changes: 29 additions & 8 deletions src/util/messages_queue.rs
Expand Up @@ -2,11 +2,16 @@ use std::collections::VecDeque;
use std::sync::{Arc, Condvar, Mutex};
use std::time::{Duration, Instant};

enum Control<T> {
Elem(T),
Unblock,
}

pub struct MessagesQueue<T>
where
T: Send,
{
queue: Mutex<VecDeque<T>>,
queue: Mutex<VecDeque<Control<T>>>,
condvar: Condvar,
}

Expand All @@ -24,17 +29,27 @@ where
/// Pushes an element to the queue.
pub fn push(&self, value: T) {
let mut queue = self.queue.lock().unwrap();
queue.push_back(value);
queue.push_back(Control::Elem(value));
self.condvar.notify_one();
}

/// Unblock one thread stuck in pop loop.
pub fn unblock(&self) {
let mut queue = self.queue.lock().unwrap();
queue.push_back(Control::Unblock);
self.condvar.notify_one();
}

/// Pops an element. Blocks until one is available.
pub fn pop(&self) -> T {
/// Returns None in case unblock() was issued.
pub fn pop(&self) -> Option<T> {
let mut queue = self.queue.lock().unwrap();

loop {
if let Some(elem) = queue.pop_front() {
return elem;
match queue.pop_front() {
Some(Control::Elem(value)) => return Some(value),
Some(Control::Unblock) => return None,
None => (),
}

queue = self.condvar.wait(queue).unwrap();
Expand All @@ -44,17 +59,23 @@ where
/// Tries to pop an element without blocking.
pub fn try_pop(&self) -> Option<T> {
let mut queue = self.queue.lock().unwrap();
queue.pop_front()
match queue.pop_front() {
Some(Control::Elem(value)) => Some(value),
Some(Control::Unblock) | None => None,
}
}

/// Tries to pop an element without blocking
/// more than the specified timeout duration
/// or unblock() was issued
pub fn pop_timeout(&self, timeout: Duration) -> Option<T> {
let mut queue = self.queue.lock().unwrap();
let mut duration = timeout;
loop {
if let Some(elem) = queue.pop_front() {
return Some(elem);
match queue.pop_front() {
Some(Control::Elem(value)) => return Some(value),
Some(Control::Unblock) => return None,
None => (),
}
let now = Instant::now();
let (_queue, result) = self.condvar.wait_timeout(queue, timeout).unwrap();
Expand Down
34 changes: 34 additions & 0 deletions tests/unblock-test.rs
@@ -0,0 +1,34 @@
extern crate tiny_http;

use std::sync::Arc;
use std::thread;

#[test]
fn unblock_server() {
let server = tiny_http::Server::http("0.0.0.0:0").unwrap();
let s = Arc::new(server);

let s1 = s.clone();
thread::spawn(move || s1.unblock());

// Without unblock this would hang forever
for _rq in s.incoming_requests() {}
}

#[test]
fn unblock_threads() {
let server = tiny_http::Server::http("0.0.0.0:0").unwrap();
let s = Arc::new(server);

let s1 = s.clone();
let s2 = s.clone();
let h1 = thread::spawn(move || for _rq in s1.incoming_requests() {});
let h2 = thread::spawn(move || for _rq in s2.incoming_requests() {});

// Graceful shutdown; removing even one of the
// unblock calls prevents termination
s.unblock();
s.unblock();
h1.join().unwrap();
h2.join().unwrap();
}