Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

minimal support of System type with io-uring #395

Merged
merged 10 commits into from Oct 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
7 changes: 3 additions & 4 deletions .github/workflows/ci.yml
Expand Up @@ -112,9 +112,7 @@ jobs:
- name: tests
if: matrix.target.os == 'ubuntu-latest'
run: |
cargo ci-test
cargo ci-test-rt-linux
cargo ci-test-server-linux
sudo bash -c "ulimit -Sl 512 && ulimit -Hl 512 && PATH=$PATH:/usr/share/rust/.cargo/bin && RUSTUP_TOOLCHAIN=${{ matrix.version }} cargo ci-test && RUSTUP_TOOLCHAIN=${{ matrix.version }} cargo ci-test-rt-linux && RUSTUP_TOOLCHAIN=${{ matrix.version }} cargo ci-test-server-linux"

- name: Clear the cargo caches
run: |
Expand All @@ -141,7 +139,8 @@ jobs:
args: cargo-hack

- name: tests
run: cargo ci-test-lower-msrv
run: |
sudo bash -c "ulimit -Sl 512 && ulimit -Hl 512 && PATH=$PATH:/usr/share/rust/.cargo/bin && RUSTUP_TOOLCHAIN=1.46 cargo ci-test-lower-msrv"

- name: Clear the cargo caches
run: |
Expand Down
2 changes: 1 addition & 1 deletion actix-rt/src/lib.rs
Expand Up @@ -15,7 +15,7 @@
//! blocking task thread-pool using [`task::spawn_blocking`].
//!
//! # Examples
//! ```
//! ```no_run
//! use std::sync::mpsc;
//! use actix_rt::{Arbiter, System};
//!
Expand Down
75 changes: 71 additions & 4 deletions actix-rt/src/system.rs
Expand Up @@ -11,7 +11,7 @@ use std::{
use futures_core::ready;
use tokio::sync::{mpsc, oneshot};

use crate::{arbiter::ArbiterHandle, runtime::default_tokio_runtime, Arbiter, Runtime};
use crate::{arbiter::ArbiterHandle, Arbiter};

static SYSTEM_COUNT: AtomicUsize = AtomicUsize::new(0);

Expand All @@ -29,6 +29,7 @@ pub struct System {
arbiter_handle: ArbiterHandle,
}

#[cfg(not(feature = "io-uring"))]
impl System {
/// Create a new system.
///
Expand All @@ -37,7 +38,7 @@ impl System {
#[allow(clippy::new_ret_no_self)]
pub fn new() -> SystemRunner {
Self::with_tokio_rt(|| {
default_tokio_runtime()
crate::runtime::default_tokio_runtime()
.expect("Default Actix (Tokio) runtime could not be created.")
})
}
Expand All @@ -53,7 +54,7 @@ impl System {
let (stop_tx, stop_rx) = oneshot::channel();
let (sys_tx, sys_rx) = mpsc::unbounded_channel();

let rt = Runtime::from(runtime_factory());
let rt = crate::runtime::Runtime::from(runtime_factory());
let sys_arbiter = rt.block_on(async { Arbiter::in_new_system() });
let system = System::construct(sys_tx, sys_arbiter.clone());

Expand All @@ -72,7 +73,32 @@ impl System {
system,
}
}
}

#[cfg(feature = "io-uring")]
impl System {
/// Create a new system.
///
/// # Panics
/// Panics if underlying Tokio runtime can not be created.
#[allow(clippy::new_ret_no_self)]
pub fn new() -> SystemRunner {
SystemRunner
}

/// Create a new System using the [Tokio Runtime](tokio-runtime) returned from a closure.
///
/// [tokio-runtime]: tokio::runtime::Runtime
#[doc(hidden)]
pub fn with_tokio_rt<F>(_: F) -> SystemRunner
where
F: Fn() -> tokio::runtime::Runtime,
{
unimplemented!("System::with_tokio_rt is not implemented yet")
}
}

impl System {
/// Constructs new system and registers it on the current thread.
pub(crate) fn construct(
sys_tx: mpsc::UnboundedSender<SystemCommand>,
Expand Down Expand Up @@ -149,16 +175,18 @@ impl System {
}
}

#[cfg(not(feature = "io-uring"))]
/// Runner that keeps a [System]'s event loop alive until stop message is received.
#[must_use = "A SystemRunner does nothing unless `run` is called."]
#[derive(Debug)]
pub struct SystemRunner {
rt: Runtime,
rt: crate::runtime::Runtime,
stop_rx: oneshot::Receiver<i32>,
#[allow(dead_code)]
system: System,
}

#[cfg(not(feature = "io-uring"))]
impl SystemRunner {
/// Starts event loop and will return once [System] is [stopped](System::stop).
pub fn run(self) -> io::Result<()> {
Expand Down Expand Up @@ -188,6 +216,45 @@ impl SystemRunner {
}
}

#[cfg(feature = "io-uring")]
/// Runner that keeps a [System]'s event loop alive until stop message is received.
#[must_use = "A SystemRunner does nothing unless `run` is called."]
#[derive(Debug)]
pub struct SystemRunner;

#[cfg(feature = "io-uring")]
impl SystemRunner {
/// Starts event loop and will return once [System] is [stopped](System::stop).
pub fn run(self) -> io::Result<()> {
unimplemented!("SystemRunner::run is not implemented yet")
}

/// Runs the provided future, blocking the current thread until the future completes.
#[inline]
pub fn block_on<F: Future>(&self, fut: F) -> F::Output {
tokio_uring::start(async move {
let (stop_tx, stop_rx) = oneshot::channel();
let (sys_tx, sys_rx) = mpsc::unbounded_channel();

let sys_arbiter = Arbiter::in_new_system();
let system = System::construct(sys_tx, sys_arbiter.clone());

system
.tx()
.send(SystemCommand::RegisterArbiter(usize::MAX, sys_arbiter))
.unwrap();

// init background system arbiter
let sys_ctrl = SystemController::new(sys_rx, stop_tx);
tokio_uring::spawn(sys_ctrl);

let res = fut.await;
drop(stop_rx);
res
})
}
}

#[derive(Debug)]
pub(crate) enum SystemCommand {
Exit(i32),
Expand Down
77 changes: 44 additions & 33 deletions actix-rt/tests/tests.rs
@@ -1,12 +1,15 @@
use std::{
future::Future,
sync::mpsc::channel,
thread,
time::{Duration, Instant},
};

use actix_rt::{task::JoinError, Arbiter, System};
use tokio::sync::oneshot;

#[cfg(not(feature = "io-uring"))]
use {
std::{sync::mpsc::channel, thread},
tokio::sync::oneshot,
};

#[test]
fn await_for_timer() {
Expand Down Expand Up @@ -103,6 +106,10 @@ fn wait_for_spawns() {
assert!(rt.block_on(handle).is_err());
}

// Temporary disabled tests for io-uring feature.
// They should be enabled when possible.

#[cfg(not(feature = "io-uring"))]
#[test]
fn arbiter_spawn_fn_runs() {
let _ = System::new();
Expand All @@ -119,6 +126,7 @@ fn arbiter_spawn_fn_runs() {
arbiter.join().unwrap();
}

#[cfg(not(feature = "io-uring"))]
#[test]
fn arbiter_handle_spawn_fn_runs() {
let sys = System::new();
Expand All @@ -141,6 +149,7 @@ fn arbiter_handle_spawn_fn_runs() {
sys.run().unwrap();
}

#[cfg(not(feature = "io-uring"))]
#[test]
fn arbiter_drop_no_panic_fn() {
let _ = System::new();
Expand All @@ -152,6 +161,7 @@ fn arbiter_drop_no_panic_fn() {
arbiter.join().unwrap();
}

#[cfg(not(feature = "io-uring"))]
#[test]
fn arbiter_drop_no_panic_fut() {
let _ = System::new();
Expand All @@ -163,18 +173,7 @@ fn arbiter_drop_no_panic_fut() {
arbiter.join().unwrap();
}

#[test]
#[should_panic]
fn no_system_current_panic() {
System::current();
}

#[test]
#[should_panic]
fn no_system_arbiter_new_panic() {
Arbiter::new();
}

#[cfg(not(feature = "io-uring"))]
#[test]
fn system_arbiter_spawn() {
let runner = System::new();
Expand Down Expand Up @@ -205,6 +204,7 @@ fn system_arbiter_spawn() {
thread.join().unwrap();
}

#[cfg(not(feature = "io-uring"))]
#[test]
fn system_stop_stops_arbiters() {
let sys = System::new();
Expand Down Expand Up @@ -293,6 +293,18 @@ fn new_arbiter_with_tokio() {
assert!(!counter.load(Ordering::SeqCst));
}

#[test]
#[should_panic]
fn no_system_current_panic() {
System::current();
}

#[test]
#[should_panic]
fn no_system_arbiter_new_panic() {
Arbiter::new();
}

#[test]
fn try_current_no_system() {
assert!(System::try_current().is_none())
Expand Down Expand Up @@ -330,28 +342,27 @@ fn spawn_local() {
#[cfg(all(target_os = "linux", feature = "io-uring"))]
#[test]
fn tokio_uring_arbiter() {
let system = System::new();
let (tx, rx) = std::sync::mpsc::channel();

Arbiter::new().spawn(async move {
let handle = actix_rt::spawn(async move {
let f = tokio_uring::fs::File::create("test.txt").await.unwrap();
let buf = b"Hello World!";
System::new().block_on(async {
let (tx, rx) = std::sync::mpsc::channel();

let (res, _) = f.write_at(&buf[..], 0).await;
assert!(res.is_ok());
Arbiter::new().spawn(async move {
let handle = actix_rt::spawn(async move {
let f = tokio_uring::fs::File::create("test.txt").await.unwrap();
let buf = b"Hello World!";

f.sync_all().await.unwrap();
f.close().await.unwrap();
let (res, _) = f.write_at(&buf[..], 0).await;
assert!(res.is_ok());

std::fs::remove_file("test.txt").unwrap();
});
f.sync_all().await.unwrap();
f.close().await.unwrap();

handle.await.unwrap();
tx.send(true).unwrap();
});
std::fs::remove_file("test.txt").unwrap();
});

assert!(rx.recv().unwrap());
handle.await.unwrap();
tx.send(true).unwrap();
});

drop(system);
assert!(rx.recv().unwrap());
})
}