Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Connect with session_expiry_interval for MQTT v5 #853

Open
xiaocq2001 opened this issue Apr 30, 2024 · 3 comments
Open

Connect with session_expiry_interval for MQTT v5 #853

xiaocq2001 opened this issue Apr 30, 2024 · 3 comments

Comments

@xiaocq2001
Copy link
Contributor

Expected Behavior

When connecting to broker, session_expiry_interval should be supported.

Current Behavior

There is no API to set session_expiry_interval for CONNECT request.

Failure Information (for bugs)

Context

  • Operating System: Ubuntu 22.04
  • Toolchain version: default

Failure Logs

Please include any relevant log snippets or files here.

While running example async_manual_acks_v5, the broker does not restore the session and re-send publishes because session_expiry_interval value 0xFFFFFFFF is not offered on connection:

Event = Incoming(ConnAck(ConnAck { session_present: true, code: Success, properties: Some(ConnAckProperties { session_expiry_interval: None, receive_max: Some(20), max_qos: None, retain_available: None, max_packet_size: None, assigned_client_identifier: None, topic_alias_max: Some(10), reason_string: None, user_properties: [], wildcard_subscription_available: None, subscription_identifiers_available: None, shared_subscription_available: None, server_keep_alive: None, response_information: None, server_reference: None, authentication_method: None, authentication_data: None }) }))
 DEBUG rumqttc::v5::state > Subscribe. Topics = [Filter { path: "hello/world", qos: AtLeastOnce, nolocal: false, preserve_retain: false, retain_forward_rule: OnEverySubscribe }], Pkid = 1
Event = Outgoing(Subscribe(1))
 DEBUG rumqttc::v5::state > Publish. Topic = hello/world, Pkid = 2, Payload Size = 1
Event = Outgoing(Publish(2))
 DEBUG rumqttc::v5::state > SubAck Pkid = 1, QoS = AtLeastOnce
Event = Incoming(SubAck(SubAck { pkid: 1, return_codes: [Success(AtLeastOnce)], properties: None }))
Event = Incoming(Publish(Publish { dup: false, qos: AtLeastOnce, retain: false, topic: b"hello/world", pkid: 21, payload: b"\x01", properties: None }))
Event = Incoming(PubAck(PubAck { pkid: 2, reason: Success, properties: None }))
 DEBUG rumqttc::v5::state > Publish. Topic = hello/world, Pkid = 3, Payload Size = 2
Event = Outgoing(Publish(3))
Event = Incoming(Publish(Publish { dup: false, qos: AtLeastOnce, retain: false, topic: b"hello/world", pkid: 22, payload: b"\x01\x01", properties: None }))
Event = Incoming(PubAck(PubAck { pkid: 3, reason: Success, properties: None }))
 DEBUG rumqttc::v5::state > Publish. Topic = hello/world, Pkid = 4, Payload Size = 3
Event = Outgoing(Publish(4))
Event = Incoming(Publish(Publish { dup: false, qos: AtLeastOnce, retain: false, topic: b"hello/world", pkid: 23, payload: b"\x01\x01\x01", properties: None }))
Event = Incoming(PubAck(PubAck { pkid: 4, reason: Success, properties: None }))
 DEBUG rumqttc::v5::state > Publish. Topic = hello/world, Pkid = 5, Payload Size = 4
Event = Outgoing(Publish(5))
Event = Incoming(Publish(Publish { dup: false, qos: AtLeastOnce, retain: false, topic: b"hello/world", pkid: 24, payload: b"\x01\x01\x01\x01", properties: None }))
Event = Incoming(PubAck(PubAck { pkid: 5, reason: Success, properties: None }))
 DEBUG rumqttc::v5::state > Publish. Topic = hello/world, Pkid = 6, Payload Size = 5
Event = Outgoing(Publish(6))
Event = Incoming(Publish(Publish { dup: false, qos: AtLeastOnce, retain: false, topic: b"hello/world", pkid: 25, payload: b"\x01\x01\x01\x01\x01", properties: None }))
Event = Incoming(PubAck(PubAck { pkid: 6, reason: Success, properties: None }))
 DEBUG rumqttc::v5::state > Pingreq, last incoming packet before 996.130476ms, last outgoing request before 996.385573ms
Event = Outgoing(PingReq)
Event = Incoming(PingResp(PingResp))
 DEBUG rumqttc::v5::state > Publish. Topic = hello/world, Pkid = 7, Payload Size = 6
Event = Outgoing(Publish(7))
Event = Incoming(Publish(Publish { dup: false, qos: AtLeastOnce, retain: false, topic: b"hello/world", pkid: 26, payload: b"\x01\x01\x01\x01\x01\x01", properties: None }))
Event = Incoming(PubAck(PubAck { pkid: 7, reason: Success, properties: None }))
 DEBUG rumqttc::v5::state > Publish. Topic = hello/world, Pkid = 8, Payload Size = 7
Event = Outgoing(Publish(8))
Event = Incoming(Publish(Publish { dup: false, qos: AtLeastOnce, retain: false, topic: b"hello/world", pkid: 27, payload: b"\x01\x01\x01\x01\x01\x01\x01", properties: None }))
Event = Incoming(PubAck(PubAck { pkid: 8, reason: Success, properties: None }))
 DEBUG rumqttc::v5::state > Publish. Topic = hello/world, Pkid = 9, Payload Size = 8
Event = Outgoing(Publish(9))
Event = Incoming(Publish(Publish { dup: false, qos: AtLeastOnce, retain: false, topic: b"hello/world", pkid: 28, payload: b"\x01\x01\x01\x01\x01\x01\x01\x01", properties: None }))
Event = Incoming(PubAck(PubAck { pkid: 9, reason: Success, properties: None }))
 DEBUG rumqttc::v5::state > Publish. Topic = hello/world, Pkid = 10, Payload Size = 9
Event = Outgoing(Publish(10))
Event = Incoming(Publish(Publish { dup: false, qos: AtLeastOnce, retain: false, topic: b"hello/world", pkid: 29, payload: b"\x01\x01\x01\x01\x01\x01\x01\x01\x01", properties: None }))
Event = Incoming(PubAck(PubAck { pkid: 10, reason: Success, properties: None }))
 DEBUG rumqttc::v5::state > Publish. Topic = hello/world, Pkid = 11, Payload Size = 10
Event = Outgoing(Publish(11))
Event = Incoming(Publish(Publish { dup: false, qos: AtLeastOnce, retain: false, topic: b"hello/world", pkid: 30, payload: b"\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01", properties: None }))
Event = Incoming(PubAck(PubAck { pkid: 11, reason: Success, properties: None }))
 DEBUG rumqttc::v5::state > Pingreq, last incoming packet before 987.076374ms, last outgoing request before 987.813679ms
Event = Outgoing(PingReq)
Event = Incoming(PingResp(PingResp))
 DEBUG rumqttc::v5::state > Disconnect with NormalDisconnection
Event = Outgoing(Disconnect)
Error = MqttState(ConnectionAborted)
Incoming(ConnAck(ConnAck { session_present: false, code: Success, properties: Some(ConnAckProperties { session_expiry_interval: None, receive_max: Some(20), max_qos: None, retain_available: None, max_packet_size: None, assigned_client_identifier: None, topic_alias_max: Some(10), reason_string: None, user_properties: [], wildcard_subscription_available: None, subscription_identifiers_available: None, shared_subscription_available: None, server_keep_alive: None, response_information: None, server_reference: None, authentication_method: None, authentication_data: None }) }))
 DEBUG rumqttc::v5::state > Pingreq, last incoming packet before 5.002052179s, last outgoing request before 5.020963659s
Outgoing(PingReq)
Incoming(PingResp(PingResp))
 DEBUG rumqttc::v5::state > Pingreq, last incoming packet before 5.005117266s, last outgoing request before 5.005405529s
Outgoing(PingReq)
Incoming(PingResp(PingResp))
 DEBUG rumqttc::v5::state > Pingreq, last incoming packet before 5.000096725s, last outgoing request before 5.00081033s
Outgoing(PingReq)
Incoming(PingResp(PingResp))
 DEBUG rumqttc::v5::state > Pingreq, last incoming packet before 5.001096962s, last outgoing request before 5.001506118s
Outgoing(PingReq)
Incoming(PingResp(PingResp))
@xiaocq2001
Copy link
Contributor Author

xiaocq2001 commented Apr 30, 2024

I created a PR to fix the issue, please check #854

Now broker resending the publishes:

 DEBUG rumqttc::v5::state > Disconnect with NormalDisconnection
Event = Outgoing(Disconnect)
Error = MqttState(ConnectionAborted)
Incoming(ConnAck(ConnAck { session_present: true, code: Success, properties: Some(ConnAckProperties { session_expiry_interval: None, receive_max: Some(20), max_qos: None, retain_available: None, max_packet_size: None, assigned_client_identifier: None, topic_alias_max: Some(10), reason_string: None, user_properties: [], wildcard_subscription_available: None, subscription_identifiers_available: None, shared_subscription_available: None, server_keep_alive: None, response_information: None, server_reference: None, authentication_method: None, authentication_data: None }) }))
Incoming(Publish(Publish { dup: true, qos: AtLeastOnce, retain: false, topic: b"hello/world", pkid: 1, payload: b"\x01", properties: None }))
Incoming(Publish(Publish { dup: true, qos: AtLeastOnce, retain: false, topic: b"hello/world", pkid: 2, payload: b"\x01\x01", properties: None }))
Incoming(Publish(Publish { dup: true, qos: AtLeastOnce, retain: false, topic: b"hello/world", pkid: 3, payload: b"\x01\x01\x01", properties: None }))
Incoming(Publish(Publish { dup: true, qos: AtLeastOnce, retain: false, topic: b"hello/world", pkid: 4, payload: b"\x01\x01\x01\x01", properties: None }))
Incoming(Publish(Publish { dup: true, qos: AtLeastOnce, retain: false, topic: b"hello/world", pkid: 5, payload: b"\x01\x01\x01\x01\x01", properties: None }))
Incoming(Publish(Publish { dup: true, qos: AtLeastOnce, retain: false, topic: b"hello/world", pkid: 6, payload: b"\x01\x01\x01\x01\x01\x01", properties: None }))
Incoming(Publish(Publish { dup: true, qos: AtLeastOnce, retain: false, topic: b"hello/world", pkid: 7, payload: b"\x01\x01\x01\x01\x01\x01\x01", properties: None }))
Incoming(Publish(Publish { dup: true, qos: AtLeastOnce, retain: false, topic: b"hello/world", pkid: 8, payload: b"\x01\x01\x01\x01\x01\x01\x01\x01", properties: None }))
Incoming(Publish(Publish { dup: true, qos: AtLeastOnce, retain: false, topic: b"hello/world", pkid: 9, payload: b"\x01\x01\x01\x01\x01\x01\x01\x01\x01", properties: None }))
Incoming(Publish(Publish { dup: true, qos: AtLeastOnce, retain: false, topic: b"hello/world", pkid: 10, payload: b"\x01\x01\x01\x01\x01\x01\x01\x01\x01\x01", properties: None }))
Outgoing(PubAck(1))
Outgoing(PubAck(2))
Outgoing(PubAck(3))
Outgoing(PubAck(4))
Outgoing(PubAck(5))
Outgoing(PubAck(6))
Outgoing(PubAck(7))
Outgoing(PubAck(8))
Outgoing(PubAck(9))
Outgoing(PubAck(10))
 DEBUG rumqttc::v5::state > Pingreq, last incoming packet before 5.001324026s, last outgoing request before 5.001175946s
Outgoing(PingReq)
Incoming(PingResp(PingResp))
 DEBUG rumqttc::v5::state > Pingreq, last incoming packet before 5.000821551s, last outgoing request before 5.001400427s
Outgoing(PingReq)
Incoming(PingResp(PingResp))

@swanandx
Copy link
Member

you can use set_connect_properties to set all kind of v5 connection properties, eg.

let mut properties = ConnectProperties::new();
properties.session_expiry_interval = Some(u32::MAX);
mqttoptions.set_connect_properties(properties);

@xiaocq2001
Copy link
Contributor Author

xiaocq2001 commented May 7, 2024

you can use set_connect_properties to set all kind of v5 connection properties, eg.

let mut properties = ConnectProperties::new();
properties.session_expiry_interval = Some(u32::MAX);
mqttoptions.set_connect_properties(properties);

Ah currently ConnectProperties is private, that's why I think we need an additional API, like we did for receive_maximum, max_packet_size and other properties.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants