diff --git a/Cargo.toml b/Cargo.toml index 233f78ff5..3dfa8bd33 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,7 +28,7 @@ native-tls-vendored = ["native-tls", "native-tls-crate/vendored"] rustls-tls = ["hyper-rustls", "tokio-rustls", "webpki-roots", "rustls", "__tls"] -blocking = ["futures-channel", "futures-util/io", "tokio/rt-threaded", "tokio/rt-core"] +blocking = ["futures-util/io", "tokio/rt-threaded", "tokio/rt-core", "tokio/sync"] cookies = ["cookie_crate", "cookie_store"] @@ -89,9 +89,6 @@ rustls = { version = "0.16", features = ["dangerous_configuration"], optional = tokio-rustls = { version = "0.12", optional = true } webpki-roots = { version = "0.17", optional = true } -## blocking -futures-channel = { version = "0.3.0", optional = true } - ## cookies cookie_crate = { version = "0.12", package = "cookie", optional = true } cookie_store = { version = "0.10", optional = true } diff --git a/src/blocking/client.rs b/src/blocking/client.rs index 9aa457c5f..0f29d22aa 100644 --- a/src/blocking/client.rs +++ b/src/blocking/client.rs @@ -5,10 +5,8 @@ use std::sync::Arc; use std::thread; use std::time::Duration; -use futures_channel::{mpsc, oneshot}; -use futures_util::{StreamExt, TryFutureExt}; - use log::{error, trace}; +use tokio::sync::{mpsc, oneshot}; use super::request::{Request, RequestBuilder}; use super::response::Response; @@ -562,7 +560,7 @@ impl ClientHandle { fn new(builder: ClientBuilder) -> crate::Result { let timeout = builder.timeout; let builder = builder.inner; - let (tx, rx) = mpsc::unbounded::<(async_impl::Request, OneshotResponse)>(); + let (tx, rx) = mpsc::unbounded_channel::<(async_impl::Request, OneshotResponse)>(); let (spawn_tx, spawn_rx) = oneshot::channel::>(); let handle = thread::Builder::new() .name("reqwest-internal-sync-runtime".into()) @@ -595,7 +593,7 @@ impl ClientHandle { let mut rx = rx; - while let Some((req, req_tx)) = rx.next().await { + while let Some((req, req_tx)) = rx.recv().await { let req_fut = client.execute(req); tokio::spawn(forward(req_fut, req_tx)); } @@ -633,7 +631,7 @@ impl ClientHandle { .tx .as_ref() .expect("core thread exited early") - .unbounded_send((req, tx)) + .send((req, tx)) .expect("core thread panicked"); let result: Result, wait::Waited> = @@ -644,10 +642,10 @@ impl ClientHandle { }; wait::timeout(f, self.timeout.0) } else { - wait::timeout( - rx.map_err(|_canceled| event_loop_panicked()), - self.timeout.0, - ) + let f = async move { + rx.await.map_err(|_canceled| event_loop_panicked()) + }; + wait::timeout(f, self.timeout.0) }; match result { @@ -677,7 +675,7 @@ where Poll::Ready(val) => Poll::Ready(Some(val)), Poll::Pending => { // check if the callback is canceled - futures_core::ready!(tx.poll_canceled(cx)); + futures_core::ready!(tx.poll_closed(cx)); Poll::Ready(None) } }