Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

How to handle the closing of a sync connection? #201

Open
ghost opened this issue Dec 19, 2018 · 1 comment
Open

How to handle the closing of a sync connection? #201

ghost opened this issue Dec 19, 2018 · 1 comment

Comments

@ghost
Copy link

ghost commented Dec 19, 2018

No description provided.

@ghost ghost changed the title How to handle the closing of a connection? How to handle the closing of a sync connection? Dec 19, 2018
@MightyPork
Copy link

MightyPork commented Feb 25, 2019

I'm facing the same problem.

I managed to shut down the sender by injecting a OwnedMessage::Close(None) into the mpsc channel. But how do I shut down the receiver?


Edit: I found a solution! This should really be better documented.

let (mut ws_reader, mut ws_writer) = client.split().unwrap();
let (pipe_tx, pipe_rx) = mpsc::channel();
let pipe_tx1 = pipe_tx.clone(); // control pipe for controlled shutdown

let send_loop = thread::spawn(move || {
    'sender: loop {
        let message = match pipe_rx.recv() {
            Ok(m) => m,
            Err(e) => {
                break 'sender; // !!! break instead of return
            }
        };

        match message {
            OwnedMessage::Close(_) => {
                let _ = ws_writer.send_message(&message);
                break 'sender; // !!! break instead of return
            }
            _ => (),
        }

        match ws_writer.send_message(&message) {
            Ok(()) => (),
            Err(e) => {
                let _ = ws_writer.send_message(&Message::close());
                break 'sender; // !!! break instead of return
            }
        }
    }

    // !!! this is the magic line
    ws_writer.shutdown_all();
});

let receive_loop = thread::spawn(move || {
    for message in ws_reader.incoming_messages() {
        let message = match message {
            Ok(m) => m,
            Err(e) => {
                let _ = pipe_tx.send(OwnedMessage::Close(None));
                return;
            }
        };

        match message {
            OwnedMessage::Close(_) => {
                let _ = pipe_tx.send(OwnedMessage::Close(None));
                return;
            }
            OwnedMessage::Ping(data) => {
                match pipe_tx.send(OwnedMessage::Pong(data)) {
                    Ok(()) => (),
                    Err(e) => {
                        return;
                    }
                }
            }
            _ => println!("Received: {:?}", message), // TODO handle data
        }
    }
});

thread::sleep(Duration::from_millis(5000));

println!("Trying to close");
let _ = pipe_tx1.send(OwnedMessage::Close(None));

println!("Waiting for threads");

send_loop.join();
println!("Sender closed");
receive_loop.join();
println!("Receiver closed");

println!("Terminated.");

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant