From 6081f6c564b7f44c33240f1fde67dc32b2a8307f Mon Sep 17 00:00:00 2001 From: Sean McArthur Date: Thu, 19 Dec 2019 12:11:44 -0800 Subject: [PATCH] Replace futures-channel with tokio::sync in blocking client --- Cargo.toml | 5 +---- src/blocking/client.rs | 20 +++++++++----------- 2 files changed, 10 insertions(+), 15 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index be83beca7..11bf0cda4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,7 +25,7 @@ default-tls-vendored = ["default-tls", "native-tls/vendored"] #rustls-tls = ["hyper-rustls", "tokio-rustls", "webpki-roots", "rustls", "tls"] -blocking = ["futures-channel", "futures-util/io", "tokio/rt-threaded", "tokio/rt-core"] +blocking = ["futures-util/io", "tokio/rt-threaded", "tokio/rt-core", "tokio/sync"] cookies = ["cookie_crate", "cookie_store"] @@ -83,9 +83,6 @@ tokio-tls = { version = "0.3.0", optional = true } #tokio-rustls = { version = "=0.12.0-alpha.2", optional = true } #webpki-roots = { version = "0.17", optional = true } -## blocking -futures-channel = { version = "0.3.0", optional = true } - ## cookies cookie_crate = { version = "0.12", package = "cookie", optional = true } cookie_store = { version = "0.10", optional = true } diff --git a/src/blocking/client.rs b/src/blocking/client.rs index 1d80c8b3e..884c89497 100644 --- a/src/blocking/client.rs +++ b/src/blocking/client.rs @@ -5,10 +5,8 @@ use std::sync::Arc; use std::thread; use std::time::Duration; -use futures_channel::{mpsc, oneshot}; -use futures_util::{StreamExt, TryFutureExt}; - use log::{error, trace}; +use tokio::sync::{mpsc, oneshot}; use super::request::{Request, RequestBuilder}; use super::response::Response; @@ -588,7 +586,7 @@ impl ClientHandle { fn new(builder: ClientBuilder) -> crate::Result { let timeout = builder.timeout; let builder = builder.inner; - let (tx, rx) = mpsc::unbounded::<(async_impl::Request, OneshotResponse)>(); + let (tx, rx) = mpsc::unbounded_channel::<(async_impl::Request, OneshotResponse)>(); let (spawn_tx, spawn_rx) = oneshot::channel::>(); let handle = thread::Builder::new() .name("reqwest-internal-sync-runtime".into()) @@ -621,7 +619,7 @@ impl ClientHandle { let mut rx = rx; - while let Some((req, req_tx)) = rx.next().await { + while let Some((req, req_tx)) = rx.recv().await { let req_fut = client.execute(req); tokio::spawn(forward(req_fut, req_tx)); } @@ -659,7 +657,7 @@ impl ClientHandle { .tx .as_ref() .expect("core thread exited early") - .unbounded_send((req, tx)) + .send((req, tx)) .expect("core thread panicked"); let result: Result, wait::Waited> = @@ -670,10 +668,10 @@ impl ClientHandle { }; wait::timeout(f, self.timeout.0) } else { - wait::timeout( - rx.map_err(|_canceled| event_loop_panicked()), - self.timeout.0, - ) + let f = async move { + rx.await.map_err(|_canceled| event_loop_panicked()) + }; + wait::timeout(f, self.timeout.0) }; match result { @@ -703,7 +701,7 @@ where Poll::Ready(val) => Poll::Ready(Some(val)), Poll::Pending => { // check if the callback is canceled - futures_core::ready!(tx.poll_canceled(cx)); + futures_core::ready!(tx.poll_closed(cx)); Poll::Ready(None) } }