Skip to content

Commit

Permalink
Fix S3 query canonicalization (#2800) (#2801)
Browse files Browse the repository at this point in the history
* Fix S3 query canonicalization (#2800)

* Disable listing with spaces on azurite and localstack
  • Loading branch information
tustvold committed Sep 29, 2022
1 parent f845d6e commit da8f742
Show file tree
Hide file tree
Showing 6 changed files with 95 additions and 26 deletions.
16 changes: 2 additions & 14 deletions object_store/src/aws/client.rs
Expand Up @@ -16,6 +16,7 @@
// under the License.

use crate::aws::credential::{AwsCredential, CredentialExt, CredentialProvider};
use crate::aws::STRICT_PATH_ENCODE_SET;
use crate::client::pagination::stream_paginated;
use crate::client::retry::RetryExt;
use crate::multipart::UploadPart;
Expand All @@ -26,26 +27,13 @@ use crate::{
};
use bytes::{Buf, Bytes};
use chrono::{DateTime, Utc};
use percent_encoding::{utf8_percent_encode, AsciiSet, PercentEncode, NON_ALPHANUMERIC};
use percent_encoding::{utf8_percent_encode, PercentEncode};
use reqwest::{Client as ReqwestClient, Method, Response, StatusCode};
use serde::{Deserialize, Serialize};
use snafu::{ResultExt, Snafu};
use std::ops::Range;
use std::sync::Arc;

// http://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html
//
// Do not URI-encode any of the unreserved characters that RFC 3986 defines:
// A-Z, a-z, 0-9, hyphen ( - ), underscore ( _ ), period ( . ), and tilde ( ~ ).
const STRICT_ENCODE_SET: AsciiSet = NON_ALPHANUMERIC
.remove(b'-')
.remove(b'.')
.remove(b'_')
.remove(b'~');

/// This struct is used to maintain the URI path encoding
const STRICT_PATH_ENCODE_SET: AsciiSet = STRICT_ENCODE_SET.remove(b'/');

/// A specialized `Error` for object store-related errors
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
Expand Down
37 changes: 36 additions & 1 deletion object_store/src/aws/credential.rs
Expand Up @@ -15,20 +15,23 @@
// specific language governing permissions and limitations
// under the License.

use crate::aws::STRICT_ENCODE_SET;
use crate::client::retry::RetryExt;
use crate::client::token::{TemporaryToken, TokenCache};
use crate::util::hmac_sha256;
use crate::{Result, RetryConfig};
use bytes::Buf;
use chrono::{DateTime, Utc};
use futures::TryFutureExt;
use percent_encoding::utf8_percent_encode;
use reqwest::header::{HeaderMap, HeaderValue};
use reqwest::{Client, Method, Request, RequestBuilder, StatusCode};
use serde::Deserialize;
use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::Instant;
use tracing::warn;
use url::Url;

type StdError = Box<dyn std::error::Error + Send + Sync>;

Expand Down Expand Up @@ -103,13 +106,14 @@ impl<'a> RequestSigner<'a> {
request.headers_mut().insert(HASH_HEADER, header_digest);

let (signed_headers, canonical_headers) = canonicalize_headers(request.headers());
let canonical_query = canonicalize_query(request.url());

// https://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html
let canonical_request = format!(
"{}\n{}\n{}\n{}\n{}\n{}",
request.method().as_str(),
request.url().path(), // S3 doesn't percent encode this like other services
request.url().query().unwrap_or(""), // This assumes the query pairs are in order
canonical_query,
canonical_headers,
signed_headers,
digest
Expand Down Expand Up @@ -207,6 +211,37 @@ fn hex_encode(bytes: &[u8]) -> String {
out
}

/// Canonicalizes query parameters into the AWS canonical form
///
/// <https://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html>
fn canonicalize_query(url: &Url) -> String {
use std::fmt::Write;

let capacity = match url.query() {
Some(q) if !q.is_empty() => q.len(),
_ => return String::new(),
};
let mut encoded = String::with_capacity(capacity + 1);

let mut headers = url.query_pairs().collect::<Vec<_>>();
headers.sort_unstable_by(|(a, _), (b, _)| a.cmp(b));

let mut first = true;
for (k, v) in headers {
if !first {
encoded.push('&');
}
first = false;
let _ = write!(
encoded,
"{}={}",
utf8_percent_encode(k.as_ref(), &STRICT_ENCODE_SET),
utf8_percent_encode(v.as_ref(), &STRICT_ENCODE_SET)
);
}
encoded
}

/// Canonicalizes headers into the AWS Canonical Form.
///
/// <https://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html>
Expand Down
20 changes: 18 additions & 2 deletions object_store/src/aws/mod.rs
Expand Up @@ -58,6 +58,20 @@ use crate::{
mod client;
mod credential;

// http://docs.aws.amazon.com/general/latest/gr/sigv4-create-canonical-request.html
//
// Do not URI-encode any of the unreserved characters that RFC 3986 defines:
// A-Z, a-z, 0-9, hyphen ( - ), underscore ( _ ), period ( . ), and tilde ( ~ ).
pub(crate) const STRICT_ENCODE_SET: percent_encoding::AsciiSet =
percent_encoding::NON_ALPHANUMERIC
.remove(b'-')
.remove(b'.')
.remove(b'_')
.remove(b'~');

/// This struct is used to maintain the URI path encoding
const STRICT_PATH_ENCODE_SET: percent_encoding::AsciiSet = STRICT_ENCODE_SET.remove(b'/');

/// A specialized `Error` for object store-related errors
#[derive(Debug, Snafu)]
#[allow(missing_docs)]
Expand Down Expand Up @@ -551,7 +565,7 @@ mod tests {
use super::*;
use crate::tests::{
get_nonexistent_object, list_uses_directories_correctly, list_with_delimiter,
put_get_delete_list, rename_and_copy, stream_get,
put_get_delete_list_opts, rename_and_copy, stream_get,
};
use bytes::Bytes;
use std::env;
Expand Down Expand Up @@ -677,9 +691,11 @@ mod tests {
#[tokio::test]
async fn s3_test() {
let config = maybe_skip_integration!();
let is_local = matches!(&config.endpoint, Some(e) if e.starts_with("http://"));
let integration = config.build().unwrap();

put_get_delete_list(&integration).await;
// Localstack doesn't support listing with spaces https://github.com/localstack/localstack/issues/6328
put_get_delete_list_opts(&integration, is_local).await;
list_uses_directories_correctly(&integration).await;
list_with_delimiter(&integration).await;
rename_and_copy(&integration).await;
Expand Down
15 changes: 6 additions & 9 deletions object_store/src/azure/mod.rs
Expand Up @@ -595,7 +595,7 @@ mod tests {
use super::*;
use crate::tests::{
copy_if_not_exists, list_uses_directories_correctly, list_with_delimiter,
put_get_delete_list, rename_and_copy, stream_get,
put_get_delete_list, put_get_delete_list_opts, rename_and_copy, stream_get,
};
use std::env;

Expand Down Expand Up @@ -663,9 +663,10 @@ mod tests {

#[tokio::test]
async fn azure_blob_test() {
let use_emulator = env::var("AZURE_USE_EMULATOR").is_ok();
let integration = maybe_skip_integration!().build().unwrap();

put_get_delete_list(&integration).await;
// Azurite doesn't support listing with spaces - https://github.com/localstack/localstack/issues/6328
put_get_delete_list_opts(&integration, use_emulator).await;
list_uses_directories_correctly(&integration).await;
list_with_delimiter(&integration).await;
rename_and_copy(&integration).await;
Expand All @@ -687,13 +688,9 @@ mod tests {
.with_container_name(
env::var("OBJECT_STORE_BUCKET").expect("must be set OBJECT_STORE_BUCKET"),
)
.with_client_secret_authorization(
env::var("AZURE_STORAGE_CLIENT_ID")
.with_access_key(
env::var("AZURE_STORAGE_ACCESS_KEY")
.expect("must be set AZURE_STORAGE_CLIENT_ID"),
env::var("AZURE_STORAGE_CLIENT_SECRET")
.expect("must be set AZURE_STORAGE_CLIENT_SECRET"),
env::var("AZURE_STORAGE_TENANT_ID")
.expect("must be set AZURE_STORAGE_TENANT_ID"),
);
let integration = builder.build().unwrap();

Expand Down
22 changes: 22 additions & 0 deletions object_store/src/lib.rs
Expand Up @@ -506,6 +506,13 @@ mod tests {
use tokio::io::AsyncWriteExt;

pub(crate) async fn put_get_delete_list(storage: &DynObjectStore) {
put_get_delete_list_opts(storage, false).await
}

pub(crate) async fn put_get_delete_list_opts(
storage: &DynObjectStore,
skip_list_with_spaces: bool,
) {
delete_fixtures(storage).await;

let content_list = flatten_list_stream(storage, None).await.unwrap();
Expand Down Expand Up @@ -701,6 +708,21 @@ mod tests {
assert_eq!(files, vec![path.clone()]);

storage.delete(&path).await.unwrap();

let path = Path::parse("foo bar/I contain spaces.parquet").unwrap();
storage.put(&path, Bytes::from(vec![0, 1])).await.unwrap();
storage.head(&path).await.unwrap();

if !skip_list_with_spaces {
let files = flatten_list_stream(storage, Some(&Path::from("foo bar")))
.await
.unwrap();
assert_eq!(files, vec![path.clone()]);
}
storage.delete(&path).await.unwrap();

let files = flatten_list_stream(storage, None).await.unwrap();
assert!(files.is_empty(), "{:?}", files);
}

fn get_vec_of_bytes(chunk_length: usize, num_chunks: usize) -> Vec<Bytes> {
Expand Down
11 changes: 11 additions & 0 deletions object_store/src/path/mod.rs
Expand Up @@ -534,4 +534,15 @@ mod tests {
needle
);
}

#[test]
fn path_containing_spaces() {
let a = Path::from_iter(["foo bar", "baz"]);
let b = Path::from("foo bar/baz");
let c = Path::parse("foo bar/baz").unwrap();

assert_eq!(a.raw, "foo bar/baz");
assert_eq!(a.raw, b.raw);
assert_eq!(b.raw, c.raw);
}
}

0 comments on commit da8f742

Please sign in to comment.