Skip to content

Commit

Permalink
Upgrade tokio 0.2 -> 1.0.2
Browse files Browse the repository at this point in the history
Recently tokio got a first stable release and many libraries &
applications already migrated to the newest version.

This changes upgrades tokio version to 1.0.2:

* Tokio renamed some of its features, e.g `rt-util` and `rt-core` now
  combined into `rt`.

* `stream` feature got extracted to a separate crate
  [tokio-stream](https://docs.rs/tokio-stream/0.1.2/tokio_stream),
  waiting for eventual `Stream` landing to the Rust std
  library. [RFC](rust-lang/rfcs#2996)

[TODO]

Actix's integration test_actix_ws_integration test still fails due to,
I guess, async rt is not being initialized. Apart from that, the tests
are green. Please feel free to take over this effort.
  • Loading branch information
akhramov authored and LegNeato committed Jun 26, 2021
1 parent de4c0e9 commit 9be6b31
Show file tree
Hide file tree
Showing 26 changed files with 50 additions and 38 deletions.
2 changes: 1 addition & 1 deletion docs/book/tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ iron = "0.5"
mount = "0.4"
skeptic = "0.13"
serde_json = "1.0"
tokio = { version = "0.2", features = ["blocking", "macros", "rt-core", "rt-util", "stream"] }
tokio = { version = "1.0.2", features = ["macros", "rt"] }
uuid = "0.8"

[build-dependencies]
Expand Down
3 changes: 2 additions & 1 deletion examples/actix_subscriptions/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ publish = false
actix-web = "3.3"
actix-cors = "0.5"
futures = "0.3"
tokio = { version = "0.2", features = ["macros", "rt-core"] }
tokio = { version = "1.0.2", features = ["rt", "rt-multi-thread", "macros", "time"] }
tokio-stream = "0.1.2"
env_logger = "0.8"
serde = "1.0"
serde_json = "1.0"
Expand Down
5 changes: 4 additions & 1 deletion examples/actix_subscriptions/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use juniper::{
};
use juniper_actix::{graphql_handler, playground_handler, subscriptions::subscriptions_handler};
use juniper_graphql_ws::ConnectionConfig;
use tokio_stream::wrappers::IntervalStream;

type Schema = RootNode<'static, Query, EmptyMutation<Database>, Subscription>;

Expand Down Expand Up @@ -65,7 +66,9 @@ impl Subscription {
use rand::{rngs::StdRng, Rng, SeedableRng};
let mut rng = StdRng::from_entropy();

let stream = tokio::time::interval(Duration::from_secs(3)).map(move |_| {
println!("found it");
let interval = tokio::time::interval(Duration::from_secs(3));
let stream = IntervalStream::new(interval).map(move |_| {
counter += 1;

if counter == 2 {
Expand Down
2 changes: 1 addition & 1 deletion examples/basic_subscriptions/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ authors = ["Jordao Rosario <jordao.rosario01@gmail.com>"]
futures = "0.3"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "0.2", features = ["rt-core", "macros", "stream"] }
tokio = { version = "1.0.2", features = ["rt", "macros"] }

juniper = { path = "../../juniper" }
juniper_subscriptions = { path = "../../juniper_subscriptions" }
2 changes: 1 addition & 1 deletion examples/basic_subscriptions/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type StringStream = Pin<Box<dyn Stream<Item = Result<String, FieldError>> + Send
impl Subscription {
async fn hello_world() -> StringStream {
let stream =
tokio::stream::iter(vec![Ok(String::from("Hello")), Ok(String::from("World!"))]);
futures::stream::iter(vec![Ok(String::from("Hello")), Ok(String::from("World!"))]);
Box::pin(stream)
}
}
Expand Down
2 changes: 1 addition & 1 deletion examples/warp_async/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ env_logger = "0.8.1"
futures = "0.3.1"
log = "0.4.8"
reqwest = { version = "0.11", features = ["rustls-tls"] }
tokio = { version = "0.2", features = ["rt-core", "macros"] }
tokio = { version = "1.0.2", features = ["rt", "macros"] }
warp = "0.2"
3 changes: 2 additions & 1 deletion examples/warp_subscriptions/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ futures = "0.3.1"
log = "0.4.8"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "0.2", features = ["rt-core", "macros"] }
tokio = { version = "1.0.2", features = ["rt", "rt-multi-thread", "macros", "time"] }
tokio-stream = "0.1.2"
warp = "0.2.1"

juniper = { path = "../../juniper" }
Expand Down
4 changes: 3 additions & 1 deletion examples/warp_subscriptions/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use juniper::{
};
use juniper_graphql_ws::ConnectionConfig;
use juniper_warp::{playground_filter, subscriptions::serve_graphql_ws};
use tokio_stream::wrappers::IntervalStream;
use warp::{http::Response, Filter};

#[derive(Clone)]
Expand Down Expand Up @@ -109,7 +110,8 @@ struct Subscription;
impl Subscription {
async fn users() -> UsersStream {
let mut counter = 0;
let stream = tokio::time::interval(Duration::from_secs(5)).map(move |_| {
let interval = tokio::time::interval(Duration::from_secs(5));
let stream = IntervalStream::new(interval).map(move |_| {
counter += 1;
if counter == 2 {
Err(FieldError::new(
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/async_await/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,4 @@ publish = false
[dependencies]
juniper = { path = "../../juniper" }
futures = "0.3.1"
tokio = { version = "0.2", features = ["rt-core", "time", "macros"] }
tokio = { version = "1.0.2", features = ["rt", "time", "macros"] }
4 changes: 2 additions & 2 deletions integration_tests/async_await/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ impl User {
}

async fn delayed() -> bool {
tokio::time::delay_for(std::time::Duration::from_millis(100)).await;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
true
}
}
Expand All @@ -68,7 +68,7 @@ impl Query {
}

async fn delayed() -> bool {
tokio::time::delay_for(std::time::Duration::from_millis(100)).await;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
true
}
}
Expand Down
2 changes: 1 addition & 1 deletion integration_tests/codegen_fail/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,5 @@ futures = "0.3.1"

[dev-dependencies]
serde_json = { version = "1" }
tokio = { version = "0.2", features = ["rt-core", "time", "macros"] }
tokio = { version = "1.0.2", features = ["rt", "time", "macros"] }
trybuild = "1.0.25"
2 changes: 1 addition & 1 deletion integration_tests/juniper_tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ juniper_subscriptions = { path = "../../juniper_subscriptions" }
async-trait = "0.1.39"
serde_json = "1.0"
fnv = "1.0"
tokio = { version = "0.2", features = ["macros", "rt-core", "time"] }
tokio = { version = "1.0.2", features = ["macros", "rt", "time"] }
2 changes: 1 addition & 1 deletion juniper/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ uuid = { version = "0.8", default-features = false, optional = true }
bencher = "0.1.2"
pretty_assertions = "0.7.1"
serde_json = "1.0.2"
tokio = { version = "0.2", features = ["macros", "rt-core", "time"] }
tokio = { version = "1.0.2", features = ["macros", "rt", "time"] }

[[bench]]
name = "bench"
Expand Down
10 changes: 5 additions & 5 deletions juniper/src/executor_tests/async_await/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl User {
}

async fn delayed() -> bool {
tokio::time::delay_for(std::time::Duration::from_millis(100)).await;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
true
}
}
Expand All @@ -65,7 +65,7 @@ impl Query {
}

async fn delayed() -> bool {
tokio::time::delay_for(std::time::Duration::from_millis(100)).await;
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
true
}
}
Expand All @@ -74,10 +74,10 @@ impl Query {
async fn async_simple() {
let schema = RootNode::new(Query, EmptyMutation::new(), EmptySubscription::new());
let doc = r#"
query {
query {
fieldSync
fieldAsyncPlain
delayed
fieldAsyncPlain
delayed
user(id: "user1") {
name
}
Expand Down
2 changes: 1 addition & 1 deletion juniper/src/tests/subscriptions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type Schema =
RootNode<'static, MyQuery, EmptyMutation<MyContext>, MySubscription, DefaultScalarValue>;

fn run<O>(f: impl std::future::Future<Output = O>) -> O {
let mut rt = tokio::runtime::Runtime::new().unwrap();
let rt = tokio::runtime::Runtime::new().unwrap();

rt.block_on(f)
}
Expand Down
1 change: 0 additions & 1 deletion juniper_actix/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ futures = "0.3.5"
serde = { version = "1.0.116", features = ["derive"] }
serde_json = "1.0.57"
thiserror = "1.0"
tokio = { version = "0.2", features = ["time"] }

[dev-dependencies]
actix-rt = "1.1"
Expand Down
11 changes: 8 additions & 3 deletions juniper_actix/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ pub mod subscriptions {
};
use actix_web_actors::ws;

use tokio::sync::Mutex;
use futures::lock::Mutex;

use juniper::{
futures::{
Expand Down Expand Up @@ -777,7 +777,7 @@ mod subscription_tests {
EmptyMutation, LocalBoxFuture,
};
use juniper_graphql_ws::ConnectionConfig;
use tokio::time::timeout;
use actix_rt::time::timeout;

use super::subscriptions::subscriptions_handler;

Expand All @@ -800,6 +800,9 @@ mod subscription_tests {
});
let mut framed = server.ws_at("/subscriptions").await.unwrap();


println!("wooowe");

for message in &messages {
match message {
WsIntegrationMessage::Send(body) => {
Expand All @@ -809,12 +812,14 @@ mod subscription_tests {
.map_err(|e| anyhow::anyhow!("WS error: {:?}", e))?;
}
WsIntegrationMessage::Expect(body, message_timeout) => {
println!("right???");
let frame = timeout(Duration::from_millis(*message_timeout), framed.next())
.await
.map_err(|_| anyhow::anyhow!("Timed-out waiting for message"))?
.ok_or_else(|| anyhow::anyhow!("Empty message received"))?
.map_err(|e| anyhow::anyhow!("WS error: {:?}", e))?;

println!("dead...");
match frame {
ws::Frame::Text(ref bytes) => {
let expected_value =
Expand Down Expand Up @@ -865,7 +870,7 @@ mod subscription_tests {
subscriptions_handler(req, stream, schema, config).await
}

#[actix_web::rt::test]
#[actix_rt::test]
async fn test_actix_ws_integration() {
run_ws_test_suite(&mut TestActixWsIntegration::default()).await;
}
Expand Down
2 changes: 1 addition & 1 deletion juniper_benchmarks/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ juniper = { path = "../juniper" }

[dev-dependencies]
criterion = "0.3"
tokio = { version = "0.2", features = ["rt-core", "rt-threaded"] }
tokio = { version = "1.0.2", features = ["rt", "rt-multi-thread"] }

[[bench]]
name = "benchmark"
Expand Down
2 changes: 1 addition & 1 deletion juniper_graphql_ws/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ keywords = ["apollo", "graphql", "graphql-ws", "juniper"]
juniper = { version = "0.15.6", path = "../juniper", default-features = false }
juniper_subscriptions = { version = "0.15.5", path = "../juniper_subscriptions" }
serde = { version = "1.0.8", features = ["derive"], default-features = false }
tokio = { version = "0.2", features = ["macros", "rt-core", "time"], default-features = false }
tokio = { version = "1.0.2", features = ["macros", "rt", "time"], default-features = false }

[dev-dependencies]
serde_json = "1.0"
8 changes: 4 additions & 4 deletions juniper_graphql_ws/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ impl<S: Schema, I: Init<S::ScalarValue, S::Context>> ConnectionState<S, I> {
.boxed();
s = s
.chain(stream::unfold((), move |_| async move {
tokio::time::delay_for(keep_alive_interval).await;
tokio::time::sleep(keep_alive_interval).await;
Some((
Reaction::ServerMessage(ServerMessage::ConnectionKeepAlive),
(),
Expand Down Expand Up @@ -658,7 +658,7 @@ mod test {
impl Subscription {
/// never never emits anything.
async fn never(context: &Context) -> BoxStream<'static, FieldResult<i32>> {
tokio::time::delay_for(Duration::from_secs(10000))
tokio::time::sleep(Duration::from_secs(10000))
.map(|_| unreachable!())
.into_stream()
.boxed()
Expand All @@ -668,7 +668,7 @@ mod test {
async fn context(context: &Context) -> BoxStream<'static, FieldResult<i32>> {
stream::once(future::ready(Ok(context.0)))
.chain(
tokio::time::delay_for(Duration::from_secs(10000))
tokio::time::sleep(Duration::from_secs(10000))
.map(|_| unreachable!())
.into_stream(),
)
Expand All @@ -682,7 +682,7 @@ mod test {
Value::null(),
))))
.chain(
tokio::time::delay_for(Duration::from_secs(10000))
tokio::time::sleep(Duration::from_secs(10000))
.map(|_| unreachable!())
.into_stream(),
)
Expand Down
6 changes: 3 additions & 3 deletions juniper_hyper/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ futures = "0.3.1"
juniper = { version = "0.15.6", path = "../juniper", default-features = false }
hyper = "0.13"
serde_json = "1.0"
tokio = "0.2"
tokio = "1.0.2"
url = "2"

[dev-dependencies]
juniper = { version = "0.15.6", path = "../juniper", features = ["expose-test-schema"] }
pretty_env_logger = "0.4"
reqwest = { version = "0.11", features = ["blocking", "rustls-tls"] }
tokio = { version = "0.2", features = ["macros"] }
reqwest = { version = "0.11", features = ["rustls-tls", "blocking"] }
tokio = { version = "1.0.2", features = ["macros"] }
2 changes: 1 addition & 1 deletion juniper_hyper/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -421,7 +421,7 @@ mod tests {
});

let (shutdown_fut, shutdown) = futures::future::abortable(async {
tokio::time::delay_for(Duration::from_secs(60)).await;
tokio::time::sleep(Duration::from_secs(60)).await;
});

let server = Server::bind(&addr)
Expand Down
1 change: 1 addition & 0 deletions juniper_rocket_async/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ futures = "0.3.1"
juniper = { version = "0.15.6", path = "../juniper", default-features = false }
rocket = { version = "0.5.0-rc.1", default-features = false }
serde_json = "1.0.2"
tokio = { version = "1.0.2", features = ["macros", "rt"] }

[dev-dependencies]
juniper = { version = "0.15.6", path = "../juniper", features = ["expose-test-schema"] }
2 changes: 1 addition & 1 deletion juniper_subscriptions/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,4 @@ juniper = { version = "0.15.6", path = "../juniper", default-features = false }

[dev-dependencies]
serde_json = "1.0"
tokio = { version = "0.2", features = ["macros", "rt-core"] }
tokio = { version = "1.0.2", features = ["macros", "rt"] }
4 changes: 2 additions & 2 deletions juniper_warp/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ juniper_graphql_ws = { version = "0.2.5", path = "../juniper_graphql_ws", option
serde = { version = "1.0.75", features = ["derive"] }
serde_json = "1.0.24"
thiserror = "1.0"
tokio = { version = "0.2", features = ["blocking", "rt-core"] }
tokio = { version = "1.0.2", features = ["rt"] }
warp = "0.2"

[dev-dependencies]
env_logger = "0.8"
juniper = { version = "0.15.6", path = "../juniper", features = ["expose-test-schema"] }
log = "0.4"
percent-encoding = "2.1"
tokio = { version = "0.2", features = ["blocking", "macros", "rt-core"] }
tokio = { version = "1.0.2", features = ["macros", "rt"] }
url = "2"
2 changes: 1 addition & 1 deletion juniper_warp/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -717,7 +717,7 @@ mod tests_http_harness {
}

fn make_request(&self, req: warp::test::RequestBuilder) -> TestResponse {
let mut rt = tokio::runtime::Runtime::new().expect("Failed to create tokio::Runtime");
let rt = tokio::runtime::Runtime::new().expect("Failed to create tokio::Runtime");
make_test_response(rt.block_on(async move {
req.filter(&self.filter).await.unwrap_or_else(|rejection| {
let code = if rejection.is_not_found() {
Expand Down

0 comments on commit 9be6b31

Please sign in to comment.