Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Large messages require multiple Connection::process() calls to be received. #364

Open
Toqozz opened this issue Dec 4, 2021 · 10 comments
Open

Comments

@Toqozz
Copy link
Contributor

Toqozz commented Dec 4, 2021

This is a really weird issue.

Using the following main loop (blocking):

fn main() {
    let (sender, receiver) = channel();
    let (mut cr, c) = init_bus(sender);

    // cr.serve(&c); equivalent
    c.start_receive(dbus::message::MatchRule::new_method_call(), Box::new(move |msg, conn| {
        cr.handle_message(msg, conn).unwrap();
        true
    }));

    loop {
        dbg!("loop");

        let _result = c.process(std::time::Duration::from_millis(0));
        if let Ok(notif) = receiver.try_recv() {
            dbg!(notif);
        }

        std::thread::sleep(std::time::Duration::from_millis(1000));
    }
}

For large messages (org.freedesktop.Notifications), this will need mulitple loops to actually receive something. If I only sleep for e.g. 1ms, then messages will be received very quickly.

I'm guessing that messages are chunked in some way and Connection::process() needs to be called multiple times to receive the whole message. I can see that if I remove the sleep and set the process() timeout to 1000ms, it runs many loops as soon as I send a message, so it seems to know that it's trying to process an incomplete message.

Perhaps Connection::process() only processes one message "chunk" at a time? Makes sense. This behaviour would be ok if I can somehow see that messages are pending so that I can spin and process them all. I looked at the Result of c.process(), but there seems to be no indication that messages are waiting. I suspect that the correct way to get this information is somehow through Channel, but I tried a few things and couldn't get a result.

I suspect I could also solve this by just running SyncConnection::process() in a separate thread, but I'm not very familiar with how the Sync traits interract. Is there an equivalent for the following but using a SyncConnection?:

c.start_receive(dbus::message::MatchRule::new_method_call(), Box::new(move |msg, conn| {
    cr.handle_message(msg, conn).unwrap();
    true
}));

Here's a gist for full repro: https://gist.github.com/Toqozz/4c91deb2a38c09a7804d7c6e4c706d0a

You can test by running notify-send "<summary>" where <summary> is really large (>~ 300 bytes).

@diwic
Copy link
Owner

diwic commented Dec 6, 2021

Hmm...interesting issue. Does something like this help:

c.channel().set_watch_enabled(true);
loop {
    loop {
        c.process(std::time::Duration::from_millis(0)).unwrap();
        if (!c.channel().watch().read) break;
   }
   /* process incoming messages, sleep etc */
}

@Toqozz Toqozz changed the title Frequency of calling Connection::process() affects message receiving. Large messages require multiple Connection::process() calls to be received. Dec 7, 2021
@Toqozz
Copy link
Contributor Author

Toqozz commented Dec 7, 2021

This fails because Channel::set_watch_enabled(true) takes &mut self but Connection::channel() returns &self. I'm not sure how to get a mutable reference to channel without manually creating the whole connection, which doesn't seem feasible.

error[E0596]: cannot borrow data in a `&` reference as mutable
  --> src/main.rs:93:5
   |
93 |     c.channel().set_watch_enabled(true);
   |     ^^^^^^^^^^^ cannot borrow as mutable

I've sort of worked around this by having a global connection variable and doing conn.process() in a separate thread, but I feel like this isn't safe, even though it's only an immutable reference.

static mut DBUS_CONN: Option<Connection> = None;

pub fn init_bus() {
    let c = Connection::new_session().expect("Failed to get a session bus.");
    let reply = c
        .request_name("org.freedesktop.Notifications", false, true, false)
        .expect("Failed to register name.");

    // etc...

    unsafe {
        DBUS_CONN = Some(c);
    }
}

pub fn get_conn() -> &'static Connection {
    unsafe {
        assert(DBUS_CONN.is_some());
        DBUS_CONN.as_ref().unwrap()
    }
}

// later, from thread 2
pub fn process_dbus() {
    let conn = get_conn();
    loop {
        conn.process(Duration::from_millis(1000)).unwrap();
    }
}

// later, from thread 1
pub fn send_message() {
    let conn = get_conn();
    conn.send(<some message>);
}

@diwic
Copy link
Owner

diwic commented Dec 7, 2021

I'm not sure how to get a mutable reference to channel without manually creating the whole connection, which doesn't seem feasible.

let mut ch = Channel::get_private(Session);
ch.set_watch_enabled(true);
let conn: Connection = ch.into();

I've sort of worked around this by having a global connection variable and doing conn.process() in a separate thread, but I feel like this isn't safe, even though it's only an immutable reference.

You can do this without unsafe with SyncConnection, it's one of the things SyncConnection is for. Just store the SyncConnection in an Arc and share it between threads.

Nevertheless I'm not sure whether I should try to fix this somehow (and if so I'm not sure how) or document it as part of the limitations of the libdbus api. It's definitely not something you would expect...

@Toqozz
Copy link
Contributor Author

Toqozz commented Dec 7, 2021

c.channel().set_watch_enabled(true);
loop {
    loop {
        c.process(std::time::Duration::from_millis(0)).unwrap();
        if (!c.channel().watch().read) break;
   }
   /* process incoming messages, sleep etc */
}

It looks like c.channel().watch().read is true regardless of whether a large message is being "processed" or not, so no go here.

You can do this without unsafe with SyncConnection, it's one of the things SyncConnection is for. Just store the SyncConnection in an Arc and share it between threads.

Yeah, I mentioned this previously:

I suspect I could also solve this by just running SyncConnection::process() in a separate thread, but I'm not very familiar with how the Sync traits interract. Is there an equivalent for the following but using a SyncConnection?:

c.start_receive(dbus::message::MatchRule::new_method_call(), Box::new(move |msg, conn| {
   cr.handle_message(msg, conn).unwrap();
   true
}))

I get a super obtuse Rust error when I try the above (c is SyncConnection):

error[E0277]: `(dyn for<'r> FnMut(dbus_crossroads::Context, &'r mut Crossroads) -> Option<dbus_crossroads::Context> + Send + 'static)` cannot be shared between threads safely
  --> src/main.rs:87:66
   |
87 |       c.start_receive(dbus::message::MatchRule::new_method_call(), Box::new(move |msg, conn| {
   |  __________________________________________________________________^
88 | |         cr.handle_message(msg, conn).unwrap();
89 | |         true
90 | |     }));
   | |______^ `(dyn for<'r> FnMut(dbus_crossroads::Context, &'r mut Crossroads) -> Option<dbus_crossroads::Context> + Send + 'static)` cannot be shared between threads safely
   |
   = help: the trait `Sync` is not implemented for `(dyn for<'r> FnMut(dbus_crossroads::Context, &'r mut Crossroads) -> Option<dbus_crossroads::Context> + Send + 'static)`
   = note: required because of the requirements on the impl of `Sync` for `Unique<(dyn for<'r> FnMut(dbus_crossroads::Context, &'r mut Crossroads) -> Option<dbus_crossroads::Context> + Send + 'static)>`
   = note: required because it appears within the type `Box<(dyn for<'r> FnMut(dbus_crossroads::Context, &'r mut Crossroads) -> Option<dbus_crossroads::Context> + Send + 'static)>`
   = note: required because it appears within the type `dbus_crossroads::ifacedesc::CallbackDbg`
   = note: required because it appears within the type `Option<dbus_crossroads::ifacedesc::CallbackDbg>`
   = note: required because it appears within the type `MethodDesc`
   = note: required because it appears within the type `(Member<'static>, MethodDesc)`
   = note: required because of the requirements on the impl of `Sync` for `hashbrown::raw::RawTable<(Member<'static>, MethodDesc)>`
   = note: required because it appears within the type `hashbrown::map::HashMap<Member<'static>, MethodDesc, RandomState>`
   = note: required because it appears within the type `HashMap<Member<'static>, MethodDesc>`
   = note: required because it appears within the type `dbus_crossroads::ifacedesc::IfaceDesc`
   = note: required because of the requirements on the impl of `Sync` for `Unique<dbus_crossroads::ifacedesc::IfaceDesc>`
   = note: required because it appears within the type `alloc::raw_vec::RawVec<dbus_crossroads::ifacedesc::IfaceDesc>`
   = note: required because it appears within the type `Vec<dbus_crossroads::ifacedesc::IfaceDesc>`
   = note: required because it appears within the type `dbus_crossroads::ifacedesc::Registry`
   = note: required because it appears within the type `Crossroads`
   = note: required because it appears within the type `[closure@src/main.rs:87:75: 90:6]`
   = note: required for the cast to the object type `dyn for<'r> FnMut(Message, &'r SyncConnection) -> bool + Send + Sync`

I can do it by putting cr in a Mutex, but this seems to defeat the purpose of having a SyncConnection in the first place.


I apologise for my ignorance here, my knowledge here is seriously lacking.

@diwic
Copy link
Owner

diwic commented Dec 8, 2021

I can do it by putting cr in a Mutex

Yep, there is no SyncCrossroads so that's what you need to do.

Let's try something else. What if you had:

c.process(std::time::Duration::from_millis(1))

...instead of from_millis(0), and then sleep for a second between invocations, would that speed up the processing? If so, would it be a suitable workaround for you?

@diwic
Copy link
Owner

diwic commented Dec 8, 2021

Hmm...in dbus-tokio, we have some kind of strange loop which calls libc::recv. Maybe that's something that's needed here as well?

https://github.com/diwic/dbus-rs/blob/master/dbus-tokio/src/connection.rs#L170

@Toqozz
Copy link
Contributor Author

Toqozz commented Dec 8, 2021

Let's try something else. What if you had:

c.process(std::time::Duration::from_millis(1))

...instead of from_millis(0), and then sleep for a second between invocations, would that speed up the processing? If so, would it be a suitable workaround for you?

The issue happens whether it's c.process(0) or c.process(1000). It seems like c.process() only processes a single "chunk" on each invocation, so this would still have the issue.

Hmm...in dbus-tokio, we have some kind of strange loop which calls libc::recv. Maybe that's something that's needed here as well?

https://github.com/diwic/dbus-rs/blob/master/dbus-tokio/src/connection.rs#L170

The issue linked in the comment above that code (#254) looks extremely familiar to this one... so seems like it.

@diwic
Copy link
Owner

diwic commented Dec 10, 2021

So if process(1000) would not receive a message > 2048 bytes immediately, that seems even weirder; I'd call it a relatively serious bug in libdbus if that's the case, and it should be reported there and discussed. Dbus-rs has been around for a really long time and libdbus for far longer, so can you double check before we go upstream with it?

@Toqozz
Copy link
Contributor Author

Toqozz commented Dec 10, 2021

I can confirm this is the case. I don't think it's a timing issue though and seems like it could be by design (though I can't see any mention on the behavior in the dbus documentation. I think c.process() (and maybe dbus_connection_pop_message() just always receives the first fixed chunk no matter the timeout.

loop {
    dbg!("loop");
    c.process(Duration::from_millis(1000)).unwrap();
    if let Ok(notif) = receiver.try_recv() {
        dbg!(notif);
    }

    std::thread::sleep(std::time::Duration::from_millis(100))
}
vid.mp4

(my top terminal breaks displaying the message because it is so large)

@diwic
Copy link
Owner

diwic commented Dec 11, 2021

Filed https://gitlab.freedesktop.org/dbus/dbus/-/issues/364 - let's see what they say.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants