Skip to content

Commit

Permalink
add async func in trait support (#2)
Browse files Browse the repository at this point in the history
* add async func in trait support

* extract the complex type

* remove the unused code

* directly return impl Future as much as possible instead of async wrapping

---------

Co-authored-by: Rain Jiang <rain.jiang@bytedance.com>
Co-authored-by: ihciah <ihciah@gmail.com>
  • Loading branch information
3 people committed Nov 10, 2023
1 parent 99fec91 commit 55ffbd2
Show file tree
Hide file tree
Showing 6 changed files with 30 additions and 70 deletions.
5 changes: 2 additions & 3 deletions service-async/Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "service-async"
version = "0.1.13"
version = "0.2.0"
edition = "2021"

authors = ["ChiHai <ihciah@gmail.com>"]
Expand All @@ -13,10 +13,9 @@ repository = "https://github.com/ihciah/service-async"

[dependencies]
param = { version = "0.1.2", path = "../param" }
futures-util = "0.3"

[target.'cfg(unix)'.dev-dependencies]
monoio = { version = "0.1.5" }
monoio = { version = "0.2.0" }

[target.'cfg(not(unix))'.dev-dependencies]
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
54 changes: 18 additions & 36 deletions service-async/examples/demo.rs
@@ -1,8 +1,5 @@
#![feature(impl_trait_in_assoc_type)]

use std::{
convert::Infallible,
future::Future,
sync::atomic::{AtomicUsize, Ordering},
};

Expand All @@ -27,18 +24,13 @@ struct SvcA {
impl Service<()> for SvcA {
type Response = ();
type Error = Infallible;
type Future<'cx> = impl Future<Output = Result<Self::Response, Self::Error>> + 'cx
where
Self: 'cx;

fn call(&self, _req: ()) -> Self::Future<'_> {
async move {
println!(
"SvcA called! pass_flag = {}, not_pass_flag = {}",
self.pass_flag, self.not_pass_flag
);
Ok(())
}

async fn call(&self, _req: ()) -> Result<Self::Response, Self::Error> {
println!(
"SvcA called! pass_flag = {}, not_pass_flag = {}",
self.pass_flag, self.not_pass_flag
);
Ok(())
}
}

Expand Down Expand Up @@ -77,18 +69,13 @@ where
{
type Response = ();
type Error = Infallible;
type Future<'cx> = impl Future<Output = Result<Self::Response, Self::Error>> + 'cx
where
Self: 'cx;

fn call(&self, req: usize) -> Self::Future<'_> {
async move {
let old = self.counter.fetch_add(req, Ordering::AcqRel);
let new = old + req;
println!("SvcB called! {old}->{new}");
self.inner.call(()).await?;
Ok(())
}

async fn call(&self, req: usize) -> Result<Self::Response, Self::Error> {
let old = self.counter.fetch_add(req, Ordering::AcqRel);
let new = old + req;
println!("SvcB called! {old}->{new}");
self.inner.call(()).await?;
Ok(())
}
}

Expand Down Expand Up @@ -127,16 +114,11 @@ where
{
type Response = ();
type Error = Infallible;
type Future<'cx> = impl Future<Output = Result<Self::Response, Self::Error>> + 'cx
where
Self: 'cx, I: 'cx;

fn call(&self, req: I) -> Self::Future<'_> {
async move {
println!("SvcC called!");
self.inner.call(req).await?;
Ok(())
}
async fn call(&self, req: I) -> Result<Self::Response, Self::Error> {
println!("SvcC called!");
self.inner.call(req).await?;
Ok(())
}
}

Expand Down
18 changes: 7 additions & 11 deletions service-async/src/boxed.rs
@@ -1,10 +1,10 @@
use std::{
any::{Any, TypeId},
future::Future,
marker::PhantomData,
pin::Pin,
};

pub use futures_util::future::LocalBoxFuture;

use crate::{MakeService, Service};

pub struct BoxedService<Request, Response, E> {
Expand Down Expand Up @@ -57,12 +57,9 @@ impl<Request, Response, E> Drop for BoxedService<Request, Response, E> {
impl<Request, Response, E> Service<Request> for BoxedService<Request, Response, E> {
type Response = Response;
type Error = E;
type Future<'cx> = LocalBoxFuture<'cx, Result<Response, E>>
where
Self: 'cx, Request: 'cx;

#[inline]
fn call(&self, req: Request) -> Self::Future<'_> {
fn call(&self, req: Request) -> impl Future<Output = Result<Self::Response, Self::Error>> {
unsafe { (self.vtable.call)(self.svc, req) }
}
}
Expand All @@ -81,15 +78,14 @@ where
}
}

type LocalStaticBoxedFuture<T, E> = Pin<Box<dyn Future<Output = Result<T, E>> + 'static>>;

struct ServiceVtable<T, U, E> {
call: unsafe fn(raw: *const (), req: T) -> LocalBoxFuture<'static, Result<U, E>>,
call: unsafe fn(raw: *const (), req: T) -> LocalStaticBoxedFuture<U, E>,
drop: unsafe fn(raw: *const ()),
}

unsafe fn call<R, S>(
svc: *const (),
req: R,
) -> LocalBoxFuture<'static, Result<S::Response, S::Error>>
unsafe fn call<R, S>(svc: *const (), req: R) -> LocalStaticBoxedFuture<S::Response, S::Error>
where
R: 'static,
S: Service<R> + 'static,
Expand Down
6 changes: 1 addition & 5 deletions service-async/src/either.rs
Expand Up @@ -72,13 +72,9 @@ where
{
type Response = A::Response;
type Error = A::Error;
type Future<'cx> = Either<A::Future<'cx>, B::Future<'cx>>
where
Self: 'cx,
R: 'cx;

#[inline]
fn call(&self, req: R) -> Self::Future<'_> {
fn call(&self, req: R) -> impl Future<Output = Result<Self::Response, Self::Error>> {
match self {
Either::Left(s) => Either::Left(s.call(req)),
Either::Right(s) => Either::Right(s.call(req)),
Expand Down
10 changes: 1 addition & 9 deletions service-async/src/lib.rs
@@ -1,5 +1,3 @@
#![feature(impl_trait_in_assoc_type)]

use std::{future::Future, sync::Arc};

pub mod either;
Expand All @@ -21,14 +19,8 @@ pub trait Service<Request> {
/// Errors produced by the service.
type Error;

/// The future response value.
type Future<'cx>: Future<Output = Result<Self::Response, Self::Error>>
where
Self: 'cx,
Request: 'cx;

/// Process the request and return the response asynchronously.
fn call(&self, req: Request) -> Self::Future<'_>;
fn call(&self, req: Request) -> impl Future<Output = Result<Self::Response, Self::Error>>;
}

pub trait MakeService {
Expand Down
7 changes: 1 addition & 6 deletions service-async/src/map.rs
Expand Up @@ -34,13 +34,8 @@ where

type Error = T::Error;

type Future<'cx> = impl Future<Output = Result<Self::Response, Self::Error>> + 'cx
where
Self: 'cx,
R: 'cx;

#[inline]
fn call(&self, req: R) -> Self::Future<'_> {
fn call(&self, req: R) -> impl Future<Output = Result<Self::Response, Self::Error>> {
let req = self.f.map_target(req);
self.inner.call(req)
}
Expand Down

0 comments on commit 55ffbd2

Please sign in to comment.