Skip to content
This repository has been archived by the owner on Mar 29, 2024. It is now read-only.

Upgrade tonic to 0.5, prost to 0.8, and bump version to 0.6.0 #51

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 4 additions & 4 deletions Cargo.toml
@@ -1,6 +1,6 @@
[package]
name = "etcd-rs"
version = "0.5.0"
version = "0.6.0"
authors = ["ccc <currantxx@gmail.com>"]
edition = "2018"
keywords = ["etcd", "future", "async"]
Expand All @@ -11,9 +11,9 @@ documentation = "https://docs.rs/etcd-rs"
license = "MIT"

[dependencies]
tonic = { version = "0.4", features = ["tls"] }
tonic = { version = "0.5", features = ["tls"] }
bytes = "1.0"
prost = "0.7"
prost = "0.8"
tokio = "1.0"
tokio-stream = "^0.1"
async-stream = "0.3"
Expand All @@ -26,4 +26,4 @@ http = "0.2"
tokio = { version = "1.0", features = ["full"] }

[build-dependencies]
tonic-build = "0.4"
tonic-build = "0.5"
11 changes: 8 additions & 3 deletions src/auth/mod.rs
Expand Up @@ -4,18 +4,23 @@ pub use authenticate::{AuthenticateRequest, AuthenticateResponse};

use tonic::transport::Channel;

use crate::client::Interceptor;
use crate::proto::etcdserverpb::auth_client::AuthClient;
use crate::Result;

/// Auth client.
#[derive(Clone)]
pub struct Auth {
client: AuthClient<Channel>,
interceptor: Interceptor,
}

impl Auth {
pub(crate) fn new(client: AuthClient<Channel>) -> Self {
Self { client }
pub(crate) fn new(client: AuthClient<Channel>, interceptor: Interceptor) -> Self {
Self {
client,
interceptor,
}
}

/// Performs an authenticating operation.
Expand All @@ -25,7 +30,7 @@ impl Auth {
pub async fn authenticate(&mut self, req: AuthenticateRequest) -> Result<AuthenticateResponse> {
let resp = self
.client
.authenticate(tonic::Request::new(req.into()))
.authenticate(self.interceptor.intercept(tonic::Request::new(req.into())))
.await?;

Ok(resp.into_inner().into())
Expand Down
58 changes: 25 additions & 33 deletions src/client.rs
@@ -1,8 +1,9 @@
use std::sync::Arc;

use futures::Stream;
use tonic::metadata::Ascii;
use tonic::transport::ClientTlsConfig;
use tonic::{metadata::MetadataValue, transport::Channel, Interceptor, Request};
use tonic::{metadata::MetadataValue, transport::Channel, Request};

use crate::proto::etcdserverpb::{
auth_client::AuthClient, kv_client::KvClient, lease_client::LeaseClient,
Expand All @@ -25,6 +26,20 @@ pub struct Client {
inner: Arc<Inner>,
}

#[derive(Clone)]
pub(crate) struct Interceptor {
token: Option<MetadataValue<Ascii>>,
}

impl Interceptor {
pub(crate) fn intercept<T>(&self, mut req: Request<T>) -> Request<T> {
if let Some(token) = self.token.as_ref() {
req.metadata_mut().insert("authorization", token.clone());
}
req
}
}

#[allow(dead_code)]
pub(crate) struct Inner {
channel: Channel,
Expand Down Expand Up @@ -54,7 +69,7 @@ impl Client {

let channel = Self::get_channel(&cfg)?;

let mut auth_client = Auth::new(AuthClient::new(channel));
let mut auth_client = Auth::new(AuthClient::new(channel), Interceptor { token: None });

let token = match cfg.auth.as_ref() {
Some((name, password)) => auth_client
Expand All @@ -74,41 +89,18 @@ impl Client {
pub async fn connect(cfg: ClientConfig) -> Result<Self> {
// If authentication provided, generates token before connecting.
let token = Self::generate_auth_token(&cfg).await?;
let token = token.map(|token| MetadataValue::from_str(&token).unwrap());

let auth_interceptor = token.map(|token| {
let token = MetadataValue::from_str(&token).unwrap();
Interceptor::new(move |mut req: Request<()>| {
req.metadata_mut().insert("authorization", token.clone());
Ok(req)
})
});
let interceptor = Interceptor { token };

let channel = Self::get_channel(&cfg)?;

let inner = {
let (auth_client, kv_client, watch_client, lease_client) =
if let Some(auth_interceptor) = auth_interceptor {
(
AuthClient::with_interceptor(channel.clone(), auth_interceptor.clone()),
KvClient::with_interceptor(channel.clone(), auth_interceptor.clone()),
WatchClient::with_interceptor(channel.clone(), auth_interceptor.clone()),
LeaseClient::with_interceptor(channel.clone(), auth_interceptor),
)
} else {
(
AuthClient::new(channel.clone()),
KvClient::new(channel.clone()),
WatchClient::new(channel.clone()),
LeaseClient::new(channel.clone()),
)
};
Inner {
channel,
auth_client: Auth::new(auth_client),
kv_client: Kv::new(kv_client),
watch_client: Watch::new(watch_client),
lease_client: Lease::new(lease_client),
}
let inner = Inner {
auth_client: Auth::new(AuthClient::new(channel.clone()), interceptor.clone()),
kv_client: Kv::new(KvClient::new(channel.clone()), interceptor.clone()),
watch_client: Watch::new(WatchClient::new(channel.clone()), interceptor.clone()),
lease_client: Lease::new(LeaseClient::new(channel.clone()), interceptor.clone()),
channel,
};

Ok(Self {
Expand Down
26 changes: 20 additions & 6 deletions src/kv/mod.rs
Expand Up @@ -10,6 +10,7 @@ pub use txn::{TxnCmp, TxnOp, TxnOpResponse, TxnRequest, TxnResponse};

use tonic::transport::Channel;

use crate::client::Interceptor;
use crate::proto::etcdserverpb::kv_client::KvClient;
use crate::proto::mvccpb;
use crate::Result as Res;
Expand All @@ -18,23 +19,33 @@ use crate::Result as Res;
#[derive(Clone)]
pub struct Kv {
client: KvClient<Channel>,
interceptor: Interceptor,
}

impl Kv {
pub(crate) fn new(client: KvClient<Channel>) -> Self {
Self { client }
pub(crate) fn new(client: KvClient<Channel>, interceptor: Interceptor) -> Self {
Self {
client,
interceptor,
}
}

/// Performs a key-value saving operation.
pub async fn put(&mut self, req: PutRequest) -> Res<PutResponse> {
let resp = self.client.put(tonic::Request::new(req.into())).await?;
let resp = self
.client
.put(self.interceptor.intercept(tonic::Request::new(req.into())))
.await?;

Ok(resp.into_inner().into())
}

/// Performs a key-value fetching operation.
pub async fn range(&mut self, req: RangeRequest) -> Res<RangeResponse> {
let resp = self.client.range(tonic::Request::new(req.into())).await?;
let resp = self
.client
.range(self.interceptor.intercept(tonic::Request::new(req.into())))
.await?;

Ok(resp.into_inner().into())
}
Expand All @@ -43,15 +54,18 @@ impl Kv {
pub async fn delete(&mut self, req: DeleteRequest) -> Res<DeleteResponse> {
let resp = self
.client
.delete_range(tonic::Request::new(req.into()))
.delete_range(self.interceptor.intercept(tonic::Request::new(req.into())))
.await?;

Ok(resp.into_inner().into())
}

/// Performs a transaction operation.
pub async fn txn(&mut self, req: TxnRequest) -> Res<TxnResponse> {
let resp = self.client.txn(tonic::Request::new(req.into())).await?;
let resp = self
.client
.txn(self.interceptor.intercept(tonic::Request::new(req.into())))
.await?;

Ok(resp.into_inner().into())
}
Expand Down
24 changes: 17 additions & 7 deletions src/lease/mod.rs
Expand Up @@ -77,9 +77,12 @@ pub use grant::{LeaseGrantRequest, LeaseGrantResponse};
pub use keep_alive::{LeaseKeepAliveRequest, LeaseKeepAliveResponse};
pub use revoke::{LeaseRevokeRequest, LeaseRevokeResponse};

use crate::lazy::{Lazy, Shutdown};
use crate::proto::etcdserverpb;
use crate::proto::etcdserverpb::lease_client::LeaseClient;
use crate::{
client::Interceptor,
lazy::{Lazy, Shutdown},
};
use crate::{Error, Result};

mod grant;
Expand All @@ -95,12 +98,14 @@ struct LeaseKeepAliveTunnel {
}

impl LeaseKeepAliveTunnel {
fn new(mut client: LeaseClient<Channel>) -> Self {
fn new(mut client: LeaseClient<Channel>, interceptor: Interceptor) -> Self {
let (req_sender, req_receiver) = unbounded_channel::<etcdserverpb::LeaseKeepAliveRequest>();
let (resp_sender, resp_receiver) = unbounded_channel::<Result<LeaseKeepAliveResponse>>();

let (shutdown_tx, shutdown_rx) = oneshot::channel();
let request = tonic::Request::new(UnboundedReceiverStream::new(req_receiver));
let request = interceptor.intercept(tonic::Request::new(UnboundedReceiverStream::new(
req_receiver,
)));

// monitor inbound watch response and transfer to the receiver
tokio::spawn(async move {
Expand Down Expand Up @@ -151,25 +156,30 @@ impl Shutdown for LeaseKeepAliveTunnel {
pub struct Lease {
client: LeaseClient<Channel>,
keep_alive_tunnel: Arc<Lazy<LeaseKeepAliveTunnel>>,
interceptor: Interceptor,
}

impl Lease {
pub(crate) fn new(client: LeaseClient<Channel>) -> Self {
pub(crate) fn new(client: LeaseClient<Channel>, interceptor: Interceptor) -> Self {
let keep_alive_tunnel = {
let client = client.clone();
Arc::new(Lazy::new(move || LeaseKeepAliveTunnel::new(client.clone())))
let interceptor = interceptor.clone();
Arc::new(Lazy::new(move || {
LeaseKeepAliveTunnel::new(client.clone(), interceptor.clone())
}))
};
Self {
client,
keep_alive_tunnel,
interceptor,
}
}

/// Performs a lease granting operation.
pub async fn grant(&mut self, req: LeaseGrantRequest) -> Result<LeaseGrantResponse> {
let resp = self
.client
.lease_grant(tonic::Request::new(req.into()))
.lease_grant(self.interceptor.intercept(tonic::Request::new(req.into())))
.await?;

Ok(resp.into_inner().into())
Expand All @@ -179,7 +189,7 @@ impl Lease {
pub async fn revoke(&mut self, req: LeaseRevokeRequest) -> Result<LeaseRevokeResponse> {
let resp = self
.client
.lease_revoke(tonic::Request::new(req.into()))
.lease_revoke(self.interceptor.intercept(tonic::Request::new(req.into())))
.await?;

Ok(resp.into_inner().into())
Expand Down
18 changes: 11 additions & 7 deletions src/watch/mod.rs
Expand Up @@ -56,13 +56,16 @@ use tonic::transport::Channel;

pub use watch::{WatchCancelRequest, WatchCreateRequest, WatchResponse};

use crate::lazy::{Lazy, Shutdown};
use crate::proto::etcdserverpb;
use crate::proto::etcdserverpb::watch_client::WatchClient;
use crate::proto::mvccpb;
use crate::Error;
use crate::KeyValue;
use crate::Result;
use crate::{
client::Interceptor,
lazy::{Lazy, Shutdown},
};

mod watch;

Expand All @@ -75,7 +78,7 @@ struct WatchTunnel {
}

impl WatchTunnel {
fn new(mut client: WatchClient<Channel>) -> Self {
fn new(mut client: WatchClient<Channel>, interceptor: Interceptor) -> Self {
let (req_sender, req_receiver) = unbounded_channel::<etcdserverpb::WatchRequest>();
let (resp_sender, resp_receiver) = unbounded_channel::<Result<Option<WatchResponse>>>();

Expand All @@ -86,7 +89,7 @@ impl WatchTunnel {
tokio::spawn(async move {
let mut shutdown_rx = shutdown_rx.fuse();
let mut inbound = futures::select! {
res = client.watch(request).fuse() => {
res = client.watch(interceptor.intercept(request)).fuse() => {
match res {
Err(e) => {
resp_sender.send(Err(From::from(e))).unwrap();
Expand Down Expand Up @@ -142,18 +145,19 @@ impl Shutdown for WatchTunnel {
/// Watch client.
#[derive(Clone)]
pub struct Watch {
client: WatchClient<Channel>,
tunnel: Arc<Lazy<WatchTunnel>>,
}

impl Watch {
pub(crate) fn new(client: WatchClient<Channel>) -> Self {
pub(crate) fn new(client: WatchClient<Channel>, interceptor: Interceptor) -> Self {
let tunnel = {
let client = client.clone();
Arc::new(Lazy::new(move || WatchTunnel::new(client.clone())))
Arc::new(Lazy::new(move || {
WatchTunnel::new(client.clone(), interceptor.clone())
}))
};

Self { client, tunnel }
Self { tunnel }
}

/// Performs a watch operation.
Expand Down