Skip to content

Commit

Permalink
Initial work towards supporting Groups v2
Browse files Browse the repository at this point in the history
  • Loading branch information
gferon committed Mar 2, 2021
1 parent 65d0ba3 commit f4166a5
Show file tree
Hide file tree
Showing 10 changed files with 701 additions and 135 deletions.
2 changes: 1 addition & 1 deletion libsignal-service-actix/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ libsignal-protocol = { git = "https://github.com/Michael-F-Bryan/libsignal-proto
awc = { version = "3.0.0-beta.1", features=["rustls"] }
actix = "0.11.0-beta.1"
actix-http = "3.0.0-beta.1"
actix-rt = "2.0.0"
actix-rt = "2.0"
mpart-async = "0.5.0"
serde_json = "1.0"
futures = "0.3"
Expand Down
238 changes: 151 additions & 87 deletions libsignal-service-actix/src/push_service.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
use std::{sync::Arc, time::Duration};

use actix_http::http::HeaderValue;
use awc::{
error::PayloadError, http::StatusCode, Client, ClientResponse, Connector,
};
use bytes::Bytes;
use futures::prelude::*;
use libsignal_service::{
configuration::*, messagepipe::WebSocketService, push_service::*,
configuration::*, messagepipe::WebSocketService, prelude::prost,
push_service::*,
};
use serde::{Deserialize, Serialize};
use url::Url;

use crate::websocket::AwcWebSocket;

#[derive(Clone)]
pub struct AwcPushService {
cfg: ServiceConfiguration,
base_url: Url,
credentials: Option<(String, String)>,
client: awc::Client,
}

Expand All @@ -26,30 +27,36 @@ impl PushService for AwcPushService {
type ByteStream = Box<dyn futures::io::AsyncRead + Unpin>;
type WebSocket = AwcWebSocket;

async fn get<T>(&mut self, path: &str) -> Result<T, ServiceError>
async fn get_json<T>(
&mut self,
endpoint: Endpoint,
path: &str,
credentials: Option<(String, String)>,
) -> Result<T, ServiceError>
where
for<'de> T: Deserialize<'de>,
{
// In principle, we should be using http::uri::Uri,
// but that doesn't seem like an owned type where we can do this kind of
// constructions on.
// https://docs.rs/http/0.2.1/http/uri/struct.Uri.html
let url = self.base_url.join(path).expect("valid url");
let url = self.cfg.base_url(endpoint).join(path)?;

log::debug!("AwcPushService::get({:?})", url);
use awc::error::{ConnectError, SendRequestError};
let mut response = self.client.get(url.as_str()).send().await.map_err(
|e| match e {
SendRequestError::Connect(ConnectError::Timeout) => {
ServiceError::Timeout {
reason: e.to_string(),
}
}
_ => ServiceError::SendError {
let mut request = self.client.get(url.as_str());
if let Some((ident, pass)) =
credentials.as_ref().or(self.credentials.as_ref())
{
request = request.basic_auth(ident, Some(pass));
}

let mut response = request.send().await.map_err(|e| match e {
SendRequestError::Connect(ConnectError::Timeout) => {
ServiceError::Timeout {
reason: e.to_string(),
},
}
}
_ => ServiceError::SendError {
reason: e.to_string(),
},
)?;
})?;

log::debug!("AwcPushService::get response: {:?}", response);

Expand Down Expand Up @@ -84,31 +91,33 @@ impl PushService for AwcPushService {
}

/// Deletes a resource through the HTTP DELETE verb.
async fn delete<T>(&mut self, path: &str) -> Result<T, ServiceError>
async fn delete_json<T>(
&mut self,
endpoint: Endpoint,
path: &str,
) -> Result<T, ServiceError>
where
for<'de> T: Deserialize<'de>,
{
// In principle, we should be using http::uri::Uri,
// but that doesn't seem like an owned type where we can do this kind of
// constructions on.
// https://docs.rs/http/0.2.1/http/uri/struct.Uri.html
let url = self.base_url.join(path).expect("valid url");
let url = self.cfg.base_url(endpoint).join(path)?;

log::debug!("AwcPushService::delete({:?})", url);
use awc::error::{ConnectError, SendRequestError};
let mut response =
self.client.delete(url.as_str()).send().await.map_err(
|e| match e {
SendRequestError::Connect(ConnectError::Timeout) => {
ServiceError::Timeout {
reason: e.to_string(),
}
}
_ => ServiceError::SendError {
reason: e.to_string(),
},
},
)?;
let mut request = self.client.delete(url.as_str());
if let Some((ident, pass)) = &self.credentials {
request = request.basic_auth(ident, Some(pass));
}

let mut response = request.send().await.map_err(|e| match e {
SendRequestError::Connect(ConnectError::Timeout) => {
ServiceError::Timeout {
reason: e.to_string(),
}
}
_ => ServiceError::SendError {
reason: e.to_string(),
},
})?;

log::debug!("AwcPushService::delete response: {:?}", response);

Expand Down Expand Up @@ -145,30 +154,29 @@ impl PushService for AwcPushService {
}
}

async fn put<D, S>(
async fn put_json<D, S>(
&mut self,
endpoint: Endpoint,
path: &str,
value: S,
) -> Result<D, ServiceError>
where
for<'de> D: Deserialize<'de>,
S: Serialize,
{
// In principle, we should be using http::uri::Uri,
// but that doesn't seem like an owned type where we can do this kind of
// constructions on.
// https://docs.rs/http/0.2.1/http/uri/struct.Uri.html
let url = self.base_url.join(path).expect("valid url");
let url = self.cfg.base_url(endpoint).join(path)?;

log::debug!("AwcPushService::put({:?})", url);
let mut response = self
.client
.put(url.as_str())
.send_json(&value)
.await
.map_err(|e| ServiceError::SendError {
let mut request = self.client.put(url.as_str());
if let Some((ident, pass)) = &self.credentials {
request = request.basic_auth(ident, Some(pass));
}

let mut response = request.send_json(&value).await.map_err(|e| {
ServiceError::SendError {
reason: e.to_string(),
})?;
}
})?;

log::debug!("AwcPushService::put response: {:?}", response);

Expand Down Expand Up @@ -210,24 +218,96 @@ impl PushService for AwcPushService {
}
}

async fn get_protobuf<T>(
&mut self,
endpoint: Endpoint,
path: &str,
credentials: Option<(String, String)>,
) -> Result<T, ServiceError>
where
T: Default + prost::Message,
{
let url = self.cfg.base_url(endpoint).join(path)?;
log::debug!("AwcPushService::get_protobuf({:?})", url);

let mut request = self.client.get(url.as_str());
if let Some((ident, pass)) =
credentials.as_ref().or(self.credentials.as_ref())
{
request = request.basic_auth(ident, Some(&pass));
};

let mut response =
request.send().await.map_err(|e| ServiceError::SendError {
reason: e.to_string(),
})?;

let text =
response
.body()
.await
.map_err(|e| ServiceError::ResponseError {
reason: e.to_string(),
})?;
Ok(prost::Message::decode(text)?)
}

async fn put_protobuf<D, S>(
&mut self,
endpoint: Endpoint,
path: &str,
value: S,
) -> Result<D, ServiceError>
where
D: Default + prost::Message,
S: Sized + prost::Message,
{
let url = self.cfg.base_url(endpoint).join(path)?;
log::debug!("AwcPushService::put_protobuf({:?})", url);

let mut buf = vec![];
value.encode(&mut buf)?;

let mut request = self.client.put(url.as_str());
if let Some((ident, pass)) = self.credentials.as_ref() {
request = request.basic_auth(ident, Some(pass));
}

let mut response = request
.content_type(HeaderValue::from_static("application/x-protobuf"))
.send_body(buf)
.await
.map_err(|e| ServiceError::SendError {
reason: e.to_string(),
})?;

let text =
response
.body()
.await
.map_err(|e| ServiceError::ResponseError {
reason: e.to_string(),
})?;
Ok(prost::Message::decode(text)?)
}

async fn get_from_cdn(
&mut self,
cdn_id: u32,
path: &str,
) -> Result<Self::ByteStream, ServiceError> {
use futures::stream::TryStreamExt;

let url = Url::parse(&self.cfg.cdn_urls[&cdn_id])
.expect("valid cdn base url")
.join(path)
.expect("valid CDN path");
let url = self.cfg.base_url(Endpoint::Cdn(cdn_id)).join(path)?;

log::debug!("AwcPushService::get_stream({:?})", url);
let mut request = self.client.get(url.as_str());
if let Some((ident, pass)) = self.credentials.as_ref() {
request = request.basic_auth(ident, Some(pass));
}

let mut response =
self.client.get(url.as_str()).send().await.map_err(|e| {
ServiceError::SendError {
reason: e.to_string(),
}
request.send().await.map_err(|e| ServiceError::SendError {
reason: e.to_string(),
})?;

log::debug!("AwcPushService::get_stream response: {:?}", response);
Expand Down Expand Up @@ -256,13 +336,13 @@ impl PushService for AwcPushService {
value: &[(&str, &str)],
file: Option<(&str, &'s mut C)>,
) -> Result<(), ServiceError> {
let url = Url::parse(&self.cfg.cdn_urls[&0])
.expect("valid cdn base url")
.join(path)
.expect("valid CDN path");
let url = self.cfg.base_url(Endpoint::Cdn(0)).join(path)?;

log::debug!("AwcPushService::post_to_cdn({:?})", url);
let client = self.client.post(url.as_str());
let mut request = self.client.post(url.as_str());
if let Some((ident, pass)) = self.credentials.as_ref() {
request = request.basic_auth(ident, Some(pass));
}

let mut form = mpart_async::client::MultipartRequest::default();

Expand Down Expand Up @@ -309,7 +389,7 @@ impl PushService for AwcPushService {
body_contents.len()
);

let mut response = client
let mut response = request
.content_type(&content_type)
.content_length(body_contents.len() as u64)
.send_body(body_contents)
Expand Down Expand Up @@ -338,7 +418,7 @@ impl PushService for AwcPushService {
> {
Ok(AwcWebSocket::with_client(
&mut self.client,
&self.base_url,
self.cfg.base_url(Endpoint::Service),
path,
credentials.as_ref(),
)
Expand All @@ -354,11 +434,7 @@ impl PushService for AwcPushService {
/// * 10s timeout on TCP connection
/// * 65s timeout on HTTP request
/// * provided user-agent
pub fn get_client(
cfg: &ServiceConfiguration,
credentials: Option<Credentials>,
user_agent: &str,
) -> Client {
fn get_client(cfg: &ServiceConfiguration, user_agent: &str) -> Client {
let mut ssl_config = rustls::ClientConfig::new();
ssl_config.alpn_protocols = vec![b"http/1.1".to_vec()];
ssl_config
Expand All @@ -371,37 +447,25 @@ pub fn get_client(
.rustls(Arc::new(ssl_config))
.timeout(Duration::from_secs(10)) // https://github.com/actix/actix-web/issues/1047
.finish();
let mut client = awc::ClientBuilder::new()
let client = awc::ClientBuilder::new()
.connector(connector)
.header("X-Signal-Agent", user_agent)
.timeout(Duration::from_secs(65)); // as in Signal-Android

if let Some(credentials) = credentials {
if let Some((ident, pass)) = credentials.authorization() {
client = client.basic_auth(ident, Some(pass));
}
};

client.finish()
}

impl AwcPushService {
/// Creates a new AwcPushService
///
/// Panics on invalid service url.
pub fn new(
cfg: ServiceConfiguration,
credentials: Option<Credentials>,
user_agent: &str,
) -> Self {
let base_url =
Url::parse(&cfg.service_urls[0]).expect("valid service url");

let client = get_client(&cfg, credentials, user_agent);

let client = get_client(&cfg, user_agent);
Self {
cfg,
base_url,
credentials: credentials.and_then(|c| c.authorization()),
client,
}
}
Expand Down

0 comments on commit f4166a5

Please sign in to comment.