Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update chat example to use tokio::select! #3587

Merged
merged 1 commit into from Mar 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 0 additions & 1 deletion examples/Cargo.toml
Expand Up @@ -11,7 +11,6 @@ tokio = { version = "1.0.0", features = ["full", "tracing"] }
tokio-util = { version = "0.6.3", features = ["full"] }
tokio-stream = { version = "0.1" }

async-stream = "0.3"
tracing = "0.1"
tracing-subscriber = { version = "0.2.7", default-features = false, features = ["fmt", "ansi", "env-filter", "chrono", "tracing-log"] }
bytes = "1.0.0"
Expand Down
100 changes: 31 additions & 69 deletions examples/chat.rs
Expand Up @@ -28,18 +28,16 @@

use tokio::net::{TcpListener, TcpStream};
use tokio::sync::{mpsc, Mutex};
use tokio_stream::{Stream, StreamExt};
use tokio_util::codec::{Framed, LinesCodec, LinesCodecError};
use tokio_stream::StreamExt;
use tokio_util::codec::{Framed, LinesCodec};

use futures::SinkExt;
use std::collections::HashMap;
use std::env;
use std::error::Error;
use std::io;
use std::net::SocketAddr;
use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
Expand Down Expand Up @@ -101,6 +99,9 @@ async fn main() -> Result<(), Box<dyn Error>> {
/// Shorthand for the transmit half of the message channel.
type Tx = mpsc::UnboundedSender<String>;

/// Shorthand for the receive half of the message channel.
type Rx = mpsc::UnboundedReceiver<String>;

/// Data that is shared between all peers in the chat server.
///
/// This is the set of `Tx` handles for all connected clients. Whenever a
Expand All @@ -124,7 +125,7 @@ struct Peer {
///
/// This is used to receive messages from peers. When a message is received
/// off of this `Rx`, it will be written to the socket.
rx: Pin<Box<dyn Stream<Item = String> + Send>>,
rx: Rx,
}

impl Shared {
Expand Down Expand Up @@ -156,58 +157,15 @@ impl Peer {
let addr = lines.get_ref().peer_addr()?;

// Create a channel for this peer
let (tx, mut rx) = mpsc::unbounded_channel();
let (tx, rx) = mpsc::unbounded_channel();

// Add an entry for this `Peer` in the shared state map.
state.lock().await.peers.insert(addr, tx);

let rx = Box::pin(async_stream::stream! {
while let Some(item) = rx.recv().await {
yield item;
}
});

Ok(Peer { lines, rx })
}
}

#[derive(Debug)]
enum Message {
/// A message that should be broadcasted to others.
Broadcast(String),

/// A message that should be received by a client
Received(String),
}

// Peer implements `Stream` in a way that polls both the `Rx`, and `Framed` types.
// A message is produced whenever an event is ready until the `Framed` stream returns `None`.
impl Stream for Peer {
type Item = Result<Message, LinesCodecError>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
// First poll the `UnboundedReceiver`.

if let Poll::Ready(Some(v)) = Pin::new(&mut self.rx).poll_next(cx) {
return Poll::Ready(Some(Ok(Message::Received(v))));
}

// Secondly poll the `Framed` stream.
let result: Option<_> = futures::ready!(Pin::new(&mut self.lines).poll_next(cx));

Poll::Ready(match result {
// We've received a message we should broadcast to others.
Some(Ok(message)) => Some(Ok(Message::Broadcast(message))),

// An error occurred.
Some(Err(e)) => Some(Err(e)),

// The stream has been exhausted.
None => None,
})
}
}

/// Process an individual chat client
async fn process(
state: Arc<Mutex<Shared>>,
Expand Down Expand Up @@ -241,28 +199,32 @@ async fn process(
}

// Process incoming messages until our stream is exhausted by a disconnect.
while let Some(result) = peer.next().await {
match result {
// A message was received from the current user, we should
// broadcast this message to the other users.
Ok(Message::Broadcast(msg)) => {
let mut state = state.lock().await;
let msg = format!("{}: {}", username, msg);

state.broadcast(addr, &msg).await;
}
// A message was received from a peer. Send it to the
// current user.
Ok(Message::Received(msg)) => {
loop {
tokio::select! {
// A message was received from a peer. Send it to the current user.
Some(msg) = peer.rx.recv() => {
peer.lines.send(&msg).await?;
}
Err(e) => {
tracing::error!(
"an error occurred while processing messages for {}; error = {:?}",
username,
e
);
}
result = peer.lines.next() => match result {
// A message was received from the current user, we should
// broadcast this message to the other users.
Some(Ok(msg)) => {
let mut state = state.lock().await;
let msg = format!("{}: {}", username, msg);

state.broadcast(addr, &msg).await;
}
// An error occurred.
Some(Err(e)) => {
tracing::error!(
"an error occurred while processing messages for {}; error = {:?}",
username,
e
);
}
// The stream has been exhausted.
None => break,
},
}
}

Expand Down