Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add more comments to chat example #1665

Merged
merged 2 commits into from
Jan 8, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
28 changes: 19 additions & 9 deletions examples/chat/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

// Our shared state
struct AppState {
// We require unique usernames. This tracks which usernames have been taken.
user_set: Mutex<HashSet<String>>,
// Channel used to send messages to all connected clients.
tx: broadcast::Sender<String>,
}

Expand All @@ -40,6 +42,7 @@ async fn main() {
.with(tracing_subscriber::fmt::layer())
.init();

// Set up application state for use with with_state().
let user_set = Mutex::new(HashSet::new());
let (tx, _rx) = broadcast::channel(100);

Expand All @@ -65,8 +68,11 @@ async fn websocket_handler(
ws.on_upgrade(|socket| websocket(socket, state))
}

// This function deals with a single websocket connection, i.e., a single
// connected client / user, for which we will spawn two independent tasks (for
// receiving / sending chat messages).
async fn websocket(stream: WebSocket, state: Arc<AppState>) {
// By splitting we can send and receive at the same time.
// By splitting, we can send and receive at the same time.
let (mut sender, mut receiver) = stream.split();

// Username gets set in the receive loop, if it's valid.
Expand All @@ -91,15 +97,17 @@ async fn websocket(stream: WebSocket, state: Arc<AppState>) {
}
}

// Subscribe before sending joined message.
// We subscribe *before* sending the "joined" message, so that we will also
// display it to our client.
let mut rx = state.tx.subscribe();

// Send joined message to all subscribers.
// Now send the "joined" message to all subscribers.
let msg = format!("{} joined.", username);
tracing::debug!("{}", msg);
let _ = state.tx.send(msg);

// This task will receive broadcast messages and send text message to our client.
// Spawn the first task that will receive broadcast messages and send text
// messages over the websocket to our client.
let mut send_task = tokio::spawn(async move {
while let Ok(msg) = rx.recv().await {
// In any websocket error, break loop.
Expand All @@ -109,29 +117,31 @@ async fn websocket(stream: WebSocket, state: Arc<AppState>) {
}
});

// Clone things we want to pass to the receiving task.
// Clone things we want to pass (move) to the receiving task.
let tx = state.tx.clone();
let name = username.clone();

// This task will receive messages from client and send them to broadcast subscribers.
// Spawn a task that takes messages from the websocket, prepends the user
// name, and sends them to all broadcast subscribers.
let mut recv_task = tokio::spawn(async move {
while let Some(Ok(Message::Text(text))) = receiver.next().await {
// Add username before message.
let _ = tx.send(format!("{}: {}", name, text));
}
});

// If any one of the tasks exit, abort the other.
// If any one of the tasks run to completion, we abort the other.
tokio::select! {
_ = (&mut send_task) => recv_task.abort(),
_ = (&mut recv_task) => send_task.abort(),
};

// Send user left message.
// Send "user left" message (similar to "joined" above).
let msg = format!("{} left.", username);
tracing::debug!("{}", msg);
let _ = state.tx.send(msg);
// Remove username from map so new clients can take it.

// Remove username from map so new clients can take it again.
state.user_set.lock().unwrap().remove(&username);
}

Expand Down