Skip to content

Commit

Permalink
Replace futures-channel with tokio::sync in blocking client (#748)
Browse files Browse the repository at this point in the history
  • Loading branch information
seanmonstar committed Dec 23, 2019
1 parent 0ab5df3 commit 47734f5
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 15 deletions.
5 changes: 1 addition & 4 deletions Cargo.toml
Expand Up @@ -28,7 +28,7 @@ native-tls-vendored = ["native-tls", "native-tls-crate/vendored"]

rustls-tls = ["hyper-rustls", "tokio-rustls", "webpki-roots", "rustls", "__tls"]

blocking = ["futures-channel", "futures-util/io", "tokio/rt-threaded", "tokio/rt-core"]
blocking = ["futures-util/io", "tokio/rt-threaded", "tokio/rt-core", "tokio/sync"]

cookies = ["cookie_crate", "cookie_store"]

Expand Down Expand Up @@ -89,9 +89,6 @@ rustls = { version = "0.16", features = ["dangerous_configuration"], optional =
tokio-rustls = { version = "0.12", optional = true }
webpki-roots = { version = "0.17", optional = true }

## blocking
futures-channel = { version = "0.3.0", optional = true }

## cookies
cookie_crate = { version = "0.12", package = "cookie", optional = true }
cookie_store = { version = "0.10", optional = true }
Expand Down
20 changes: 9 additions & 11 deletions src/blocking/client.rs
Expand Up @@ -5,10 +5,8 @@ use std::sync::Arc;
use std::thread;
use std::time::Duration;

use futures_channel::{mpsc, oneshot};
use futures_util::{StreamExt, TryFutureExt};

use log::{error, trace};
use tokio::sync::{mpsc, oneshot};

use super::request::{Request, RequestBuilder};
use super::response::Response;
Expand Down Expand Up @@ -562,7 +560,7 @@ impl ClientHandle {
fn new(builder: ClientBuilder) -> crate::Result<ClientHandle> {
let timeout = builder.timeout;
let builder = builder.inner;
let (tx, rx) = mpsc::unbounded::<(async_impl::Request, OneshotResponse)>();
let (tx, rx) = mpsc::unbounded_channel::<(async_impl::Request, OneshotResponse)>();
let (spawn_tx, spawn_rx) = oneshot::channel::<crate::Result<()>>();
let handle = thread::Builder::new()
.name("reqwest-internal-sync-runtime".into())
Expand Down Expand Up @@ -595,7 +593,7 @@ impl ClientHandle {

let mut rx = rx;

while let Some((req, req_tx)) = rx.next().await {
while let Some((req, req_tx)) = rx.recv().await {
let req_fut = client.execute(req);
tokio::spawn(forward(req_fut, req_tx));
}
Expand Down Expand Up @@ -633,7 +631,7 @@ impl ClientHandle {
.tx
.as_ref()
.expect("core thread exited early")
.unbounded_send((req, tx))
.send((req, tx))
.expect("core thread panicked");

let result: Result<crate::Result<async_impl::Response>, wait::Waited<crate::Error>> =
Expand All @@ -644,10 +642,10 @@ impl ClientHandle {
};
wait::timeout(f, self.timeout.0)
} else {
wait::timeout(
rx.map_err(|_canceled| event_loop_panicked()),
self.timeout.0,
)
let f = async move {
rx.await.map_err(|_canceled| event_loop_panicked())
};
wait::timeout(f, self.timeout.0)
};

match result {
Expand Down Expand Up @@ -677,7 +675,7 @@ where
Poll::Ready(val) => Poll::Ready(Some(val)),
Poll::Pending => {
// check if the callback is canceled
futures_core::ready!(tx.poll_canceled(cx));
futures_core::ready!(tx.poll_closed(cx));
Poll::Ready(None)
}
}
Expand Down

0 comments on commit 47734f5

Please sign in to comment.