Skip to content

Commit

Permalink
Migrate to stable Future trait and Tokio 0.2
Browse files Browse the repository at this point in the history
Co-authored-by: Igor Matuszewski <xanewok@gmail.com>
  • Loading branch information
drahnr and Xanewok committed Apr 9, 2021
1 parent affebd4 commit 229bf34
Show file tree
Hide file tree
Showing 35 changed files with 3,064 additions and 3,105 deletions.
368 changes: 257 additions & 111 deletions Cargo.lock

Large diffs are not rendered by default.

20 changes: 8 additions & 12 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ required-features = ["dist-server"]
[dependencies]
anyhow = "1.0"
ar = { version = "0.8", optional = true }
async-trait = "0.1"
atty = "0.2.6"
base64 = "0.13"
bincode = "1"
Expand All @@ -37,12 +38,12 @@ directories = "3"
env_logger = "0.8"
filetime = "0.2"
flate2 = { version = "1.0", optional = true, default-features = false, features = ["rust_backend"] }
futures = "0.1.11"
futures_03 = { package = "futures", version = "0.3", features = ["compat", "thread-pool"] }
futures = "0.3"
futures-locks = "0.6"
hmac = { version = "0.10", optional = true }
http = "0.1"
hyper = { version = "0.12", optional = true }
hyperx = { version = "0.12", optional = true }
http = "0.2"
hyper = { version = "0.13", optional = true }
hyperx = { version = "0.13", optional = true }
jobserver = "0.1"
jsonwebtoken = { version = "7", optional = true }
lazy_static = "1.0.0"
Expand All @@ -59,7 +60,7 @@ percent-encoding = { version = "2", optional = true }
rand = "0.7"
redis = { version = "0.17", optional = true }
regex = "1"
reqwest = { version = "0.9.11", optional = true }
reqwest = { version = "0.10", features = ["json", "blocking"], optional = true }
retry = "1"
ring = { version = "0.16", optional = true, features = ["std"] }
sha-1 = { version = "0.9", optional = true }
Expand All @@ -70,14 +71,10 @@ serde_json = "1.0"
strip-ansi-escapes = "0.1"
tar = "0.4"
tempfile = "3"
tokio = { version = "0.2", features = ["tcp"] }
tokio-compat = "0.1"
tokio-io = "0.1"
tokio-process = "0.2"
tokio = { version = "0.2", features = ["rt-threaded", "blocking", "io-util", "time", "uds", "tcp", "process", "macros"] }
tokio-serde = "0.6"
tokio-util = { version = "0.3", features = ["codec"] }
tower = "0.3"
tokio-timer = "0.2"
toml = "0.5"
untrusted = { version = "0.7", optional = true }
url = { version = "2", optional = true }
Expand Down Expand Up @@ -107,7 +104,6 @@ selenium-rs = "0.1"

[target.'cfg(unix)'.dependencies]
daemonize = "0.4"
tokio-uds = "0.2"

[target.'cfg(windows)'.dependencies]
tokio-named-pipes = "0.1"
Expand Down
82 changes: 32 additions & 50 deletions src/azure/blobstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,11 @@
// limitations under the License.

use crate::azure::credentials::*;
use futures::{Future, Stream};
use hmac::{Hmac, Mac, NewMac};
use hyper::header::HeaderValue;
use hyper::Method;
use hyperx::header;
use md5::{Digest, Md5};
use reqwest::r#async::{Client, Request};
use reqwest::Url;
use reqwest::{header::HeaderValue, Client, Method, Request};
use sha2::Sha256;
use std::fmt;
use std::str::FromStr;
Expand Down Expand Up @@ -72,7 +69,7 @@ impl BlobContainer {
})
}

pub fn get(&self, key: &str, creds: &AzureCredentials) -> SFuture<Vec<u8>> {
pub async fn get(&self, key: &str, creds: &AzureCredentials) -> Result<Vec<u8>> {
let url_string = format!("{}{}", self.url, key);
let uri = Url::from_str(&url_string).unwrap();
let dt = chrono::Utc::now();
Expand All @@ -90,10 +87,7 @@ impl BlobContainer {
creds,
);

let uri_copy = uri.clone();
let uri_second_copy = uri.clone();

let mut request = Request::new(Method::GET, uri);
let mut request = Request::new(Method::GET, uri.clone());
request.headers_mut().insert(
"x-ms-date",
HeaderValue::from_str(&date).expect("Date is an invalid header value"),
Expand All @@ -108,46 +102,34 @@ impl BlobContainer {
);
}

Box::new(
self.client
.execute(request)
.fwith_context(move || format!("failed GET: {}", uri_copy))
.and_then(|res| {
if res.status().is_success() {
let content_length = res
.headers()
.get_hyperx::<header::ContentLength>()
.map(|header::ContentLength(len)| len);
Ok((res.into_body(), content_length))
} else {
Err(BadHttpStatusError(res.status()).into())
}
})
.and_then(|(body, content_length)| {
body.fold(Vec::new(), |mut body, chunk| {
body.extend_from_slice(&chunk);
Ok::<_, reqwest::Error>(body)
})
.fcontext("failed to read HTTP body")
.and_then(move |bytes| {
if let Some(len) = content_length {
if len != bytes.len() as u64 {
bail!(format!(
"Bad HTTP body size read: {}, expected {}",
bytes.len(),
len
));
} else {
info!("Read {} bytes from {}", bytes.len(), uri_second_copy);
}
}
Ok(bytes)
})
}),
)
let res = self
.client
.execute(request)
.await
.with_context(|| format!("failed GET: {}", &uri))?;

let (bytes, content_length) = if res.status().is_success() {
let content_length = res.content_length();
(res.bytes().await?, content_length)
} else {
return Err(BadHttpStatusError(res.status()).into());
};

if let Some(len) = content_length {
if len != bytes.len() as u64 {
bail!(format!(
"Bad HTTP body size read: {}, expected {}",
bytes.len(),
len
));
} else {
info!("Read {} bytes from {}", bytes.len(), &uri);
}
}
Ok(bytes.into_iter().collect())
}

pub fn put(&self, key: &str, content: Vec<u8>, creds: &AzureCredentials) -> SFuture<()> {
pub async fn put(&self, key: &str, content: Vec<u8>, creds: &AzureCredentials) -> Result<()> {
let url_string = format!("{}{}", self.url, key);
let uri = Url::from_str(&url_string).unwrap();
let dt = chrono::Utc::now();
Expand Down Expand Up @@ -206,7 +188,7 @@ impl BlobContainer {

*request.body_mut() = Some(content.into());

Box::new(self.client.execute(request).then(|result| match result {
match self.client.execute(request).await {
Ok(res) => {
if res.status().is_success() {
trace!("PUT succeeded");
Expand All @@ -220,7 +202,7 @@ impl BlobContainer {
trace!("PUT failed with error: {:?}", e);
Err(e.into())
}
}))
}
}
}

Expand Down Expand Up @@ -285,7 +267,7 @@ fn canonicalize_resource(uri: &Url, account_name: &str) -> String {
#[cfg(test)]
mod test {
use super::*;
use tokio_compat::runtime::current_thread::Runtime;
use tokio::runtime::Runtime;

#[test]
fn test_signing() {
Expand Down
22 changes: 9 additions & 13 deletions src/bin/sccache-dist/token_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ const MOZ_USERINFO_ENDPOINT: &str = "https://auth.mozilla.auth0.com/userinfo";
// Mozilla-specific check by forwarding the token onto the auth0 userinfo endpoint
pub struct MozillaCheck {
auth_cache: Mutex<HashMap<String, Instant>>, // token, token_expiry
client: reqwest::Client,
client: reqwest::blocking::Client,
required_groups: Vec<String>,
}

Expand All @@ -96,7 +96,7 @@ impl MozillaCheck {
pub fn new(required_groups: Vec<String>) -> Self {
Self {
auth_cache: Mutex::new(HashMap::new()),
client: reqwest::Client::new(),
client: reqwest::blocking::Client::new(),
required_groups,
}
}
Expand Down Expand Up @@ -145,22 +145,18 @@ impl MozillaCheck {
let header = hyperx::header::Authorization(hyperx::header::Bearer {
token: token.to_owned(),
});
let mut res = self
let res = self
.client
.get(url.clone())
.set_header(header)
.send()
.context("Failed to make request to mozilla userinfo")?;
let status = res.status();
let res_text = res
.text()
.context("Failed to interpret response from mozilla userinfo as string")?;
if !res.status().is_success() {
bail!(
"JWT forwarded to {} returned {}: {}",
url,
res.status(),
res_text
)
if !status.is_success() {
bail!("JWT forwarded to {} returned {}: {}", url, status, res_text)
}

// The API didn't return a HTTP error code, let's check the response
Expand Down Expand Up @@ -241,7 +237,7 @@ fn test_auth_verify_check_mozilla_profile() {
// Don't check a token is valid (it may not even be a JWT) just forward it to
// an API and check for success
pub struct ProxyTokenCheck {
client: reqwest::Client,
client: reqwest::blocking::Client,
maybe_auth_cache: Option<Mutex<(HashMap<String, Instant>, Duration)>>,
url: String,
}
Expand All @@ -265,7 +261,7 @@ impl ProxyTokenCheck {
let maybe_auth_cache: Option<Mutex<(HashMap<String, Instant>, Duration)>> =
cache_secs.map(|secs| Mutex::new((HashMap::new(), Duration::from_secs(secs))));
Self {
client: reqwest::Client::new(),
client: reqwest::blocking::Client::new(),
maybe_auth_cache,
url,
}
Expand Down Expand Up @@ -330,7 +326,7 @@ impl ClientAuthCheck for ValidJWTCheck {

impl ValidJWTCheck {
pub fn new(audience: String, issuer: String, jwks_url: &str) -> Result<Self> {
let mut res = reqwest::get(jwks_url).context("Failed to make request to JWKs url")?;
let res = reqwest::blocking::get(jwks_url).context("Failed to make request to JWKs url")?;
if !res.status().is_success() {
bail!("Could not retrieve JWKs, HTTP error: {}", res.status())
}
Expand Down
56 changes: 25 additions & 31 deletions src/cache/azure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,14 @@
use crate::azure::BlobContainer;
use crate::azure::*;
use crate::cache::{Cache, CacheRead, CacheWrite, Storage};
use futures::future::Future;
use std::io;
use std::rc::Rc;
use std::sync::Arc;
use std::time::{Duration, Instant};

use crate::errors::*;

pub struct AzureBlobCache {
container: Rc<BlobContainer>,
container: Arc<BlobContainer>,
credentials: AzureCredentials,
}

Expand All @@ -44,53 +43,48 @@ impl AzureBlobCache {
};

Ok(AzureBlobCache {
container: Rc::new(container),
container: Arc::new(container),
credentials,
})
}
}

#[async_trait]
impl Storage for AzureBlobCache {
fn get(&self, key: &str) -> SFuture<Cache> {
Box::new(
self.container
.get(key, &self.credentials)
.then(|result| match result {
Ok(data) => {
let hit = CacheRead::from(io::Cursor::new(data))?;
Ok(Cache::Hit(hit))
}
Err(e) => {
warn!("Got Azure error: {:?}", e);
Ok(Cache::Miss)
}
}),
)
async fn get(&self, key: &str) -> Result<Cache> {
match self.container.get(&key, &self.credentials).await {
Ok(data) => {
let hit = CacheRead::from(io::Cursor::new(data))?;
Ok(Cache::Hit(hit))
}
Err(e) => {
warn!("Got Azure error: {:?}", e);
Ok(Cache::Miss)
}
}
}

fn put(&self, key: &str, entry: CacheWrite) -> SFuture<Duration> {
async fn put(&self, key: &str, entry: CacheWrite) -> Result<Duration> {
let start = Instant::now();
let data = match entry.finish() {
Ok(data) => data,
Err(e) => return f_err(e),
};
let data = entry.finish()?;

let response = self
let _ = self
.container
.put(key, data, &self.credentials)
.fcontext("Failed to put cache entry in Azure");
.await
.context("Failed to put cache entry in Azure")?;

Box::new(response.map(move |_| start.elapsed()))
Ok(start.elapsed())
}

fn location(&self) -> String {
format!("Azure, container: {}", self.container)
}

fn current_size(&self) -> SFuture<Option<u64>> {
f_ok(None)
async fn current_size(&self) -> Result<Option<u64>> {
Ok(None)
}
fn max_size(&self) -> SFuture<Option<u64>> {
f_ok(None)
async fn max_size(&self) -> Result<Option<u64>> {
Ok(None)
}
}

0 comments on commit 229bf34

Please sign in to comment.