Skip to content
This repository has been archived by the owner on Feb 28, 2023. It is now read-only.

Commit

Permalink
feat: Update tokio to 1.0
Browse files Browse the repository at this point in the history
Followup on mozilla/sccache#985 to pull sccache the last little bit into tokio 1.0
  • Loading branch information
Markus Westerlind authored and drahnr committed Nov 13, 2021
1 parent 2c4b14f commit 203231b
Show file tree
Hide file tree
Showing 12 changed files with 319 additions and 524 deletions.
698 changes: 246 additions & 452 deletions Cargo.lock

Large diffs are not rendered by default.

24 changes: 12 additions & 12 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ base64 = "0.13"
bincode = "1"
blake3 = "0.3"
byteorder = "1.0"
bytes = "0.5"
bytes = "1"
chrono = { version = "0.4", optional = true }
clap = "2.23.0"
counted-array = "0.1"
Expand All @@ -46,8 +46,8 @@ futures-locks = "0.6"
fs-err = "2.6"
hmac = { version = "0.10", optional = true }
http = "0.2"
hyper = { version = "0.13", optional = true }
hyper-tls = { version = "0.4", optional = true }
hyper = { version = "0.14", optional = true, features = ["server", "client"] }
hyper-tls = { version = "0.5", optional = true }
hyperx = { version = "0.13", optional = true }
jobserver = "0.1"
jsonwebtoken = { version = "7", optional = true }
Expand All @@ -64,13 +64,13 @@ num_cpus = "1.0"
number_prefix = "0.4"
percent-encoding = { version = "2", optional = true }
rand = "0.8"
redis = { version = "0.17", optional = true, default-features = false, features = ["aio", "tls", "tokio-comp", "tokio-tls-comp"] }
redis = { version = "0.21", optional = true, default-features = false, features = ["aio", "tokio-comp"] }
regex = "1"
reqwest = { version = "0.10", features = ["json", "blocking"], optional = true }
reqwest = { version = "0.11", features = ["json", "blocking"], optional = true }
retry = "1"
ring = { version = "0.16", optional = true, features = ["std"] }
rusoto_core = { version = "0.45", optional = true }
rusoto_s3 = { version = "0.45", optional = true }
rusoto_core = { version = "0.47", optional = true }
rusoto_s3 = { version = "0.47", optional = true }
sha-1 = { version = "0.9", optional = true }
sha2 = { version = "0.9", optional = true }
serde = "1.0"
Expand All @@ -79,10 +79,10 @@ serde_json = "1.0"
strip-ansi-escapes = "0.1"
tar = "0.4"
tempfile = "3"
tokio = { version = "0.2", features = ["rt-threaded", "blocking", "io-util", "time", "uds", "tcp", "process", "macros"] }
tokio-serde = "0.6"
tokio-util = { version = "0.3", features = ["codec"] }
tower = "0.3"
tokio = { version = "1", features = ["rt-multi-thread", "io-util", "time", "net", "process", "macros"] }
tokio-serde = "0.8"
tokio-util = { version = "0.6", features = ["codec"] }
tower = "0.4"
toml = "0.5"
untrusted = { version = "0.7", optional = true }
url = { version = "2", optional = true }
Expand Down Expand Up @@ -119,7 +119,7 @@ serial_test = "0.5"
daemonize = "0.4"

[target.'cfg(windows)'.dependencies]
parity-tokio-ipc = "0.8"
parity-tokio-ipc = "0.9"

[target.'cfg(windows)'.dependencies.winapi]
version = "0.3"
Expand Down
2 changes: 1 addition & 1 deletion src/azure/blobstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ mod test {
container_name.to_string(),
);

let mut runtime = Runtime::new().unwrap();
let runtime = Runtime::new().unwrap();

let container = BlobContainer::new(creds.azure_blob_endpoint(), container_name).unwrap();

Expand Down
2 changes: 1 addition & 1 deletion src/cache/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl RedisCache {
let _ = parsed.set_password(Some("*****"));
}
Ok(RedisCache {
display_url: parsed.into_string(),
display_url: parsed.to_string(),
client: Client::open(url)?,
})
}
Expand Down
9 changes: 4 additions & 5 deletions src/cache/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use rusoto_s3::{GetObjectOutput, GetObjectRequest, PutObjectRequest, S3Client, S
use std::io;
use std::str::FromStr;
use std::time::{Duration, Instant};
use tokio::io::AsyncReadExt as _;
use tokio::io::AsyncReadExt;

use crate::errors::*;

Expand Down Expand Up @@ -84,13 +84,12 @@ impl S3Cache {

async fn read_object_output(output: GetObjectOutput) -> Result<Cache> {
let body = output.body.context("no HTTP body")?;
let mut body_bytes = Vec::new();
let mut body_reader = body.into_async_read();
let mut body = Vec::new();
body_reader
.read_to_end(&mut body)
body_reader.read_to_end(&mut body_bytes)
.await
.context("failed to read HTTP body")?;
let hit = CacheRead::from(io::Cursor::new(body))?;
let hit = CacheRead::from(io::Cursor::new(body_bytes))?;
Ok(Cache::Hit(hit))
}

Expand Down
14 changes: 6 additions & 8 deletions src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,12 @@ async fn read_server_startup_status<R: AsyncReadExt + Unpin>(
/// for it to start up.
#[cfg(not(windows))]
fn run_server_process() -> Result<ServerStartup> {
use futures::StreamExt;
use std::time::Duration;

trace!("run_server_process");
let tempdir = tempfile::Builder::new().prefix("cachepot").tempdir()?;
let socket_path = tempdir.path().join("sock");
let mut runtime = Runtime::new()?;
let runtime = Runtime::new()?;
let exe_path = env::current_exe()?;
let workdir = exe_path.parent().expect("executable path has no parent?!");
let _child = process::Command::new(&exe_path)
Expand All @@ -88,11 +87,10 @@ fn run_server_process() -> Result<ServerStartup> {
.spawn()?;

let startup = async move {
let mut listener = tokio::net::UnixListener::bind(&socket_path)?;
let socket = listener.incoming().next().await;
let socket = socket.unwrap(); // incoming() never returns None
let listener = tokio::net::UnixListener::bind(&socket_path)?;
let (socket, _) = listener.accept().await?;

read_server_startup_status(socket?).await
read_server_startup_status(socket).await
};

let timeout = Duration::from_millis(SERVER_STARTUP_TIMEOUT_MS.into());
Expand Down Expand Up @@ -155,7 +153,7 @@ fn run_server_process() -> Result<ServerStartup> {
trace!("run_server_process");

// Create a mini event loop and register our named pipe server
let mut runtime = Runtime::new()?;
let runtime = Runtime::new()?;
let pipe_name = format!(r"\\.\pipe\{}", Uuid::new_v4().to_simple_ref());

// Spawn a server which should come back and connect to us
Expand Down Expand Up @@ -661,7 +659,7 @@ pub fn run_command(cmd: Command) -> Result<i32> {
use crate::compiler;

trace!("Command::PackageToolchain({})", executable.display());
let mut runtime = Runtime::new()?;
let runtime = Runtime::new()?;
let jobserver = unsafe { Client::new() };
let creator = ProcessCommandCreator::new(&jobserver);
let env: Vec<_> = env::vars_os().collect();
Expand Down
10 changes: 5 additions & 5 deletions src/compiler/compiler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1383,7 +1383,7 @@ LLVM version: 6.0",
let _ = env_logger::Builder::new().is_test(true).try_init();
let creator = new_creator();
let f = TestFixture::new();
let mut runtime = Runtime::new().unwrap();
let runtime = Runtime::new().unwrap();
let pool = runtime.handle().clone();
let storage = DiskCache::new(&f.tempdir.path().join("cache"), u64::MAX, &pool);
let storage = Arc::new(storage);
Expand Down Expand Up @@ -1493,7 +1493,7 @@ LLVM version: 6.0",
let _ = env_logger::Builder::new().is_test(true).try_init();
let creator = new_creator();
let f = TestFixture::new();
let mut runtime = Runtime::new().unwrap();
let runtime = Runtime::new().unwrap();
let pool = runtime.handle().clone();
let storage = DiskCache::new(&f.tempdir.path().join("cache"), u64::MAX, &pool);
let storage = Arc::new(storage);
Expand Down Expand Up @@ -1599,7 +1599,7 @@ LLVM version: 6.0",
let _ = env_logger::Builder::new().is_test(true).try_init();
let creator = new_creator();
let f = TestFixture::new();
let mut runtime = Runtime::new().unwrap();
let runtime = Runtime::new().unwrap();
let pool = runtime.handle().clone();
let storage = MockStorage::new();
let storage: Arc<MockStorage> = Arc::new(storage);
Expand Down Expand Up @@ -1676,7 +1676,7 @@ LLVM version: 6.0",
let _ = env_logger::Builder::new().is_test(true).try_init();
let creator = new_creator();
let f = TestFixture::new();
let mut runtime = single_threaded_runtime();
let runtime = single_threaded_runtime();
let pool = runtime.handle().clone();
let storage = DiskCache::new(&f.tempdir.path().join("cache"), u64::MAX, &pool);
let storage = Arc::new(storage);
Expand Down Expand Up @@ -1786,7 +1786,7 @@ LLVM version: 6.0",
let _ = env_logger::Builder::new().is_test(true).try_init();
let creator = new_creator();
let f = TestFixture::new();
let mut runtime = single_threaded_runtime();
let runtime = single_threaded_runtime();
let pool = runtime.handle().clone();
let storage = DiskCache::new(&f.tempdir.path().join("cache"), u64::MAX, &pool);
let storage = Arc::new(storage);
Expand Down
21 changes: 13 additions & 8 deletions src/dist/client_auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -503,10 +503,13 @@ pub fn get_token_oauth2_code_grant_pkce(
mut auth_url: Url,
token_url: &str,
) -> Result<String> {
let mut runtime = Runtime::new()?;
let server = runtime
.enter(try_bind)?
.serve(make_service!(code_grant_pkce::serve));
let runtime = Runtime::new()?;
let builder = {
let _guard = runtime.enter();
try_bind()?
};
let server = builder.serve(make_service!(code_grant_pkce::serve));

let port = server.local_addr().port();

let redirect_uri = format!("http://localhost:{}/redirect", port);
Expand Down Expand Up @@ -554,10 +557,12 @@ pub fn get_token_oauth2_code_grant_pkce(

// https://auth0.com/docs/api-auth/tutorials/implicit-grant
pub fn get_token_oauth2_implicit(client_id: &str, mut auth_url: Url) -> Result<String> {
let mut runtime = Runtime::new()?;
let server = runtime
.enter(try_bind)?
.serve(make_service!(implicit::serve));
let runtime = Runtime::new()?;
let builder = {
let _guard = runtime.enter();
try_bind()?
};
let server = builder.serve(make_service!(implicit::serve));

let port = server.local_addr().port();

Expand Down
3 changes: 1 addition & 2 deletions src/jobserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ impl Client {
let helper = inner
.clone()
.into_helper_thread(move |token| {
let mut rt = tokio::runtime::Builder::new()
.basic_scheduler()
let rt = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();
rt.block_on(async {
Expand Down
4 changes: 2 additions & 2 deletions src/mock_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,8 @@ impl CommandChild for Child {
}

async fn wait(self) -> io::Result<ExitStatus> {
let Child { inner, token } = self;
inner.await.map(|ret| {
let Child { mut inner, token } = self;
inner.wait().await.map(|ret| {
drop(token);
ret
})
Expand Down
49 changes: 25 additions & 24 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::util;
use crate::util::fs::metadata;
#[cfg(feature = "dist-client")]
use anyhow::Context as _;
use bytes::{buf::ext::BufMutExt, Bytes, BytesMut};
use bytes::{buf::BufMut, Bytes, BytesMut};
use filetime::FileTime;
use futures::channel::mpsc;
use futures::future::FutureExt;
Expand All @@ -54,11 +54,11 @@ use std::time::Duration;
#[cfg(feature = "dist-client")]
use std::time::Instant;
use std::u64;
use tokio::runtime::Runtime;
use tokio::{
io::{AsyncRead, AsyncWrite},
net::TcpListener,
time::{self, delay_for, Delay},
runtime::Runtime,
time::{self, sleep, Sleep},
};
use tokio_serde::Framed;
use tokio_util::codec::{length_delimited, LengthDelimitedCodec};
Expand Down Expand Up @@ -402,10 +402,9 @@ impl DistClientContainer {
pub fn start_server(config: &Config, port: u16) -> Result<()> {
info!("start_server: port: {}", port);
let client = unsafe { Client::new() };
let runtime = tokio::runtime::Builder::new()
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.threaded_scheduler()
.core_threads(std::cmp::max(20, 2 * num_cpus::get()))
.worker_threads(std::cmp::max(20, 2 * num_cpus::get()))
.build()?;
let pool = runtime.handle().clone();
let dist_client = DistClientContainer::new(config, &pool);
Expand Down Expand Up @@ -449,7 +448,7 @@ pub struct CachepotServer<C: CommandCreatorSync> {
impl<C: CommandCreatorSync> CachepotServer<C> {
pub fn new(
port: u16,
mut runtime: Runtime,
runtime: Runtime,
client: Client,
dist_client: DistClientContainer,
storage: Arc<dyn Storage>,
Expand Down Expand Up @@ -515,7 +514,7 @@ impl<C: CommandCreatorSync> CachepotServer<C> {
C: Send,
{
let CachepotServer {
mut runtime,
runtime,
listener,
rx,
service,
Expand All @@ -525,17 +524,19 @@ impl<C: CommandCreatorSync> CachepotServer<C> {

// Create our "server future" which will simply handle all incoming
// connections in separate tasks.
let server = listener.try_for_each(move |socket| {
trace!("incoming connection");
let conn = service.clone().bind(socket).map_err(|res| {
error!("Failed to bind socket: {}", res);
});

// We're not interested if the task panicked; immediately process
// another connection
let _ = tokio::spawn(conn);
async { Ok(()) }
});
let server = async move {
loop {
let (socket, _) = listener.accept().await?;
trace!("incoming connection");
let conn = service.clone().bind(socket).map_err(|res| {
error!("Failed to bind socket: {}", res);
});

// We're not interested if the task panicked; immediately process
// another connection
let _ = tokio::spawn(conn);
}
};

// Right now there's a whole bunch of ways to shut down this server for
// various purposes. These include:
Expand All @@ -557,7 +558,7 @@ impl<C: CommandCreatorSync> CachepotServer<C> {
ShutdownOrInactive {
rx,
timeout: if timeout != Duration::new(0, 0) {
Some(delay_for(timeout))
Some(Box::pin(sleep(timeout)))
} else {
None
},
Expand All @@ -571,7 +572,7 @@ impl<C: CommandCreatorSync> CachepotServer<C> {
futures::select! {
server = server.fuse() => server,
_res = shutdown.fuse() => Ok(()),
_res = shutdown_idle.fuse() => Ok(()),
_res = shutdown_idle.fuse() => Ok::<_, io::Error>(()),
}
})?;

Expand Down Expand Up @@ -1698,7 +1699,7 @@ impl<I: AsyncRead + AsyncWrite + Unpin> Sink<Frame<Response, Response>> for Cach

struct ShutdownOrInactive {
rx: mpsc::Receiver<ServerMessage>,
timeout: Option<Delay>,
timeout: Option<Pin<Box<Sleep>>>,
timeout_dur: Duration,
}

Expand All @@ -1713,7 +1714,7 @@ impl Future for ShutdownOrInactive {
Poll::Ready(Some(ServerMessage::Shutdown)) => return Poll::Ready(()),
Poll::Ready(Some(ServerMessage::Request)) => {
if self.timeout_dur != Duration::new(0, 0) {
self.timeout = Some(delay_for(self.timeout_dur));
self.timeout = Some(Box::pin(sleep(self.timeout_dur)));
}
}
// All services have shut down, in theory this isn't possible...
Expand All @@ -1722,7 +1723,7 @@ impl Future for ShutdownOrInactive {
}
match self.timeout {
None => Poll::Pending,
Some(ref mut timeout) => Pin::new(timeout).poll(cx),
Some(ref mut timeout) => timeout.as_mut().poll(cx),
}
}
}
Expand Down

0 comments on commit 203231b

Please sign in to comment.