Skip to content

Commit

Permalink
feat: Update tokio to 1.0
Browse files Browse the repository at this point in the history
Followup on mozilla#985 to pull sccache the last little bit into tokio 1.0
  • Loading branch information
Markus Westerlind committed Nov 12, 2021
1 parent 0192fdd commit 16c7817
Show file tree
Hide file tree
Showing 12 changed files with 696 additions and 903 deletions.
1,468 changes: 629 additions & 839 deletions Cargo.lock

Large diffs are not rendered by default.

18 changes: 9 additions & 9 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ base64 = "0.13"
bincode = "1"
blake3 = "0.3"
byteorder = "1.0"
bytes = "0.5"
bytes = "1"
chrono = { version = "0.4", optional = true }
clap = "2.23.0"
counted-array = "0.1"
Expand All @@ -42,7 +42,7 @@ futures = "0.3"
futures-locks = "0.6"
hmac = { version = "0.10", optional = true }
http = "0.2"
hyper = { version = "0.13", optional = true }
hyper = { version = "0.14", optional = true, features = ["server"] }
hyperx = { version = "0.13", optional = true }
jobserver = "0.1"
jsonwebtoken = { version = "7", optional = true }
Expand All @@ -58,9 +58,9 @@ number_prefix = "0.4"
openssl = { version = "0.10", optional = true }
percent-encoding = { version = "2", optional = true }
rand = "0.7"
redis = { version = "0.17", optional = true, default-features = false, features = ["aio", "tls", "tokio-comp", "tokio-tls-comp"] }
redis = { version = "0.21", optional = true, default-features = false, features = ["aio", "tokio-comp", "tokio-native-tls-comp"] }
regex = "1"
reqwest = { version = "0.10", features = ["json", "blocking"], optional = true }
reqwest = { version = "0.11", features = ["json", "blocking"], optional = true }
retry = "1"
ring = { version = "0.16", optional = true, features = ["std"] }
sha-1 = { version = "0.9", optional = true }
Expand All @@ -71,10 +71,10 @@ serde_json = "1.0"
strip-ansi-escapes = "0.1"
tar = "0.4"
tempfile = "3"
tokio = { version = "0.2", features = ["rt-threaded", "blocking", "io-util", "time", "uds", "tcp", "process", "macros"] }
tokio-serde = "0.6"
tokio-util = { version = "0.3", features = ["codec"] }
tower = "0.3"
tokio = { version = "1", features = ["rt-multi-thread", "io-util", "time", "net", "process", "macros"] }
tokio-serde = "0.8"
tokio-util = { version = "0.6", features = ["codec"] }
tower = "0.4"
toml = "0.5"
untrusted = { version = "0.7", optional = true }
url = { version = "2", optional = true }
Expand Down Expand Up @@ -106,7 +106,7 @@ selenium-rs = "0.1"
daemonize = "0.4"

[target.'cfg(windows)'.dependencies]
parity-tokio-ipc = "0.8"
parity-tokio-ipc = "0.9"

[target.'cfg(windows)'.dependencies.winapi]
version = "0.3"
Expand Down
2 changes: 1 addition & 1 deletion src/azure/blobstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ mod test {
container_name.to_string(),
);

let mut runtime = Runtime::new().unwrap();
let runtime = Runtime::new().unwrap();

let container = BlobContainer::new(creds.azure_blob_endpoint(), container_name).unwrap();

Expand Down
2 changes: 1 addition & 1 deletion src/bin/sccache-dist/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ fn init_logging() {
if env::var("RUST_LOG").is_ok() {
match env_logger::try_init() {
Ok(_) => (),
Err(e) => panic!(format!("Failed to initalize logging: {:?}", e)),
Err(e) => panic!("Failed to initalize logging: {:?}", e),
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/cache/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl RedisCache {
let _ = parsed.set_password(Some("*****"));
}
Ok(RedisCache {
display_url: parsed.into_string(),
display_url: parsed.to_string(),
client: Client::open(url)?,
})
}
Expand Down
13 changes: 6 additions & 7 deletions src/commands.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ fn run_server_process() -> Result<ServerStartup> {
trace!("run_server_process");
let tempdir = tempfile::Builder::new().prefix("sccache").tempdir()?;
let socket_path = tempdir.path().join("sock");
let mut runtime = Runtime::new()?;
let runtime = Runtime::new()?;
let exe_path = env::current_exe()?;
let workdir = exe_path.parent().expect("executable path has no parent?!");
let _child = process::Command::new(&exe_path)
Expand All @@ -88,11 +88,10 @@ fn run_server_process() -> Result<ServerStartup> {
.spawn()?;

let startup = async move {
let mut listener = tokio::net::UnixListener::bind(&socket_path)?;
let socket = listener.incoming().next().await;
let socket = socket.unwrap(); // incoming() never returns None
let listener = tokio::net::UnixListener::bind(&socket_path)?;
let (socket, _) = listener.accept().await?;

read_server_startup_status(socket?).await
read_server_startup_status(socket).await
};

let timeout = Duration::from_millis(SERVER_STARTUP_TIMEOUT_MS.into());
Expand Down Expand Up @@ -155,7 +154,7 @@ fn run_server_process() -> Result<ServerStartup> {
trace!("run_server_process");

// Create a mini event loop and register our named pipe server
let mut runtime = Runtime::new()?;
let runtime = Runtime::new()?;
let pipe_name = format!(r"\\.\pipe\{}", Uuid::new_v4().to_simple_ref());

// Spawn a server which should come back and connect to us
Expand Down Expand Up @@ -661,7 +660,7 @@ pub fn run_command(cmd: Command) -> Result<i32> {
use crate::compiler;

trace!("Command::PackageToolchain({})", executable.display());
let mut runtime = Runtime::new()?;
let runtime = Runtime::new()?;
let jobserver = unsafe { Client::new() };
let creator = ProcessCommandCreator::new(&jobserver);
let env: Vec<_> = env::vars_os().collect();
Expand Down
10 changes: 5 additions & 5 deletions src/compiler/compiler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1382,7 +1382,7 @@ LLVM version: 6.0",
drop(env_logger::try_init());
let creator = new_creator();
let f = TestFixture::new();
let mut runtime = Runtime::new().unwrap();
let runtime = Runtime::new().unwrap();
let pool = runtime.handle().clone();
let storage = DiskCache::new(&f.tempdir.path().join("cache"), u64::MAX, &pool);
let storage = Arc::new(storage);
Expand Down Expand Up @@ -1492,7 +1492,7 @@ LLVM version: 6.0",
drop(env_logger::try_init());
let creator = new_creator();
let f = TestFixture::new();
let mut runtime = Runtime::new().unwrap();
let runtime = Runtime::new().unwrap();
let pool = runtime.handle().clone();
let storage = DiskCache::new(&f.tempdir.path().join("cache"), u64::MAX, &pool);
let storage = Arc::new(storage);
Expand Down Expand Up @@ -1598,7 +1598,7 @@ LLVM version: 6.0",
drop(env_logger::try_init());
let creator = new_creator();
let f = TestFixture::new();
let mut runtime = Runtime::new().unwrap();
let runtime = Runtime::new().unwrap();
let pool = runtime.handle().clone();
let storage = MockStorage::new();
let storage: Arc<MockStorage> = Arc::new(storage);
Expand Down Expand Up @@ -1675,7 +1675,7 @@ LLVM version: 6.0",
drop(env_logger::try_init());
let creator = new_creator();
let f = TestFixture::new();
let mut runtime = single_threaded_runtime();
let runtime = single_threaded_runtime();
let pool = runtime.handle().clone();
let storage = DiskCache::new(&f.tempdir.path().join("cache"), u64::MAX, &pool);
let storage = Arc::new(storage);
Expand Down Expand Up @@ -1785,7 +1785,7 @@ LLVM version: 6.0",
drop(env_logger::try_init());
let creator = new_creator();
let f = TestFixture::new();
let mut runtime = single_threaded_runtime();
let runtime = single_threaded_runtime();
let pool = runtime.handle().clone();
let storage = DiskCache::new(&f.tempdir.path().join("cache"), u64::MAX, &pool);
let storage = Arc::new(storage);
Expand Down
21 changes: 13 additions & 8 deletions src/dist/client_auth.rs
Original file line number Diff line number Diff line change
Expand Up @@ -503,10 +503,13 @@ pub fn get_token_oauth2_code_grant_pkce(
mut auth_url: Url,
token_url: &str,
) -> Result<String> {
let mut runtime = Runtime::new()?;
let server = runtime
.enter(try_bind)?
.serve(make_service!(code_grant_pkce::serve));
let runtime = Runtime::new()?;
let builder = {
let _guard = runtime.enter();
try_bind()?
};
let server = builder.serve(make_service!(code_grant_pkce::serve));

let port = server.local_addr().port();

let redirect_uri = format!("http://localhost:{}/redirect", port);
Expand Down Expand Up @@ -554,10 +557,12 @@ pub fn get_token_oauth2_code_grant_pkce(

// https://auth0.com/docs/api-auth/tutorials/implicit-grant
pub fn get_token_oauth2_implicit(client_id: &str, mut auth_url: Url) -> Result<String> {
let mut runtime = Runtime::new()?;
let server = runtime
.enter(try_bind)?
.serve(make_service!(implicit::serve));
let runtime = Runtime::new()?;
let builder = {
let _guard = runtime.enter();
try_bind()?
};
let server = builder.serve(make_service!(implicit::serve));

let port = server.local_addr().port();

Expand Down
3 changes: 1 addition & 2 deletions src/jobserver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,7 @@ impl Client {
let helper = inner
.clone()
.into_helper_thread(move |token| {
let mut rt = tokio::runtime::Builder::new()
.basic_scheduler()
let rt = tokio::runtime::Builder::new_current_thread()
.build()
.unwrap();
rt.block_on(async {
Expand Down
4 changes: 2 additions & 2 deletions src/mock_command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,8 +164,8 @@ impl CommandChild for Child {
}

async fn wait(self) -> io::Result<ExitStatus> {
let Child { inner, token } = self;
inner.await.map(|ret| {
let Child { mut inner, token } = self;
inner.wait().await.map(|ret| {
drop(token);
ret
})
Expand Down
49 changes: 25 additions & 24 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use crate::protocol::{Compile, CompileFinished, CompileResponse, Request, Respon
use crate::util;
#[cfg(feature = "dist-client")]
use anyhow::Context as _;
use bytes::{buf::ext::BufMutExt, Bytes, BytesMut};
use bytes::{buf::BufMut, Bytes, BytesMut};
use filetime::FileTime;
use futures::channel::mpsc;
use futures::future::FutureExt;
Expand All @@ -54,11 +54,11 @@ use std::time::Duration;
#[cfg(feature = "dist-client")]
use std::time::Instant;
use std::u64;
use tokio::runtime::Runtime;
use tokio::{
io::{AsyncRead, AsyncWrite},
net::TcpListener,
time::{self, delay_for, Delay},
runtime::Runtime,
time::{self, sleep, Sleep},
};
use tokio_serde::Framed;
use tokio_util::codec::{length_delimited, LengthDelimitedCodec};
Expand Down Expand Up @@ -402,10 +402,9 @@ impl DistClientContainer {
pub fn start_server(config: &Config, port: u16) -> Result<()> {
info!("start_server: port: {}", port);
let client = unsafe { Client::new() };
let runtime = tokio::runtime::Builder::new()
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.threaded_scheduler()
.core_threads(std::cmp::max(20, 2 * num_cpus::get()))
.worker_threads(std::cmp::max(20, 2 * num_cpus::get()))
.build()?;
let pool = runtime.handle().clone();
let dist_client = DistClientContainer::new(config, &pool);
Expand Down Expand Up @@ -449,7 +448,7 @@ pub struct SccacheServer<C: CommandCreatorSync> {
impl<C: CommandCreatorSync> SccacheServer<C> {
pub fn new(
port: u16,
mut runtime: Runtime,
runtime: Runtime,
client: Client,
dist_client: DistClientContainer,
storage: Arc<dyn Storage>,
Expand Down Expand Up @@ -515,7 +514,7 @@ impl<C: CommandCreatorSync> SccacheServer<C> {
C: Send,
{
let SccacheServer {
mut runtime,
runtime,
listener,
rx,
service,
Expand All @@ -525,17 +524,19 @@ impl<C: CommandCreatorSync> SccacheServer<C> {

// Create our "server future" which will simply handle all incoming
// connections in separate tasks.
let server = listener.try_for_each(move |socket| {
trace!("incoming connection");
let conn = service.clone().bind(socket).map_err(|res| {
error!("Failed to bind socket: {}", res);
});

// We're not interested if the task panicked; immediately process
// another connection
let _ = tokio::spawn(conn);
async { Ok(()) }
});
let server = async move {
loop {
let (socket, _) = listener.accept().await?;
trace!("incoming connection");
let conn = service.clone().bind(socket).map_err(|res| {
error!("Failed to bind socket: {}", res);
});

// We're not interested if the task panicked; immediately process
// another connection
let _ = tokio::spawn(conn);
}
};

// Right now there's a whole bunch of ways to shut down this server for
// various purposes. These include:
Expand All @@ -557,7 +558,7 @@ impl<C: CommandCreatorSync> SccacheServer<C> {
ShutdownOrInactive {
rx,
timeout: if timeout != Duration::new(0, 0) {
Some(delay_for(timeout))
Some(Box::pin(sleep(timeout)))
} else {
None
},
Expand All @@ -571,7 +572,7 @@ impl<C: CommandCreatorSync> SccacheServer<C> {
futures::select! {
server = server.fuse() => server,
_res = shutdown.fuse() => Ok(()),
_res = shutdown_idle.fuse() => Ok(()),
_res = shutdown_idle.fuse() => Ok::<_, io::Error>(()),
}
})?;

Expand Down Expand Up @@ -1695,7 +1696,7 @@ impl<I: AsyncRead + AsyncWrite + Unpin> Sink<Frame<Response, Response>> for Scca

struct ShutdownOrInactive {
rx: mpsc::Receiver<ServerMessage>,
timeout: Option<Delay>,
timeout: Option<Pin<Box<Sleep>>>,
timeout_dur: Duration,
}

Expand All @@ -1710,7 +1711,7 @@ impl Future for ShutdownOrInactive {
Poll::Ready(Some(ServerMessage::Shutdown)) => return Poll::Ready(()),
Poll::Ready(Some(ServerMessage::Request)) => {
if self.timeout_dur != Duration::new(0, 0) {
self.timeout = Some(delay_for(self.timeout_dur));
self.timeout = Some(Box::pin(sleep(self.timeout_dur)));
}
}
// All services have shut down, in theory this isn't possible...
Expand All @@ -1719,7 +1720,7 @@ impl Future for ShutdownOrInactive {
}
match self.timeout {
None => Poll::Pending,
Some(ref mut timeout) => Pin::new(timeout).poll(cx),
Some(ref mut timeout) => timeout.as_mut().poll(cx),
}
}
}
Expand Down
7 changes: 3 additions & 4 deletions src/test/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,10 +231,9 @@ impl TestFixture {
}

pub fn single_threaded_runtime() -> tokio::runtime::Runtime {
tokio::runtime::Builder::new()
tokio::runtime::Builder::new_current_thread()
.enable_all()
.basic_scheduler()
.core_threads(1)
.worker_threads(1)
.build()
.unwrap()
}
Expand All @@ -252,7 +251,7 @@ where
T: Future<Output = O>,
{
fn wait(self) -> O {
let mut rt = single_threaded_runtime();
let rt = single_threaded_runtime();
rt.block_on(self)
}
}
Expand Down

0 comments on commit 16c7817

Please sign in to comment.