Skip to content

Commit

Permalink
add support for io-uring (#374)
Browse files Browse the repository at this point in the history
Co-authored-by: Rob Ede <robjtede@icloud.com>
  • Loading branch information
fakeshadow and robjtede committed Oct 11, 2021
1 parent c3d697d commit 6fed1c3
Show file tree
Hide file tree
Showing 13 changed files with 185 additions and 51 deletions.
20 changes: 19 additions & 1 deletion .cargo/config.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,23 @@
[alias]
chk = "check --workspace --all-features --tests --examples --bins"
lint = "clippy --workspace --all-features --tests --examples --bins -- -Dclippy::todo"
ci-test = "test --workspace --all-features --lib --tests --no-fail-fast -- --nocapture"

ci-doctest = "test --workspace --all-features --doc --no-fail-fast -- --nocapture"

# just check the library (without dev deps)
ci-check-min = "hack --workspace check --no-default-features"
ci-check-lib = "hack --workspace --feature-powerset --exclude-features io-uring check"
ci-check-lib-linux = "hack --workspace --feature-powerset check"

# check everything
ci-check = "hack --workspace --feature-powerset --exclude-features io-uring check --tests --examples"
ci-check-linux = "hack --workspace --feature-powerset check --tests --examples"

# tests avoiding io-uring feature
ci-test = "hack test --workspace --exclude=actix-rt --exclude=actix-server --all-features --lib --tests --no-fail-fast -- --nocapture"
ci-test-rt = " hack --feature-powerset --exclude-features io-uring test --package=actix-rt --lib --tests --no-fail-fast -- --nocapture"
ci-test-server = "hack --feature-powerset --exclude-features io-uring test --package=actix-server --lib --tests --no-fail-fast -- --nocapture"

# test with io-uring feature
ci-test-rt-linux = " hack --feature-powerset test --package=actix-rt --lib --tests --no-fail-fast -- --nocapture"
ci-test-server-linux = "hack --feature-powerset test --package=actix-server --lib --tests --no-fail-fast -- --nocapture"
58 changes: 34 additions & 24 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,36 +75,47 @@ jobs:
command: install
args: cargo-hack

- name: check minimal
- name: check lib
if: >
matrix.target.os != 'ubuntu-latest'
&& matrix.target.triple != 'x86_64-pc-windows-gnu'
uses: actions-rs/cargo@v1
with:
command: hack
args: check --workspace --no-default-features

- name: check minimal + tests
with: { command: ci-check-lib }
- name: check lib
if: matrix.target.os == 'ubuntu-latest'
uses: actions-rs/cargo@v1
with:
command: hack
args: check --workspace --no-default-features --tests --examples

- name: check default
with: { command: ci-check-lib-linux }
- name: check lib
if: matrix.target.triple == 'x86_64-pc-windows-gnu'
uses: actions-rs/cargo@v1
with:
command: check
args: --workspace --tests --examples

with: { command: ci-check-min }

- name: check full
# TODO: compile OpenSSL and run tests on MinGW
if: matrix.target.triple != 'x86_64-pc-windows-gnu'
if: >
matrix.target.os != 'ubuntu-latest'
&& matrix.target.triple != 'x86_64-pc-windows-gnu'
uses: actions-rs/cargo@v1
with:
command: check
args: --workspace --all-features --tests --examples
with: { command: ci-check }
- name: check all
if: matrix.target.os == 'ubuntu-latest'
uses: actions-rs/cargo@v1
with: { command: ci-check-linux }

- name: tests
if: matrix.target.triple != 'x86_64-pc-windows-gnu'
uses: actions-rs/cargo@v1
with: { command: ci-test }
if: >
matrix.target.os != 'ubuntu-latest'
&& matrix.target.triple != 'x86_64-pc-windows-gnu'
run: |
cargo ci-test
cargo ci-test-rt
cargo ci-test-server
- name: tests
if: matrix.target.os == 'ubuntu-latest'
run: |
cargo ci-test
cargo ci-test-rt-linux
cargo ci-test-server-linux
- name: Generate coverage file
if: >
Expand All @@ -120,8 +131,7 @@ jobs:
&& matrix.version == 'stable'
&& github.ref == 'refs/heads/master'
uses: codecov/codecov-action@v1
with:
file: cobertura.xml
with: { file: cobertura.xml }

- name: Clear the cargo caches
run: |
Expand Down
2 changes: 2 additions & 0 deletions actix-rt/CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
# Changes

## Unreleased - 2021-xx-xx
* Add `io-uring` feature for enabling async file I/O on linux. [#374]
* The `spawn` method can now resolve with non-unit outputs. [#369]

[#369]: https://github.com/actix/actix-net/pull/369
[#374]: https://github.com/actix/actix-net/pull/374


## 2.2.0 - 2021-03-29
Expand Down
4 changes: 4 additions & 0 deletions actix-rt/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,17 @@ path = "src/lib.rs"
[features]
default = ["macros"]
macros = ["actix-macros"]
io-uring = ["tokio-uring"]

[dependencies]
actix-macros = { version = "0.2.0", optional = true }

futures-core = { version = "0.3", default-features = false }
tokio = { version = "1.3", features = ["rt", "net", "parking_lot", "signal", "sync", "time"] }

[target.'cfg(target_os = "linux")'.dependencies]
tokio-uring = { version = "0.1", optional = true }

[dev-dependencies]
tokio = { version = "1.2", features = ["full"] }
hyper = { version = "0.14", default-features = false, features = ["server", "tcp", "http1"] }
72 changes: 62 additions & 10 deletions actix-rt/src/arbiter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,9 @@ use std::{
};

use futures_core::ready;
use tokio::{sync::mpsc, task::LocalSet};
use tokio::sync::mpsc;

use crate::{
runtime::{default_tokio_runtime, Runtime},
system::{System, SystemCommand},
};
use crate::system::{System, SystemCommand};

pub(crate) static COUNT: AtomicUsize = AtomicUsize::new(0);

Expand Down Expand Up @@ -98,16 +95,19 @@ impl Arbiter {
///
/// # Panics
/// Panics if a [System] is not registered on the current thread.
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
#[allow(clippy::new_without_default)]
pub fn new() -> Arbiter {
Self::with_tokio_rt(|| {
default_tokio_runtime().expect("Cannot create new Arbiter's Runtime.")
crate::runtime::default_tokio_runtime()
.expect("Cannot create new Arbiter's Runtime.")
})
}

/// Spawn a new Arbiter using the [Tokio Runtime](tokio-runtime) returned from a closure.
///
/// [tokio-runtime]: tokio::runtime::Runtime
#[cfg(not(all(target_os = "linux", feature = "io-uring")))]
#[doc(hidden)]
pub fn with_tokio_rt<F>(runtime_factory: F) -> Arbiter
where
Expand All @@ -127,7 +127,7 @@ impl Arbiter {
.spawn({
let tx = tx.clone();
move || {
let rt = Runtime::from(runtime_factory());
let rt = crate::runtime::Runtime::from(runtime_factory());
let hnd = ArbiterHandle::new(tx);

System::set_current(sys);
Expand Down Expand Up @@ -159,15 +159,67 @@ impl Arbiter {
Arbiter { tx, thread_handle }
}

/// Sets up an Arbiter runner in a new System using the provided runtime local task set.
pub(crate) fn in_new_system(local: &LocalSet) -> ArbiterHandle {
/// Spawn a new Arbiter thread and start its event loop with `tokio-uring` runtime.
///
/// # Panics
/// Panics if a [System] is not registered on the current thread.
#[cfg(all(target_os = "linux", feature = "io-uring"))]
#[allow(clippy::new_without_default)]
pub fn new() -> Arbiter {
let sys = System::current();
let system_id = sys.id();
let arb_id = COUNT.fetch_add(1, Ordering::Relaxed);

let name = format!("actix-rt|system:{}|arbiter:{}", system_id, arb_id);
let (tx, rx) = mpsc::unbounded_channel();

let (ready_tx, ready_rx) = std::sync::mpsc::channel::<()>();

let thread_handle = thread::Builder::new()
.name(name.clone())
.spawn({
let tx = tx.clone();
move || {
let hnd = ArbiterHandle::new(tx);

System::set_current(sys);

HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone()));

// register arbiter
let _ = System::current()
.tx()
.send(SystemCommand::RegisterArbiter(arb_id, hnd));

ready_tx.send(()).unwrap();

// run arbiter event processing loop
tokio_uring::start(ArbiterRunner { rx });

// deregister arbiter
let _ = System::current()
.tx()
.send(SystemCommand::DeregisterArbiter(arb_id));
}
})
.unwrap_or_else(|err| {
panic!("Cannot spawn Arbiter's thread: {:?}. {:?}", &name, err)
});

ready_rx.recv().unwrap();

Arbiter { tx, thread_handle }
}

/// Sets up an Arbiter runner in a new System using the environment's local set.
pub(crate) fn in_new_system() -> ArbiterHandle {
let (tx, rx) = mpsc::unbounded_channel();

let hnd = ArbiterHandle::new(tx);

HANDLE.with(|cell| *cell.borrow_mut() = Some(hnd.clone()));

local.spawn_local(ArbiterRunner { rx });
crate::spawn(ArbiterRunner { rx });

hnd
}
Expand Down
7 changes: 7 additions & 0 deletions actix-rt/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,20 @@
//! arbiter.stop();
//! arbiter.join().unwrap();
//! ```
//!
//! # `io-uring` Support
//! There is experimental support for using io-uring with this crate by enabling the
//! `io-uring` feature. For now, it is semver exempt.

#![deny(rust_2018_idioms, nonstandard_style)]
#![allow(clippy::type_complexity)]
#![warn(missing_docs)]
#![doc(html_logo_url = "https://actix.rs/img/logo.png")]
#![doc(html_favicon_url = "https://actix.rs/favicon.ico")]

#[cfg(all(not(target_os = "linux"), feature = "io-uring"))]
compile_error!("io_uring is a linux only feature.");

use std::future::Future;

use tokio::task::JoinHandle;
Expand Down
5 changes: 0 additions & 5 deletions actix-rt/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,6 @@ impl Runtime {
})
}

/// Reference to local task set.
pub(crate) fn local_set(&self) -> &LocalSet {
&self.local
}

/// Offload a future onto the single-threaded runtime.
///
/// The returned join handle can be used to await the future's result.
Expand Down
2 changes: 1 addition & 1 deletion actix-rt/src/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl System {
let (sys_tx, sys_rx) = mpsc::unbounded_channel();

let rt = Runtime::from(runtime_factory());
let sys_arbiter = Arbiter::in_new_system(rt.local_set());
let sys_arbiter = rt.block_on(async { Arbiter::in_new_system() });
let system = System::construct(sys_tx, sys_arbiter.clone());

system
Expand Down
46 changes: 39 additions & 7 deletions actix-rt/tests/tests.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,6 @@
use std::{
future::Future,
sync::{
atomic::{AtomicBool, Ordering},
mpsc::channel,
Arc,
},
sync::mpsc::channel,
thread,
time::{Duration, Instant},
};
Expand Down Expand Up @@ -221,8 +217,8 @@ fn system_stop_stops_arbiters() {
System::current().stop();
sys.run().unwrap();

// account for slightly slow thread de-spawns (only observed on windows)
thread::sleep(Duration::from_millis(100));
// account for slightly slow thread de-spawns
thread::sleep(Duration::from_millis(500));

// arbiter should be dead and return false
assert!(!Arbiter::current().spawn_fn(|| {}));
Expand All @@ -231,6 +227,7 @@ fn system_stop_stops_arbiters() {
arb.join().unwrap();
}

#[cfg(not(feature = "io-uring"))]
#[test]
fn new_system_with_tokio() {
let (tx, rx) = channel();
Expand Down Expand Up @@ -263,8 +260,14 @@ fn new_system_with_tokio() {
assert_eq!(rx.recv().unwrap(), 42);
}

#[cfg(not(feature = "io-uring"))]
#[test]
fn new_arbiter_with_tokio() {
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};

let _ = System::new();

let arb = Arbiter::with_tokio_rt(|| {
Expand Down Expand Up @@ -323,3 +326,32 @@ fn spawn_local() {
h(actix_rt::spawn(async { 1 }));
})
}

#[cfg(all(target_os = "linux", feature = "io-uring"))]
#[test]
fn tokio_uring_arbiter() {
let system = System::new();
let (tx, rx) = std::sync::mpsc::channel();

Arbiter::new().spawn(async move {
let handle = actix_rt::spawn(async move {
let f = tokio_uring::fs::File::create("test.txt").await.unwrap();
let buf = b"Hello World!";

let (res, _) = f.write_at(&buf[..], 0).await;
assert!(res.is_ok());

f.sync_all().await.unwrap();
f.close().await.unwrap();

std::fs::remove_file("test.txt").unwrap();
});

handle.await.unwrap();
tx.send(true).unwrap();
});

assert!(rx.recv().unwrap());

drop(system);
}
2 changes: 2 additions & 0 deletions actix-server/CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@
## Unreleased - 2021-xx-xx
* Remove `config` module. `ServiceConfig`, `ServiceRuntime` public types are removed due to this change. [#349]
* Remove `ServerBuilder::configure` [#349]
* Add `io-uring` feature for enabling async file I/O on linux. [#374]
* Server no long listens to SIGHUP signal.
It actually did not take any action when receiving SIGHUP, the only thing SIGHUP did was to stop
the Server from receiving any future signal, because the `Signals` future stops on the first
signal received [#389]

[#374]: https://github.com/actix/actix-net/pull/374
[#349]: https://github.com/actix/actix-net/pull/349
[#389]: https://github.com/actix/actix-net/pull/389

Expand Down
1 change: 1 addition & 0 deletions actix-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ path = "src/lib.rs"

[features]
default = []
io-uring = ["actix-rt/io-uring"]

[dependencies]
actix-rt = { version = "2.0.0", default-features = false }
Expand Down

0 comments on commit 6fed1c3

Please sign in to comment.