From c56265d9aad18d5bb720e432184b386c21a005a1 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Tue, 2 Nov 2021 23:12:16 +0000 Subject: [PATCH 01/12] get rtless working with manual localset --- actix-rt/src/arbiter.rs | 9 ++ actix-rt/src/system.rs | 2 +- actix-server/examples/tcp-echo.rs | 27 ++++-- actix-server/src/accept.rs | 33 +++++-- actix-server/src/builder.rs | 13 ++- actix-server/src/worker.rs | 143 +++++++++++++++++------------- 6 files changed, 148 insertions(+), 79 deletions(-) diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs index 97084f0558..ec336cbf72 100644 --- a/actix-rt/src/arbiter.rs +++ b/actix-rt/src/arbiter.rs @@ -240,6 +240,15 @@ impl Arbiter { }) } + /// Try to get current running arbiter handle. + /// + /// Returns `None` if no Arbiter has been started. + /// + /// Contrary to [`current`](Self::current), this never panics. + pub fn try_current() -> Option { + HANDLE.with(|cell| cell.borrow().clone()) + } + /// Stop Arbiter from continuing it's event loop. /// /// Returns true if stop message was sent successfully and false if the Arbiter has been dropped. diff --git a/actix-rt/src/system.rs b/actix-rt/src/system.rs index 23d692a431..ebe0b34722 100644 --- a/actix-rt/src/system.rs +++ b/actix-rt/src/system.rs @@ -130,7 +130,7 @@ impl System { /// /// Returns `None` if no System has been started. /// - /// Contrary to `current`, this never panics. + /// Contrary to [`current`](Self::current), this never panics. pub fn try_current() -> Option { CURRENT.with(|cell| cell.borrow().clone()) } diff --git a/actix-server/examples/tcp-echo.rs b/actix-server/examples/tcp-echo.rs index 8b038da4c3..5e2b99ad8b 100644 --- a/actix-server/examples/tcp-echo.rs +++ b/actix-server/examples/tcp-echo.rs @@ -10,7 +10,7 @@ //! the length of each line it echos and the total size of data sent when the connection is closed. use std::{ - env, io, + io, sync::{ atomic::{AtomicUsize, Ordering}, Arc, @@ -25,10 +25,8 @@ use futures_util::future::ok; use log::{error, info}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; -#[actix_rt::main] -async fn main() -> io::Result<()> { - env::set_var("RUST_LOG", "info"); - env_logger::init(); +async fn run() -> io::Result<()> { + env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); let count = Arc::new(AtomicUsize::new(0)); @@ -85,6 +83,25 @@ async fn main() -> io::Result<()> { }) })? .workers(1) + // .system_exit() .run() .await } + +fn main() -> io::Result<()> { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + + let ls = tokio::task::LocalSet::new(); + rt.block_on(ls.run_until(run()))?; + + Ok(()) +} + +// #[actix_rt::main] +// async fn main() -> io::Result<()> { +// run().await?; +// Ok(()) +// } diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index a872853cb4..bb75a1be5c 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -156,13 +156,17 @@ impl Accept { srv: ServerHandle, handles: Vec, ) { - // Accept runs in its own thread and would want to spawn additional futures to current - // actix system. - let sys = System::current(); + // Accept runs in its own thread and might spawn additional futures to current system + let sys = System::try_current(); + thread::Builder::new() .name("actix-server accept loop".to_owned()) .spawn(move || { - System::set_current(sys); + // forward existing actix system context + if let Some(sys) = sys { + System::set_current(sys); + } + let (mut accept, mut sockets) = Accept::new_with_sockets(poll, waker, socks, handles, srv); @@ -479,10 +483,23 @@ impl Accept { // after the sleep a Timer interest is sent to Accept Poll let waker = self.waker.clone(); - System::current().arbiter().spawn(async move { - sleep(Duration::from_millis(510)).await; - waker.wake(WakerInterest::Timer); - }); + + match System::try_current() { + Some(sys) => { + sys.arbiter().spawn(async move { + sleep(Duration::from_millis(510)).await; + waker.wake(WakerInterest::Timer); + }); + } + + None => { + let rt = tokio::runtime::Handle::current(); + rt.spawn(async move { + sleep(Duration::from_millis(510)).await; + waker.wake(WakerInterest::Timer); + }); + } + } return; } diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index 7e3df9d8b1..36c79655ac 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -7,7 +7,7 @@ use std::{ }; use actix_rt::{self as rt, net::TcpStream, time::sleep, System}; -use log::{error, info}; +use log::{error, info, trace}; use tokio::sync::{ mpsc::{unbounded_channel, UnboundedReceiver}, oneshot, @@ -160,6 +160,8 @@ impl ServerBuilder { { let sockets = bind_addr(addr, self.backlog)?; + trace!("binding server to: {:?}", &sockets); + for lst in sockets { let token = self.next_token(); self.services.push(StreamNewService::create( @@ -171,6 +173,7 @@ impl ServerBuilder { self.sockets .push((token, name.as_ref().to_string(), MioListener::Tcp(lst))); } + Ok(self) } @@ -255,6 +258,8 @@ impl ServerBuilder { if self.sockets.is_empty() { panic!("Server should have at least one bound socket"); } else { + trace!("start running server"); + for (_, name, lst) in &self.sockets { info!( r#"Starting service: "{}", workers: {}, listening on: {}"#, @@ -264,6 +269,8 @@ impl ServerBuilder { ); } + trace!("run server"); + // start workers let handles = (0..self.threads) .map(|idx| { @@ -301,8 +308,8 @@ impl ServerBuilder { idx: usize, waker_queue: WakerQueue, ) -> (WorkerHandleAccept, WorkerHandleServer) { + trace!("start server worker {}", idx); let services = self.services.iter().map(|v| v.clone_factory()).collect(); - ServerWorker::start(idx, services, waker_queue, self.worker_config) } @@ -384,7 +391,7 @@ impl ServerBuilder { if exit { sleep(Duration::from_millis(300)).await; - System::current().stop(); + System::try_current().as_ref().map(System::stop); } }); } diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index f8550e18f2..39ae7914cd 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -14,7 +14,7 @@ use std::{ use actix_rt::{ spawn, time::{sleep, Instant, Sleep}, - Arbiter, + Arbiter, ArbiterHandle, System, }; use futures_core::{future::LocalBoxFuture, ready}; use log::{error, info, trace}; @@ -23,12 +23,14 @@ use tokio::sync::{ oneshot, }; -use crate::join_all; -use crate::service::{BoxedServerService, InternalServiceFactory}; -use crate::socket::MioStream; -use crate::waker_queue::{WakerInterest, WakerQueue}; +use crate::{ + join_all, + service::{BoxedServerService, InternalServiceFactory}, + socket::MioStream, + waker_queue::{WakerInterest, WakerQueue}, +}; -/// Stop worker message. Returns `true` on successful graceful shutdown. +/// Stop worker message. Returns `true` on successful graceful shutdown /// and `false` if some connections still alive when shutdown execute. pub(crate) struct Stop { graceful: bool, @@ -273,6 +275,8 @@ impl ServerWorker { waker_queue: WakerQueue, config: ServerWorkerConfig, ) -> (WorkerHandleAccept, WorkerHandleServer) { + trace!("starting server worker {}", idx); + let (tx1, rx) = unbounded_channel(); let (tx2, rx2) = unbounded_channel(); @@ -289,63 +293,77 @@ impl ServerWorker { Arbiter::new() }; - #[cfg(not(all(target_os = "linux", feature = "io-uring")))] - let arbiter = Arbiter::with_tokio_rt(move || { - tokio::runtime::Builder::new_current_thread() - .enable_all() - .max_blocking_threads(config.max_blocking_threads) - .build() - .unwrap() - }); + // get actix system context if it is set + let sys = System::try_current(); - arbiter.spawn(async move { - let fut = factories - .iter() - .enumerate() - .map(|(idx, factory)| { - let fut = factory.create(); - async move { fut.await.map(|(t, s)| (idx, t, s)) } - }) - .collect::>(); - - // a second spawn to run !Send future tasks. - spawn(async move { - let res = join_all(fut) - .await - .into_iter() - .collect::, _>>(); - let services = match res { - Ok(res) => res - .into_iter() - .fold(Vec::new(), |mut services, (factory, token, service)| { - assert_eq!(token, services.len()); - services.push(WorkerService { - factory, - service, - status: WorkerServiceStatus::Unavailable, - }); - services - }) - .into_boxed_slice(), - Err(e) => { - error!("Can not start worker: {:?}", e); - Arbiter::current().stop(); - return; - } - }; + std::thread::Builder::new() + .name("eofibef".to_owned()) + .spawn(move || { + // forward existing actix system context + if let Some(sys) = sys { + System::set_current(sys); + } - // a third spawn to make sure ServerWorker runs as non boxed future. - spawn(ServerWorker { - rx, - rx2, - services, - counter: WorkerCounter::new(idx, waker_queue, counter_clone), - factories: factories.into_boxed_slice(), - state: Default::default(), - shutdown_timeout: config.shutdown_timeout, - }); - }); - }); + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .max_blocking_threads(config.max_blocking_threads) + .build() + .unwrap(); + + rt.block_on(tokio::task::LocalSet::new().run_until(async move { + let fut = factories + .iter() + .enumerate() + .map(|(idx, factory)| { + let fut = factory.create(); + async move { fut.await.map(|(t, s)| (idx, t, s)) } + }) + .collect::>(); + + // a second spawn to run !Send future tasks. + spawn(async move { + let res = join_all(fut) + .await + .into_iter() + .collect::, _>>(); + let services = match res { + Ok(res) => res + .into_iter() + .fold(Vec::new(), |mut services, (factory, token, service)| { + assert_eq!(token, services.len()); + services.push(WorkerService { + factory, + service, + status: WorkerServiceStatus::Unavailable, + }); + services + }) + .into_boxed_slice(), + Err(e) => { + error!("Can not start worker: {:?}", e); + Arbiter::try_current().as_ref().map(ArbiterHandle::stop); + return; + } + }; + + // a third spawn to make sure ServerWorker runs as non boxed future. + spawn(ServerWorker { + rx, + rx2, + services, + counter: WorkerCounter::new(idx, waker_queue, counter_clone), + factories: factories.into_boxed_slice(), + state: Default::default(), + shutdown_timeout: config.shutdown_timeout, + }) + .await + .expect("task 3 panic"); + }) + .await + .expect("task 2 panic"); + })) + }) + .expect("worker thread error/panic"); handle_pair(idx, tx1, tx2, counter) } @@ -450,8 +468,9 @@ impl Default for WorkerState { impl Drop for ServerWorker { fn drop(&mut self) { + trace!("dropping ServerWorker"); // Stop the Arbiter ServerWorker runs on on drop. - Arbiter::current().stop(); + Arbiter::try_current().as_ref().map(ArbiterHandle::stop); } } From 9662a6e51c57ed7e0d781b74582c2ebe72afa7ff Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Tue, 2 Nov 2021 23:57:50 +0000 Subject: [PATCH 02/12] dont panic in accept loop --- actix-server/src/accept.rs | 25 ++++--- actix-server/src/availability.rs | 121 +++++++++++++++++++++++++++++++ 2 files changed, 134 insertions(+), 12 deletions(-) create mode 100644 actix-server/src/availability.rs diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index bb75a1be5c..07192f455a 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -65,7 +65,7 @@ impl AcceptLoop { let poll = self.poll.take().unwrap(); let waker = self.waker.clone(); - Accept::start(poll, waker, socks, srv, handles); + Accept::start(poll, waker, socks, srv, handles).expect("accept failed to start"); } } @@ -155,10 +155,13 @@ impl Accept { socks: Vec<(usize, MioListener)>, srv: ServerHandle, handles: Vec, - ) { + ) -> io::Result<()> { // Accept runs in its own thread and might spawn additional futures to current system let sys = System::try_current(); + let (mut accept, mut sockets) = + Accept::new_with_sockets(poll, waker, socks, handles, srv)?; + thread::Builder::new() .name("actix-server accept loop".to_owned()) .spawn(move || { @@ -167,12 +170,11 @@ impl Accept { System::set_current(sys); } - let (mut accept, mut sockets) = - Accept::new_with_sockets(poll, waker, socks, handles, srv); - accept.poll_with(&mut sockets); }) .unwrap(); + + Ok(()) } fn new_with_sockets( @@ -181,22 +183,21 @@ impl Accept { socks: Vec<(usize, MioListener)>, handles: Vec, srv: ServerHandle, - ) -> (Accept, Vec) { + ) -> io::Result<(Accept, Box<[ServerSocketInfo]>)> { let sockets = socks .into_iter() .map(|(token, mut lst)| { // Start listening for incoming connections poll.registry() - .register(&mut lst, MioToken(token), Interest::READABLE) - .unwrap_or_else(|e| panic!("Can not register io: {}", e)); + .register(&mut lst, MioToken(token), Interest::READABLE)?; - ServerSocketInfo { + Ok(ServerSocketInfo { token, lst, timeout: None, - } + }) }) - .collect(); + .collect::>()?; let mut avail = Availability::default(); @@ -213,7 +214,7 @@ impl Accept { paused: false, }; - (accept, sockets) + Ok((accept, sockets)) } fn poll_with(&mut self, sockets: &mut [ServerSocketInfo]) { diff --git a/actix-server/src/availability.rs b/actix-server/src/availability.rs new file mode 100644 index 0000000000..801b08f2b5 --- /dev/null +++ b/actix-server/src/availability.rs @@ -0,0 +1,121 @@ +use crate::worker::WorkerHandleAccept; + +/// Array of u128 with every bit as marker for a worker handle's availability. +#[derive(Debug, Default)] +pub(crate) struct Availability([u128; 4]); + +impl Availability { + /// Check if any worker handle is available + #[inline(always)] + pub(crate) fn available(&self) -> bool { + self.0.iter().any(|a| *a != 0) + } + + /// Check if worker handle is available by index + #[inline(always)] + pub(crate) fn get_available(&self, idx: usize) -> bool { + let (offset, idx) = Self::offset(idx); + + self.0[offset] & (1 << idx as u128) != 0 + } + + /// Set worker handle available state by index. + pub(crate) fn set_available(&mut self, idx: usize, avail: bool) { + let (offset, idx) = Self::offset(idx); + + let off = 1 << idx as u128; + if avail { + self.0[offset] |= off; + } else { + self.0[offset] &= !off + } + } + + /// Set all worker handle to available state. + /// This would result in a re-check on all workers' availability. + pub(crate) fn set_available_all(&mut self, handles: &[WorkerHandleAccept]) { + handles.iter().for_each(|handle| { + self.set_available(handle.idx(), true); + }) + } + + /// Get offset and adjusted index of given worker handle index. + pub(crate) fn offset(idx: usize) -> (usize, usize) { + if idx < 128 { + (0, idx) + } else if idx < 128 * 2 { + (1, idx - 128) + } else if idx < 128 * 3 { + (2, idx - 128 * 2) + } else if idx < 128 * 4 { + (3, idx - 128 * 3) + } else { + panic!("Max WorkerHandle count is 512") + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn single(aval: &mut Availability, idx: usize) { + aval.set_available(idx, true); + assert!(aval.available()); + + aval.set_available(idx, true); + + aval.set_available(idx, false); + assert!(!aval.available()); + + aval.set_available(idx, false); + assert!(!aval.available()); + } + + fn multi(aval: &mut Availability, mut idx: Vec) { + idx.iter().for_each(|idx| aval.set_available(*idx, true)); + + assert!(aval.available()); + + while let Some(idx) = idx.pop() { + assert!(aval.available()); + aval.set_available(idx, false); + } + + assert!(!aval.available()); + } + + #[test] + fn availability() { + let mut aval = Availability::default(); + + single(&mut aval, 1); + single(&mut aval, 128); + single(&mut aval, 256); + single(&mut aval, 511); + + let idx = (0..511).filter(|i| i % 3 == 0 && i % 5 == 0).collect(); + + multi(&mut aval, idx); + + multi(&mut aval, (0..511).collect()) + } + + #[test] + #[should_panic] + fn overflow() { + let mut aval = Availability::default(); + single(&mut aval, 512); + } + + #[test] + fn pin_point() { + let mut aval = Availability::default(); + + aval.set_available(438, true); + + aval.set_available(479, true); + + assert_eq!(aval.0[3], 1 << (438 - 384) | 1 << (479 - 384)); + } +} From 7b99e85797ec8d21ae711e746f91e21e95ce0fb6 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Wed, 3 Nov 2021 02:56:14 +0000 Subject: [PATCH 03/12] checkpoint based off fake's earlier work --- actix-server/examples/tcp-echo.rs | 12 +- actix-server/src/accept.rs | 305 ++++++++---------------------- actix-server/src/builder.rs | 267 ++++---------------------- actix-server/src/lib.rs | 1 + actix-server/src/server.rs | 298 ++++++++++++++++++++++++----- actix-server/src/signals.rs | 35 ++-- actix-server/src/worker.rs | 35 ++-- actix-server/tests/test_server.rs | 32 ++-- 8 files changed, 420 insertions(+), 565 deletions(-) diff --git a/actix-server/examples/tcp-echo.rs b/actix-server/examples/tcp-echo.rs index 5e2b99ad8b..3529fab845 100644 --- a/actix-server/examples/tcp-echo.rs +++ b/actix-server/examples/tcp-echo.rs @@ -88,15 +88,9 @@ async fn run() -> io::Result<()> { .await } -fn main() -> io::Result<()> { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); - - let ls = tokio::task::LocalSet::new(); - rt.block_on(ls.run_until(run()))?; - +#[tokio::main] +async fn main() -> io::Result<()> { + run().await?; Ok(()) } diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index 07192f455a..1f2ecafa49 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -1,17 +1,20 @@ use std::time::Duration; use std::{io, thread}; -use actix_rt::{ - time::{sleep, Instant}, - System, -}; +use actix_rt::time::Instant; +use actix_rt::{time::sleep, System}; use log::{debug, error, info}; use mio::{Interest, Poll, Token as MioToken}; -use crate::server::ServerHandle; -use crate::socket::MioListener; -use crate::waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN}; -use crate::worker::{Conn, WorkerHandleAccept}; +use crate::worker::ServerWorker; +use crate::{ + availability::Availability, + server::ServerHandle, + socket::MioListener, + waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN}, + worker::{Conn, WorkerHandleAccept, WorkerHandleServer}, + ServerBuilder, +}; struct ServerSocketInfo { token: usize, @@ -20,59 +23,13 @@ struct ServerSocketInfo { /// Timeout is used to mark the deadline when this socket's listener should be registered again /// after an error. - timeout: Option, -} - -/// Accept loop would live with `ServerBuilder`. -/// -/// It's tasked with construct `Poll` instance and `WakerQueue` which would be distributed to -/// `Accept` and `Worker`. -/// -/// It would also listen to `ServerCommand` and push interests to `WakerQueue`. -pub(crate) struct AcceptLoop { - srv: Option, - poll: Option, - waker: WakerQueue, -} - -impl AcceptLoop { - pub fn new(srv: ServerHandle) -> Self { - let poll = Poll::new().unwrap_or_else(|e| panic!("Can not create `mio::Poll`: {}", e)); - let waker = WakerQueue::new(poll.registry()) - .unwrap_or_else(|e| panic!("Can not create `mio::Waker`: {}", e)); - - Self { - srv: Some(srv), - poll: Some(poll), - waker, - } - } - - pub(crate) fn waker_owned(&self) -> WakerQueue { - self.waker.clone() - } - - pub fn wake(&self, i: WakerInterest) { - self.waker.wake(i); - } - - pub(crate) fn start( - &mut self, - socks: Vec<(usize, MioListener)>, - handles: Vec, - ) { - let srv = self.srv.take().expect("Can not re-use AcceptInfo"); - let poll = self.poll.take().unwrap(); - let waker = self.waker.clone(); - - Accept::start(poll, waker, socks, srv, handles).expect("accept failed to start"); - } + timeout: Option, } /// poll instance of the server. -struct Accept { +pub(crate) struct Accept { poll: Poll, - waker: WakerQueue, + waker_queue: WakerQueue, handles: Vec, srv: ServerHandle, next: usize, @@ -80,111 +37,58 @@ struct Accept { paused: bool, } -/// Array of u128 with every bit as marker for a worker handle's availability. -#[derive(Debug, Default)] -struct Availability([u128; 4]); - -impl Availability { - /// Check if any worker handle is available - #[inline(always)] - fn available(&self) -> bool { - self.0.iter().any(|a| *a != 0) - } - - /// Check if worker handle is available by index - #[inline(always)] - fn get_available(&self, idx: usize) -> bool { - let (offset, idx) = Self::offset(idx); - - self.0[offset] & (1 << idx as u128) != 0 - } - - /// Set worker handle available state by index. - fn set_available(&mut self, idx: usize, avail: bool) { - let (offset, idx) = Self::offset(idx); - - let off = 1 << idx as u128; - if avail { - self.0[offset] |= off; - } else { - self.0[offset] &= !off - } - } - - /// Set all worker handle to available state. - /// This would result in a re-check on all workers' availability. - fn set_available_all(&mut self, handles: &[WorkerHandleAccept]) { - handles.iter().for_each(|handle| { - self.set_available(handle.idx(), true); - }) - } - - /// Get offset and adjusted index of given worker handle index. - fn offset(idx: usize) -> (usize, usize) { - if idx < 128 { - (0, idx) - } else if idx < 128 * 2 { - (1, idx - 128) - } else if idx < 128 * 3 { - (2, idx - 128 * 2) - } else if idx < 128 * 4 { - (3, idx - 128 * 3) - } else { - panic!("Max WorkerHandle count is 512") - } - } -} - -/// This function defines errors that are per-connection. Which basically -/// means that if we get this error from `accept()` system call it means -/// next connection might be ready to be accepted. -/// -/// All other errors will incur a timeout before next `accept()` is performed. -/// The timeout is useful to handle resource exhaustion errors like ENFILE -/// and EMFILE. Otherwise, could enter into tight loop. -fn connection_error(e: &io::Error) -> bool { - e.kind() == io::ErrorKind::ConnectionRefused - || e.kind() == io::ErrorKind::ConnectionAborted - || e.kind() == io::ErrorKind::ConnectionReset -} - impl Accept { pub(crate) fn start( - poll: Poll, - waker: WakerQueue, - socks: Vec<(usize, MioListener)>, - srv: ServerHandle, - handles: Vec, - ) -> io::Result<()> { - // Accept runs in its own thread and might spawn additional futures to current system - let sys = System::try_current(); + sockets: Vec<(usize, MioListener)>, + builder: &ServerBuilder, + ) -> io::Result<(WakerQueue, Vec)> { + let handle_server = ServerHandle::new(builder.cmd_tx.clone()); + + // construct poll instance and its waker + let poll = Poll::new()?; + let waker_queue = WakerQueue::new(poll.registry())?; + + // start workers and collect handles + let (handles_accept, handles_server) = (0..builder.threads) + .map(|idx| { + // clone service factories + let factories = builder + .factories + .iter() + .map(|f| f.clone_factory()) + .collect::>(); + + // start worker using service factories + ServerWorker::start(idx, factories, waker_queue.clone(), builder.worker_config) + }) + .collect::>>()? + .into_iter() + .unzip(); - let (mut accept, mut sockets) = - Accept::new_with_sockets(poll, waker, socks, handles, srv)?; + let (mut accept, mut sockets) = Accept::new_with_sockets( + poll, + waker_queue.clone(), + sockets, + handles_accept, + handle_server, + )?; thread::Builder::new() - .name("actix-server accept loop".to_owned()) - .spawn(move || { - // forward existing actix system context - if let Some(sys) = sys { - System::set_current(sys); - } - - accept.poll_with(&mut sockets); - }) - .unwrap(); + .name("actix-server acceptor".to_owned()) + .spawn(move || accept.poll_with(&mut sockets)) + .map_err(|err| io::Error::new(io::ErrorKind::Other, err))?; - Ok(()) + Ok((waker_queue, handles_server)) } fn new_with_sockets( poll: Poll, - waker: WakerQueue, - socks: Vec<(usize, MioListener)>, - handles: Vec, - srv: ServerHandle, + waker_queue: WakerQueue, + sockets: Vec<(usize, MioListener)>, + accept_handles: Vec, + server_handle: ServerHandle, ) -> io::Result<(Accept, Box<[ServerSocketInfo]>)> { - let sockets = socks + let sockets = sockets .into_iter() .map(|(token, mut lst)| { // Start listening for incoming connections @@ -202,13 +106,13 @@ impl Accept { let mut avail = Availability::default(); // Assume all handles are avail at construct time. - avail.set_available_all(&handles); + avail.set_available_all(&accept_handles); let accept = Accept { poll, - waker, - handles, - srv, + waker_queue, + handles: accept_handles, + srv: server_handle, next: 0, avail, paused: false, @@ -217,8 +121,9 @@ impl Accept { Ok((accept, sockets)) } + /// blocking wait for readiness events triggered by mio fn poll_with(&mut self, sockets: &mut [ServerSocketInfo]) { - let mut events = mio::Events::with_capacity(128); + let mut events = mio::Events::with_capacity(256); loop { if let Err(e) = self.poll.poll(&mut events, None) { @@ -254,7 +159,7 @@ impl Accept { loop { // take guard with every iteration so no new interest can be added // until the current task is done. - let mut guard = self.waker.guard(); + let mut guard = self.waker_queue.guard(); match guard.pop_front() { // worker notify it becomes available. Some(WakerInterest::WorkerAvailable(idx)) => { @@ -325,6 +230,7 @@ impl Accept { fn process_timer(&self, sockets: &mut [ServerSocketInfo]) { let now = Instant::now(); + sockets .iter_mut() // Only sockets that had an associated timeout were deregistered. @@ -387,12 +293,12 @@ impl Accept { fn deregister_all(&self, sockets: &mut [ServerSocketInfo]) { // This is a best effort implementation with following limitation: // - // Every ServerSocketInfo with associate timeout will be skipped and it's timeout - // is removed in the process. + // Every ServerSocketInfo with associated timeout will be skipped and it's timeout is + // removed in the process. // - // Therefore WakerInterest::Pause followed by WakerInterest::Resume in a very short - // gap (less than 500ms) would cause all timing out ServerSocketInfos be reregistered - // before expected timing. + // Therefore WakerInterest::Pause followed by WakerInterest::Resume in a very short gap + // (less than 500ms) would cause all timing out ServerSocketInfos be re-registered before + // expected timing. sockets .iter_mut() // Take all timeout. @@ -483,7 +389,7 @@ impl Accept { info.timeout = Some(Instant::now() + Duration::from_millis(500)); // after the sleep a Timer interest is sent to Accept Poll - let waker = self.waker.clone(); + let waker = self.waker_queue.clone(); match System::try_current() { Some(sys) => { @@ -539,67 +445,14 @@ impl Accept { } } -#[cfg(test)] -mod test { - use super::Availability; - - fn single(aval: &mut Availability, idx: usize) { - aval.set_available(idx, true); - assert!(aval.available()); - - aval.set_available(idx, true); - - aval.set_available(idx, false); - assert!(!aval.available()); - - aval.set_available(idx, false); - assert!(!aval.available()); - } - - fn multi(aval: &mut Availability, mut idx: Vec) { - idx.iter().for_each(|idx| aval.set_available(*idx, true)); - - assert!(aval.available()); - - while let Some(idx) = idx.pop() { - assert!(aval.available()); - aval.set_available(idx, false); - } - - assert!(!aval.available()); - } - - #[test] - fn availability() { - let mut aval = Availability::default(); - - single(&mut aval, 1); - single(&mut aval, 128); - single(&mut aval, 256); - single(&mut aval, 511); - - let idx = (0..511).filter(|i| i % 3 == 0 && i % 5 == 0).collect(); - - multi(&mut aval, idx); - - multi(&mut aval, (0..511).collect()) - } - - #[test] - #[should_panic] - fn overflow() { - let mut aval = Availability::default(); - single(&mut aval, 512); - } - - #[test] - fn pin_point() { - let mut aval = Availability::default(); - - aval.set_available(438, true); - - aval.set_available(479, true); - - assert_eq!(aval.0[3], 1 << (438 - 384) | 1 << (479 - 384)); - } +/// This function defines errors that are per-connection; if we get this error from the `accept()` +/// system call it means the next connection might be ready to be accepted. +/// +/// All other errors will incur a timeout before next `accept()` call is attempted. The timeout is +/// useful to handle resource exhaustion errors like `ENFILE` and `EMFILE`. Otherwise, it could +/// enter into a temporary spin loop. +fn connection_error(e: &io::Error) -> bool { + e.kind() == io::ErrorKind::ConnectionRefused + || e.kind() == io::ErrorKind::ConnectionAborted + || e.kind() == io::ErrorKind::ConnectionReset } diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index 36c79655ac..62ae57e428 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -1,43 +1,31 @@ -use std::{ - future::Future, - io, mem, - pin::Pin, - task::{Context, Poll}, - time::Duration, +use std::{io, time::Duration}; + +use actix_rt::net::TcpStream; +use log::trace; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; + +use crate::{ + server::{ServerCommand, ServerHandle}, + service::{InternalServiceFactory, ServiceFactory, StreamNewService}, + socket::{ + MioListener, MioTcpListener, MioTcpSocket, StdSocketAddr, StdTcpListener, ToSocketAddrs, + }, + worker::ServerWorkerConfig, + Server, }; -use actix_rt::{self as rt, net::TcpStream, time::sleep, System}; -use log::{error, info, trace}; -use tokio::sync::{ - mpsc::{unbounded_channel, UnboundedReceiver}, - oneshot, -}; - -use crate::accept::AcceptLoop; -use crate::join_all; -use crate::server::{ServerCommand, ServerHandle}; -use crate::service::{InternalServiceFactory, ServiceFactory, StreamNewService}; -use crate::signals::{Signal, Signals}; -use crate::socket::{MioListener, StdSocketAddr, StdTcpListener, ToSocketAddrs}; -use crate::socket::{MioTcpListener, MioTcpSocket}; -use crate::waker_queue::{WakerInterest, WakerQueue}; -use crate::worker::{ServerWorker, ServerWorkerConfig, WorkerHandleAccept, WorkerHandleServer}; - /// Server builder pub struct ServerBuilder { - threads: usize, - token: usize, - backlog: u32, - handles: Vec<(usize, WorkerHandleServer)>, - services: Vec>, - sockets: Vec<(usize, String, MioListener)>, - accept: AcceptLoop, - exit: bool, - no_signals: bool, - cmd: UnboundedReceiver, - server: ServerHandle, - notify: Vec>, - worker_config: ServerWorkerConfig, + pub(super) threads: usize, + pub(super) token: usize, + pub(super) backlog: u32, + pub(super) factories: Vec>, + pub(super) sockets: Vec<(usize, String, MioListener)>, + pub(super) exit: bool, + pub(super) listen_os_signals: bool, + pub(super) cmd_tx: UnboundedSender, + pub(super) cmd_rx: UnboundedReceiver, + pub(super) worker_config: ServerWorkerConfig, } impl Default for ServerBuilder { @@ -50,21 +38,18 @@ impl ServerBuilder { /// Create new Server builder instance pub fn new() -> ServerBuilder { let (tx, rx) = unbounded_channel(); - let server = ServerHandle::new(tx); + let _server = ServerHandle::new(tx.clone()); ServerBuilder { threads: num_cpus::get(), token: 0, - handles: Vec::new(), - services: Vec::new(), + factories: Vec::new(), sockets: Vec::new(), - accept: AcceptLoop::new(server.clone()), backlog: 2048, exit: false, - no_signals: false, - cmd: rx, - notify: Vec::new(), - server, + listen_os_signals: true, + cmd_tx: tx, + cmd_rx: rx, worker_config: ServerWorkerConfig::default(), } } @@ -128,15 +113,16 @@ impl ServerBuilder { self.max_concurrent_connections(num) } + // TODO: wtf is this for /// Stop Actix system. pub fn system_exit(mut self) -> Self { self.exit = true; self } - /// Disable signal handling. + /// Disable OS signal handling. pub fn disable_signals(mut self) -> Self { - self.no_signals = true; + self.listen_os_signals = false; self } @@ -164,7 +150,7 @@ impl ServerBuilder { for lst in sockets { let token = self.next_token(); - self.services.push(StreamNewService::create( + self.factories.push(StreamNewService::create( name.as_ref().to_string(), token, factory.clone(), @@ -215,7 +201,7 @@ impl ServerBuilder { lst.set_nonblocking(true)?; let token = self.next_token(); let addr = StdSocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); - self.services.push(StreamNewService::create( + self.factories.push(StreamNewService::create( name.as_ref().to_string(), token, factory, @@ -240,7 +226,7 @@ impl ServerBuilder { let addr = lst.local_addr()?; let token = self.next_token(); - self.services.push(StreamNewService::create( + self.factories.push(StreamNewService::create( name.as_ref().to_string(), token, factory, @@ -254,177 +240,11 @@ impl ServerBuilder { } /// Starts processing incoming connections and return server controller. - pub fn run(mut self) -> ServerHandle { + pub fn run(self) -> Server { if self.sockets.is_empty() { panic!("Server should have at least one bound socket"); } else { - trace!("start running server"); - - for (_, name, lst) in &self.sockets { - info!( - r#"Starting service: "{}", workers: {}, listening on: {}"#, - name, - self.threads, - lst.local_addr() - ); - } - - trace!("run server"); - - // start workers - let handles = (0..self.threads) - .map(|idx| { - let (handle_accept, handle_server) = - self.start_worker(idx, self.accept.waker_owned()); - self.handles.push((idx, handle_server)); - - handle_accept - }) - .collect(); - - // start accept thread - self.accept.start( - mem::take(&mut self.sockets) - .into_iter() - .map(|t| (t.0, t.2)) - .collect(), - handles, - ); - - // handle signals - if !self.no_signals { - Signals::start(self.server.clone()); - } - - // start http server actor - let server = self.server.clone(); - rt::spawn(self); - server - } - } - - fn start_worker( - &self, - idx: usize, - waker_queue: WakerQueue, - ) -> (WorkerHandleAccept, WorkerHandleServer) { - trace!("start server worker {}", idx); - let services = self.services.iter().map(|v| v.clone_factory()).collect(); - ServerWorker::start(idx, services, waker_queue, self.worker_config) - } - - fn handle_cmd(&mut self, item: ServerCommand) { - match item { - ServerCommand::Pause(tx) => { - self.accept.wake(WakerInterest::Pause); - let _ = tx.send(()); - } - ServerCommand::Resume(tx) => { - self.accept.wake(WakerInterest::Resume); - let _ = tx.send(()); - } - ServerCommand::Signal(sig) => { - // Signals support - // Handle `SIGINT`, `SIGTERM`, `SIGQUIT` signals and stop actix system - match sig { - Signal::Int => { - info!("SIGINT received; starting forced shutdown"); - self.exit = true; - self.handle_cmd(ServerCommand::Stop { - graceful: false, - completion: None, - }) - } - - Signal::Term => { - info!("SIGTERM received; starting graceful shutdown"); - self.exit = true; - self.handle_cmd(ServerCommand::Stop { - graceful: true, - completion: None, - }) - } - - Signal::Quit => { - info!("SIGQUIT received; starting forced shutdown"); - self.exit = true; - self.handle_cmd(ServerCommand::Stop { - graceful: false, - completion: None, - }) - } - } - } - ServerCommand::Notify(tx) => { - self.notify.push(tx); - } - ServerCommand::Stop { - graceful, - completion, - } => { - let exit = self.exit; - - // stop accept thread - self.accept.wake(WakerInterest::Stop); - let notify = std::mem::take(&mut self.notify); - - // stop workers - let stop = self - .handles - .iter() - .map(move |worker| worker.1.stop(graceful)) - .collect(); - - rt::spawn(async move { - if graceful { - // wait for all workers to shut down - let _ = join_all(stop).await; - } - - if let Some(tx) = completion { - let _ = tx.send(()); - } - - for tx in notify { - let _ = tx.send(()); - } - - if exit { - sleep(Duration::from_millis(300)).await; - System::try_current().as_ref().map(System::stop); - } - }); - } - ServerCommand::WorkerFaulted(idx) => { - let mut found = false; - for i in 0..self.handles.len() { - if self.handles[i].0 == idx { - self.handles.swap_remove(i); - found = true; - break; - } - } - - if found { - error!("Worker {} has died; restarting", idx); - - let mut new_idx = self.handles.len(); - 'found: loop { - for i in 0..self.handles.len() { - if self.handles[i].0 == new_idx { - new_idx += 1; - continue 'found; - } - } - break; - } - - let (handle_accept, handle_server) = - self.start_worker(new_idx, self.accept.waker_owned()); - self.handles.push((new_idx, handle_server)); - self.accept.wake(WakerInterest::Worker(handle_accept)); - } - } + Server::new(self) } } @@ -435,19 +255,6 @@ impl ServerBuilder { } } -impl Future for ServerBuilder { - type Output = (); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - loop { - match Pin::new(&mut self.cmd).poll_recv(cx) { - Poll::Ready(Some(it)) => self.as_mut().get_mut().handle_cmd(it), - _ => return Poll::Pending, - } - } - } -} - pub(super) fn bind_addr( addr: S, backlog: u32, diff --git a/actix-server/src/lib.rs b/actix-server/src/lib.rs index 5bfc8faf11..d0c6de3ea6 100644 --- a/actix-server/src/lib.rs +++ b/actix-server/src/lib.rs @@ -5,6 +5,7 @@ #![doc(html_favicon_url = "https://actix.rs/favicon.ico")] mod accept; +mod availability; mod builder; mod server; mod service; diff --git a/actix-server/src/server.rs b/actix-server/src/server.rs index 46ffb3cd84..633bf1c00f 100644 --- a/actix-server/src/server.rs +++ b/actix-server/src/server.rs @@ -1,64 +1,170 @@ use std::future::Future; -use std::io; use std::pin::Pin; use std::task::{Context, Poll}; +use std::time::Duration; +use std::{io, mem}; -use tokio::sync::mpsc::UnboundedSender; +use actix_rt::time::sleep; +use actix_rt::System; +use futures_core::future::LocalBoxFuture; +use log::{error, info, trace}; +use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; use tokio::sync::oneshot; +use crate::accept::Accept; use crate::builder::ServerBuilder; -use crate::signals::Signal; +use crate::join_all; +use crate::service::InternalServiceFactory; +use crate::signals::{Signal, Signals}; +use crate::waker_queue::{WakerInterest, WakerQueue}; +use crate::worker::{ServerWorker, ServerWorkerConfig, WorkerHandleServer}; #[derive(Debug)] pub(crate) enum ServerCommand { WorkerFaulted(usize), Pause(oneshot::Sender<()>), Resume(oneshot::Sender<()>), - Signal(Signal), Stop { /// True if shut down should be graceful. graceful: bool, completion: Option>, }, - /// Notify of server stop - Notify(oneshot::Sender<()>), } -#[derive(Debug)] -#[non_exhaustive] -pub struct Server; +// TODO: docs + must use + +/// Server +/// +/// # Shutdown Signals +/// On UNIX systems, `SIGQUIT` will start a graceful shutdown and `SIGTERM` or `SIGINT` will start a +/// forced shutdown. On Windows, a CTRL-C signal will start a forced shutdown. +/// +/// A graceful shutdown will wait for all workers to stop first. +#[must_use = "futures do nothing unless you `.await` or poll them"] +pub enum Server { + Server(ServerInner), + Error(Option), +} impl Server { /// Start server building process. pub fn build() -> ServerBuilder { ServerBuilder::default() } + + pub(crate) fn new(mut builder: ServerBuilder) -> Self { + trace!("start running server"); + + let sockets = mem::take(&mut builder.sockets) + .into_iter() + .map(|t| (t.0, t.2)) + .collect(); + + // Give log information on what runtime will be used. + let is_tokio = tokio::runtime::Handle::try_current().is_ok(); + let is_actix = actix_rt::System::try_current().is_some(); + + match (is_tokio, is_actix) { + (true, false) => info!("Tokio runtime found. Starting in existing Tokio runtime"), + (_, true) => info!("Actix runtime found. Starting in Actix runtime"), + (_, _) => info!( + "Actix/Tokio runtime not found. Starting in newt Tokio current-thread runtime" + ), + } + + for (_, name, lst) in &builder.sockets { + info!( + r#"Starting service: "{}", workers: {}, listening on: {}"#, + name, + builder.threads, + lst.local_addr() + ); + } + + trace!("run server"); + + match Accept::start(sockets, &builder) { + Ok((waker_queue, worker_handles)) => { + // construct OS signals listener future + let signals = (!builder.listen_os_signals).then(Signals::new); + + Self::Server(ServerInner { + cmd_tx: builder.cmd_tx.clone(), + cmd_rx: builder.cmd_rx, + signals, + waker_queue, + worker_handles, + worker_config: builder.worker_config, + services: builder.factories, + exit: builder.exit, + stop_task: None, + }) + } + + Err(err) => Self::Error(Some(err)), + } + } + + pub fn handle(&self) -> ServerHandle { + match self { + Server::Server(inner) => ServerHandle::new(inner.cmd_tx.clone()), + Server::Error(err) => { + // TODO: i don't think this is the best way to handle server startup fail + panic!( + "server handle can not be obtained because server failed to start up: {:?}", + err + ); + } + } + } } -/// Server handle. -/// -/// # Shutdown Signals -/// On UNIX systems, `SIGQUIT` will start a graceful shutdown and `SIGTERM` or `SIGINT` will start a -/// forced shutdown. On Windows, a CTRL-C signal will start a forced shutdown. -/// -/// A graceful shutdown will wait for all workers to stop first. -#[derive(Debug)] -pub struct ServerHandle( - UnboundedSender, - Option>, -); +impl Future for Server { + type Output = io::Result<()>; -impl ServerHandle { - pub(crate) fn new(tx: UnboundedSender) -> Self { - ServerHandle(tx, None) + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + match self.as_mut().get_mut() { + Server::Error(err) => Poll::Ready(Err(err + .take() + .expect("Server future cannot be polled after error"))), + + Server::Server(inner) => { + // poll Signals + if let Some(ref mut signals) = inner.signals { + if let Poll::Ready(signal) = Pin::new(signals).poll(cx) { + inner.stop_task = inner.handle_signal(signal); + // drop signals listener + inner.signals = None; + } + } + + // eager drain command channel and handle command + loop { + match Pin::new(&mut inner.cmd_rx).poll_recv(cx) { + Poll::Ready(Some(cmd)) => { + inner.stop_task = inner.handle_cmd(cmd); + } + _ => return Poll::Pending, + } + } + } + } } +} - pub(crate) fn signal(&self, sig: Signal) { - let _ = self.0.send(ServerCommand::Signal(sig)); +/// Server handle. +#[derive(Debug, Clone)] +pub struct ServerHandle { + tx_cmd: UnboundedSender, +} + +impl ServerHandle { + pub(crate) fn new(tx_cmd: UnboundedSender) -> Self { + ServerHandle { tx_cmd } } pub(crate) fn worker_faulted(&self, idx: usize) { - let _ = self.0.send(ServerCommand::WorkerFaulted(idx)); + let _ = self.tx_cmd.send(ServerCommand::WorkerFaulted(idx)); } /// Pause accepting incoming connections @@ -67,7 +173,7 @@ impl ServerHandle { /// All opened connection remains active. pub fn pause(&self) -> impl Future { let (tx, rx) = oneshot::channel(); - let _ = self.0.send(ServerCommand::Pause(tx)); + let _ = self.tx_cmd.send(ServerCommand::Pause(tx)); async { let _ = rx.await; } @@ -76,7 +182,7 @@ impl ServerHandle { /// Resume accepting incoming connections pub fn resume(&self) -> impl Future { let (tx, rx) = oneshot::channel(); - let _ = self.0.send(ServerCommand::Resume(tx)); + let _ = self.tx_cmd.send(ServerCommand::Resume(tx)); async { let _ = rx.await; } @@ -87,7 +193,7 @@ impl ServerHandle { /// If server starts with `spawn()` method, then spawned thread get terminated. pub fn stop(&self, graceful: bool) -> impl Future { let (tx, rx) = oneshot::channel(); - let _ = self.0.send(ServerCommand::Stop { + let _ = self.tx_cmd.send(ServerCommand::Stop { graceful, completion: Some(tx), }); @@ -97,29 +203,129 @@ impl ServerHandle { } } -impl Clone for ServerHandle { - fn clone(&self) -> Self { - Self(self.0.clone(), None) - } +pub struct ServerInner { + worker_handles: Vec, + worker_config: ServerWorkerConfig, + services: Vec>, + exit: bool, + cmd_tx: UnboundedSender, + cmd_rx: UnboundedReceiver, + signals: Option, + waker_queue: WakerQueue, + stop_task: Option>, } -impl Future for ServerHandle { - type Output = io::Result<()>; +impl ServerInner { + fn handle_cmd(&mut self, item: ServerCommand) -> Option> { + match item { + ServerCommand::Pause(tx) => { + self.waker_queue.wake(WakerInterest::Pause); + let _ = tx.send(()); + None + } + + ServerCommand::Resume(tx) => { + self.waker_queue.wake(WakerInterest::Resume); + let _ = tx.send(()); + None + } - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let this = self.get_mut(); + ServerCommand::Stop { + graceful, + completion, + } => { + let exit = self.exit; - if this.1.is_none() { - let (tx, rx) = oneshot::channel(); - if this.0.send(ServerCommand::Notify(tx)).is_err() { - return Poll::Ready(Ok(())); + // stop accept thread + self.waker_queue.wake(WakerInterest::Stop); + + // stop workers + let stop = self + .worker_handles + .iter() + .map(|worker| worker.stop(graceful)) + .collect::>(); + + Some(Box::pin(async move { + if graceful { + // wait for all workers to shut down + let _ = join_all(stop).await; + } + + if let Some(tx) = completion { + let _ = tx.send(()); + } + + if exit { + sleep(Duration::from_millis(300)).await; + System::try_current().as_ref().map(System::stop); + } + })) + } + + ServerCommand::WorkerFaulted(idx) => { + // TODO: maybe just return if not found ? + assert!(self.worker_handles.iter().any(|wrk| wrk.idx == idx)); + + error!("Worker {} has died; restarting", idx); + + let factories = self + .services + .iter() + .map(|service| service.clone_factory()) + .collect(); + + match ServerWorker::start( + idx, + factories, + self.waker_queue.clone(), + self.worker_config, + ) { + Ok((handle_accept, handle_server)) => { + *self + .worker_handles + .iter_mut() + .find(|wrk| wrk.idx == idx) + .unwrap() = handle_server; + + self.waker_queue.wake(WakerInterest::Worker(handle_accept)); + } + Err(_) => todo!(), + }; + + None } - this.1 = Some(rx); } + } + + fn handle_signal(&mut self, signal: Signal) -> Option> { + match signal { + Signal::Int => { + info!("SIGINT received; starting forced shutdown"); + self.exit = true; + self.handle_cmd(ServerCommand::Stop { + graceful: false, + completion: None, + }) + } + + Signal::Term => { + info!("SIGTERM received; starting graceful shutdown"); + self.exit = true; + self.handle_cmd(ServerCommand::Stop { + graceful: true, + completion: None, + }) + } - match Pin::new(this.1.as_mut().unwrap()).poll(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(_) => Poll::Ready(Ok(())), + Signal::Quit => { + info!("SIGQUIT received; starting forced shutdown"); + self.exit = true; + self.handle_cmd(ServerCommand::Stop { + graceful: false, + completion: None, + }) + } } } } diff --git a/actix-server/src/signals.rs b/actix-server/src/signals.rs index b80fa7597e..008218080d 100644 --- a/actix-server/src/signals.rs +++ b/actix-server/src/signals.rs @@ -2,11 +2,9 @@ use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; -use crate::server::ServerHandle; - /// Types of process signals. -#[allow(dead_code)] -#[derive(PartialEq, Clone, Copy, Debug)] +// #[allow(dead_code)] +#[derive(Debug, Clone, Copy, PartialEq)] pub(crate) enum Signal { /// `SIGINT` Int, @@ -20,8 +18,6 @@ pub(crate) enum Signal { /// Process signal listener. pub(crate) struct Signals { - srv: ServerHandle, - #[cfg(not(unix))] signals: futures_core::future::LocalBoxFuture<'static, std::io::Result<()>>, @@ -30,14 +26,13 @@ pub(crate) struct Signals { } impl Signals { - /// Spawns a signal listening future that is able to send commands to the `Server`. - pub(crate) fn start(srv: ServerHandle) { + /// Constructs an OS signal listening future. + pub(crate) fn new() -> Self { #[cfg(not(unix))] { - actix_rt::spawn(Signals { - srv, + Signals { signals: Box::pin(actix_rt::signal::ctrl_c()), - }); + } } #[cfg(unix)] @@ -66,33 +61,29 @@ impl Signals { }) .collect::>(); - actix_rt::spawn(Signals { srv, signals }); + Signals { signals } } } } impl Future for Signals { - type Output = (); + type Output = Signal; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { #[cfg(not(unix))] - match self.signals.as_mut().poll(cx) { - Poll::Ready(_) => { - self.srv.signal(Signal::Int); - Poll::Ready(()) - } - Poll::Pending => Poll::Pending, + { + self.signals.as_mut().poll(cx).map(|_| Signal::Int) } #[cfg(unix)] { for (sig, fut) in self.signals.iter_mut() { + // TODO: match on if let Some ? if Pin::new(fut).poll_recv(cx).is_ready() { - let sig = *sig; - self.srv.signal(sig); - return Poll::Ready(()); + return Poll::Ready(*sig); } } + Poll::Pending } } diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 39ae7914cd..24f79e60d4 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -1,6 +1,6 @@ use std::{ future::Future, - mem, + io, mem, pin::Pin, rc::Rc, sync::{ @@ -43,19 +43,20 @@ pub(crate) struct Conn { pub token: usize, } +/// fn handle_pair( idx: usize, - tx1: UnboundedSender, - tx2: UnboundedSender, + tx_conn: UnboundedSender, + tx_stop: UnboundedSender, counter: Counter, ) -> (WorkerHandleAccept, WorkerHandleServer) { let accept = WorkerHandleAccept { idx, - tx: tx1, + tx_conn, counter, }; - let server = WorkerHandleServer { idx, tx: tx2 }; + let server = WorkerHandleServer { idx, tx_stop }; (accept, server) } @@ -151,13 +152,13 @@ impl Drop for WorkerCounterGuard { } } -/// Handle to worker that can send connection message to worker and share the -/// availability of worker to other thread. +/// Handle to worker that can send connection message to worker and share the availability of worker +/// to other threads. /// /// Held by [Accept](crate::accept::Accept). pub(crate) struct WorkerHandleAccept { idx: usize, - tx: UnboundedSender, + tx_conn: UnboundedSender, counter: Counter, } @@ -168,8 +169,8 @@ impl WorkerHandleAccept { } #[inline(always)] - pub(crate) fn send(&self, msg: Conn) -> Result<(), Conn> { - self.tx.send(msg).map_err(|msg| msg.0) + pub(crate) fn send(&self, conn: Conn) -> Result<(), Conn> { + self.tx_conn.send(conn).map_err(|msg| msg.0) } #[inline(always)] @@ -183,15 +184,14 @@ impl WorkerHandleAccept { /// Held by [ServerBuilder](crate::builder::ServerBuilder). #[derive(Debug)] pub(crate) struct WorkerHandleServer { - #[allow(dead_code)] - idx: usize, - tx: UnboundedSender, + pub(crate) idx: usize, + tx_stop: UnboundedSender, } impl WorkerHandleServer { pub(crate) fn stop(&self, graceful: bool) -> oneshot::Receiver { let (tx, rx) = oneshot::channel(); - let _ = self.tx.send(Stop { graceful, tx }); + let _ = self.tx_stop.send(Stop { graceful, tx }); rx } } @@ -274,7 +274,7 @@ impl ServerWorker { factories: Vec>, waker_queue: WakerQueue, config: ServerWorkerConfig, - ) -> (WorkerHandleAccept, WorkerHandleServer) { + ) -> io::Result<(WorkerHandleAccept, WorkerHandleServer)> { trace!("starting server worker {}", idx); let (tx1, rx) = unbounded_channel(); @@ -296,6 +296,8 @@ impl ServerWorker { // get actix system context if it is set let sys = System::try_current(); + // TODO: wait for server startup with sync channel + std::thread::Builder::new() .name("eofibef".to_owned()) .spawn(move || { @@ -339,6 +341,7 @@ impl ServerWorker { services }) .into_boxed_slice(), + Err(e) => { error!("Can not start worker: {:?}", e); Arbiter::try_current().as_ref().map(ArbiterHandle::stop); @@ -365,7 +368,7 @@ impl ServerWorker { }) .expect("worker thread error/panic"); - handle_pair(idx, tx1, tx2, counter) + Ok(handle_pair(idx, tx1, tx2, counter)) } fn restart_service(&mut self, idx: usize, factory_id: usize) { diff --git a/actix-server/tests/test_server.rs b/actix-server/tests/test_server.rs index 5919438bc8..ee9790a575 100644 --- a/actix-server/tests/test_server.rs +++ b/actix-server/tests/test_server.rs @@ -30,7 +30,7 @@ fn test_bind() { })? .run(); - let _ = tx.send((srv.clone(), actix_rt::System::current())); + let _ = tx.send((srv.handle(), actix_rt::System::current())); srv.await }) @@ -61,7 +61,7 @@ fn test_listen() { })? .run(); - let _ = tx.send((srv.clone(), actix_rt::System::current())); + let _ = tx.send((srv.handle(), actix_rt::System::current())); srv.await }) @@ -103,7 +103,7 @@ fn test_start() { })? .run(); - let _ = tx.send((srv.clone(), actix_rt::System::current())); + let _ = tx.send((srv.handle(), actix_rt::System::current())); srv.await }) @@ -166,7 +166,7 @@ async fn test_max_concurrent_connections() { let h = thread::spawn(move || { actix_rt::System::new().block_on(async { - let server = Server::build() + let srv = Server::build() // Set a relative higher backlog. .backlog(12) // max connection for a worker is 3. @@ -187,9 +187,9 @@ async fn test_max_concurrent_connections() { })? .run(); - let _ = tx.send((server.clone(), actix_rt::System::current())); + let _ = tx.send((srv.handle(), actix_rt::System::current())); - server.await + srv.await }) }); @@ -260,7 +260,7 @@ async fn test_service_restart() { let h = thread::spawn(move || { let num = num.clone(); actix_rt::System::new().block_on(async { - let server = Server::build() + let srv = Server::build() .backlog(1) .disable_signals() .bind("addr1", addr1, move || { @@ -280,12 +280,12 @@ async fn test_service_restart() { .workers(1) .run(); - let _ = tx.send((server.clone(), actix_rt::System::current())); - server.await + let _ = tx.send((srv.handle(), actix_rt::System::current())); + srv.await }) }); - let (server, sys) = rx.recv().unwrap(); + let (srv, sys) = rx.recv().unwrap(); for _ in 0..5 { TcpStream::connect(addr1) @@ -307,7 +307,7 @@ async fn test_service_restart() { assert!(num_clone.load(Ordering::SeqCst) > 5); assert!(num2_clone.load(Ordering::SeqCst) > 5); - let _ = server.stop(false); + let _ = srv.stop(false); sys.stop(); h.join().unwrap().unwrap(); } @@ -379,19 +379,19 @@ async fn worker_restart() { let h = thread::spawn(move || { let counter = counter.clone(); actix_rt::System::new().block_on(async { - let server = Server::build() + let srv = Server::build() .disable_signals() .bind("addr", addr, move || TestServiceFactory(counter.clone()))? .workers(2) .run(); - let _ = tx.send((server.clone(), actix_rt::System::current())); + let _ = tx.send((srv.handle(), actix_rt::System::current())); - server.await + srv.await }) }); - let (server, sys) = rx.recv().unwrap(); + let (srv, sys) = rx.recv().unwrap(); sleep(Duration::from_secs(3)).await; @@ -448,7 +448,7 @@ async fn worker_restart() { assert_eq!("3", id); stream.shutdown().await.unwrap(); - let _ = server.stop(false); + let _ = srv.stop(false); sys.stop(); h.join().unwrap().unwrap(); } From d89c7e2de4f3f1536598f1da792c0ea9c369f2f7 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Wed, 3 Nov 2021 03:42:19 +0000 Subject: [PATCH 04/12] align accept timeout logic with fake's impl --- actix-server/Cargo.toml | 2 +- actix-server/examples/tcp-echo.rs | 1 + actix-server/src/accept.rs | 99 ++++++++++++++++--------------- actix-server/src/builder.rs | 34 +++++------ actix-server/src/join_all.rs | 76 ++++++++++++++++++++++++ actix-server/src/lib.rs | 81 +------------------------ actix-server/src/server.rs | 79 +++++++++++++++--------- actix-server/src/test_server.rs | 44 ++++++++------ actix-server/src/waker_queue.rs | 9 +-- actix-server/src/worker.rs | 6 +- 10 files changed, 227 insertions(+), 204 deletions(-) create mode 100644 actix-server/src/join_all.rs diff --git a/actix-server/Cargo.toml b/actix-server/Cargo.toml index 46a0ad1d62..a4c403f154 100755 --- a/actix-server/Cargo.toml +++ b/actix-server/Cargo.toml @@ -38,4 +38,4 @@ actix-rt = "2.0.0" bytes = "1" env_logger = "0.9" futures-util = { version = "0.3.7", default-features = false, features = ["sink"] } -tokio = { version = "1.5.1", features = ["io-util"] } +tokio = { version = "1.5.1", features = ["io-util", "rt-multi-thread", "macros"] } diff --git a/actix-server/examples/tcp-echo.rs b/actix-server/examples/tcp-echo.rs index 3529fab845..01f58479ac 100644 --- a/actix-server/examples/tcp-echo.rs +++ b/actix-server/examples/tcp-echo.rs @@ -94,6 +94,7 @@ async fn main() -> io::Result<()> { Ok(()) } +// alternatively: // #[actix_rt::main] // async fn main() -> io::Result<()> { // run().await?; diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index 1f2ecafa49..3bca430540 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -1,21 +1,20 @@ -use std::time::Duration; -use std::{io, thread}; +use std::{io, thread, time::Duration}; use actix_rt::time::Instant; -use actix_rt::{time::sleep, System}; use log::{debug, error, info}; use mio::{Interest, Poll, Token as MioToken}; -use crate::worker::ServerWorker; use crate::{ availability::Availability, server::ServerHandle, socket::MioListener, waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN}, - worker::{Conn, WorkerHandleAccept, WorkerHandleServer}, + worker::{Conn, ServerWorker, WorkerHandleAccept, WorkerHandleServer}, ServerBuilder, }; +const TIMEOUT_DURATION_ON_ERROR: Duration = Duration::from_millis(510); + struct ServerSocketInfo { token: usize, @@ -34,6 +33,8 @@ pub(crate) struct Accept { srv: ServerHandle, next: usize, avail: Availability, + /// use the smallest duration from sockets timeout. + timeout: Option, paused: bool, } @@ -115,6 +116,7 @@ impl Accept { srv: server_handle, next: 0, avail, + timeout: None, paused: false, }; @@ -149,6 +151,9 @@ impl Accept { } } } + + // check for timeout and re-register sockets + self.process_timeout(sockets); } } @@ -171,6 +176,7 @@ impl Accept { self.accept_all(sockets); } } + // a new worker thread is made and it's handle would be added to Accept Some(WakerInterest::Worker(handle)) => { drop(guard); @@ -182,12 +188,7 @@ impl Accept { self.accept_all(sockets); } } - // got timer interest and it's time to try register socket(s) again - Some(WakerInterest::Timer) => { - drop(guard); - self.process_timer(sockets) - } Some(WakerInterest::Pause) => { drop(guard); @@ -197,6 +198,7 @@ impl Accept { self.deregister_all(sockets); } } + Some(WakerInterest::Resume) => { drop(guard); @@ -210,6 +212,7 @@ impl Accept { self.accept_all(sockets); } } + Some(WakerInterest::Stop) => { if !self.paused { self.deregister_all(sockets); @@ -217,6 +220,7 @@ impl Accept { return true; } + // waker queue is drained None => { // Reset the WakerQueue before break so it does not grow infinitely @@ -228,26 +232,44 @@ impl Accept { } } - fn process_timer(&self, sockets: &mut [ServerSocketInfo]) { - let now = Instant::now(); + fn process_timeout(&mut self, sockets: &mut [ServerSocketInfo]) { + // always remove old timeouts + if self.timeout.take().is_some() { + let now = Instant::now(); + + sockets + .iter_mut() + // Only sockets that had an associated timeout were deregistered. + .filter(|info| info.timeout.is_some()) + .for_each(|info| { + let inst = info.timeout.take().unwrap(); + + if now < inst { + // still timed out; try to set new timeout + info.timeout = Some(inst); + self.set_timeout(inst - now); + } else if !self.paused { + // timeout expired; register socket again + self.register_logged(info); + } - sockets - .iter_mut() - // Only sockets that had an associated timeout were deregistered. - .filter(|info| info.timeout.is_some()) - .for_each(|info| { - let inst = info.timeout.take().unwrap(); - - if now < inst { - info.timeout = Some(inst); - } else if !self.paused { - self.register_logged(info); - } + // Drop the timeout if server is paused and socket timeout is expired. + // When server recovers from pause it will register all sockets without + // a timeout value so this socket register will be delayed till then. + }); + } + } - // Drop the timeout if server is paused and socket timeout is expired. - // When server recovers from pause it will register all sockets without - // a timeout value so this socket register will be delayed till then. - }); + /// Update accept timeout with `duration` if it is shorter than current timeout. + fn set_timeout(&mut self, duration: Duration) { + match self.timeout { + Some(ref mut timeout) => { + if *timeout > duration { + *timeout = duration; + } + } + None => self.timeout = Some(duration), + } } #[cfg(not(target_os = "windows"))] @@ -387,26 +409,7 @@ impl Accept { // the poll would need it mark which socket and when it's // listener should be registered info.timeout = Some(Instant::now() + Duration::from_millis(500)); - - // after the sleep a Timer interest is sent to Accept Poll - let waker = self.waker_queue.clone(); - - match System::try_current() { - Some(sys) => { - sys.arbiter().spawn(async move { - sleep(Duration::from_millis(510)).await; - waker.wake(WakerInterest::Timer); - }); - } - - None => { - let rt = tokio::runtime::Handle::current(); - rt.spawn(async move { - sleep(Duration::from_millis(510)).await; - waker.wake(WakerInterest::Timer); - }); - } - } + self.set_timeout(TIMEOUT_DURATION_ON_ERROR); return; } diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index 62ae57e428..34288d1140 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -1,11 +1,11 @@ use std::{io, time::Duration}; use actix_rt::net::TcpStream; -use log::trace; +use log::{info, trace}; use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use crate::{ - server::{ServerCommand, ServerHandle}, + server::ServerCommand, service::{InternalServiceFactory, ServiceFactory, StreamNewService}, socket::{ MioListener, MioTcpListener, MioTcpSocket, StdSocketAddr, StdTcpListener, ToSocketAddrs, @@ -14,18 +14,18 @@ use crate::{ Server, }; -/// Server builder +/// Server builder. pub struct ServerBuilder { - pub(super) threads: usize, - pub(super) token: usize, - pub(super) backlog: u32, - pub(super) factories: Vec>, - pub(super) sockets: Vec<(usize, String, MioListener)>, - pub(super) exit: bool, - pub(super) listen_os_signals: bool, - pub(super) cmd_tx: UnboundedSender, - pub(super) cmd_rx: UnboundedReceiver, - pub(super) worker_config: ServerWorkerConfig, + pub(crate) threads: usize, + pub(crate) token: usize, + pub(crate) backlog: u32, + pub(crate) factories: Vec>, + pub(crate) sockets: Vec<(usize, String, MioListener)>, + pub(crate) exit: bool, + pub(crate) listen_os_signals: bool, + pub(crate) cmd_tx: UnboundedSender, + pub(crate) cmd_rx: UnboundedReceiver, + pub(crate) worker_config: ServerWorkerConfig, } impl Default for ServerBuilder { @@ -37,8 +37,7 @@ impl Default for ServerBuilder { impl ServerBuilder { /// Create new Server builder instance pub fn new() -> ServerBuilder { - let (tx, rx) = unbounded_channel(); - let _server = ServerHandle::new(tx.clone()); + let (cmd_tx, cmd_rx) = unbounded_channel(); ServerBuilder { threads: num_cpus::get(), @@ -48,8 +47,8 @@ impl ServerBuilder { backlog: 2048, exit: false, listen_os_signals: true, - cmd_tx: tx, - cmd_rx: rx, + cmd_tx, + cmd_rx, worker_config: ServerWorkerConfig::default(), } } @@ -244,6 +243,7 @@ impl ServerBuilder { if self.sockets.is_empty() { panic!("Server should have at least one bound socket"); } else { + info!("Starting {} workers", self.threads); Server::new(self) } } diff --git a/actix-server/src/join_all.rs b/actix-server/src/join_all.rs new file mode 100644 index 0000000000..aa47d72dff --- /dev/null +++ b/actix-server/src/join_all.rs @@ -0,0 +1,76 @@ +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +// a poor man's join future. joined future is only used when starting/stopping the server. +// pin_project and pinned futures are overkill for this task. +pub(crate) struct JoinAll { + fut: Vec>, +} + +pub(crate) fn join_all(fut: Vec + 'static>) -> JoinAll { + let fut = fut + .into_iter() + .map(|f| JoinFuture::Future(Box::pin(f))) + .collect(); + + JoinAll { fut } +} + +enum JoinFuture { + Future(Pin>>), + Result(Option), +} + +impl Unpin for JoinAll {} + +impl Future for JoinAll { + type Output = Vec; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut ready = true; + + let this = self.get_mut(); + for fut in this.fut.iter_mut() { + if let JoinFuture::Future(f) = fut { + match f.as_mut().poll(cx) { + Poll::Ready(t) => { + *fut = JoinFuture::Result(Some(t)); + } + Poll::Pending => ready = false, + } + } + } + + if ready { + let mut res = Vec::new(); + for fut in this.fut.iter_mut() { + if let JoinFuture::Result(f) = fut { + res.push(f.take().unwrap()); + } + } + + Poll::Ready(res) + } else { + Poll::Pending + } + } +} + +#[cfg(test)] +mod test { + use super::*; + + use actix_utils::future::ready; + + #[actix_rt::test] + async fn test_join_all() { + let futs = vec![ready(Ok(1)), ready(Err(3)), ready(Ok(9))]; + let mut res = join_all(futs).await.into_iter(); + assert_eq!(Ok(1), res.next().unwrap()); + assert_eq!(Err(3), res.next().unwrap()); + assert_eq!(Ok(9), res.next().unwrap()); + } +} diff --git a/actix-server/src/lib.rs b/actix-server/src/lib.rs index d0c6de3ea6..7862b81016 100644 --- a/actix-server/src/lib.rs +++ b/actix-server/src/lib.rs @@ -7,6 +7,7 @@ mod accept; mod availability; mod builder; +mod join_all; mod server; mod service; mod signals; @@ -22,83 +23,3 @@ pub use self::test_server::TestServer; #[doc(hidden)] pub use self::socket::FromStream; - -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; - -/// Start server building process -pub fn new() -> ServerBuilder { - ServerBuilder::default() -} - -// a poor man's join future. joined future is only used when starting/stopping the server. -// pin_project and pinned futures are overkill for this task. -pub(crate) struct JoinAll { - fut: Vec>, -} - -pub(crate) fn join_all(fut: Vec + 'static>) -> JoinAll { - let fut = fut - .into_iter() - .map(|f| JoinFuture::Future(Box::pin(f))) - .collect(); - - JoinAll { fut } -} - -enum JoinFuture { - Future(Pin>>), - Result(Option), -} - -impl Unpin for JoinAll {} - -impl Future for JoinAll { - type Output = Vec; - - fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let mut ready = true; - - let this = self.get_mut(); - for fut in this.fut.iter_mut() { - if let JoinFuture::Future(f) = fut { - match f.as_mut().poll(cx) { - Poll::Ready(t) => { - *fut = JoinFuture::Result(Some(t)); - } - Poll::Pending => ready = false, - } - } - } - - if ready { - let mut res = Vec::new(); - for fut in this.fut.iter_mut() { - if let JoinFuture::Result(f) = fut { - res.push(f.take().unwrap()); - } - } - - Poll::Ready(res) - } else { - Poll::Pending - } - } -} - -#[cfg(test)] -mod test { - use super::*; - - use actix_utils::future::ready; - - #[actix_rt::test] - async fn test_join_all() { - let futs = vec![ready(Ok(1)), ready(Err(3)), ready(Ok(9))]; - let mut res = join_all(futs).await.into_iter(); - assert_eq!(Ok(1), res.next().unwrap()); - assert_eq!(Err(3), res.next().unwrap()); - assert_eq!(Ok(9), res.next().unwrap()); - } -} diff --git a/actix-server/src/server.rs b/actix-server/src/server.rs index 633bf1c00f..3b37248e90 100644 --- a/actix-server/src/server.rs +++ b/actix-server/src/server.rs @@ -1,32 +1,46 @@ -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; -use std::time::Duration; -use std::{io, mem}; - -use actix_rt::time::sleep; -use actix_rt::System; +use std::{ + future::Future, + io, mem, + pin::Pin, + task::{Context, Poll}, + time::Duration, +}; + +use actix_rt::{time::sleep, System}; use futures_core::future::LocalBoxFuture; use log::{error, info, trace}; -use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; -use tokio::sync::oneshot; - -use crate::accept::Accept; -use crate::builder::ServerBuilder; -use crate::join_all; -use crate::service::InternalServiceFactory; -use crate::signals::{Signal, Signals}; -use crate::waker_queue::{WakerInterest, WakerQueue}; -use crate::worker::{ServerWorker, ServerWorkerConfig, WorkerHandleServer}; +use tokio::sync::{ + mpsc::{UnboundedReceiver, UnboundedSender}, + oneshot, +}; + +use crate::{ + accept::Accept, + builder::ServerBuilder, + join_all::join_all, + service::InternalServiceFactory, + signals::{Signal, Signals}, + waker_queue::{WakerInterest, WakerQueue}, + worker::{ServerWorker, ServerWorkerConfig, WorkerHandleServer}, +}; #[derive(Debug)] pub(crate) enum ServerCommand { + /// TODO WorkerFaulted(usize), + + /// Contains return channel to notify caller of successful state change. Pause(oneshot::Sender<()>), + + /// Contains return channel to notify caller of successful state change. Resume(oneshot::Sender<()>), + + /// TODO Stop { /// True if shut down should be graceful. graceful: bool, + + /// Return channel to notify caller that shutdown is complete. completion: Option>, }, } @@ -105,14 +119,17 @@ impl Server { } } + /// Get a handle for ServerFuture that can be used to change state of actix server. + /// + /// See [ServerHandle](ServerHandle) for usage. pub fn handle(&self) -> ServerHandle { match self { Server::Server(inner) => ServerHandle::new(inner.cmd_tx.clone()), Server::Error(err) => { // TODO: i don't think this is the best way to handle server startup fail panic!( - "server handle can not be obtained because server failed to start up: {:?}", - err + "server handle can not be obtained because server failed to start up: {}", + err.as_ref().unwrap() ); } } @@ -138,10 +155,16 @@ impl Future for Server { } } - // eager drain command channel and handle command + // handle stop tasks and eager drain command channel loop { + if let Some(ref mut fut) = inner.stop_task { + // only resolve stop task and exit + return fut.as_mut().poll(cx).map(|_| Ok(())); + } + match Pin::new(&mut inner.cmd_rx).poll_recv(cx) { Poll::Ready(Some(cmd)) => { + // if stop task is required, set it and loop inner.stop_task = inner.handle_cmd(cmd); } _ => return Poll::Pending, @@ -167,10 +190,9 @@ impl ServerHandle { let _ = self.tx_cmd.send(ServerCommand::WorkerFaulted(idx)); } - /// Pause accepting incoming connections + /// Pause accepting incoming connections. /// - /// If socket contains some pending connection, they might be dropped. - /// All opened connection remains active. + /// May drop socket pending connection. All open connections remain active. pub fn pause(&self) -> impl Future { let (tx, rx) = oneshot::channel(); let _ = self.tx_cmd.send(ServerCommand::Pause(tx)); @@ -179,7 +201,7 @@ impl ServerHandle { } } - /// Resume accepting incoming connections + /// Resume accepting incoming connections. pub fn resume(&self) -> impl Future { let (tx, rx) = oneshot::channel(); let _ = self.tx_cmd.send(ServerCommand::Resume(tx)); @@ -189,8 +211,6 @@ impl ServerHandle { } /// Stop incoming connection processing, stop all workers and exit. - /// - /// If server starts with `spawn()` method, then spawned thread get terminated. pub fn stop(&self, graceful: bool) -> impl Future { let (tx, rx) = oneshot::channel(); let _ = self.tx_cmd.send(ServerCommand::Stop { @@ -264,7 +284,7 @@ impl ServerInner { } ServerCommand::WorkerFaulted(idx) => { - // TODO: maybe just return if not found ? + // TODO: maybe just return with warning log if not found ? assert!(self.worker_handles.iter().any(|wrk| wrk.idx == idx)); error!("Worker {} has died; restarting", idx); @@ -290,7 +310,8 @@ impl ServerInner { self.waker_queue.wake(WakerInterest::Worker(handle_accept)); } - Err(_) => todo!(), + + Err(err) => error!("can not restart worker {}: {}", idx, err), }; None diff --git a/actix-server/src/test_server.rs b/actix-server/src/test_server.rs index ad6ee8ee35..7cf0d0a648 100644 --- a/actix-server/src/test_server.rs +++ b/actix-server/src/test_server.rs @@ -1,9 +1,9 @@ use std::sync::mpsc; -use std::{net, thread}; +use std::{io, net, thread}; use actix_rt::{net::TcpStream, System}; -use crate::{Server, ServerBuilder, ServiceFactory}; +use crate::{Server, ServerBuilder, ServerHandle, ServiceFactory}; /// A testing server. /// @@ -34,7 +34,8 @@ pub struct TestServerRuntime { addr: net::SocketAddr, host: String, port: u16, - system: System, + server_handle: ServerHandle, + thread_handle: Option>>, } impl TestServer { @@ -46,20 +47,22 @@ impl TestServer { let (tx, rx) = mpsc::channel(); // run server in separate thread - thread::spawn(move || { - let sys = System::new(); - factory(Server::build()).workers(1).disable_signals().run(); - - tx.send(System::current()).unwrap(); - sys.run() + let thread_handle = thread::spawn(move || { + System::new().block_on(async { + let server = factory(Server::build()).workers(1).disable_signals().run(); + tx.send(server.handle()).unwrap(); + server.await + }) }); - let system = rx.recv().unwrap(); + + let server_handle = rx.recv().unwrap(); TestServerRuntime { - system, addr: "127.0.0.1:0".parse().unwrap(), host: "127.0.0.1".to_string(), port: 0, + server_handle, + thread_handle: Some(thread_handle), } } @@ -68,24 +71,25 @@ impl TestServer { let (tx, rx) = mpsc::channel(); // run server in separate thread - thread::spawn(move || { + let thread_handle = thread::spawn(move || { let sys = System::new(); let tcp = net::TcpListener::bind("127.0.0.1:0").unwrap(); let local_addr = tcp.local_addr().unwrap(); sys.block_on(async { - Server::build() + let server = Server::build() .listen("test", tcp, factory) .unwrap() .workers(1) .disable_signals() .run(); - tx.send((System::current(), local_addr)).unwrap(); - }); - sys.run() + + tx.send((server.handle(), local_addr)).unwrap(); + server.await + }) }); - let (system, addr) = rx.recv().unwrap(); + let (server_handle, addr) = rx.recv().unwrap(); let host = format!("{}", addr.ip()); let port = addr.port(); @@ -94,7 +98,8 @@ impl TestServer { addr, host, port, - system, + server_handle, + thread_handle: Some(thread_handle), } } @@ -127,7 +132,8 @@ impl TestServerRuntime { /// Stop server. fn stop(&mut self) { - self.system.stop(); + let _ = self.server_handle.stop(false); + self.thread_handle.take().unwrap().join().unwrap().unwrap(); } /// Connect to server, returning a Tokio `TcpStream`. diff --git a/actix-server/src/waker_queue.rs b/actix-server/src/waker_queue.rs index 3f8669d4fb..a7280901e6 100644 --- a/actix-server/src/waker_queue.rs +++ b/actix-server/src/waker_queue.rs @@ -78,12 +78,7 @@ pub(crate) enum WakerInterest { Pause, Resume, Stop, - /// `Timer` is an interest sent as a delayed future. When an error happens on accepting - /// connection `Accept` would deregister socket listener temporary and wake up the poll and - /// register them again after the delayed future resolve. - Timer, - /// `Worker` is an interest happen after a worker runs into faulted state(This is determined - /// by if work can be sent to it successfully).`Accept` would be waked up and add the new - /// `WorkerHandleAccept`. + /// `Worker` is an interest that is triggered after a worker faults. This is determined by + /// trying to send work to it. `Accept` would be waked up and add the new `WorkerHandleAccept`. Worker(WorkerHandleAccept), } diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 24f79e60d4..1c9d61357f 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -24,7 +24,7 @@ use tokio::sync::{ }; use crate::{ - join_all, + join_all::join_all, service::{BoxedServerService, InternalServiceFactory}, socket::MioStream, waker_queue::{WakerInterest, WakerQueue}, @@ -224,7 +224,7 @@ impl WorkerService { } } -#[derive(Copy, Clone, Debug, PartialEq, Eq)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] enum WorkerServiceStatus { Available, Unavailable, @@ -235,7 +235,7 @@ enum WorkerServiceStatus { } /// Config for worker behavior passed down from server builder. -#[derive(Copy, Clone)] +#[derive(Debug, Clone, Copy)] pub(crate) struct ServerWorkerConfig { shutdown_timeout: Duration, max_blocking_threads: usize, From 36a61469e479725c535e1d4164971f0c07126ab5 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Wed, 3 Nov 2021 03:48:23 +0000 Subject: [PATCH 05/12] add changelog --- actix-server/CHANGES.md | 5 +++++ actix-server/src/lib.rs | 7 +++++++ 2 files changed, 12 insertions(+) diff --git a/actix-server/CHANGES.md b/actix-server/CHANGES.md index 83aeecd4d0..913a48e05f 100644 --- a/actix-server/CHANGES.md +++ b/actix-server/CHANGES.md @@ -1,11 +1,16 @@ # Changes ## Unreleased - 2021-xx-xx +* Server can be started in regular Tokio runtime. [#408] +* Expose new `Server` type whose `Future` impl resolves when server stops. [#408] * Rename `Server` to `ServerHandle`. [#407] +* Add `Server::handle` to obtain handle to server. [#408] * Rename `ServerBuilder::{maxconn => max_concurrent_connections}`. [#407] +* Deprecate crate-level `new` shortcut for server builder. [#408] * Minimum supported Rust version (MSRV) is now 1.52. [#407]: https://github.com/actix/actix-net/pull/407 +[#408]: https://github.com/actix/actix-net/pull/408 ## 2.0.0-beta.6 - 2021-10-11 diff --git a/actix-server/src/lib.rs b/actix-server/src/lib.rs index 7862b81016..86db82d223 100644 --- a/actix-server/src/lib.rs +++ b/actix-server/src/lib.rs @@ -23,3 +23,10 @@ pub use self::test_server::TestServer; #[doc(hidden)] pub use self::socket::FromStream; + +/// Start server building process +#[doc(hidden)] +#[deprecated(since = "2.0.0", note = "Use `Server::build()`.")] +pub fn new() -> ServerBuilder { + ServerBuilder::default() +} From 010b557492978486b75e88b965a4c7b811e56a2a Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Wed, 3 Nov 2021 13:59:35 +0000 Subject: [PATCH 06/12] update rt changelog --- actix-rt/CHANGES.md | 3 +++ actix-rt/src/arbiter.rs | 2 +- actix-rt/src/system.rs | 2 +- actix-server/src/server.rs | 4 ++-- 4 files changed, 7 insertions(+), 4 deletions(-) diff --git a/actix-rt/CHANGES.md b/actix-rt/CHANGES.md index 642cf27a16..0ac9f24a54 100644 --- a/actix-rt/CHANGES.md +++ b/actix-rt/CHANGES.md @@ -1,6 +1,9 @@ # Changes ## Unreleased - 2021-xx-xx +* Add `Arbiter::try_current` for situations where thread may or may not have Arbiter context. [#408] + +[#408]: https://github.com/actix/actix-net/pull/408 ## 2.3.0 - 2021-10-11 diff --git a/actix-rt/src/arbiter.rs b/actix-rt/src/arbiter.rs index ec336cbf72..43b1bdc361 100644 --- a/actix-rt/src/arbiter.rs +++ b/actix-rt/src/arbiter.rs @@ -244,7 +244,7 @@ impl Arbiter { /// /// Returns `None` if no Arbiter has been started. /// - /// Contrary to [`current`](Self::current), this never panics. + /// Unlike [`current`](Self::current), this never panics. pub fn try_current() -> Option { HANDLE.with(|cell| cell.borrow().clone()) } diff --git a/actix-rt/src/system.rs b/actix-rt/src/system.rs index ebe0b34722..e32d620918 100644 --- a/actix-rt/src/system.rs +++ b/actix-rt/src/system.rs @@ -130,7 +130,7 @@ impl System { /// /// Returns `None` if no System has been started. /// - /// Contrary to [`current`](Self::current), this never panics. + /// Unlike [`current`](Self::current), this never panics. pub fn try_current() -> Option { CURRENT.with(|cell| cell.borrow().clone()) } diff --git a/actix-server/src/server.rs b/actix-server/src/server.rs index 3b37248e90..4122de6b05 100644 --- a/actix-server/src/server.rs +++ b/actix-server/src/server.rs @@ -260,7 +260,7 @@ impl ServerInner { self.waker_queue.wake(WakerInterest::Stop); // stop workers - let stop = self + let workers_stop = self .worker_handles .iter() .map(|worker| worker.stop(graceful)) @@ -269,7 +269,7 @@ impl ServerInner { Some(Box::pin(async move { if graceful { // wait for all workers to shut down - let _ = join_all(stop).await; + let _ = join_all(workers_stop).await; } if let Some(tx) = completion { From c33fb4ef9aba131c13382b71565e66d4f2761f82 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Wed, 3 Nov 2021 14:55:24 +0000 Subject: [PATCH 07/12] fix join_all --- actix-server/src/join_all.rs | 72 +++++++++++++++++++++++++++++++++++- actix-server/src/server.rs | 10 ++--- actix-server/src/worker.rs | 5 ++- 3 files changed, 77 insertions(+), 10 deletions(-) diff --git a/actix-server/src/join_all.rs b/actix-server/src/join_all.rs index aa47d72dff..ae68871ccc 100644 --- a/actix-server/src/join_all.rs +++ b/actix-server/src/join_all.rs @@ -4,13 +4,15 @@ use std::{ task::{Context, Poll}, }; +use futures_core::future::{BoxFuture, LocalBoxFuture}; + // a poor man's join future. joined future is only used when starting/stopping the server. // pin_project and pinned futures are overkill for this task. pub(crate) struct JoinAll { fut: Vec>, } -pub(crate) fn join_all(fut: Vec + 'static>) -> JoinAll { +pub(crate) fn join_all(fut: Vec + Send + 'static>) -> JoinAll { let fut = fut .into_iter() .map(|f| JoinFuture::Future(Box::pin(f))) @@ -20,7 +22,7 @@ pub(crate) fn join_all(fut: Vec + 'static>) -> JoinAl } enum JoinFuture { - Future(Pin>>), + Future(BoxFuture<'static, T>), Result(Option), } @@ -59,6 +61,63 @@ impl Future for JoinAll { } } +pub(crate) fn join_all_local( + fut: Vec + 'static>, +) -> JoinAllLocal { + let fut = fut + .into_iter() + .map(|f| JoinLocalFuture::LocalFuture(Box::pin(f))) + .collect(); + + JoinAllLocal { fut } +} + +// a poor man's join future. joined future is only used when starting/stopping the server. +// pin_project and pinned futures are overkill for this task. +pub(crate) struct JoinAllLocal { + fut: Vec>, +} + +enum JoinLocalFuture { + LocalFuture(LocalBoxFuture<'static, T>), + Result(Option), +} + +impl Unpin for JoinAllLocal {} + +impl Future for JoinAllLocal { + type Output = Vec; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let mut ready = true; + + let this = self.get_mut(); + for fut in this.fut.iter_mut() { + if let JoinLocalFuture::LocalFuture(f) = fut { + match f.as_mut().poll(cx) { + Poll::Ready(t) => { + *fut = JoinLocalFuture::Result(Some(t)); + } + Poll::Pending => ready = false, + } + } + } + + if ready { + let mut res = Vec::new(); + for fut in this.fut.iter_mut() { + if let JoinLocalFuture::Result(f) = fut { + res.push(f.take().unwrap()); + } + } + + Poll::Ready(res) + } else { + Poll::Pending + } + } +} + #[cfg(test)] mod test { use super::*; @@ -73,4 +132,13 @@ mod test { assert_eq!(Err(3), res.next().unwrap()); assert_eq!(Ok(9), res.next().unwrap()); } + + #[actix_rt::test] + async fn test_join_all_local() { + let futs = vec![ready(Ok(1)), ready(Err(3)), ready(Ok(9))]; + let mut res = join_all_local(futs).await.into_iter(); + assert_eq!(Ok(1), res.next().unwrap()); + assert_eq!(Err(3), res.next().unwrap()); + assert_eq!(Ok(9), res.next().unwrap()); + } } diff --git a/actix-server/src/server.rs b/actix-server/src/server.rs index 4122de6b05..e75cc53761 100644 --- a/actix-server/src/server.rs +++ b/actix-server/src/server.rs @@ -7,7 +7,7 @@ use std::{ }; use actix_rt::{time::sleep, System}; -use futures_core::future::LocalBoxFuture; +use futures_core::future::BoxFuture; use log::{error, info, trace}; use tokio::sync::{ mpsc::{UnboundedReceiver, UnboundedSender}, @@ -45,8 +45,6 @@ pub(crate) enum ServerCommand { }, } -// TODO: docs + must use - /// Server /// /// # Shutdown Signals @@ -232,11 +230,11 @@ pub struct ServerInner { cmd_rx: UnboundedReceiver, signals: Option, waker_queue: WakerQueue, - stop_task: Option>, + stop_task: Option>, } impl ServerInner { - fn handle_cmd(&mut self, item: ServerCommand) -> Option> { + fn handle_cmd(&mut self, item: ServerCommand) -> Option> { match item { ServerCommand::Pause(tx) => { self.waker_queue.wake(WakerInterest::Pause); @@ -319,7 +317,7 @@ impl ServerInner { } } - fn handle_signal(&mut self, signal: Signal) -> Option> { + fn handle_signal(&mut self, signal: Signal) -> Option> { match signal { Signal::Int => { info!("SIGINT received; starting forced shutdown"); diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 1c9d61357f..0fedd8b9ca 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -24,7 +24,7 @@ use tokio::sync::{ }; use crate::{ - join_all::join_all, + join_all::join_all_local, service::{BoxedServerService, InternalServiceFactory}, socket::MioStream, waker_queue::{WakerInterest, WakerQueue}, @@ -324,10 +324,11 @@ impl ServerWorker { // a second spawn to run !Send future tasks. spawn(async move { - let res = join_all(fut) + let res = join_all_local(fut) .await .into_iter() .collect::, _>>(); + let services = match res { Ok(res) => res .into_iter() From 512de2163f4edb47dd9a5de28d89aeaf3ffda827 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Wed, 3 Nov 2021 15:05:51 +0000 Subject: [PATCH 08/12] fix signal handling --- actix-server/src/server.rs | 8 ++------ actix-server/src/signals.rs | 24 +++++++++++++++++++++--- actix-server/src/worker.rs | 5 ++--- 3 files changed, 25 insertions(+), 12 deletions(-) diff --git a/actix-server/src/server.rs b/actix-server/src/server.rs index e75cc53761..58ee647f78 100644 --- a/actix-server/src/server.rs +++ b/actix-server/src/server.rs @@ -8,7 +8,7 @@ use std::{ use actix_rt::{time::sleep, System}; use futures_core::future::BoxFuture; -use log::{error, info, trace}; +use log::{error, info}; use tokio::sync::{ mpsc::{UnboundedReceiver, UnboundedSender}, oneshot, @@ -65,8 +65,6 @@ impl Server { } pub(crate) fn new(mut builder: ServerBuilder) -> Self { - trace!("start running server"); - let sockets = mem::take(&mut builder.sockets) .into_iter() .map(|t| (t.0, t.2)) @@ -93,12 +91,10 @@ impl Server { ); } - trace!("run server"); - match Accept::start(sockets, &builder) { Ok((waker_queue, worker_handles)) => { // construct OS signals listener future - let signals = (!builder.listen_os_signals).then(Signals::new); + let signals = (builder.listen_os_signals).then(Signals::new); Self::Server(ServerInner { cmd_tx: builder.cmd_tx.clone(), diff --git a/actix-server/src/signals.rs b/actix-server/src/signals.rs index 008218080d..e00c0a1c22 100644 --- a/actix-server/src/signals.rs +++ b/actix-server/src/signals.rs @@ -1,6 +1,11 @@ -use std::future::Future; -use std::pin::Pin; -use std::task::{Context, Poll}; +use std::{ + fmt, + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +use log::trace; /// Types of process signals. // #[allow(dead_code)] @@ -16,6 +21,16 @@ pub(crate) enum Signal { Quit, } +impl fmt::Display for Signal { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(match self { + Signal::Int => "SIGINT", + Signal::Term => "SIGTERM", + Signal::Quit => "SIGQUIT", + }) + } +} + /// Process signal listener. pub(crate) struct Signals { #[cfg(not(unix))] @@ -28,6 +43,8 @@ pub(crate) struct Signals { impl Signals { /// Constructs an OS signal listening future. pub(crate) fn new() -> Self { + trace!("setting up OS signal listener"); + #[cfg(not(unix))] { Signals { @@ -80,6 +97,7 @@ impl Future for Signals { for (sig, fut) in self.signals.iter_mut() { // TODO: match on if let Some ? if Pin::new(fut).poll_recv(cx).is_ready() { + trace!("{} received", sig); return Poll::Ready(*sig); } } diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index 0fedd8b9ca..c53217248c 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -460,7 +460,7 @@ struct Shutdown { /// Start time of shutdown. start_from: Instant, - /// Notify of the shutdown outcome (force/grace) to stop caller. + /// Notify caller of the shutdown outcome (graceful/force). tx: oneshot::Sender, } @@ -472,8 +472,7 @@ impl Default for WorkerState { impl Drop for ServerWorker { fn drop(&mut self) { - trace!("dropping ServerWorker"); - // Stop the Arbiter ServerWorker runs on on drop. + trace!("stopping ServerWorker Arbiter"); Arbiter::try_current().as_ref().map(ArbiterHandle::stop); } } From 351b37bdc84609a3b115e0b7145285f9dd52712c Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Wed, 3 Nov 2021 15:25:43 +0000 Subject: [PATCH 09/12] stop ignoring a test --- actix-server/src/builder.rs | 92 +++++++++++++++---------------- actix-server/tests/test_server.rs | 1 - 2 files changed, 46 insertions(+), 47 deletions(-) diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index 34288d1140..f0eef5c060 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -112,7 +112,6 @@ impl ServerBuilder { self.max_concurrent_connections(num) } - // TODO: wtf is this for /// Stop Actix system. pub fn system_exit(mut self) -> Self { self.exit = true; @@ -162,8 +161,53 @@ impl ServerBuilder { Ok(self) } + /// Add new service to the server. + pub fn listen>( + mut self, + name: N, + lst: StdTcpListener, + factory: F, + ) -> io::Result + where + F: ServiceFactory, + { + lst.set_nonblocking(true)?; + let addr = lst.local_addr()?; + + let token = self.next_token(); + self.factories.push(StreamNewService::create( + name.as_ref().to_string(), + token, + factory, + addr, + )); + + self.sockets + .push((token, name.as_ref().to_string(), MioListener::from(lst))); + + Ok(self) + } + + /// Starts processing incoming connections and return server controller. + pub fn run(self) -> Server { + if self.sockets.is_empty() { + panic!("Server should have at least one bound socket"); + } else { + info!("Starting {} workers", self.threads); + Server::new(self) + } + } + + fn next_token(&mut self) -> usize { + let token = self.token; + self.token += 1; + token + } +} + +#[cfg(unix)] +impl ServerBuilder { /// Add new unix domain service to the server. - #[cfg(unix)] pub fn bind_uds(self, name: N, addr: U, factory: F) -> io::Result where F: ServiceFactory, @@ -186,7 +230,6 @@ impl ServerBuilder { /// Add new unix domain service to the server. /// /// Useful when running as a systemd service and a socket FD is acquired externally. - #[cfg(unix)] pub fn listen_uds>( mut self, name: N, @@ -210,49 +253,6 @@ impl ServerBuilder { .push((token, name.as_ref().to_string(), MioListener::from(lst))); Ok(self) } - - /// Add new service to the server. - pub fn listen>( - mut self, - name: N, - lst: StdTcpListener, - factory: F, - ) -> io::Result - where - F: ServiceFactory, - { - lst.set_nonblocking(true)?; - let addr = lst.local_addr()?; - - let token = self.next_token(); - self.factories.push(StreamNewService::create( - name.as_ref().to_string(), - token, - factory, - addr, - )); - - self.sockets - .push((token, name.as_ref().to_string(), MioListener::from(lst))); - - Ok(self) - } - - /// Starts processing incoming connections and return server controller. - pub fn run(self) -> Server { - if self.sockets.is_empty() { - panic!("Server should have at least one bound socket"); - } else { - info!("Starting {} workers", self.threads); - Server::new(self) - } - } - - fn next_token(&mut self) -> usize { - let token = self.token; - self.token += 1; - token - } } pub(super) fn bind_addr( diff --git a/actix-server/tests/test_server.rs b/actix-server/tests/test_server.rs index ee9790a575..76d99e7e55 100644 --- a/actix-server/tests/test_server.rs +++ b/actix-server/tests/test_server.rs @@ -312,7 +312,6 @@ async fn test_service_restart() { h.join().unwrap().unwrap(); } -#[ignore] #[actix_rt::test] async fn worker_restart() { use actix_service::{Service, ServiceFactory}; From 5fe957d275aa213637027e7764ff3324f69b0a8b Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Wed, 3 Nov 2021 16:02:59 +0000 Subject: [PATCH 10/12] run worker restart on linux --- actix-server/tests/test_server.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/actix-server/tests/test_server.rs b/actix-server/tests/test_server.rs index 76d99e7e55..78bc64e4ff 100644 --- a/actix-server/tests/test_server.rs +++ b/actix-server/tests/test_server.rs @@ -312,6 +312,7 @@ async fn test_service_restart() { h.join().unwrap().unwrap(); } +#[cfg_attr(not(target_os = "linux"), ignore)] #[actix_rt::test] async fn worker_restart() { use actix_service::{Service, ServiceFactory}; From 3ba69afe9f57d64cf0200d08beb2c348a85c1279 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Wed, 3 Nov 2021 16:37:43 +0000 Subject: [PATCH 11/12] document server struct --- actix-server/examples/tcp-echo.rs | 3 +- actix-server/src/accept.rs | 3 +- actix-server/src/builder.rs | 2 +- actix-server/src/handle.rs | 53 ++++++++++++++ actix-server/src/lib.rs | 4 +- actix-server/src/server.rs | 115 +++++++++++++++++------------- 6 files changed, 123 insertions(+), 57 deletions(-) create mode 100644 actix-server/src/handle.rs diff --git a/actix-server/examples/tcp-echo.rs b/actix-server/examples/tcp-echo.rs index 01f58479ac..930ebf0af1 100644 --- a/actix-server/examples/tcp-echo.rs +++ b/actix-server/examples/tcp-echo.rs @@ -23,7 +23,7 @@ use actix_service::{fn_service, ServiceFactoryExt as _}; use bytes::BytesMut; use futures_util::future::ok; use log::{error, info}; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; +use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _}; async fn run() -> io::Result<()> { env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init(); @@ -83,7 +83,6 @@ async fn run() -> io::Result<()> { }) })? .workers(1) - // .system_exit() .run() .await } diff --git a/actix-server/src/accept.rs b/actix-server/src/accept.rs index 3bca430540..bdeb60047d 100644 --- a/actix-server/src/accept.rs +++ b/actix-server/src/accept.rs @@ -6,11 +6,10 @@ use mio::{Interest, Poll, Token as MioToken}; use crate::{ availability::Availability, - server::ServerHandle, socket::MioListener, waker_queue::{WakerInterest, WakerQueue, WAKER_TOKEN}, worker::{Conn, ServerWorker, WorkerHandleAccept, WorkerHandleServer}, - ServerBuilder, + ServerBuilder, ServerHandle, }; const TIMEOUT_DURATION_ON_ERROR: Duration = Duration::from_millis(510); diff --git a/actix-server/src/builder.rs b/actix-server/src/builder.rs index f0eef5c060..0d4abe784c 100644 --- a/actix-server/src/builder.rs +++ b/actix-server/src/builder.rs @@ -14,7 +14,7 @@ use crate::{ Server, }; -/// Server builder. +/// [Server] builder. pub struct ServerBuilder { pub(crate) threads: usize, pub(crate) token: usize, diff --git a/actix-server/src/handle.rs b/actix-server/src/handle.rs new file mode 100644 index 0000000000..55f2bb25d1 --- /dev/null +++ b/actix-server/src/handle.rs @@ -0,0 +1,53 @@ +use std::future::Future; + +use tokio::sync::{mpsc::UnboundedSender, oneshot}; + +use crate::server::ServerCommand; + +/// Server handle. +#[derive(Debug, Clone)] +pub struct ServerHandle { + tx_cmd: UnboundedSender, +} + +impl ServerHandle { + pub(crate) fn new(tx_cmd: UnboundedSender) -> Self { + ServerHandle { tx_cmd } + } + + pub(crate) fn worker_faulted(&self, idx: usize) { + let _ = self.tx_cmd.send(ServerCommand::WorkerFaulted(idx)); + } + + /// Pause accepting incoming connections. + /// + /// May drop socket pending connection. All open connections remain active. + pub fn pause(&self) -> impl Future { + let (tx, rx) = oneshot::channel(); + let _ = self.tx_cmd.send(ServerCommand::Pause(tx)); + async { + let _ = rx.await; + } + } + + /// Resume accepting incoming connections. + pub fn resume(&self) -> impl Future { + let (tx, rx) = oneshot::channel(); + let _ = self.tx_cmd.send(ServerCommand::Resume(tx)); + async { + let _ = rx.await; + } + } + + /// Stop incoming connection processing, stop all workers and exit. + pub fn stop(&self, graceful: bool) -> impl Future { + let (tx, rx) = oneshot::channel(); + let _ = self.tx_cmd.send(ServerCommand::Stop { + graceful, + completion: Some(tx), + }); + async { + let _ = rx.await; + } + } +} diff --git a/actix-server/src/lib.rs b/actix-server/src/lib.rs index 86db82d223..6ac8ba7eed 100644 --- a/actix-server/src/lib.rs +++ b/actix-server/src/lib.rs @@ -7,6 +7,7 @@ mod accept; mod availability; mod builder; +mod handle; mod join_all; mod server; mod service; @@ -17,7 +18,8 @@ mod waker_queue; mod worker; pub use self::builder::ServerBuilder; -pub use self::server::{Server, ServerHandle}; +pub use self::handle::ServerHandle; +pub use self::server::Server; pub use self::service::ServiceFactory; pub use self::test_server::TestServer; diff --git a/actix-server/src/server.rs b/actix-server/src/server.rs index 58ee647f78..f1edcb232f 100644 --- a/actix-server/src/server.rs +++ b/actix-server/src/server.rs @@ -22,6 +22,7 @@ use crate::{ signals::{Signal, Signals}, waker_queue::{WakerInterest, WakerQueue}, worker::{ServerWorker, ServerWorkerConfig, WorkerHandleServer}, + ServerHandle, }; #[derive(Debug)] @@ -45,13 +46,73 @@ pub(crate) enum ServerCommand { }, } -/// Server +/// General purpose TCP server that runs services receiving Tokio `TcpStream`s. +/// +/// Handles creating worker threads, restarting faulted workers, connection accepting, and +/// back-pressure logic. +/// +/// Creates a worker per CPU core (or the number specified in [`ServerBuilder::workers`]) and +/// distributes connections with a round-robin strategy. +/// +/// The [Server] must be awaited to process stop commands and listen for OS signals. It will resolve +/// when the server has fully shut down. /// /// # Shutdown Signals /// On UNIX systems, `SIGQUIT` will start a graceful shutdown and `SIGTERM` or `SIGINT` will start a -/// forced shutdown. On Windows, a CTRL-C signal will start a forced shutdown. +/// forced shutdown. On Windows, a Ctrl-C signal will start a forced shutdown. /// /// A graceful shutdown will wait for all workers to stop first. +/// +/// # Examples +/// The following is a TCP echo server. Test using `telnet 127.0.0.1 8080`. +/// +/// ```no_run +/// use std::io; +/// +/// use actix_rt::net::TcpStream; +/// use actix_server::Server; +/// use actix_service::{fn_service, ServiceFactoryExt as _}; +/// use bytes::BytesMut; +/// use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _}; +/// +/// #[actix_rt::main] +/// async fn main() -> io::Result<()> { +/// let bind_addr = ("127.0.0.1", 8080); +/// +/// Server::build() +/// .bind("echo", bind_addr, move || { +/// fn_service(move |mut stream: TcpStream| { +/// async move { +/// let mut size = 0; +/// let mut buf = BytesMut::new(); +/// +/// loop { +/// match stream.read_buf(&mut buf).await { +/// // end of stream; bail from loop +/// Ok(0) => break, +/// +/// // write bytes back to stream +/// Ok(bytes_read) => { +/// stream.write_all(&buf[size..]).await.unwrap(); +/// size += bytes_read; +/// } +/// +/// Err(err) => { +/// eprintln!("Stream Error: {:?}", err); +/// return Err(()); +/// } +/// } +/// } +/// +/// Ok(()) +/// } +/// }) +/// .map_err(|err| eprintln!("Service Error: {:?}", err)) +/// })? +/// .run() +/// .await +/// } +/// ``` #[must_use = "futures do nothing unless you `.await` or poll them"] pub enum Server { Server(ServerInner), @@ -59,7 +120,7 @@ pub enum Server { } impl Server { - /// Start server building process. + /// Create server build. pub fn build() -> ServerBuilder { ServerBuilder::default() } @@ -169,54 +230,6 @@ impl Future for Server { } } -/// Server handle. -#[derive(Debug, Clone)] -pub struct ServerHandle { - tx_cmd: UnboundedSender, -} - -impl ServerHandle { - pub(crate) fn new(tx_cmd: UnboundedSender) -> Self { - ServerHandle { tx_cmd } - } - - pub(crate) fn worker_faulted(&self, idx: usize) { - let _ = self.tx_cmd.send(ServerCommand::WorkerFaulted(idx)); - } - - /// Pause accepting incoming connections. - /// - /// May drop socket pending connection. All open connections remain active. - pub fn pause(&self) -> impl Future { - let (tx, rx) = oneshot::channel(); - let _ = self.tx_cmd.send(ServerCommand::Pause(tx)); - async { - let _ = rx.await; - } - } - - /// Resume accepting incoming connections. - pub fn resume(&self) -> impl Future { - let (tx, rx) = oneshot::channel(); - let _ = self.tx_cmd.send(ServerCommand::Resume(tx)); - async { - let _ = rx.await; - } - } - - /// Stop incoming connection processing, stop all workers and exit. - pub fn stop(&self, graceful: bool) -> impl Future { - let (tx, rx) = oneshot::channel(); - let _ = self.tx_cmd.send(ServerCommand::Stop { - graceful, - completion: Some(tx), - }); - async { - let _ = rx.await; - } - } -} - pub struct ServerInner { worker_handles: Vec, worker_config: ServerWorkerConfig, From a12871cebc79d8d74d8b7c2ebd4362fa933d3751 Mon Sep 17 00:00:00 2001 From: Rob Ede Date: Wed, 3 Nov 2021 17:22:27 +0000 Subject: [PATCH 12/12] allow dead code on signals --- actix-server/src/handle.rs | 14 +++++++------- actix-server/src/signals.rs | 1 + actix-server/src/worker.rs | 18 +++++++++--------- 3 files changed, 17 insertions(+), 16 deletions(-) diff --git a/actix-server/src/handle.rs b/actix-server/src/handle.rs index 55f2bb25d1..49d8eb017d 100644 --- a/actix-server/src/handle.rs +++ b/actix-server/src/handle.rs @@ -7,16 +7,16 @@ use crate::server::ServerCommand; /// Server handle. #[derive(Debug, Clone)] pub struct ServerHandle { - tx_cmd: UnboundedSender, + cmd_tx: UnboundedSender, } impl ServerHandle { - pub(crate) fn new(tx_cmd: UnboundedSender) -> Self { - ServerHandle { tx_cmd } + pub(crate) fn new(cmd_tx: UnboundedSender) -> Self { + ServerHandle { cmd_tx } } pub(crate) fn worker_faulted(&self, idx: usize) { - let _ = self.tx_cmd.send(ServerCommand::WorkerFaulted(idx)); + let _ = self.cmd_tx.send(ServerCommand::WorkerFaulted(idx)); } /// Pause accepting incoming connections. @@ -24,7 +24,7 @@ impl ServerHandle { /// May drop socket pending connection. All open connections remain active. pub fn pause(&self) -> impl Future { let (tx, rx) = oneshot::channel(); - let _ = self.tx_cmd.send(ServerCommand::Pause(tx)); + let _ = self.cmd_tx.send(ServerCommand::Pause(tx)); async { let _ = rx.await; } @@ -33,7 +33,7 @@ impl ServerHandle { /// Resume accepting incoming connections. pub fn resume(&self) -> impl Future { let (tx, rx) = oneshot::channel(); - let _ = self.tx_cmd.send(ServerCommand::Resume(tx)); + let _ = self.cmd_tx.send(ServerCommand::Resume(tx)); async { let _ = rx.await; } @@ -42,7 +42,7 @@ impl ServerHandle { /// Stop incoming connection processing, stop all workers and exit. pub fn stop(&self, graceful: bool) -> impl Future { let (tx, rx) = oneshot::channel(); - let _ = self.tx_cmd.send(ServerCommand::Stop { + let _ = self.cmd_tx.send(ServerCommand::Stop { graceful, completion: Some(tx), }); diff --git a/actix-server/src/signals.rs b/actix-server/src/signals.rs index e00c0a1c22..4013d7f220 100644 --- a/actix-server/src/signals.rs +++ b/actix-server/src/signals.rs @@ -10,6 +10,7 @@ use log::trace; /// Types of process signals. // #[allow(dead_code)] #[derive(Debug, Clone, Copy, PartialEq)] +#[allow(dead_code)] // variants are never constructed on non-unix pub(crate) enum Signal { /// `SIGINT` Int, diff --git a/actix-server/src/worker.rs b/actix-server/src/worker.rs index c53217248c..c156444bd5 100644 --- a/actix-server/src/worker.rs +++ b/actix-server/src/worker.rs @@ -43,20 +43,20 @@ pub(crate) struct Conn { pub token: usize, } -/// +/// Create accept and server worker handles. fn handle_pair( idx: usize, - tx_conn: UnboundedSender, - tx_stop: UnboundedSender, + conn_tx: UnboundedSender, + stop_tx: UnboundedSender, counter: Counter, ) -> (WorkerHandleAccept, WorkerHandleServer) { let accept = WorkerHandleAccept { idx, - tx_conn, + conn_tx, counter, }; - let server = WorkerHandleServer { idx, tx_stop }; + let server = WorkerHandleServer { idx, stop_tx }; (accept, server) } @@ -158,7 +158,7 @@ impl Drop for WorkerCounterGuard { /// Held by [Accept](crate::accept::Accept). pub(crate) struct WorkerHandleAccept { idx: usize, - tx_conn: UnboundedSender, + conn_tx: UnboundedSender, counter: Counter, } @@ -170,7 +170,7 @@ impl WorkerHandleAccept { #[inline(always)] pub(crate) fn send(&self, conn: Conn) -> Result<(), Conn> { - self.tx_conn.send(conn).map_err(|msg| msg.0) + self.conn_tx.send(conn).map_err(|msg| msg.0) } #[inline(always)] @@ -185,13 +185,13 @@ impl WorkerHandleAccept { #[derive(Debug)] pub(crate) struct WorkerHandleServer { pub(crate) idx: usize, - tx_stop: UnboundedSender, + stop_tx: UnboundedSender, } impl WorkerHandleServer { pub(crate) fn stop(&self, graceful: bool) -> oneshot::Receiver { let (tx, rx) = oneshot::channel(); - let _ = self.tx_stop.send(Stop { graceful, tx }); + let _ = self.stop_tx.send(Stop { graceful, tx }); rx } }