Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use futures::future::select instead of select! macro #100

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
38 changes: 21 additions & 17 deletions libsignal-service-actix/src/websocket.rs
Expand Up @@ -5,7 +5,7 @@ use awc::{
ws::Frame,
};
use bytes::Bytes;
use futures::{channel::mpsc::*, prelude::*};
use futures::{channel::mpsc::*, future::Either, pin_mut, prelude::*};
use url::Url;

use libsignal_service::{
Expand Down Expand Up @@ -53,32 +53,33 @@ impl From<WsProtocolError> for AwcWebSocketError {

/// Process the WebSocket, until it times out.
async fn process<S: Stream>(
socket_stream: S,
mut socket_stream: S,
mut incoming_sink: Sender<WebSocketStreamItem>,
) -> Result<(), AwcWebSocketError>
where
S: Unpin,
S: Stream<Item = Result<Frame, WsProtocolError>>,
{
let mut socket_stream = socket_stream.fuse();

let mut ka_interval = actix::clock::interval_at(
actix::clock::Instant::now(),
push_service::KEEPALIVE_TIMEOUT_SECONDS,
);

loop {
let tick = ka_interval.tick().fuse();
futures::pin_mut!(tick);
futures::select! {
_ = tick => {
let tick = ka_interval.tick();
pin_mut!(tick);
match future::select(tick, socket_stream.next()).await {
Either::Left((_tick, _)) => {
log::trace!("Triggering keep-alive");
if let Err(e) = incoming_sink.send(WebSocketStreamItem::KeepAliveRequest).await {
if let Err(e) = incoming_sink
.send(WebSocketStreamItem::KeepAliveRequest)
.await
{
log::info!("Websocket sink has closed: {:?}.", e);
break;
};
},
frame = socket_stream.next() => {
}
Either::Right((frame, _tick)) => {
let frame = if let Some(frame) = frame {
frame
} else {
Expand All @@ -94,32 +95,35 @@ where
log::warn!("Received Ping({:?})", msg);

continue;
},
}
Frame::Pong(msg) => {
log::trace!("Received Pong({:?})", msg);

continue;
},
}
Frame::Text(frame) => {
log::warn!("Frame::Text {:?}", frame);

// this is a protocol violation, maybe break; is better?
continue;
},
}

Frame::Close(c) => {
log::warn!("Websocket closing: {:?}", c);

break;
},
}
};

// Match SendError
if let Err(e) = incoming_sink.send(WebSocketStreamItem::Message(frame)).await {
if let Err(e) = incoming_sink
.send(WebSocketStreamItem::Message(frame))
.await
{
log::info!("Websocket sink has closed: {:?}.", e);
break;
}
},
}
}
}
Ok(())
Expand Down
36 changes: 21 additions & 15 deletions libsignal-service-hyper/src/websocket.rs
Expand Up @@ -5,7 +5,7 @@ use async_tungstenite::{
tungstenite::{Error as TungsteniteError, Message},
};
use bytes::Bytes;
use futures::{channel::mpsc::*, prelude::*};
use futures::{channel::mpsc::*, future::Either, pin_mut, prelude::*};
use hyper::StatusCode;
use tokio::time::Instant;
use tokio_rustls::rustls;
Expand Down Expand Up @@ -74,30 +74,33 @@ impl From<TungsteniteWebSocketError> for ServiceError {

// Process the WebSocket, until it times out.
async fn process<S: Stream>(
socket_stream: S,
mut socket_stream: S,
mut incoming_sink: Sender<WebSocketStreamItem>,
) -> Result<(), TungsteniteWebSocketError>
where
S: Unpin,
S: Stream<Item = Result<Message, TungsteniteError>>,
{
let mut socket_stream = socket_stream.fuse();

let mut ka_interval = tokio::time::interval_at(
Instant::now(),
push_service::KEEPALIVE_TIMEOUT_SECONDS,
);

loop {
tokio::select! {
_ = ka_interval.tick() => {
let tick = ka_interval.tick();
pin_mut!(tick);
match future::select(tick, socket_stream.next()).await {
Either::Left((_tick, _)) => {
log::trace!("Triggering keep-alive");
if let Err(e) = incoming_sink.send(WebSocketStreamItem::KeepAliveRequest).await {
if let Err(e) = incoming_sink
.send(WebSocketStreamItem::KeepAliveRequest)
.await
{
log::info!("Websocket sink has closed: {:?}.", e);
break;
};
},
frame = socket_stream.next() => {
}
Either::Right((frame, _)) => {
let frame = if let Some(frame) = frame {
frame
} else {
Expand All @@ -111,32 +114,35 @@ where
log::warn!("Received Ping({:?})", msg);

continue;
},
}
Message::Pong(msg) => {
log::trace!("Received Pong({:?})", msg);

continue;
},
}
Message::Text(frame) => {
log::warn!("Message::Text {:?}", frame);

// this is a protocol violation, maybe break; is better?
continue;
},
}

Message::Close(c) => {
log::warn!("Websocket closing: {:?}", c);

break;
},
}
};

// Match SendError
if let Err(e) = incoming_sink.send(WebSocketStreamItem::Message(Bytes::from(frame))).await {
if let Err(e) = incoming_sink
.send(WebSocketStreamItem::Message(Bytes::from(frame)))
.await
{
log::info!("Websocket sink has closed: {:?}.", e);
break;
}
},
}
}
}
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion libsignal-service/src/account_manager.rs
Expand Up @@ -398,7 +398,7 @@ impl<Service: PushService> AccountManager<Service> {

/// Set profile attributes
///
/// Signal Android does not allow unsetting voice/video.
/// Signal Android does not allow un-setting voice/video.
#[allow(clippy::too_many_arguments)]
pub async fn set_account_attributes(
&mut self,
Expand Down
34 changes: 18 additions & 16 deletions libsignal-service/src/messagepipe.rs
Expand Up @@ -6,6 +6,7 @@ use futures::{
mpsc::{self, Sender},
oneshot,
},
future::Either,
prelude::*,
stream::{FusedStream, FuturesUnordered},
};
Expand Down Expand Up @@ -129,43 +130,44 @@ impl<WS: WebSocketService> MessagePipe<WS> {
let mut background_work = FuturesUnordered::<LocalBoxFuture<()>>::new();
// a pending task is added, as to never end the background worker until
// it's dropped.
background_work.push(futures::future::pending().boxed_local());
background_work.push(future::pending().boxed_local());

loop {
futures::select! {
// WebsocketConnection::onMessage(ByteString)
frame = self.stream.next() => match frame {
match future::select(self.stream.next(), background_work.next())
.await
{
Either::Left((frame, _)) => match frame {
Some(WebSocketStreamItem::Message(frame)) => {
let env = self.process_frame(frame).await.transpose();
if let Some(env) = env {
sink.send(env).await?;
}
},
}
Some(WebSocketStreamItem::KeepAliveRequest) => {
let request = self.send_keep_alive().await;
match request {
Ok(request) => {
let request = request.map(|response| {
if let Err(e) = response {
log::warn!("Error from keep alive: {:?}", e);
log::warn!(
"Error from keep alive: {:?}",
e
);
}
});
background_work.push(request.boxed_local());
},
Err(e) => log::warn!("Could not send keep alive: {}", e),
}
Err(e) => {
log::warn!("Could not send keep alive: {}", e)
}
}
},
}
None => {
log::debug!("WebSocket stream ended.");
break;
},
},
_ = background_work.next() => {
// no op
}
},
complete => {
log::info!("select! complete");
}
Either::Right((_background_work, _)) => {}
}
}

Expand Down
16 changes: 7 additions & 9 deletions libsignal-service/src/provisioning/pipe.rs
@@ -1,6 +1,7 @@
use bytes::{Bytes, BytesMut};
use futures::{
channel::mpsc::{self, Sender},
future::Either,
prelude::*,
stream::FuturesUnordered,
};
Expand Down Expand Up @@ -93,25 +94,22 @@ impl<WS: WebSocketService> ProvisioningPipe<WS> {
background_work.push(futures::future::pending().boxed_local());

loop {
futures::select! {
match future::select(self.stream.next(), background_work.next())
.await
{
// WebsocketConnection::onMessage(ByteString)
frame = self.stream.next() => match frame {
Either::Left((frame, _)) => match frame {
Some(WebSocketStreamItem::Message(frame)) => {
let env = self.process_frame(frame).await.transpose();
if let Some(env) = env {
sink.send(env).await?;
}
},
}
// TODO: implement keep-alive?
Some(WebSocketStreamItem::KeepAliveRequest) => continue,
None => break,
},
_ = background_work.next() => {
// no op
},
complete => {
log::info!("select! complete");
}
Either::Right((_background_work, _)) => (),
}
}

Expand Down