Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Publish request completes after keep-alive ping message #750

Open
federico-cristofani opened this issue Nov 10, 2023 · 7 comments
Open

Publish request completes after keep-alive ping message #750

federico-cristofani opened this issue Nov 10, 2023 · 7 comments

Comments

@federico-cristofani
Copy link

Hi all, I'm working on evaluation of energy consumption for differnt IoT protocols and I'm currently using rumqtt implementation for MQTT tests.

Experiment setup

The setup of my experiment is the following:

  • IoT device (Raspberry Pi 3b+) connected via cellular network establish connection with the broker, publish one message and disconnects.
  • The same experiment is executed for different payload sizes (from 128 bytes to 512KB), each one repeated 30 times.
  • The payload content is composed by a buffer of given size intialized with the value 127.
  • The adopted QoS is 1 (AtLestOnce)
  • The transport protocol adopted is TLS
  • The broker code is the one of rumqttd/singlenode from the repository

Problem description

During my testing, due to some strange results, I noticed that for some payload sizes (e.g. 64 KB) the PublishAck expected by the client arrives only after a keep-alive message is sent from the client to the server, along with the PingResp from the server. In my case the keep-alive mechanism is set to a period of 5 seconds, so the execution time of the requests is affected by a considerable overhead. Looking at the system calls generated both by the client and the broker it seems that the delay is caused by the client that send the last part of payload jointly with the ping message, maybe due to some buffering of packets.

I've analyzed more in details the behavior and I found out that the requests run smoothly up to payload size of 45002 bytes. After suhch payload size the behavior is the following:

  • From 45003 bytes to about 100KB the problem occurs every time
  • For larger payloads the problem occurs randomly from time to time

I also encountered the same problem using the client on my PC, but in these cases the payload size generating the problem is different. Also, I have tried changing the broker implementation but the problem is still the same.

Note that the problem is related only to TLS, in the case of plain TCP the problem doesn't occur.

Question

In my opinion, the problem is on the client side and is related to some configuration of the TLS connection and buffering somewhere in the code or network stack.

Have you ever encountered a similar problem ? What could be the cause of such delay ?

@federico-cristofani federico-cristofani changed the title Publish request complete after keep-alive ping message Publish request completes after keep-alive ping message Nov 10, 2023
@swanandx
Copy link
Member

Hey, Thank you for the detailed explanation!

I was investing it further and noticed rustls docs have a field with max_fragment_size which by default sets limit of 2**16 ( 64KB ). This might have to do something with the issue you are facing?

Can you please provide more info like what version of rumqttd / rumqttc are you using and rumqttd.toml file ( if applicable ), and what features you have enabled? Also it would be great if you can give a sample code snippet to recreate this behavior. This would help in further debugging.

buffering somewhere in the code or network stack

there is a chance of network config of OS being involved as the payload size which caused the issue is different or RPi and your PC. I'm not much experienced with this ( just learned about TCP MSS/MTU while tackling this issue haha )

Thank you!

@federico-cristofani
Copy link
Author

federico-cristofani commented Nov 10, 2023

Sure, thanks for your quick feedback.

I would rule out the broker configuration because with other client implementations the problem does not occur.

I have also noticed that the problem is strictly correlated with the physical network interface because I have just tried to send request via WiFi on my RPI the problem doesn't arise anymore, but on pc the problem is still there.

From the tracing of the system calls, it appears that a "writev" (which should be the system call to send the data on the socket) fails due to resource unavailability, waits for the "futex mechanism," and then, when the ping message is ready, sends the final part of the request jointly with the ping message (lines 234 - 252 of strace_pi.log).

The rumqttc version is 0.23.0 and for rumqttd is 0.18.0.

I attach the following files:

I report just the most important part of the code, the full code can be found attached.

    mod custom_cert_verifier;
    
    use std::{time::Duration, sync::Arc, u16};
    use log::{info, error, debug, LevelFilter, warn};
    use env_logger;
    use rand::Rng;
    use tokio;
    use getopts::Options;
    use rumqttc::{MqttOptions, TlsConfiguration, Transport, QoS, AsyncClient, Incoming, Event, Outgoing};
    
    use rustls_native_certs::load_native_certs;
    use custom_cert_verifier::SkipServerVerification;
    use tokio_rustls::rustls::{ClientConfig, RootCertStore, Certificate};
        
    async fn async_request(/* request params ...*/) -> Result<(), String>{
    
        let mut options = MqttOptions::new(client_id, host, port);
    
        if transport == "tcp" {
            options.set_transport(Transport::Tcp);
        }
        else{
    
            // TLS client configuration
            let mut tls_client_config = if insecure {
    
                // Skip server certificate verification
                ClientConfig::builder()
                    .with_safe_defaults()
                    .with_custom_certificate_verifier(SkipServerVerification::new())
                    .with_no_client_auth()
            }
            else{
                // Load trusted certificates
                let mut root_cert_store = RootCertStore::empty();
                for cert in load_native_certs().expect("Could not load platform certs") {
                    root_cert_store.add(&Certificate(cert.0)).unwrap();
                }
                ClientConfig::builder()
                    .with_safe_defaults()
                    .with_root_certificates(root_cert_store)
                    .with_no_client_auth()
            };
    
            // TLS ALPN extension
            let mut alpn: Vec<Vec<u8>> = Vec::new();
            alpn.push("mqtt".as_bytes().to_vec());
            tls_client_config.alpn_protocols = alpn;
    
            // Set transport protocol
            options.set_transport(Transport::Tls(
                TlsConfiguration::Rustls(Arc::new(tls_client_config))));
    
        }
    
        options.set_keep_alive(Duration::from_secs(keep_alive));
        options.set_max_packet_size(MAX_INCOMING_SIZE, MAX_OUTGOING_SIZE);
        
        // Build client to handle connection
        let (client, mut event_loop) = AsyncClient::new(options, 10);
    
        // Publish/Subscribe
        if message_type == "publish" {
            client
                .publish(topic, qos, false, message)
                .await
                .expect("Publishing failed");
        }
        else {
            client
                .subscribe(topic, qos)
                .await
                .expect("Subscription failed");
        }
    
        loop {
    
            let event = event_loop.poll().await;
            match &event {
                Ok(event) => {
                    match event {
                        Event::Incoming(Incoming::PubAck(_message)) => {
                            info!("Published completed");
                            // Disconnect client
                            client.disconnect()
                                .await
                                .expect("Disconnection failed");
                        }
                        Event::Incoming(Incoming::Disconnect) => {
                            error!("Disconnected by broker");
                            return Err("Connection closed by broker".to_string());
                        }
                        Event::Outgoing(Outgoing::Disconnect) => {
                            debug!("Disconnection request sent");
                            break;
                        }
                        /* Other events ...*/
                    }
                }
            }
        }
        return Ok(());
    }

@swanandx
Copy link
Member

I did try the code you shared, with only one change as I don't have to custom_cert_verifier, I used my own valid certificates:

let mut alpn: Vec<Vec<u8>> = Vec::new();
alpn.push("mqtt".as_bytes().to_vec());
let ca = include_str!("../../ca.cert.pem");
let client_cert = include_str!("../../device1.cert.pem");
let client_key = include_str!("../../device1.key.pem");
let transport = Transport::Tls(TlsConfiguration::Simple {
    ca: ca.into(),
    alpn: Some(alpn),
    client_auth: Some((client_cert.into(), Key::RSA(client_key.into()))),
});

options.set_transport(transport);

and for me, even when I'm running broker and client locally, 8323060 this payload size leads to reproduce the issue!

I will try to debug it further and update you

@swanandx
Copy link
Member

also, one intresting thing I noticed is that, it's not about any activity, it has to be pingreq for some reason haha.

E.g.

  • publish that big message
  • subscribe to a topic
  • pingreq

Both pub and sub happenes only after pingreq!

@federico-cristofani
Copy link
Author

You're right, I forgot to attach that file too!

I think your modification does not alter the problem, so you can continue with that code, in any case I will also share that file.

If I find out anything more I will inform you immediately.

Thanks for your help!

@swanandx
Copy link
Member

swanandx commented Nov 10, 2023

BTW, can you please once confirm if issue still exists if we use different brokers like hivemq or emqx, etc.?

@federico-cristofani
Copy link
Author

federico-cristofani commented Nov 10, 2023

I have carried out some tests using emqx broker and the same issues occured.
The emqx broker I have used is the one from public docker image emqx/emqx with the following configuration:

node {
  name = "emqx@your-hostname"
  cookie = "your-secret-cookie"
  data_dir = "data"
}

listeners.quic.default {
  enabled = true
  bind = "0.0.0.0:14567"
  keyfile = "etc/certs/key.pem"
  certfile = "etc/certs/cert.pem"
 
  ssl_options {
        cacertfile = "etc/certs/cacert.pem"
        certfile = "etc/certs/cert.pem"
        ciphers = []
        client_renegotiation = true
        depth = 10
        enable_crl_check = false
        fail_if_no_peer_cert = false
        gc_after_handshake = false
        handshake_timeout = 15s
        hibernate_after = 5s
        honor_cipher_order = true
        keyfile = "etc/certs/key.pem"
        log_level = notice
        ocsp {
          enable_ocsp_stapling = false
          refresh_http_timeout = 15s
          refresh_interval = 5m
        }
        reuse_sessions = true
        secure_renegotiate = true
        verify = verify_none
        versions = [tlsv1.3]

  }
}


listeners.ssl.default {
  enabled = true
  bind = "0.0.0.0:8883"
  ssl_options {
    keyfile = "etc/certs/key.pem"
    certfile = "etc/certs/cert.pem"
    cacertfile = "etc/certs/cacert.pem"
    # Peer verification not enabled
    verify = verify_none
  }
}

listeners.tcp.default {
  enabled = true
  bind = "0.0.0.0:1883"
}

listeners.ws.default {
  enabled = false
}

listeners.wss.default {
  enabled = false
}


log {
  file_handlers.default {
    level = debug
    file = "opt/emqx/log/emqx.log"
    # count = 10
    max_size = 50MB
    formatter = text
  }
  console_handler {
    level = debug
    formatter = text
  }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants