Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rumqttc: Outgoing publications are sent one by one #810

Open
flxo opened this issue Feb 28, 2024 · 11 comments
Open

rumqttc: Outgoing publications are sent one by one #810

flxo opened this issue Feb 28, 2024 · 11 comments

Comments

@flxo
Copy link
Contributor

flxo commented Feb 28, 2024

Expected Behavior

Outgoing publications are sent to the network io in batches and not in inefficient writes per publish.

Current Behavior

The (v5) client code "flushes" the outgoing queue after each request from the application here. This leads to one write or sendto systemcall per publication. This is inefficient.

Proposal

Queue up to n requests from the request channel and flush similar to the processing of the incoming packages where up to 10 packets are processed in a batch.

I implemented a small poc that loops over requests and processes up to n incoming or outgoing packages before flushing the io. We're currently evaluating the performance impact in our scenario which is has publications from the client to the broker in high frequencies (a less common use case).

Let me know if this is a possible way to tackle the problem.

@swanandx
Copy link
Member

Hey @flxo , so basically we want to buffer the outgoing packets and then flush / write them to network at once to avoid multiple syscall, correct?

  • How will it work out for keepalive messages? we have to send them as soon as the timeout is reached.
  • What happens when a message is lost before being written to network?
    let's say my batch size was 5, I publish message 1, 2, 3. As batch size isn't reached, they aren't flushed, but for with message 4 ( or by some other reason ) client disconnects / crashes. Now, state/client thinks that it has published the messages, but in reality it hasn't. ( Or same might happen if broker disconnects right? ). Though this can me mitigated with use of Qos1/2 so we can republish those messages, the message being lost not good.
    [ we could have put them back in pending but the state has updated and it would just cause lack of consensus ]

note: I haven't dived in depth in poc but i think easier solution might be:

  • modify next_request to return 0..=MAX_BATCH_SIZE requests.
  • handles those requests and then call to flush, kinda like:
o = Self::next_requests(
    ..
), if .. => match o {
    Ok(requests) => {
        for req in requests {
            self.state.handle_outgoing_packet(req)?;
        }
        match time::timeout(network_timeout, network.flush(&mut self.state.write)).await {
            Ok(inner) => inner?,
            Err(_)=> return Err(ConnectionError::FlushTimeout),
        };
        Ok(self.state.events.pop_front().unwrap())
    }
    Err(_) => Err(ConnectionError::RequestsDone),
},

^ this may have different disadvantages which i haven't thought about it though haha.

btw, can you elaborate more details of the use case? as batching publishes would mean we are okie with messages being published late right?

Batching would be cool feature to have though 💯

Thank you so much!

@flxo
Copy link
Contributor Author

flxo commented Feb 29, 2024

Thanks for your thoughts!

Hey @flxo , so basically we want to buffer the outgoing packets and then flush / write them to network at once to avoid multiple syscall, correct?

Yes.

  • How will it work out for keepalive messages? we have to send them as soon as the timeout is reached.

Currently the alives are flushed when the loop breaks of finishes. We could easily use send instead of feed for the pings to ensure they're sent immediately.

  • What happens when a message is lost before being written to network?
    let's say my batch size was 5, I publish message 1, 2, 3. As batch size isn't reached, they aren't flushed, but for with message 4 ( or by some other reason ) client disconnects / crashes. Now, state/client thinks that it has published the messages, but in reality it hasn't. ( Or same might happen if broker disconnects right? ). Though this can me mitigated with use of Qos1/2 so we can republish those messages, the message being lost not good.
    [ we could have put them back in pending but the state has updated and it would just cause lack of consensus ]

Good catch. This is definitely missing in my hack. One option could be to retrieve the sending buffer from Framed and reuse for the next connection. I've done that lately in a project where we had a similar issue.

note: I haven't dived in depth in poc but i think easier solution might be:

  • modify next_request to return 0..=MAX_BATCH_SIZE requests.
  • handles those requests and then call to flush, kinda like:
o = Self::next_requests(
    ..
), if .. => match o {
    Ok(requests) => {
        for req in requests {
            self.state.handle_outgoing_packet(req)?;
        }
        match time::timeout(network_timeout, network.flush(&mut self.state.write)).await {
            Ok(inner) => inner?,
            Err(_)=> return Err(ConnectionError::FlushTimeout),
        };
        Ok(self.state.events.pop_front().unwrap())
    }
    Err(_) => Err(ConnectionError::RequestsDone),
},

^ this may have different disadvantages which i haven't thought about it though haha.

You cannot await a timeout in the select branch. This would stall the processing of incoming frames as well.
A less intrusive option could be to return a Vec of Requests from next_requests. This would use recv_async and `try_recv) like this: (rough pseudo code)

async fn next_requests(..) -> Result<Vec<Request>, ...> {
  let mut result = Vec::with_capacity(10);
  let next = requests_rx.recv_async().awit?;
  result.push(next);
  while result.len() < result.capacity() {
    match requests_rx.try_recv() {
       Ok(r) => result.push(r),
       Err(Empty) => return Ok(result),
       Err(e) => return Err(e),
    }
  }
  Ok(result)
}

btw, can you elaborate more details of the use case? as batching publishes would mean we are okie with messages being published late right?

Not really late. The tx buffer is filled as long as there are requests pending from the client (and up to a certain limit). There's not timeout awaiting in between. The flush happens right along.

The use case is that we a client that has a high output rate. We're currently starving on the publish side because there's also inflow traffic that is processed with (currently) 10 times more iterations per poll than the incoming packets.

Batching would be cool feature to have though 💯

Agree :-)

Thank you so much!

@swanandx
Copy link
Member

A less intrusive option could be to return a Vec of Requests from next_requests. This would use recv_async and `try_recv) like this:

That is exactly what I meant when I said modify next_request to return 0..BATCH_SIZE requests
I'm putting range here as sometimes we would have < BATCH_SIZE elements.

^ just like the pseudo code you mentioned in comment!

can you also please confirm the perf impact once ( likely there won't be much with above approach ) and also if it's actually helping in reducing syscalls.

thanks 😊

@flxo
Copy link
Contributor Author

flxo commented Feb 29, 2024

A less intrusive option could be to return a Vec of Requests from next_requests. This would use recv_async and `try_recv) like this:

That is exactly what I meant when I said modify next_request to return 0..BATCH_SIZE requests I'm putting range here as sometimes we would have < BATCH_SIZE elements.

^ just like the pseudo code you mentioned in comment!

I refined the pseudocode here

can you also please confirm the perf impact once ( likely there won't be much with above approach ) and also if it's actually helping in reducing syscalls.

Can you specify what exactly you want? Profiling data e.g flame graphs?

The biggest advantage is to avoid starvation on the upstream path.

@flxo
Copy link
Contributor Author

flxo commented Mar 1, 2024

can you also please confirm the perf impact once ( likely there won't be much with above approach ) and also if it's actually helping in reducing syscalls.

A colleague tested the patch in one of our products. It's a arm64 linux on A57 with a constant publication rate of ~800/s to a local broker (test setup). We see a reduction in the CPU usage of the client application of ~8.5%.

Is there a reproducible benchmark somewhere around here?

@swanandx
Copy link
Member

swanandx commented Mar 1, 2024

We see a reduction in the CPU usage of the client application of ~8.5%.

niceee!

Is there a reproducible benchmark somewhere around here?

not that i'm aware of 😅

i had a doubt regarding the patch, why are we doing read_bulk for incoming packets? readb function already reads packets in bulk, so i think we only need the handling of outgoing packets in batches right?

@flxo
Copy link
Contributor Author

flxo commented Mar 1, 2024

i had a doubt regarding the patch, why are we doing read_bulk for incoming packets? readb function already reads packets in bulk, so i think we only need the handling of outgoing packets in batches right?

I changed the inflow just align it in style with outflow. The current implementation processes the messages in readb which is super confusing (and needs to be cancel safe ;-)). Oh - I renamed read to read_bulk.

@swanandx
Copy link
Member

swanandx commented Mar 1, 2024

and needs to be cancel safe

right, was readb not cancel safe? can you elaborate more?

I would say readb approach was better ( we should rename it to read_bulk though 😝 ), as we have to process the message we received right away, plus we avoid unnecessary allocations wdyt?

thanks!

@flxo
Copy link
Contributor Author

flxo commented Mar 4, 2024

and needs to be cancel safe

right, was readb not cancel safe? can you elaborate more?

Sorry this was imprecise. readb is cancel safe but as the full rx path is polled in a select branch it has to be. This mistakes happen fast (at least to me).

I would say readb approach was better ( we should rename it to read_bulk though 😝 ), as we have to process the message we received right away, plus we avoid unnecessary allocations wdyt?

You're right regarding the allocations.

Shall I draft a PR with the update to the rx path only?

Just out of curiosity: There's the other branch that uses tokio_util::codec::Framed. It's missing the feature to retain the tx buffer over reconnects but otherwise is complete and passes the tests. The drawback here is that it introduces some async in Stateand the dependency to tokio_util). It should be optimal from the allocation point of view because the packets are serialised directly into the buffer of Framed (yes - that's also the case today but not homebrewn.. Also tx and rx is processed interleaved. The framed impl is way easier to read than today. What's your opinion on that one?

@swanandx
Copy link
Member

swanandx commented Mar 4, 2024

the full rx path is polled in a select branch it has to be

still kinda went over me 😅 can you please explain wdym by full rx path and what mistakes ?

Shall I draft a PR with the update to the rx path only?

let's keep this issue related to batching of Outgoing packets only! also can you please open PR for just that part?

we will have different PR/issues/conversation for anything related to handling of incoming packets, this will make it easier for review and reason about, wdyt?

It should be optimal from the allocation point of view because the packets are serialised directly into the buffer of Framed

iirc, we are already re-using the buffer to read packets. So i think it will be same as you mentioned ( also due to how mqttbytes parse packets ).

this might be biased ( and due to branch containing other refactors ) but for readability, I find the current code more maintainable / readable than with Framed. and if it's not bringing any additional benefits, i think sticking to existing structure makes more sense right 😄

thanks!

@flxo
Copy link
Contributor Author

flxo commented Mar 4, 2024

Thanks for your answer.

the full rx path is polled in a select branch it has to be

still kinda went over me 😅 can you please explain wdym by full rx path and what mistakes ?

When modifying the read related code in Network you always have to keep in mind that it must be cancel safe because it's all driven from the select! in event_loop. This is not obvious from the module itself. Anyways - the current impl is correct.

Shall I draft a PR with the update to the rx path only?

let's keep this issue related to batching of Outgoing packets only! also can you please open PR for just that part?

Sorry - my mistake - should have been tx.

we will have different PR/issues/conversation for anything related to handling of incoming packets, this will make it easier for review and reason about, wdyt?

Fully agree.

It should be optimal from the allocation point of view because the packets are serialised directly into the buffer of Framed

iirc, we are already re-using the buffer to read packets. So i think it will be same as you mentioned ( also due to how mqttbytes parse packets ).

this might be biased ( and due to branch containing other refactors ) but for readability, I find the current code more maintainable / readable than with Framed. and if it's not bringing any additional benefits, i think sticking to existing structure makes more sense right 😄

Fine for me.

flxo pushed a commit to flxo/rumqtt that referenced this issue Mar 19, 2024
Collect up to BATCH_SIZE request from the client channel before flushing the network.

Fixes bytebeamio#810
flxo pushed a commit to flxo/rumqtt that referenced this issue Mar 19, 2024
Collect up to BATCH_SIZE request from the client channel before flushing the network.

Fixes bytebeamio#810
flxo pushed a commit to flxo/rumqtt that referenced this issue Mar 26, 2024
Collect up to BATCH_SIZE request from the client channel before flushing the network.

Fixes bytebeamio#810
flxo pushed a commit to flxo/rumqtt that referenced this issue Mar 26, 2024
Collect up to BATCH_SIZE request from the client channel before flushing the network.

Fixes bytebeamio#810
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants