Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate to stable Future trait, async/await and Tokio 0.2 runtime #985

Merged
merged 8 commits into from
Nov 10, 2021
Merged
692 changes: 365 additions & 327 deletions Cargo.lock

Large diffs are not rendered by default.

29 changes: 13 additions & 16 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,26 @@ required-features = ["dist-server"]
[dependencies]
anyhow = "1.0"
ar = { version = "0.8", optional = true }
async-trait = "0.1"
atty = "0.2.6"
base64 = "0.13"
bincode = "1"
blake3 = "0.3"
byteorder = "1.0"
bytes = "0.5"
chrono = { version = "0.4", optional = true }
clap = "2.23.0"
counted-array = "0.1"
directories = "3"
env_logger = "0.8"
filetime = "0.2"
flate2 = { version = "1.0", optional = true, default-features = false, features = ["rust_backend"] }
futures = "0.1.11"
futures_03 = { package = "futures", version = "0.3", features = ["compat", "thread-pool"] }
futures = "0.3"
futures-locks = "0.6"
hmac = { version = "0.10", optional = true }
http = "0.1"
hyper = { version = "0.12", optional = true }
hyperx = { version = "0.12", optional = true }
http = "0.2"
hyper = { version = "0.13", optional = true }
hyperx = { version = "0.13", optional = true }
jobserver = "0.1"
jsonwebtoken = { version = "7", optional = true }
lazy_static = "1.0.0"
Expand All @@ -58,7 +60,7 @@ percent-encoding = { version = "2", optional = true }
rand = "0.7"
redis = { version = "0.17", optional = true }
regex = "1"
reqwest = { version = "0.9.11", optional = true }
reqwest = { version = "0.10", features = ["json", "blocking"], optional = true }
retry = "1"
ring = { version = "0.16", optional = true, features = ["std"] }
sha-1 = { version = "0.9", optional = true }
Expand All @@ -69,13 +71,10 @@ serde_json = "1.0"
strip-ansi-escapes = "0.1"
tar = "0.4"
tempfile = "3"
tokio-compat = "0.1"
tokio-io = "0.1"
tokio-process = "0.2"
tokio-serde-bincode = "0.1"
tower = "0.1"
tokio-tcp = "0.1"
tokio-timer = "0.2"
tokio = { version = "0.2", features = ["rt-threaded", "blocking", "io-util", "time", "uds", "tcp", "process", "macros"] }
tokio-serde = "0.6"
tokio-util = { version = "0.3", features = ["codec"] }
tower = "0.3"
toml = "0.5"
untrusted = { version = "0.7", optional = true }
url = { version = "2", optional = true }
Expand Down Expand Up @@ -105,11 +104,9 @@ selenium-rs = "0.1"

[target.'cfg(unix)'.dependencies]
daemonize = "0.4"
tokio-uds = "0.2"

[target.'cfg(windows)'.dependencies]
tokio-named-pipes = "0.1"
tokio-reactor = "0.1"
parity-tokio-ipc = "0.8"

[target.'cfg(windows)'.dependencies.winapi]
version = "0.3"
Expand Down
82 changes: 32 additions & 50 deletions src/azure/blobstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,11 @@
// limitations under the License.

use crate::azure::credentials::*;
use futures::{Future, Stream};
use hmac::{Hmac, Mac, NewMac};
use hyper::header::HeaderValue;
use hyper::Method;
use hyperx::header;
use md5::{Digest, Md5};
use reqwest::r#async::{Client, Request};
use reqwest::Url;
use reqwest::{header::HeaderValue, Client, Method, Request};
use sha2::Sha256;
use std::fmt;
use std::str::FromStr;
Expand Down Expand Up @@ -72,7 +69,7 @@ impl BlobContainer {
})
}

pub fn get(&self, key: &str, creds: &AzureCredentials) -> SFuture<Vec<u8>> {
pub async fn get(&self, key: &str, creds: &AzureCredentials) -> Result<Vec<u8>> {
let url_string = format!("{}{}", self.url, key);
let uri = Url::from_str(&url_string).unwrap();
let dt = chrono::Utc::now();
Expand All @@ -90,10 +87,7 @@ impl BlobContainer {
creds,
);

let uri_copy = uri.clone();
let uri_second_copy = uri.clone();

let mut request = Request::new(Method::GET, uri);
let mut request = Request::new(Method::GET, uri.clone());
request.headers_mut().insert(
"x-ms-date",
HeaderValue::from_str(&date).expect("Date is an invalid header value"),
Expand All @@ -108,46 +102,34 @@ impl BlobContainer {
);
}

Box::new(
self.client
.execute(request)
.fwith_context(move || format!("failed GET: {}", uri_copy))
.and_then(|res| {
if res.status().is_success() {
let content_length = res
.headers()
.get_hyperx::<header::ContentLength>()
.map(|header::ContentLength(len)| len);
Ok((res.into_body(), content_length))
} else {
Err(BadHttpStatusError(res.status()).into())
}
})
.and_then(|(body, content_length)| {
body.fold(Vec::new(), |mut body, chunk| {
body.extend_from_slice(&chunk);
Ok::<_, reqwest::Error>(body)
})
.fcontext("failed to read HTTP body")
.and_then(move |bytes| {
if let Some(len) = content_length {
if len != bytes.len() as u64 {
bail!(format!(
"Bad HTTP body size read: {}, expected {}",
bytes.len(),
len
));
} else {
info!("Read {} bytes from {}", bytes.len(), uri_second_copy);
}
}
Ok(bytes)
})
}),
)
let res = self
.client
.execute(request)
.await
.with_context(|| format!("failed GET: {}", &uri))?;

let (bytes, content_length) = if res.status().is_success() {
let content_length = res.content_length();
(res.bytes().await?, content_length)
} else {
return Err(BadHttpStatusError(res.status()).into());
};

if let Some(len) = content_length {
if len != bytes.len() as u64 {
bail!(format!(
"Bad HTTP body size read: {}, expected {}",
bytes.len(),
len
));
} else {
info!("Read {} bytes from {}", bytes.len(), &uri);
}
}
Ok(bytes.into_iter().collect())
}

pub fn put(&self, key: &str, content: Vec<u8>, creds: &AzureCredentials) -> SFuture<()> {
pub async fn put(&self, key: &str, content: Vec<u8>, creds: &AzureCredentials) -> Result<()> {
let url_string = format!("{}{}", self.url, key);
let uri = Url::from_str(&url_string).unwrap();
let dt = chrono::Utc::now();
Expand Down Expand Up @@ -206,7 +188,7 @@ impl BlobContainer {

*request.body_mut() = Some(content.into());

Box::new(self.client.execute(request).then(|result| match result {
match self.client.execute(request).await {
Ok(res) => {
if res.status().is_success() {
trace!("PUT succeeded");
Expand All @@ -220,7 +202,7 @@ impl BlobContainer {
trace!("PUT failed with error: {:?}", e);
Err(e.into())
}
}))
}
}
}

Expand Down Expand Up @@ -285,7 +267,7 @@ fn canonicalize_resource(uri: &Url, account_name: &str) -> String {
#[cfg(test)]
mod test {
use super::*;
use tokio_compat::runtime::current_thread::Runtime;
use tokio::runtime::Runtime;

#[test]
fn test_signing() {
Expand Down
22 changes: 9 additions & 13 deletions src/bin/sccache-dist/token_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ const MOZ_USERINFO_ENDPOINT: &str = "https://auth.mozilla.auth0.com/userinfo";
// Mozilla-specific check by forwarding the token onto the auth0 userinfo endpoint
pub struct MozillaCheck {
auth_cache: Mutex<HashMap<String, Instant>>, // token, token_expiry
client: reqwest::Client,
client: reqwest::blocking::Client,
required_groups: Vec<String>,
}

Expand All @@ -96,7 +96,7 @@ impl MozillaCheck {
pub fn new(required_groups: Vec<String>) -> Self {
Self {
auth_cache: Mutex::new(HashMap::new()),
client: reqwest::Client::new(),
client: reqwest::blocking::Client::new(),
required_groups,
}
}
Expand Down Expand Up @@ -145,22 +145,18 @@ impl MozillaCheck {
let header = hyperx::header::Authorization(hyperx::header::Bearer {
token: token.to_owned(),
});
let mut res = self
let res = self
.client
.get(url.clone())
.set_header(header)
.send()
.context("Failed to make request to mozilla userinfo")?;
let status = res.status();
let res_text = res
.text()
.context("Failed to interpret response from mozilla userinfo as string")?;
if !res.status().is_success() {
bail!(
"JWT forwarded to {} returned {}: {}",
url,
res.status(),
res_text
)
if !status.is_success() {
bail!("JWT forwarded to {} returned {}: {}", url, status, res_text)
}

// The API didn't return a HTTP error code, let's check the response
Expand Down Expand Up @@ -241,7 +237,7 @@ fn test_auth_verify_check_mozilla_profile() {
// Don't check a token is valid (it may not even be a JWT) just forward it to
// an API and check for success
pub struct ProxyTokenCheck {
client: reqwest::Client,
client: reqwest::blocking::Client,
maybe_auth_cache: Option<Mutex<(HashMap<String, Instant>, Duration)>>,
url: String,
}
Expand All @@ -265,7 +261,7 @@ impl ProxyTokenCheck {
let maybe_auth_cache: Option<Mutex<(HashMap<String, Instant>, Duration)>> =
cache_secs.map(|secs| Mutex::new((HashMap::new(), Duration::from_secs(secs))));
Self {
client: reqwest::Client::new(),
client: reqwest::blocking::Client::new(),
maybe_auth_cache,
url,
}
Expand Down Expand Up @@ -330,7 +326,7 @@ impl ClientAuthCheck for ValidJWTCheck {

impl ValidJWTCheck {
pub fn new(audience: String, issuer: String, jwks_url: &str) -> Result<Self> {
let mut res = reqwest::get(jwks_url).context("Failed to make request to JWKs url")?;
let res = reqwest::blocking::get(jwks_url).context("Failed to make request to JWKs url")?;
if !res.status().is_success() {
bail!("Could not retrieve JWKs, HTTP error: {}", res.status())
}
Expand Down
56 changes: 25 additions & 31 deletions src/cache/azure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,14 @@
use crate::azure::BlobContainer;
use crate::azure::*;
use crate::cache::{Cache, CacheRead, CacheWrite, Storage};
use futures::future::Future;
use std::io;
use std::rc::Rc;
use std::sync::Arc;
use std::time::{Duration, Instant};

use crate::errors::*;

pub struct AzureBlobCache {
container: Rc<BlobContainer>,
container: Arc<BlobContainer>,
credentials: AzureCredentials,
}

Expand All @@ -44,53 +43,48 @@ impl AzureBlobCache {
};

Ok(AzureBlobCache {
container: Rc::new(container),
container: Arc::new(container),
credentials,
})
}
}

#[async_trait]
impl Storage for AzureBlobCache {
fn get(&self, key: &str) -> SFuture<Cache> {
Box::new(
self.container
.get(key, &self.credentials)
.then(|result| match result {
Ok(data) => {
let hit = CacheRead::from(io::Cursor::new(data))?;
Ok(Cache::Hit(hit))
}
Err(e) => {
warn!("Got Azure error: {:?}", e);
Ok(Cache::Miss)
}
}),
)
async fn get(&self, key: &str) -> Result<Cache> {
match self.container.get(key, &self.credentials).await {
Ok(data) => {
let hit = CacheRead::from(io::Cursor::new(data))?;
Ok(Cache::Hit(hit))
}
Err(e) => {
warn!("Got Azure error: {:?}", e);
Ok(Cache::Miss)
}
}
}

fn put(&self, key: &str, entry: CacheWrite) -> SFuture<Duration> {
async fn put(&self, key: &str, entry: CacheWrite) -> Result<Duration> {
let start = Instant::now();
let data = match entry.finish() {
Ok(data) => data,
Err(e) => return f_err(e),
};
let data = entry.finish()?;

let response = self
let _ = self
.container
.put(key, data, &self.credentials)
.fcontext("Failed to put cache entry in Azure");
.await
.context("Failed to put cache entry in Azure")?;

Box::new(response.map(move |_| start.elapsed()))
Ok(start.elapsed())
}

fn location(&self) -> String {
format!("Azure, container: {}", self.container)
}

fn current_size(&self) -> SFuture<Option<u64>> {
f_ok(None)
async fn current_size(&self) -> Result<Option<u64>> {
Ok(None)
}
fn max_size(&self) -> SFuture<Option<u64>> {
f_ok(None)
async fn max_size(&self) -> Result<Option<u64>> {
Ok(None)
}
}