Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

from TcpStream to signal example #58

Open
nothingIsSomething opened this issue Jun 17, 2022 · 5 comments
Open

from TcpStream to signal example #58

nothingIsSomething opened this issue Jun 17, 2022 · 5 comments

Comments

@nothingIsSomething
Copy link

nothingIsSomething commented Jun 17, 2022

hello I'm new in rust, very used to rxjs, could you help me with a simple example please? , I have already read the crate tutorial but I'm still a bit lost, I'm trying to create a signal from a tcpStream.

use tokio;
use futures;
use futures::{SinkExt, StreamExt};
use futures_signals::signal::Broadcaster;
use tokio_tungstenite::{connect_async};
use futures_signals::signal::SignalExt; //for Iterator trait (gives for_each)
use futures_signals::signal::from_stream;

pub async fn new(url: &str) {
let (ws_stream, _response) = connect_async(url).await.expect("Failed to connect");
let (mut write, read) = ws_stream.split();

    //1. obsevable
    let signal_from_stream = from_stream(read);//.to_stream();

    //2. broadcast to clone the signal.
    let broadcaster = Broadcaster::new(signal_from_stream);
  
    //let signal = broadcaster.signal();

   //3. create observers 
   let observer_A = broadcaster.signal().for_each(|message| {
    println!("Observer A receiving... {}", message);
   
    let msg = message.unwrap().to_string().unwrap(); 
    println!("Received: {:?}", msg);
    async {}
    });

    let observer_B = broadcaster.signal().for_each(|message| {
        println!("Observer B receiving... {}", message);
  
        let msg = message.unwrap().to_string().unwrap(); 
        println!("Received: {:?}", msg);
        async {}
    });

    tokio::spawn(observer_A);
    tokio::spawn(observer_B);
}
@Pauan
Copy link
Owner

Pauan commented Jun 17, 2022

What's the issue that you're having? That code seems reasonable to me, though it usually doesn't make sense to convert a Stream into a Signal, because when you convert a Stream into a Signal, the Signal will only contain the most-recent message, all other messages are dropped.

@nothingIsSomething
Copy link
Author

As it says in the tutorial, it doesn't make sense if you need the intermediate values, well, in this case I'm only interested in the last message, not in buffering all the incoming messages, I guess I gain in performance with the signal, I need to learn more about the streams and signals, I'm trying to find the differences and similarities with rxjs to gain understanding faster.

this is the error that I get:

the method `signal` exists for struct `futures_signals::signal::Broadcaster<futures_signals::signal::FromStream<futures::stream::SplitStream<tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>>>>`, but its trait bounds were not satisfied

method cannot be called on `futures_signals::signal::Broadcaster<futures_signals::signal::FromStream<futures::stream::SplitStream<tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>>>>` due to unsatisfied trait bounds

note: the following trait bounds were not satisfied:
      `std::option::Option<std::result::Result<tokio_tungstenite::tungstenite::Message, tokio_tungstenite::tungstenite::Error>>: std::marker::Copy`rustc(E0599)
client.rs(22, 33): method cannot be called on `futures_signals::signal::Broadcaster<futures_signals::signal::FromStream<futures::stream::SplitStream<tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>>>>` due to unsatisfied trait bounds
option.rs(515, 1): doesn't satisfy `_: std::marker::Copy`

@Pauan
Copy link
Owner

Pauan commented Jun 17, 2022

I guess I gain in performance with the signal

That depends on what you're trying to do. When you convert a Stream into a Signal, it's still pulling all of the Stream values like usual, so the performance is the same, it's just dropping the intermediate values.

The buffer comes from the Stream itself, so the buffer still exists either way. And when you use StreamExt methods like map it doesn't create a new buffer, so the performance should be the same in most situations.

In addition to that, Broadcaster is pretty costly in performance, so in your code snippet it might actually be faster to use Streams.

this is the error that I get

That error is saying that the message type doesn't implement Copy, and so you can't use it with signal().

If the message type implements Clone then you can use signal_cloned() instead.

If the message type doesn't implement Copy or Clone then you have to use signal_ref(|x| { ... }) which gives you a reference to the message.

All of this is only needed because you're using Broadcaster, if you didn't use Broadcaster then it becomes much simpler and faster:

let future = from_stream(read).for_each(move |message| {
    if let Some(message) = message {
        // ...
    }
    async {}
});

If you want to continue to use Broadcaster, then I suggest doing something like this:

let broadcaster = Broadcaster::new(from_stream(read).map(|message| {
    message.map(|message| message.to_string())
}));

This converts the message into a String, so now you can use broadcaster.signal_cloned(), because String implements Clone.

@Pauan
Copy link
Owner

Pauan commented Jun 17, 2022

I'm trying to find the differences and similarities with rxjs to gain understanding faster.

RxJS is closer to Streams than it is to Signals. In RxJS they combine the concept of Streams and Signals together, which causes tons of bugs and problems. In Rust the two concepts are very cleanly separated:

  • A Stream is an ordered sequence of 0 or more values over time.

  • A Stream will emit every value, without missing anything.

  • A Stream can be empty.

  • A Signal is a single value which changes over time.

  • A Signal automatically drops intermediate values, it only keeps the most recent value.

  • A Signal can never be empty, it must always have a value.

You can think of it as being like the difference between having a mutable Vec and a mutable variable:

  • A Vec can be empty, but a variable must always have a value.
  • A Vec can contain multiple values, but a variable only contains one value (the most recent value).
  • You can merge multiple Vecs together in various ways (chain, zip, concat), but you can't do that with a variable.
  • You can combine two variables together easily just by using them (e.g. a + b) but you can't do that with a Vec.

In this analogy, a Stream is like an asynchronous Vec, whereas a Signal is like an asynchronous variable. They have very different behavior and APIs, because they do different things, they serve different purposes. Signals do not replace Streams, instead they complement Streams, because both are useful.

@nothingIsSomething
Copy link
Author

it's all very clear to me now! thank you so much! :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants