-
Notifications
You must be signed in to change notification settings - Fork 228
/
async_manual_acks.rs
90 lines (77 loc) · 2.5 KB
/
async_manual_acks.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
use tokio::{task, time};
use rumqttc::{self, AsyncClient, Event, EventLoop, Incoming, MqttOptions, QoS};
use std::error::Error;
use std::time::Duration;
fn create_conn() -> (AsyncClient, EventLoop) {
let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
mqttoptions
.set_keep_alive(Duration::from_secs(5))
.set_manual_acks(true)
.set_clean_session(false);
AsyncClient::new(mqttoptions, 10)
}
#[tokio::main(worker_threads = 1)]
async fn main() -> Result<(), Box<dyn Error>> {
pretty_env_logger::init();
// create mqtt connection with clean_session = false and manual_acks = true
let (client, mut eventloop) = create_conn();
// subscribe example topic
client
.subscribe("hello/world", QoS::AtLeastOnce)
.await
.unwrap();
task::spawn(async move {
// send some messages to example topic and disconnect
requests(client.clone()).await;
client.disconnect().await.unwrap();
});
loop {
// get subscribed messages without acking
let event = eventloop.poll().await;
match &event {
Ok(notif) => {
println!("Event = {:?}", notif);
}
Err(error) => {
println!("Error = {:?}", error);
return Ok(());
}
}
if let Err(_err) = event {
// break loop on disconnection
break;
}
}
// create new broker connection
let (client, mut eventloop) = create_conn();
loop {
// previously published messages should be republished after reconnection.
let event = eventloop.poll().await;
match &event {
Ok(notif) => {
println!("Event = {:?}", notif);
}
Err(error) => {
println!("Error = {:?}", error);
return Ok(());
}
}
if let Ok(Event::Incoming(Incoming::Publish(publish))) = event {
// this time we will ack incoming publishes.
// Its important not to block eventloop as this can cause deadlock.
let c = client.clone();
tokio::spawn(async move {
c.ack(&publish).await.unwrap();
});
}
}
}
async fn requests(client: AsyncClient) {
for i in 1..=10 {
client
.publish("hello/world", QoS::AtLeastOnce, false, vec![1; i])
.await
.unwrap();
time::sleep(Duration::from_secs(1)).await;
}
}