Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

async monitor? #431

Open
acheronfail opened this issue Apr 23, 2023 · 4 comments
Open

async monitor? #431

acheronfail opened this issue Apr 23, 2023 · 4 comments

Comments

@acheronfail
Copy link
Contributor

acheronfail commented Apr 23, 2023

Apologies if I've missed an example of this, or if there's some API for it but I've not found it.

I want to BecomeMonitor, but in an async environment. This monitor example is great and works, but it's not async.

This is how I've currently got it to work:

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let (resource, con) = dbus_tokio::connection::new_session_sync()?;
    tokio::spawn(async move {
        panic!("Lost connecton to dbus: {}", resource.await);
    });

    let rule = MatchRule::new()
        .with_type(dbus::MessageType::MethodCall)
        .with_path("/org/freedesktop/Notifications")
        .with_interface("org.freedesktop.DBus.Properties")
        .with_member("Set");

    let dbus_proxy = nonblock::Proxy::new(
        "org.freedesktop.DBus",
        "/org/freedesktop/DBus",
        Duration::from_secs(5),
        con.clone(),
    );

    let _: () = dbus_proxy
        .method_call(
            "org.freedesktop.DBus.Monitoring",
            "BecomeMonitor",
            (vec![rule.match_str()], 0u32),
        )
        .await?;

    // HERE: is there an "async" way to stream response from a monitor?
    let (tx, mut rx) = mpsc::channel(8);
    con.start_receive(
        rule.clone(),
        Box::new(move |msg: Message, _con: &SyncConnection| {
			// gets called whenever dunstctl toggle is called...
            let (_, what, is_paused): (&str, &str, Variant<bool>) = msg.read3().unwrap();
            if what == "paused" {
                let tx = tx.clone();
                tokio::spawn(async move {
                    tx.send(is_paused.0).await.unwrap();
                });
            }

            true
        }),
    );

    loop {
        match rx.recv().await {
            Some(paused) => dbg!(paused),
            None => {}
        }
    }
}

But, the callback I pass to con.start_receive isn't async friendly... I expected something along the lines of MsgMatch::stream for receiving messages from a monitor, but I can't seem to get that working, either (well it works if I eavesdrop(), but it panics if I use it after BecomeMonitor...). 🤔

Is there an async way to stream responses from Connection::start_receive? Or an async alternative?

@diwic
Copy link
Owner

diwic commented Apr 23, 2023

I'd say MsgMatch::stream is the way to go here, what issue do you have trying to use it?

Edit: Actually MsgMatch::msg_stream might be even better or at least easier to get working?

@acheronfail
Copy link
Contributor Author

acheronfail commented Apr 23, 2023

It seems no matter what I do, if I ask dbus to BecomeMonitor, then any time I use MsgMatch::stream or MsgMatch::msg_stream after the connection dies with:

D-Bus error: Read/write failed (org.freedesktop.DBus.Error.Failed)

I've pushed a small repository here: https://github.com/acheronfail/dbus-tokio-monitor, you should be able to reproduce it with that.

And if I don't ask to BecomeMonitor, there is no error, but also no events ever seem to come though...

@diwic
Copy link
Owner

diwic commented Apr 29, 2023

Hi,

Thanks for providing a repository, I suspect this is a bug inside the libdbus library, but I'm not sure. I spent some time looking into it but I'm quite busy currently I haven't had time to nail it down, might need to rebuild libdbus with debug prints or so...

Is this a thread safety problem; i e, does it help if you use a single thread scheduler instead?

@acheronfail
Copy link
Contributor Author

Thanks for your help so far!

Hmm, I used #[tokio::main(flavor = "current_thread")] for a single thread runtime as described in tokio's documentation but unfortunately the error still persists. :(

I also verified using std::thread::current().name and yes, it all ran on the same thread.

I'm also fairly busy for the next little while, but if I get the chance I'll try to look into this too (never built libdbus before...)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants