Skip to content

Commit

Permalink
minimal support of System type with io-uring (#395)
Browse files Browse the repository at this point in the history
  • Loading branch information
fakeshadow committed Oct 21, 2021
1 parent 70ea532 commit a1d15f2
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 92 deletions.
7 changes: 3 additions & 4 deletions .github/workflows/ci.yml
Expand Up @@ -112,9 +112,7 @@ jobs:
- name: tests
if: matrix.target.os == 'ubuntu-latest'
run: |
cargo ci-test
cargo ci-test-rt-linux
cargo ci-test-server-linux
sudo bash -c "ulimit -Sl 512 && ulimit -Hl 512 && PATH=$PATH:/usr/share/rust/.cargo/bin && RUSTUP_TOOLCHAIN=${{ matrix.version }} cargo ci-test && RUSTUP_TOOLCHAIN=${{ matrix.version }} cargo ci-test-rt-linux && RUSTUP_TOOLCHAIN=${{ matrix.version }} cargo ci-test-server-linux"
- name: Clear the cargo caches
run: |
Expand All @@ -141,7 +139,8 @@ jobs:
args: cargo-hack

- name: tests
run: cargo ci-test-lower-msrv
run: |
sudo bash -c "ulimit -Sl 512 && ulimit -Hl 512 && PATH=$PATH:/usr/share/rust/.cargo/bin && RUSTUP_TOOLCHAIN=1.46 cargo ci-test-lower-msrv"
- name: Clear the cargo caches
run: |
Expand Down
2 changes: 1 addition & 1 deletion actix-rt/src/lib.rs
Expand Up @@ -15,7 +15,7 @@
//! blocking task thread-pool using [`task::spawn_blocking`].
//!
//! # Examples
//! ```
//! ```no_run
//! use std::sync::mpsc;
//! use actix_rt::{Arbiter, System};
//!
Expand Down
75 changes: 71 additions & 4 deletions actix-rt/src/system.rs
Expand Up @@ -11,7 +11,7 @@ use std::{
use futures_core::ready;
use tokio::sync::{mpsc, oneshot};

use crate::{arbiter::ArbiterHandle, runtime::default_tokio_runtime, Arbiter, Runtime};
use crate::{arbiter::ArbiterHandle, Arbiter};

static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);

Expand All @@ -29,6 +29,7 @@ pub struct System {
arbiter_handle: ArbiterHandle,
}

#[cfg(not(feature = "io-uring"))]
impl System {
/// Create a new system.
///
Expand All @@ -37,7 +38,7 @@ impl System {
#[allow(clippy::new_ret_no_self)]
pub fn new() -> SystemRunner {
Self::with_tokio_rt(|| {
default_tokio_runtime()
crate::runtime::default_tokio_runtime()
.expect("Default Actix (Tokio) runtime could not be created.")
})
}
Expand All @@ -53,7 +54,7 @@ impl System {
let (stop_tx, stop_rx) = oneshot::channel();
let (sys_tx, sys_rx) = mpsc::unbounded_channel();

let rt = Runtime::from(runtime_factory());
let rt = crate::runtime::Runtime::from(runtime_factory());
let sys_arbiter = rt.block_on(async { Arbiter::in_new_system() });
let system = System::construct(sys_tx, sys_arbiter.clone());

Expand All @@ -72,7 +73,32 @@ impl System {
system,
}
}
}

#[cfg(feature = "io-uring")]
impl System {
/// Create a new system.
///
/// # Panics
/// Panics if underlying Tokio runtime can not be created.
#[allow(clippy::new_ret_no_self)]
pub fn new() -> SystemRunner {
SystemRunner
}

/// Create a new System using the [Tokio Runtime](tokio-runtime) returned from a closure.
///
/// [tokio-runtime]: tokio::runtime::Runtime
#[doc(hidden)]
pub fn with_tokio_rt<F>(_: F) -> SystemRunner
where
F: Fn() -> tokio::runtime::Runtime,
{
unimplemented!("System::with_tokio_rt is not implemented yet")
}
}

impl System {
/// Constructs new system and registers it on the current thread.
pub(crate) fn construct(
sys_tx: mpsc::UnboundedSender<SystemCommand>,
Expand Down Expand Up @@ -149,16 +175,18 @@ impl System {
}
}

#[cfg(not(feature = "io-uring"))]
/// Runner that keeps a [System]'s event loop alive until stop message is received.
#[must_use = "A SystemRunner does nothing unless `run` is called."]
#[derive(Debug)]
pub struct SystemRunner {
rt: Runtime,
rt: crate::runtime::Runtime,
stop_rx: oneshot::Receiver<i32>,
#[allow(dead_code)]
system: System,
}

#[cfg(not(feature = "io-uring"))]
impl SystemRunner {
/// Starts event loop and will return once [System] is [stopped](System::stop).
pub fn run(self) -> io::Result<()> {
Expand Down Expand Up @@ -188,6 +216,45 @@ impl SystemRunner {
}
}

#[cfg(feature = "io-uring")]
/// Runner that keeps a [System]'s event loop alive until stop message is received.
#[must_use = "A SystemRunner does nothing unless `run` is called."]
#[derive(Debug)]
pub struct SystemRunner;

#[cfg(feature = "io-uring")]
impl SystemRunner {
/// Starts event loop and will return once [System] is [stopped](System::stop).
pub fn run(self) -> io::Result<()> {
unimplemented!("SystemRunner::run is not implemented yet")
}

/// Runs the provided future, blocking the current thread until the future completes.
#[inline]
pub fn block_on<F: Future>(&self, fut: F) -> F::Output {
tokio_uring::start(async move {
let (stop_tx, stop_rx) = oneshot::channel();
let (sys_tx, sys_rx) = mpsc::unbounded_channel();

let sys_arbiter = Arbiter::in_new_system();
let system = System::construct(sys_tx, sys_arbiter.clone());

system
.tx()
.send(SystemCommand::RegisterArbiter(usize::MAX, sys_arbiter))
.unwrap();

// init background system arbiter
let sys_ctrl = SystemController::new(sys_rx, stop_tx);
tokio_uring::spawn(sys_ctrl);

let res = fut.await;
drop(stop_rx);
res
})
}
}

#[derive(Debug)]
pub(crate) enum SystemCommand {
Exit(i32),
Expand Down
77 changes: 44 additions & 33 deletions actix-rt/tests/tests.rs
@@ -1,12 +1,15 @@
use std::{
future::Future,
sync::mpsc::channel,
thread,
time::{Duration, Instant},
};

use actix_rt::{task::JoinError, Arbiter, System};
use tokio::sync::oneshot;

#[cfg(not(feature = "io-uring"))]
use {
std::{sync::mpsc::channel, thread},
tokio::sync::oneshot,
};

#[test]
fn await_for_timer() {
Expand Down Expand Up @@ -103,6 +106,10 @@ fn wait_for_spawns() {
assert!(rt.block_on(handle).is_err());
}

// Temporary disabled tests for io-uring feature.
// They should be enabled when possible.

#[cfg(not(feature = "io-uring"))]
#[test]
fn arbiter_spawn_fn_runs() {
let _ = System::new();
Expand All @@ -119,6 +126,7 @@ fn arbiter_spawn_fn_runs() {
arbiter.join().unwrap();
}

#[cfg(not(feature = "io-uring"))]
#[test]
fn arbiter_handle_spawn_fn_runs() {
let sys = System::new();
Expand All @@ -141,6 +149,7 @@ fn arbiter_handle_spawn_fn_runs() {
sys.run().unwrap();
}

#[cfg(not(feature = "io-uring"))]
#[test]
fn arbiter_drop_no_panic_fn() {
let _ = System::new();
Expand All @@ -152,6 +161,7 @@ fn arbiter_drop_no_panic_fn() {
arbiter.join().unwrap();
}

#[cfg(not(feature = "io-uring"))]
#[test]
fn arbiter_drop_no_panic_fut() {
let _ = System::new();
Expand All @@ -163,18 +173,7 @@ fn arbiter_drop_no_panic_fut() {
arbiter.join().unwrap();
}

#[test]
#[should_panic]
fn no_system_current_panic() {
System::current();
}

#[test]
#[should_panic]
fn no_system_arbiter_new_panic() {
Arbiter::new();
}

#[cfg(not(feature = "io-uring"))]
#[test]
fn system_arbiter_spawn() {
let runner = System::new();
Expand Down Expand Up @@ -205,6 +204,7 @@ fn system_arbiter_spawn() {
thread.join().unwrap();
}

#[cfg(not(feature = "io-uring"))]
#[test]
fn system_stop_stops_arbiters() {
let sys = System::new();
Expand Down Expand Up @@ -293,6 +293,18 @@ fn new_arbiter_with_tokio() {
assert!(!counter.load(Ordering::SeqCst));
}

#[test]
#[should_panic]
fn no_system_current_panic() {
System::current();
}

#[test]
#[should_panic]
fn no_system_arbiter_new_panic() {
Arbiter::new();
}

#[test]
fn try_current_no_system() {
assert!(System::try_current().is_none())
Expand Down Expand Up @@ -330,28 +342,27 @@ fn spawn_local() {
#[cfg(all(target_os = "linux", feature = "io-uring"))]
#[test]
fn tokio_uring_arbiter() {
let system = System::new();
let (tx, rx) = std::sync::mpsc::channel();

Arbiter::new().spawn(async move {
let handle = actix_rt::spawn(async move {
let f = tokio_uring::fs::File::create("test.txt").await.unwrap();
let buf = b"Hello World!";
System::new().block_on(async {
let (tx, rx) = std::sync::mpsc::channel();

let (res, _) = f.write_at(&buf[..], 0).await;
assert!(res.is_ok());
Arbiter::new().spawn(async move {
let handle = actix_rt::spawn(async move {
let f = tokio_uring::fs::File::create("test.txt").await.unwrap();
let buf = b"Hello World!";

f.sync_all().await.unwrap();
f.close().await.unwrap();
let (res, _) = f.write_at(&buf[..], 0).await;
assert!(res.is_ok());

std::fs::remove_file("test.txt").unwrap();
});
f.sync_all().await.unwrap();
f.close().await.unwrap();

handle.await.unwrap();
tx.send(true).unwrap();
});
std::fs::remove_file("test.txt").unwrap();
});

assert!(rx.recv().unwrap());
handle.await.unwrap();
tx.send(true).unwrap();
});

drop(system);
assert!(rx.recv().unwrap());
})
}

0 comments on commit a1d15f2

Please sign in to comment.