From 94979df9005f91bdb601e6112ac1efe4a983df24 Mon Sep 17 00:00:00 2001 From: henil Date: Tue, 13 Dec 2022 21:23:02 +0530 Subject: [PATCH] Stop printing error in loop --- rumqttc/examples/async_manual_acks.rs | 20 ++++++++++++++++++-- rumqttc/examples/async_manual_acks_v5.rs | 15 ++++++++++++--- rumqttc/examples/asyncpubsub.rs | 10 +++++++++- rumqttc/examples/asyncpubsub_v5.rs | 15 +++++++++++---- rumqttc/examples/syncpubsub.rs | 11 ++++++++++- rumqttc/examples/syncpubsub_v5.rs | 14 ++++++++++++-- rumqttc/examples/syncrecv_v5.rs | 2 +- rumqttc/examples/tls.rs | 1 + rumqttc/examples/websocket.rs | 9 +++++++-- 9 files changed, 81 insertions(+), 16 deletions(-) diff --git a/rumqttc/examples/async_manual_acks.rs b/rumqttc/examples/async_manual_acks.rs index def85b61a..5ee53b9f9 100644 --- a/rumqttc/examples/async_manual_acks.rs +++ b/rumqttc/examples/async_manual_acks.rs @@ -36,7 +36,15 @@ async fn main() -> Result<(), Box> { loop { // get subscribed messages without acking let event = eventloop.poll().await; - println!("{:?}", event); + match &event { + Ok(notif) => { + println!("Event = {:?}", notif); + } + Err(error) => { + println!("Error = {:?}", error); + return Ok(()); + } + } if let Err(_err) = event { // break loop on disconnection break; @@ -49,7 +57,15 @@ async fn main() -> Result<(), Box> { loop { // previously published messages should be republished after reconnection. let event = eventloop.poll().await; - println!("{:?}", event); + match &event { + Ok(notif) => { + println!("Event = {:?}", notif); + } + Err(error) => { + println!("Error = {:?}", error); + return Ok(()); + } + } if let Ok(Event::Incoming(Incoming::Publish(publish))) = event { // this time we will ack incoming publishes. diff --git a/rumqttc/examples/async_manual_acks_v5.rs b/rumqttc/examples/async_manual_acks_v5.rs index f7efdcb9c..8fba78f5c 100644 --- a/rumqttc/examples/async_manual_acks_v5.rs +++ b/rumqttc/examples/async_manual_acks_v5.rs @@ -8,7 +8,7 @@ use std::error::Error; use std::time::Duration; fn create_conn() -> (AsyncClient, EventLoop) { - let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883); + let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1884); mqttoptions .set_keep_alive(Duration::from_secs(5)) .set_manual_acks(true) @@ -38,8 +38,17 @@ async fn main() -> Result<(), Box> { }); // get subscribed messages without acking - while let Ok(event) = eventloop.poll().await { - println!("{:?}", event); + loop { + let event = eventloop.poll().await; + match &event { + Ok(v) => { + println!("Event = {:?}", v); + } + Err(e) => { + println!("Error = {:?}", e); + break; + } + } } // create new broker connection diff --git a/rumqttc/examples/asyncpubsub.rs b/rumqttc/examples/asyncpubsub.rs index 4ec2cd983..3de6253e9 100644 --- a/rumqttc/examples/asyncpubsub.rs +++ b/rumqttc/examples/asyncpubsub.rs @@ -20,7 +20,15 @@ async fn main() -> Result<(), Box> { loop { let event = eventloop.poll().await; - println!("{:?}", event.unwrap()); + match &event { + Ok(v) => { + println!("Event = {:?}", v); + } + Err(e) => { + println!("Error = {:?}", e); + return Ok(()); + } + } } } diff --git a/rumqttc/examples/asyncpubsub_v5.rs b/rumqttc/examples/asyncpubsub_v5.rs index 2652b778f..76d246969 100644 --- a/rumqttc/examples/asyncpubsub_v5.rs +++ b/rumqttc/examples/asyncpubsub_v5.rs @@ -19,11 +19,18 @@ async fn main() -> Result<(), Box> { time::sleep(Duration::from_secs(3)).await; }); - while let Ok(event) = eventloop.poll().await { - println!("{:?}", event); + loop { + let event = eventloop.poll().await; + match &event { + Ok(v) => { + println!("Event = {:?}", v); + } + Err(e) => { + println!("Error = {:?}", e); + return Ok(()); + } + } } - - Ok(()) } async fn requests(client: AsyncClient) { diff --git a/rumqttc/examples/syncpubsub.rs b/rumqttc/examples/syncpubsub.rs index 2c61d0c2b..255e22c9e 100644 --- a/rumqttc/examples/syncpubsub.rs +++ b/rumqttc/examples/syncpubsub.rs @@ -15,13 +15,22 @@ fn main() { thread::spawn(move || publish(client)); for (i, notification) in connection.iter().enumerate() { - println!("{}. Notification = {:?}", i, notification); + match notification { + Ok(notif) => { + println!("{}. Notification = {:?}", i, notif); + } + Err(error) => { + println!("{}. Notification = {:?}", i, error); + return; + } + } } println!("Done with the stream!!"); } fn publish(mut client: Client) { + thread::sleep(Duration::from_secs(1)); client.subscribe("hello/+/world", QoS::AtMostOnce).unwrap(); for i in 0..10_usize { let payload = vec![1; i]; diff --git a/rumqttc/examples/syncpubsub_v5.rs b/rumqttc/examples/syncpubsub_v5.rs index 862341e2d..3af309148 100644 --- a/rumqttc/examples/syncpubsub_v5.rs +++ b/rumqttc/examples/syncpubsub_v5.rs @@ -1,4 +1,5 @@ use rumqttc::v5::mqttbytes::{LastWill, QoS}; +use rumqttc::v5::ConnectionError; use rumqttc::v5::{Client, MqttOptions}; use std::thread; use std::time::Duration; @@ -6,7 +7,7 @@ use std::time::Duration; fn main() { pretty_env_logger::init(); - let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883); + let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1884); let will = LastWill::new("hello/world", "good bye", QoS::AtMostOnce, false); mqttoptions .set_keep_alive(Duration::from_secs(5)) @@ -16,6 +17,15 @@ fn main() { thread::spawn(move || publish(client)); for (i, notification) in connection.iter().enumerate() { + match notification { + Err(ConnectionError::Io(error)) + if error.kind() == std::io::ErrorKind::ConnectionRefused => + { + println!("Failed to connect to the server. Make sure correct client is configured properly!\nError: {:?}", error); + return; + } + _ => {} + } println!("{}. Notification = {:?}", i, notification); } @@ -29,7 +39,7 @@ fn publish(client: Client) { let topic = format!("hello/{}/world", i); let qos = QoS::AtLeastOnce; - client.publish(topic, qos, true, payload).unwrap(); + let _ = client.publish(topic, qos, true, payload); } thread::sleep(Duration::from_secs(1)); diff --git a/rumqttc/examples/syncrecv_v5.rs b/rumqttc/examples/syncrecv_v5.rs index acd6a5f9f..6510ec99c 100644 --- a/rumqttc/examples/syncrecv_v5.rs +++ b/rumqttc/examples/syncrecv_v5.rs @@ -6,7 +6,7 @@ use std::time::Duration; fn main() { pretty_env_logger::init(); - let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883); + let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1884); let will = LastWill::new("hello/world", "good bye", QoS::AtMostOnce, false); mqttoptions .set_keep_alive(Duration::from_secs(5)) diff --git a/rumqttc/examples/tls.rs b/rumqttc/examples/tls.rs index 2bb0daeab..65f88bdd2 100644 --- a/rumqttc/examples/tls.rs +++ b/rumqttc/examples/tls.rs @@ -40,6 +40,7 @@ async fn main() -> Result<(), Box> { Ok(Event::Outgoing(o)) => println!("Outgoing = {:?}", o), Err(e) => { println!("Error = {:?}", e); + return Ok(()); } } } diff --git a/rumqttc/examples/websocket.rs b/rumqttc/examples/websocket.rs index 712427b3c..b9233a109 100644 --- a/rumqttc/examples/websocket.rs +++ b/rumqttc/examples/websocket.rs @@ -28,8 +28,13 @@ async fn main() -> Result<(), Box> { loop { let event = eventloop.poll().await; match event { - Ok(ev) => println!("{:?}", ev), - Err(err) => println!("{:?}", err), + Ok(notif) => { + println!("Event = {:?}", notif); + } + Err(err) => { + println!("Error = {:?}", err); + return Ok(()); + } } } }