Skip to content

Commit

Permalink
Merge branch 'master' into feature/actix_tls_connect_native_tls
Browse files Browse the repository at this point in the history
  • Loading branch information
fakeshadow committed Mar 26, 2021
2 parents b14f54a + a3c9ebc commit e76097d
Show file tree
Hide file tree
Showing 3 changed files with 103 additions and 27 deletions.
60 changes: 60 additions & 0 deletions actix-rt/examples/multi_thread_system.rs
@@ -0,0 +1,60 @@
//! An example on how to build a multi-thread tokio runtime for Actix System.
//! Then spawn async task that can make use of work stealing of tokio runtime.

use actix_rt::System;

fn main() {
System::with_tokio_rt(|| {
// build system with a multi-thread tokio runtime.
tokio::runtime::Builder::new_multi_thread()
.worker_threads(2)
.enable_all()
.build()
.unwrap()
})
.block_on(async_main());
}

// async main function that acts like #[actix_web::main] or #[tokio::main]
async fn async_main() {
let (tx, rx) = tokio::sync::oneshot::channel();

// get a handle to system arbiter and spawn async task on it
System::current().arbiter().spawn(async {
// use tokio::spawn to get inside the context of multi thread tokio runtime
let h1 = tokio::spawn(async {
println!("thread id is {:?}", std::thread::current().id());
std::thread::sleep(std::time::Duration::from_secs(2));
});

// work stealing occurs for this task spawn
let h2 = tokio::spawn(async {
println!("thread id is {:?}", std::thread::current().id());
});

h1.await.unwrap();
h2.await.unwrap();
let _ = tx.send(());
});

rx.await.unwrap();

let (tx, rx) = tokio::sync::oneshot::channel();
let now = std::time::Instant::now();

// without additional tokio::spawn, all spawned tasks run on single thread
System::current().arbiter().spawn(async {
println!("thread id is {:?}", std::thread::current().id());
std::thread::sleep(std::time::Duration::from_secs(2));
let _ = tx.send(());
});

// previous spawn task has blocked the system arbiter thread
// so this task will wait for 2 seconds until it can be run
System::current().arbiter().spawn(async move {
println!("thread id is {:?}", std::thread::current().id());
assert!(now.elapsed() > std::time::Duration::from_secs(2));
});

rx.await.unwrap();
}
3 changes: 3 additions & 0 deletions actix-tls/CHANGES.md
Expand Up @@ -3,8 +3,11 @@
## Unreleased - 2021-xx-xx
* Add `connect::ssl::native_tls` module for native tls support. [#295]
* Rename `accept::{nativetls => native_tls}`. [#295]
* Changed `connect::ssl::rustls::RustlsConnectorService` to return error when `DNSNameRef`
generation failed instead of panic. [#296]

[#295]: https://github.com/actix/actix-net/pull/295
[#296]: https://github.com/actix/actix-net/pull/296


## 3.0.0-beta.4 - 2021-02-24
Expand Down
67 changes: 40 additions & 27 deletions actix-tls/src/connect/ssl/rustls.rs
@@ -1,6 +1,6 @@
use std::{
fmt,
future::Future,
io,
pin::Pin,
sync::Arc,
task::{Context, Poll},
Expand All @@ -10,7 +10,7 @@ pub use tokio_rustls::rustls::Session;
pub use tokio_rustls::{client::TlsStream, rustls::ClientConfig};
pub use webpki_roots::TLS_SERVER_ROOTS;

use actix_codec::{AsyncRead, AsyncWrite};
use actix_rt::net::ActixStream;
use actix_service::{Service, ServiceFactory};
use futures_core::{future::LocalBoxFuture, ready};
use log::trace;
Expand Down Expand Up @@ -44,12 +44,13 @@ impl Clone for RustlsConnector {
}
}

impl<T: Address, U> ServiceFactory<Connection<T, U>> for RustlsConnector
impl<T, U> ServiceFactory<Connection<T, U>> for RustlsConnector
where
U: AsyncRead + AsyncWrite + Unpin + fmt::Debug,
T: Address,
U: ActixStream,
{
type Response = Connection<T, TlsStream<U>>;
type Error = std::io::Error;
type Error = io::Error;
type Config = ();
type Service = RustlsConnectorService;
type InitError = ();
Expand All @@ -76,43 +77,55 @@ impl Clone for RustlsConnectorService {
impl<T, U> Service<Connection<T, U>> for RustlsConnectorService
where
T: Address,
U: AsyncRead + AsyncWrite + Unpin + fmt::Debug,
U: ActixStream,
{
type Response = Connection<T, TlsStream<U>>;
type Error = std::io::Error;
type Future = ConnectAsyncExt<T, U>;
type Error = io::Error;
type Future = RustlsConnectorServiceFuture<T, U>;

actix_service::always_ready!();

fn call(&self, stream: Connection<T, U>) -> Self::Future {
trace!("SSL Handshake start for: {:?}", stream.host());
let (io, stream) = stream.replace_io(());
let host = DNSNameRef::try_from_ascii_str(stream.host())
.expect("rustls currently only handles hostname-based connections. See https://github.com/briansmith/webpki/issues/54");
ConnectAsyncExt {
fut: TlsConnector::from(self.connector.clone()).connect(host, io),
stream: Some(stream),
fn call(&self, connection: Connection<T, U>) -> Self::Future {
trace!("SSL Handshake start for: {:?}", connection.host());
let (stream, connection) = connection.replace_io(());

match DNSNameRef::try_from_ascii_str(connection.host()) {
Ok(host) => RustlsConnectorServiceFuture::Future {
connect: TlsConnector::from(self.connector.clone()).connect(host, stream),
connection: Some(connection),
},
Err(_) => RustlsConnectorServiceFuture::InvalidDns,
}
}
}

pub struct ConnectAsyncExt<T, U> {
fut: Connect<U>,
stream: Option<Connection<T, ()>>,
pub enum RustlsConnectorServiceFuture<T, U> {
/// See issue https://github.com/briansmith/webpki/issues/54
InvalidDns,
Future {
connect: Connect<U>,
connection: Option<Connection<T, ()>>,
},
}

impl<T, U> Future for ConnectAsyncExt<T, U>
impl<T, U> Future for RustlsConnectorServiceFuture<T, U>
where
T: Address,
U: AsyncRead + AsyncWrite + Unpin + fmt::Debug,
U: ActixStream,
{
type Output = Result<Connection<T, TlsStream<U>>, std::io::Error>;
type Output = Result<Connection<T, TlsStream<U>>, io::Error>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.get_mut();
let stream = ready!(Pin::new(&mut this.fut).poll(cx))?;
let s = this.stream.take().unwrap();
trace!("SSL Handshake success: {:?}", s.host());
Poll::Ready(Ok(s.replace_io(stream).1))
match self.get_mut() {
Self::InvalidDns => Poll::Ready(Err(
io::Error::new(io::ErrorKind::Other, "rustls currently only handles hostname-based connections. See https://github.com/briansmith/webpki/issues/54")
)),
Self::Future { connect, connection } => {
let stream = ready!(Pin::new(connect).poll(cx))?;
let connection = connection.take().unwrap();
trace!("SSL Handshake success: {:?}", connection.host());
Poll::Ready(Ok(connection.replace_io(stream).1))
}
}
}
}

0 comments on commit e76097d

Please sign in to comment.