Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timestream support #1918

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Swap `pin-project` for the lighter weight `pin-project-lite`
- Disable `chrono`'s `oldtime` feature
- Remove dependency on `regex`
- Fix incorrect parse logic for timestamp headers in JSON based protocols
- Update to botocore 1.19.12
- Add Amazon Timestream write support, crate `rusoto_timestream_write`.

## [0.45.0] - 2020-07-22

Expand Down
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ members = [
"rusoto/services/support",
"rusoto/services/swf",
"rusoto/services/textract",
"rusoto/services/timestream-query",
"rusoto/services/timestream-write",
"rusoto/services/transcribe",
"rusoto/services/transfer",
"rusoto/services/translate",
Expand Down
28 changes: 20 additions & 8 deletions integration_tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ name = "rusoto_tests"
description = "AWS SDK for Rust - Integration Tests"
version = "0.47.0"
authors = [
"Anthony DiMarco <ocramida@gmail.com>",
"Jimmy Cuadra <jimmy@jimmycuadra.com>",
"Matthew Mayer <matthewkmayer@gmail.com>",
"Nikita Pekin <contact@nikitapek.in>"
"Anthony DiMarco <ocramida@gmail.com>",
"Jimmy Cuadra <jimmy@jimmycuadra.com>",
"Matthew Mayer <matthewkmayer@gmail.com>",
"Nikita Pekin <contact@nikitapek.in>",
]
license = "MIT"
repository = "https://github.com/rusoto/rusoto"
Expand Down Expand Up @@ -338,8 +338,8 @@ optional = true
path = "../rusoto/services/iot-jobs-data"

[dependencies.rusoto_iotsecuretunneling]
optional = true
path = "../rusoto/services/iotsecuretunneling"
optional = true
path = "../rusoto/services/iotsecuretunneling"

[dependencies.rusoto_kafka]
optional = true
Expand Down Expand Up @@ -589,6 +589,14 @@ path = "../rusoto/services/support"
optional = true
path = "../rusoto/services/swf"

[dependencies.rusoto_timestream_write]
optional = true
path = "../rusoto/services/timestream-write"

[dependencies.rusoto_timestream_query]
optional = true
path = "../rusoto/services/timestream-query"
Comment on lines +596 to +598
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to add timestream-write here too?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added


[dependencies.rusoto_transcribe]
optional = true
path = "../rusoto/services/transcribe"
Expand Down Expand Up @@ -805,6 +813,8 @@ all = [
"sts",
"support",
"swf",
"timestream-write",
"timestream-query",
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

"transcribe",
"transfer",
"translate",
Expand All @@ -819,8 +829,8 @@ all = [
"apigatewaymanagementapi",
"apigatewayv2",
"ram",
"qldb"
]
"qldb",
]
core = []
acm = ["rusoto_acm"]
acm-pca = ["rusoto_acm_pca"]
Expand Down Expand Up @@ -963,6 +973,8 @@ storagegateway = ["rusoto_storagegateway"]
sts = ["rusoto_sts", "rusoto_ec2"]
support = ["rusoto_support"]
swf = ["rusoto_swf"]
timestream-write = ["rusoto_timestream_write"]
timestream-query = ["rusoto_timestream_query"]
transcribe = ["rusoto_transcribe"]
transfer = ["rusoto_transfer"]
translate = ["rusoto_translate"]
Expand Down
11 changes: 11 additions & 0 deletions integration_tests/tests/timestream-query.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#![cfg(feature = "timestream-query")]

use rusoto_core::Region;
use rusoto_timestream_query::{TimestreamQuery, TimestreamQueryClient};

#[tokio::test]
async fn should_describe_endpoints() {
let client = TimestreamQueryClient::new(Region::UsEast1);

client.describe_endpoints().await.unwrap();
}
49 changes: 49 additions & 0 deletions rusoto/services/timestream-query/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
[package]
authors = ["Anthony DiMarco <ocramida@gmail.com>", "Jimmy Cuadra <jimmy@jimmycuadra.com>", "Matthew Mayer <matthewkmayer@gmail.com>", "Nikita Pekin <contact@nikitapek.in>"]
description = "AWS SDK for Rust - Amazon Timestream Query @ 2018-11-01"
documentation = "https://docs.rs/rusoto_timestream_query"
keywords = ["AWS", "Amazon", "timestream-query"]
license = "MIT"
name = "rusoto_timestream_query"
readme = "README.md"
repository = "https://github.com/rusoto/rusoto"
version = "0.47.0"
homepage = "https://www.rusoto.org/"
edition = "2018"
exclude = ["test_resources/*"]
[package.metadata.docs.rs]
targets = []

[build-dependencies]

[dependencies]
async-trait = "0.1"
bytes = "1.0"
serde_json = "1.0"

[dependencies.futures]
version = "0.3"

[dependencies.rusoto_core]
version = "0.47.0"
path = "../../core"
default-features = false

[dependencies.serde]
version = "1.0"
features = ["derive"]

[dev-dependencies]
tokio = "1.0"

[dev-dependencies.rusoto_mock]
version = "0.47.0"
path = "../../../mock"
default-features = false

[features]
default = ["native-tls"]
deserialize_structs = ["bytes/serde"]
native-tls = ["rusoto_core/native-tls"]
rustls = ["rusoto_core/rustls"]
serialize_structs = ["bytes/serde"]
58 changes: 58 additions & 0 deletions rusoto/services/timestream-query/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@

# Rusoto TimestreamQuery
Rust SDK for Amazon Timestream Query

<a href="https://docs.rs/rusoto_timestream_query/0.46.0" title="API Docs"><img src="https://img.shields.io/badge/API-docs-blue.svg" alt="api-docs-badge"></img></a>
<a href="https://crates.io/crates/rusoto_timestream_query/0.46.0" title="Crates.io"><img src="https://img.shields.io/crates/v/rusoto_core.svg" alt="crates-io"></img></a>
<a href="#license" title="License: MIT"><img src="https://img.shields.io/badge/license-MIT-blue.svg" alt="license-badge"></img></a>
<a href="https://discordapp.com/invite/WMJ4DWp"><img src="https://img.shields.io/discord/670751965273391124"></img></a>

You may be looking for:

* [An overview of Rusoto][rusoto-overview]
* [AWS services supported by Rusoto][supported-aws-services]
* [API documentation][api-documentation]
* [Getting help with Rusoto][rusoto-help]

## Requirements

Rust stable or beta are required to use Rusoto. Nightly is tested, but not guaranteed to be supported. Older
versions _may_ be supported. The currently supported Rust versions can be found in the Rusoto project
[`travis.yml`](https://github.com/rusoto/rusoto/blob/master/.travis.yml).

On Linux, OpenSSL is required if using the `native-tls` feature.

## Installation

To use `rusoto_timestream_query` in your application, add it as a dependency in your `Cargo.toml`:

```toml
[dependencies]
rusoto_timestream_query = "0.46.0"
```

## Crate Features
- `native-tls` - use platform-specific TLS implementation.
- `rustls` - use rustls TLS implementation.
- `serialize_structs` - output structs of most operations get `derive(Serialize)`.
- `deserialize_structs` - input structs of most operations get `derive(Deserialize)`.

Note: the crate will use the `native-tls` TLS implementation by default.

## Contributing

See [CONTRIBUTING][contributing].

## License

Rusoto is distributed under the terms of the MIT license.

See [LICENSE][license] for details.

[api-documentation]: https://docs.rs/rusoto_timestream_query "API documentation"
[license]: https://github.com/rusoto/rusoto/blob/master/LICENSE "MIT License"
[contributing]: https://github.com/rusoto/rusoto/blob/master/CONTRIBUTING.md "Contributing Guide"
[rusoto-help]: https://www.rusoto.org/help.html "Getting help with Rusoto"
[rusoto-overview]: https://www.rusoto.org/ "Rusoto overview"
[supported-aws-services]: https://www.rusoto.org/supported-aws-services.html "List of AWS services supported by Rusoto"

158 changes: 158 additions & 0 deletions rusoto/services/timestream-query/src/custom/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
use crate::{
CancelQueryError, CancelQueryRequest, CancelQueryResponse, DescribeEndpointsError, QueryError,
QueryRequest, QueryResponse, TimestreamQuery, TimestreamQueryClient,
};
use rusoto_core::{
credential::ProvideAwsCredentials, Client, DispatchSignedRequest, Region, RusotoError,
};
use std::cmp::max;
use std::ops::Add;
use std::time::{Duration, Instant};

#[derive(Clone)]
struct Endpoint {
address: String,
expiry: Instant,
}

/// An endpoint-discovery-aware client for the Timestream Query API.
///
/// Amazon Timestream utilizes a segregated architecture to ensure better scaling and traffic isolation properties.
/// Each system segment is managed through multiple endpoints, and your applications must use the correct endpoint
/// while accessing the service. When using this client, these endpoint management tasks are transparently handled for you.
/// However, when accessing the Timestream Query API using the default [`TimestreamQuery`] methods,
/// you will need to manage and map the correct endpoints yourself. This process is called the endpoint discovery pattern,
/// and is described [here](https://docs.aws.amazon.com/timestream/latest/developerguide/Using-API.endpoint-discovery.html).
///
/// # Examples
///
/// ```
/// use rusoto_core::{Region, RusotoError};
/// use rusoto_timestream_query::{QueryError, QueryResponse, QueryRequest, TimestreamQueryEndpointClient};
///
/// async fn query(query: QueryRequest) -> Result<QueryResponse, RusotoError<QueryError>> {
/// let mut client = TimestreamQueryEndpointClient::new(Region::UsEast1);
/// client.query(query).await
/// }
/// ```
pub struct TimestreamQueryEndpointClient {
client: TimestreamQueryClient,
endpoint: Option<Endpoint>,
region: Region,
}

impl TimestreamQueryEndpointClient {
/// Creates a client backed by the default tokio event loop.
///
/// The client will use the default credentials provider and tls client.
pub fn new(region: Region) -> Self {
Self {
client: TimestreamQueryClient::new(region.clone()),
endpoint: None,
region,
}
}

pub fn new_with<P, D>(request_dispatcher: D, credentials_provider: P, region: Region) -> Self
where
P: ProvideAwsCredentials + Send + Sync + 'static,
D: DispatchSignedRequest + Send + Sync + 'static,
{
Self {
client: TimestreamQueryClient::new_with(
request_dispatcher,
credentials_provider,
region.clone(),
),
endpoint: None,
region,
}
}

pub fn new_with_client(client: Client, region: Region) -> Self {
Self {
client: TimestreamQueryClient::new_with_client(client, region.clone()),
endpoint: None,
region,
}
}

async fn update_endpoint(&mut self) -> Result<(), RusotoError<DescribeEndpointsError>> {
match self.endpoint {
Some(ref e) if e.expiry > Instant::now() => Ok(()),
_ => {
let client = TimestreamQueryClient::new(self.region.clone());
let endpoint = client.describe_endpoints().await?.endpoints.pop().ok_or(
RusotoError::Service(DescribeEndpointsError::InternalServer(
"DescribeEndpoints API returned empty endpoint list".to_string(),
)),
)?;

self.endpoint = Some(Endpoint {
address: endpoint.address.clone(),
// Subtract 1 minute from the TTL to ensure the endpoint data is refreshed before it expires.
expiry: Instant::now().add(Duration::from_secs(max(
5,
endpoint.cache_period_in_minutes as u64 * 60 - 60,
))),
});
self.client = TimestreamQueryClient::new(Region::Custom {
name: self.region.name().to_string(),
endpoint: endpoint.address,
});

Ok(())
}
}
}

/// <p> Cancels a query that has been issued. Cancellation is guaranteed only if the query has not completed execution before the cancellation request was issued. Because cancellation is an idempotent operation, subsequent cancellation requests will return a <code>CancellationMessage</code>, indicating that the query has already been canceled. </p>
pub async fn cancel_query(
&mut self,
input: CancelQueryRequest,
) -> Result<CancelQueryResponse, RusotoError<CancelQueryError>> {
self.update_endpoint().await.map_err(|e| match e {
// Remap any DescribeEndpointError to ThrottlingErrors
RusotoError::Service(DescribeEndpointsError::Throttling(s)) => {
RusotoError::Service(CancelQueryError::Throttling(s))
}
RusotoError::Service(DescribeEndpointsError::InternalServer(s)) => {
RusotoError::Service(CancelQueryError::InternalServer(s))
}
// Pass on all non-service related errors as is.
RusotoError::Blocking => RusotoError::Blocking,
RusotoError::Credentials(e) => RusotoError::Credentials(e),
RusotoError::HttpDispatch(e) => RusotoError::HttpDispatch(e),
RusotoError::ParseError(e) => RusotoError::ParseError(e),
RusotoError::Unknown(e) => RusotoError::Unknown(e),
RusotoError::Validation(e) => RusotoError::Validation(e),
})?;

self.client.cancel_query(input).await
}

/// <p> Query is a synchronous operation that enables you to execute a query. Query will timeout after 60 seconds. You must update the default timeout in the SDK to support a timeout of 60 seconds. The result set will be truncated to 1MB. Service quotas apply. For more information, see Quotas in the Timestream Developer Guide. </p>
pub async fn query(
&mut self,
input: QueryRequest,
) -> Result<QueryResponse, RusotoError<QueryError>> {
self.update_endpoint().await.map_err(|e| match e {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may be misunderstanding this, but does this mean we update the endpoint every time we do a call?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is due to the endpoints for queries are dynamic. The same for the writer. Also see here service.json of boto

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the endpoints are dynamic -- but they are meant to be cached for the TTL that comes with them (i.e point 3 here:https://docs.aws.amazon.com/timestream/latest/developerguide/Using-API.endpoint-discovery.describe-endpoints.implementation.html). On a second pass though, we don't actually update the endpoint with each call, only when we find at the time of calling that the expiry for the endpoint has passed. Which seems correct.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So this is ok then?

// Remap any DescribeEndpointError to TrhottelingErrors
RusotoError::Service(DescribeEndpointsError::Throttling(s)) => {
RusotoError::Service(QueryError::Throttling(s))
}
RusotoError::Service(DescribeEndpointsError::InternalServer(s)) => {
RusotoError::Service(QueryError::InternalServer(s))
}
// Pass on all non-service related errors as is.
RusotoError::Blocking => RusotoError::Blocking,
RusotoError::Credentials(e) => RusotoError::Credentials(e),
RusotoError::HttpDispatch(e) => RusotoError::HttpDispatch(e),
RusotoError::ParseError(e) => RusotoError::ParseError(e),
RusotoError::Unknown(e) => RusotoError::Unknown(e),
RusotoError::Validation(e) => RusotoError::Validation(e),
})?;

self.client.query(input).await
}
}