diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 9a0b9c1f92..a1353f6032 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -112,9 +112,7 @@ jobs: - name: tests if: matrix.target.os == 'ubuntu-latest' run: | - cargo ci-test - cargo ci-test-rt-linux - cargo ci-test-server-linux + sudo bash -c "ulimit -Sl 512 && ulimit -Hl 512 && PATH=$PATH:/usr/share/rust/.cargo/bin && RUSTUP_TOOLCHAIN=${{ matrix.version }} cargo ci-test && RUSTUP_TOOLCHAIN=${{ matrix.version }} cargo ci-test-rt-linux && RUSTUP_TOOLCHAIN=${{ matrix.version }} cargo ci-test-server-linux" - name: Clear the cargo caches run: | @@ -141,7 +139,8 @@ jobs: args: cargo-hack - name: tests - run: cargo ci-test-lower-msrv + run: | + sudo bash -c "ulimit -Sl 512 && ulimit -Hl 512 && PATH=$PATH:/usr/share/rust/.cargo/bin && RUSTUP_TOOLCHAIN=1.46 cargo ci-test-lower-msrv" - name: Clear the cargo caches run: | diff --git a/actix-rt/src/lib.rs b/actix-rt/src/lib.rs index e078dd0624..590335c161 100644 --- a/actix-rt/src/lib.rs +++ b/actix-rt/src/lib.rs @@ -15,7 +15,7 @@ //! blocking task thread-pool using [`task::spawn_blocking`]. //! //! # Examples -//! ``` +//! ```no_run //! use std::sync::mpsc; //! use actix_rt::{Arbiter, System}; //! diff --git a/actix-rt/src/system.rs b/actix-rt/src/system.rs index 2b44ad6a4e..23d692a431 100644 --- a/actix-rt/src/system.rs +++ b/actix-rt/src/system.rs @@ -11,7 +11,7 @@ use std::{ use futures_core::ready; use tokio::sync::{mpsc, oneshot}; -use crate::{arbiter::ArbiterHandle, runtime::default_tokio_runtime, Arbiter, Runtime}; +use crate::{arbiter::ArbiterHandle, Arbiter}; static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0); @@ -29,6 +29,7 @@ pub struct System { arbiter_handle: ArbiterHandle, } +#[cfg(not(feature = "io-uring"))] impl System { /// Create a new system. /// @@ -37,7 +38,7 @@ impl System { #[allow(clippy::new_ret_no_self)] pub fn new() -> SystemRunner { Self::with_tokio_rt(|| { - default_tokio_runtime() + crate::runtime::default_tokio_runtime() .expect("Default Actix (Tokio) runtime could not be created.") }) } @@ -53,7 +54,7 @@ impl System { let (stop_tx, stop_rx) = oneshot::channel(); let (sys_tx, sys_rx) = mpsc::unbounded_channel(); - let rt = Runtime::from(runtime_factory()); + let rt = crate::runtime::Runtime::from(runtime_factory()); let sys_arbiter = rt.block_on(async { Arbiter::in_new_system() }); let system = System::construct(sys_tx, sys_arbiter.clone()); @@ -72,7 +73,32 @@ impl System { system, } } +} +#[cfg(feature = "io-uring")] +impl System { + /// Create a new system. + /// + /// # Panics + /// Panics if underlying Tokio runtime can not be created. + #[allow(clippy::new_ret_no_self)] + pub fn new() -> SystemRunner { + SystemRunner + } + + /// Create a new System using the [Tokio Runtime](tokio-runtime) returned from a closure. + /// + /// [tokio-runtime]: tokio::runtime::Runtime + #[doc(hidden)] + pub fn with_tokio_rt(_: F) -> SystemRunner + where + F: Fn() -> tokio::runtime::Runtime, + { + unimplemented!("System::with_tokio_rt is not implemented yet") + } +} + +impl System { /// Constructs new system and registers it on the current thread. pub(crate) fn construct( sys_tx: mpsc::UnboundedSender, @@ -149,16 +175,18 @@ impl System { } } +#[cfg(not(feature = "io-uring"))] /// Runner that keeps a [System]'s event loop alive until stop message is received. #[must_use = "A SystemRunner does nothing unless `run` is called."] #[derive(Debug)] pub struct SystemRunner { - rt: Runtime, + rt: crate::runtime::Runtime, stop_rx: oneshot::Receiver, #[allow(dead_code)] system: System, } +#[cfg(not(feature = "io-uring"))] impl SystemRunner { /// Starts event loop and will return once [System] is [stopped](System::stop). pub fn run(self) -> io::Result<()> { @@ -188,6 +216,45 @@ impl SystemRunner { } } +#[cfg(feature = "io-uring")] +/// Runner that keeps a [System]'s event loop alive until stop message is received. +#[must_use = "A SystemRunner does nothing unless `run` is called."] +#[derive(Debug)] +pub struct SystemRunner; + +#[cfg(feature = "io-uring")] +impl SystemRunner { + /// Starts event loop and will return once [System] is [stopped](System::stop). + pub fn run(self) -> io::Result<()> { + unimplemented!("SystemRunner::run is not implemented yet") + } + + /// Runs the provided future, blocking the current thread until the future completes. + #[inline] + pub fn block_on(&self, fut: F) -> F::Output { + tokio_uring::start(async move { + let (stop_tx, stop_rx) = oneshot::channel(); + let (sys_tx, sys_rx) = mpsc::unbounded_channel(); + + let sys_arbiter = Arbiter::in_new_system(); + let system = System::construct(sys_tx, sys_arbiter.clone()); + + system + .tx() + .send(SystemCommand::RegisterArbiter(usize::MAX, sys_arbiter)) + .unwrap(); + + // init background system arbiter + let sys_ctrl = SystemController::new(sys_rx, stop_tx); + tokio_uring::spawn(sys_ctrl); + + let res = fut.await; + drop(stop_rx); + res + }) + } +} + #[derive(Debug)] pub(crate) enum SystemCommand { Exit(i32), diff --git a/actix-rt/tests/tests.rs b/actix-rt/tests/tests.rs index 5fe1e89400..8395022182 100644 --- a/actix-rt/tests/tests.rs +++ b/actix-rt/tests/tests.rs @@ -1,12 +1,15 @@ use std::{ future::Future, - sync::mpsc::channel, - thread, time::{Duration, Instant}, }; use actix_rt::{task::JoinError, Arbiter, System}; -use tokio::sync::oneshot; + +#[cfg(not(feature = "io-uring"))] +use { + std::{sync::mpsc::channel, thread}, + tokio::sync::oneshot, +}; #[test] fn await_for_timer() { @@ -103,6 +106,10 @@ fn wait_for_spawns() { assert!(rt.block_on(handle).is_err()); } +// Temporary disabled tests for io-uring feature. +// They should be enabled when possible. + +#[cfg(not(feature = "io-uring"))] #[test] fn arbiter_spawn_fn_runs() { let _ = System::new(); @@ -119,6 +126,7 @@ fn arbiter_spawn_fn_runs() { arbiter.join().unwrap(); } +#[cfg(not(feature = "io-uring"))] #[test] fn arbiter_handle_spawn_fn_runs() { let sys = System::new(); @@ -141,6 +149,7 @@ fn arbiter_handle_spawn_fn_runs() { sys.run().unwrap(); } +#[cfg(not(feature = "io-uring"))] #[test] fn arbiter_drop_no_panic_fn() { let _ = System::new(); @@ -152,6 +161,7 @@ fn arbiter_drop_no_panic_fn() { arbiter.join().unwrap(); } +#[cfg(not(feature = "io-uring"))] #[test] fn arbiter_drop_no_panic_fut() { let _ = System::new(); @@ -163,18 +173,7 @@ fn arbiter_drop_no_panic_fut() { arbiter.join().unwrap(); } -#[test] -#[should_panic] -fn no_system_current_panic() { - System::current(); -} - -#[test] -#[should_panic] -fn no_system_arbiter_new_panic() { - Arbiter::new(); -} - +#[cfg(not(feature = "io-uring"))] #[test] fn system_arbiter_spawn() { let runner = System::new(); @@ -205,6 +204,7 @@ fn system_arbiter_spawn() { thread.join().unwrap(); } +#[cfg(not(feature = "io-uring"))] #[test] fn system_stop_stops_arbiters() { let sys = System::new(); @@ -293,6 +293,18 @@ fn new_arbiter_with_tokio() { assert!(!counter.load(Ordering::SeqCst)); } +#[test] +#[should_panic] +fn no_system_current_panic() { + System::current(); +} + +#[test] +#[should_panic] +fn no_system_arbiter_new_panic() { + Arbiter::new(); +} + #[test] fn try_current_no_system() { assert!(System::try_current().is_none()) @@ -330,28 +342,27 @@ fn spawn_local() { #[cfg(all(target_os = "linux", feature = "io-uring"))] #[test] fn tokio_uring_arbiter() { - let system = System::new(); - let (tx, rx) = std::sync::mpsc::channel(); - - Arbiter::new().spawn(async move { - let handle = actix_rt::spawn(async move { - let f = tokio_uring::fs::File::create("test.txt").await.unwrap(); - let buf = b"Hello World!"; + System::new().block_on(async { + let (tx, rx) = std::sync::mpsc::channel(); - let (res, _) = f.write_at(&buf[..], 0).await; - assert!(res.is_ok()); + Arbiter::new().spawn(async move { + let handle = actix_rt::spawn(async move { + let f = tokio_uring::fs::File::create("test.txt").await.unwrap(); + let buf = b"Hello World!"; - f.sync_all().await.unwrap(); - f.close().await.unwrap(); + let (res, _) = f.write_at(&buf[..], 0).await; + assert!(res.is_ok()); - std::fs::remove_file("test.txt").unwrap(); - }); + f.sync_all().await.unwrap(); + f.close().await.unwrap(); - handle.await.unwrap(); - tx.send(true).unwrap(); - }); + std::fs::remove_file("test.txt").unwrap(); + }); - assert!(rx.recv().unwrap()); + handle.await.unwrap(); + tx.send(true).unwrap(); + }); - drop(system); + assert!(rx.recv().unwrap()); + }) } diff --git a/actix-server/tests/test_server.rs b/actix-server/tests/test_server.rs index 788948161d..315e3eff18 100644 --- a/actix-server/tests/test_server.rs +++ b/actix-server/tests/test_server.rs @@ -5,8 +5,6 @@ use std::{net, thread, time::Duration}; use actix_rt::{net::TcpStream, time::sleep}; use actix_server::Server; use actix_service::fn_service; -use actix_utils::future::ok; -use futures_util::future::lazy; fn unused_addr() -> net::SocketAddr { let addr: net::SocketAddr = "127.0.0.1:0".parse().unwrap(); @@ -23,25 +21,28 @@ fn test_bind() { let (tx, rx) = mpsc::channel(); let h = thread::spawn(move || { - let sys = actix_rt::System::new(); - let srv = sys.block_on(lazy(|_| { - Server::build() + actix_rt::System::new().block_on(async { + let srv = Server::build() .workers(1) .disable_signals() - .bind("test", addr, move || fn_service(|_| ok::<_, ()>(()))) - .unwrap() - .run() - })); + .bind("test", addr, move || { + fn_service(|_| async { Ok::<_, ()>(()) }) + })? + .run(); + + let _ = tx.send((srv.clone(), actix_rt::System::current())); - let _ = tx.send((srv, actix_rt::System::current())); - let _ = sys.run(); + srv.await + }) }); - let (_, sys) = rx.recv().unwrap(); + let (srv, sys) = rx.recv().unwrap(); thread::sleep(Duration::from_millis(500)); assert!(net::TcpStream::connect(addr).is_ok()); + + let _ = srv.stop(true); sys.stop(); - let _ = h.join(); + h.join().unwrap().unwrap(); } #[test] @@ -50,25 +51,30 @@ fn test_listen() { let (tx, rx) = mpsc::channel(); let h = thread::spawn(move || { - let sys = actix_rt::System::new(); - let lst = net::TcpListener::bind(addr).unwrap(); - sys.block_on(async { - Server::build() + let lst = net::TcpListener::bind(addr)?; + actix_rt::System::new().block_on(async { + let srv = Server::build() .disable_signals() .workers(1) - .listen("test", lst, move || fn_service(|_| ok::<_, ()>(()))) - .unwrap() + .listen("test", lst, move || { + fn_service(|_| async { Ok::<_, ()>(()) }) + })? .run(); - let _ = tx.send(actix_rt::System::current()); - }); - let _ = sys.run(); + + let _ = tx.send((srv.clone(), actix_rt::System::current())); + + srv.await + }) }); - let sys = rx.recv().unwrap(); + + let (srv, sys) = rx.recv().unwrap(); thread::sleep(Duration::from_millis(500)); assert!(net::TcpStream::connect(addr).is_ok()); + + let _ = srv.stop(true); sys.stop(); - let _ = h.join(); + h.join().unwrap().unwrap(); } #[test] @@ -84,9 +90,8 @@ fn test_start() { let (tx, rx) = mpsc::channel(); let h = thread::spawn(move || { - let sys = actix_rt::System::new(); - let srv = sys.block_on(lazy(|_| { - Server::build() + actix_rt::System::new().block_on(async { + let srv = Server::build() .backlog(100) .disable_signals() .bind("test", addr, move || { @@ -95,13 +100,13 @@ fn test_start() { f.send(Bytes::from_static(b"test")).await.unwrap(); Ok::<_, ()>(()) }) - }) - .unwrap() - .run() - })); + })? + .run(); + + let _ = tx.send((srv.clone(), actix_rt::System::current())); - let _ = tx.send((srv, actix_rt::System::current())); - let _ = sys.run(); + srv.await + }) }); let (srv, sys) = rx.recv().unwrap(); @@ -134,12 +139,11 @@ fn test_start() { // stop let _ = srv.stop(false); - thread::sleep(Duration::from_millis(100)); - assert!(net::TcpStream::connect(addr).is_err()); - - thread::sleep(Duration::from_millis(100)); sys.stop(); - let _ = h.join(); + h.join().unwrap().unwrap(); + + thread::sleep(Duration::from_secs(1)); + assert!(net::TcpStream::connect(addr).is_err()); } #[actix_rt::test] @@ -209,9 +213,8 @@ async fn test_max_concurrent_connections() { } srv.stop(false).await; - sys.stop(); - let _ = h.join().unwrap(); + h.join().unwrap().unwrap(); } #[actix_rt::test] @@ -266,16 +269,14 @@ async fn test_service_restart() { let num = num.clone(); async move { Ok::<_, ()>(TestService(num)) } }) - }) - .unwrap() + })? .bind("addr2", addr2, move || { let num2 = num2.clone(); fn_factory(move || { let num2 = num2.clone(); async move { Ok::<_, ()>(TestService(num2)) } }) - }) - .unwrap() + })? .workers(1) .run(); @@ -306,9 +307,9 @@ async fn test_service_restart() { assert!(num_clone.load(Ordering::SeqCst) > 5); assert!(num2_clone.load(Ordering::SeqCst) > 5); - sys.stop(); let _ = server.stop(false); - let _ = h.join().unwrap(); + sys.stop(); + h.join().unwrap().unwrap(); } #[ignore] @@ -380,12 +381,12 @@ async fn worker_restart() { actix_rt::System::new().block_on(async { let server = Server::build() .disable_signals() - .bind("addr", addr, move || TestServiceFactory(counter.clone())) - .unwrap() + .bind("addr", addr, move || TestServiceFactory(counter.clone()))? .workers(2) .run(); let _ = tx.send((server.clone(), actix_rt::System::current())); + server.await }) }); @@ -447,7 +448,7 @@ async fn worker_restart() { assert_eq!("3", id); stream.shutdown().await.unwrap(); - sys.stop(); let _ = server.stop(false); - let _ = h.join().unwrap(); + sys.stop(); + h.join().unwrap().unwrap(); }