diff --git a/rumqttc/examples/async_manual_acks.rs b/rumqttc/examples/async_manual_acks.rs index def85b61a..5ee53b9f9 100644 --- a/rumqttc/examples/async_manual_acks.rs +++ b/rumqttc/examples/async_manual_acks.rs @@ -36,7 +36,15 @@ async fn main() -> Result<(), Box> { loop { // get subscribed messages without acking let event = eventloop.poll().await; - println!("{:?}", event); + match &event { + Ok(notif) => { + println!("Event = {:?}", notif); + } + Err(error) => { + println!("Error = {:?}", error); + return Ok(()); + } + } if let Err(_err) = event { // break loop on disconnection break; @@ -49,7 +57,15 @@ async fn main() -> Result<(), Box> { loop { // previously published messages should be republished after reconnection. let event = eventloop.poll().await; - println!("{:?}", event); + match &event { + Ok(notif) => { + println!("Event = {:?}", notif); + } + Err(error) => { + println!("Error = {:?}", error); + return Ok(()); + } + } if let Ok(Event::Incoming(Incoming::Publish(publish))) = event { // this time we will ack incoming publishes. diff --git a/rumqttc/examples/async_manual_acks_v5.rs b/rumqttc/examples/async_manual_acks_v5.rs index f7efdcb9c..767da472a 100644 --- a/rumqttc/examples/async_manual_acks_v5.rs +++ b/rumqttc/examples/async_manual_acks_v5.rs @@ -41,6 +41,18 @@ async fn main() -> Result<(), Box> { while let Ok(event) = eventloop.poll().await { println!("{:?}", event); } + loop { + let event = eventloop.poll().await; + match &event { + Ok(v) => { + println!("Event = {:?}", v); + } + Err(e) => { + println!("Error = {:?}", e); + break; + } + } + } // create new broker connection let (client, mut eventloop) = create_conn(); diff --git a/rumqttc/examples/asyncpubsub.rs b/rumqttc/examples/asyncpubsub.rs index 4ec2cd983..3de6253e9 100644 --- a/rumqttc/examples/asyncpubsub.rs +++ b/rumqttc/examples/asyncpubsub.rs @@ -20,7 +20,15 @@ async fn main() -> Result<(), Box> { loop { let event = eventloop.poll().await; - println!("{:?}", event.unwrap()); + match &event { + Ok(v) => { + println!("Event = {:?}", v); + } + Err(e) => { + println!("Error = {:?}", e); + return Ok(()); + } + } } } diff --git a/rumqttc/examples/asyncpubsub_v5.rs b/rumqttc/examples/asyncpubsub_v5.rs index 2652b778f..76d246969 100644 --- a/rumqttc/examples/asyncpubsub_v5.rs +++ b/rumqttc/examples/asyncpubsub_v5.rs @@ -19,11 +19,18 @@ async fn main() -> Result<(), Box> { time::sleep(Duration::from_secs(3)).await; }); - while let Ok(event) = eventloop.poll().await { - println!("{:?}", event); + loop { + let event = eventloop.poll().await; + match &event { + Ok(v) => { + println!("Event = {:?}", v); + } + Err(e) => { + println!("Error = {:?}", e); + return Ok(()); + } + } } - - Ok(()) } async fn requests(client: AsyncClient) { diff --git a/rumqttc/examples/syncpubsub.rs b/rumqttc/examples/syncpubsub.rs index 2c61d0c2b..255e22c9e 100644 --- a/rumqttc/examples/syncpubsub.rs +++ b/rumqttc/examples/syncpubsub.rs @@ -15,13 +15,22 @@ fn main() { thread::spawn(move || publish(client)); for (i, notification) in connection.iter().enumerate() { - println!("{}. Notification = {:?}", i, notification); + match notification { + Ok(notif) => { + println!("{}. Notification = {:?}", i, notif); + } + Err(error) => { + println!("{}. Notification = {:?}", i, error); + return; + } + } } println!("Done with the stream!!"); } fn publish(mut client: Client) { + thread::sleep(Duration::from_secs(1)); client.subscribe("hello/+/world", QoS::AtMostOnce).unwrap(); for i in 0..10_usize { let payload = vec![1; i]; diff --git a/rumqttc/examples/syncpubsub_v5.rs b/rumqttc/examples/syncpubsub_v5.rs index 862341e2d..3af309148 100644 --- a/rumqttc/examples/syncpubsub_v5.rs +++ b/rumqttc/examples/syncpubsub_v5.rs @@ -1,4 +1,5 @@ use rumqttc::v5::mqttbytes::{LastWill, QoS}; +use rumqttc::v5::ConnectionError; use rumqttc::v5::{Client, MqttOptions}; use std::thread; use std::time::Duration; @@ -6,7 +7,7 @@ use std::time::Duration; fn main() { pretty_env_logger::init(); - let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883); + let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1884); let will = LastWill::new("hello/world", "good bye", QoS::AtMostOnce, false); mqttoptions .set_keep_alive(Duration::from_secs(5)) @@ -16,6 +17,15 @@ fn main() { thread::spawn(move || publish(client)); for (i, notification) in connection.iter().enumerate() { + match notification { + Err(ConnectionError::Io(error)) + if error.kind() == std::io::ErrorKind::ConnectionRefused => + { + println!("Failed to connect to the server. Make sure correct client is configured properly!\nError: {:?}", error); + return; + } + _ => {} + } println!("{}. Notification = {:?}", i, notification); } @@ -29,7 +39,7 @@ fn publish(client: Client) { let topic = format!("hello/{}/world", i); let qos = QoS::AtLeastOnce; - client.publish(topic, qos, true, payload).unwrap(); + let _ = client.publish(topic, qos, true, payload); } thread::sleep(Duration::from_secs(1)); diff --git a/rumqttc/examples/tls.rs b/rumqttc/examples/tls.rs index 2bb0daeab..65f88bdd2 100644 --- a/rumqttc/examples/tls.rs +++ b/rumqttc/examples/tls.rs @@ -40,6 +40,7 @@ async fn main() -> Result<(), Box> { Ok(Event::Outgoing(o)) => println!("Outgoing = {:?}", o), Err(e) => { println!("Error = {:?}", e); + return Ok(()); } } } diff --git a/rumqttc/examples/websocket.rs b/rumqttc/examples/websocket.rs index 712427b3c..b9233a109 100644 --- a/rumqttc/examples/websocket.rs +++ b/rumqttc/examples/websocket.rs @@ -28,8 +28,13 @@ async fn main() -> Result<(), Box> { loop { let event = eventloop.poll().await; match event { - Ok(ev) => println!("{:?}", ev), - Err(err) => println!("{:?}", err), + Ok(notif) => { + println!("Event = {:?}", notif); + } + Err(err) => { + println!("Error = {:?}", err); + return Ok(()); + } } } }