Skip to content

Commit

Permalink
Stop printing error in loop
Browse files Browse the repository at this point in the history
  • Loading branch information
henil committed Dec 13, 2022
1 parent e04652c commit b1c9361
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 12 deletions.
20 changes: 18 additions & 2 deletions rumqttc/examples/async_manual_acks.rs
Expand Up @@ -36,7 +36,15 @@ async fn main() -> Result<(), Box<dyn Error>> {
loop {
// get subscribed messages without acking
let event = eventloop.poll().await;
println!("{:?}", event);
match &event {
Ok(notif) => {
println!("Event = {:?}", notif);
}
Err(error) => {
println!("Error = {:?}", error);
return Ok(());
}
}
if let Err(_err) = event {
// break loop on disconnection
break;
Expand All @@ -49,7 +57,15 @@ async fn main() -> Result<(), Box<dyn Error>> {
loop {
// previously published messages should be republished after reconnection.
let event = eventloop.poll().await;
println!("{:?}", event);
match &event {
Ok(notif) => {
println!("Event = {:?}", notif);
}
Err(error) => {
println!("Error = {:?}", error);
return Ok(());
}
}

if let Ok(Event::Incoming(Incoming::Publish(publish))) = event {
// this time we will ack incoming publishes.
Expand Down
12 changes: 12 additions & 0 deletions rumqttc/examples/async_manual_acks_v5.rs
Expand Up @@ -41,6 +41,18 @@ async fn main() -> Result<(), Box<dyn Error>> {
while let Ok(event) = eventloop.poll().await {
println!("{:?}", event);
}
loop {
let event = eventloop.poll().await;
match &event {
Ok(v) => {
println!("Event = {:?}", v);
}
Err(e) => {
println!("Error = {:?}", e);
break;
}
}
}

// create new broker connection
let (client, mut eventloop) = create_conn();
Expand Down
10 changes: 9 additions & 1 deletion rumqttc/examples/asyncpubsub.rs
Expand Up @@ -20,7 +20,15 @@ async fn main() -> Result<(), Box<dyn Error>> {

loop {
let event = eventloop.poll().await;
println!("{:?}", event.unwrap());
match &event {
Ok(v) => {
println!("Event = {:?}", v);
}
Err(e) => {
println!("Error = {:?}", e);
return Ok(());
}
}
}
}

Expand Down
15 changes: 11 additions & 4 deletions rumqttc/examples/asyncpubsub_v5.rs
Expand Up @@ -19,11 +19,18 @@ async fn main() -> Result<(), Box<dyn Error>> {
time::sleep(Duration::from_secs(3)).await;
});

while let Ok(event) = eventloop.poll().await {
println!("{:?}", event);
loop {
let event = eventloop.poll().await;
match &event {
Ok(v) => {
println!("Event = {:?}", v);
}
Err(e) => {
println!("Error = {:?}", e);
return Ok(());
}
}
}

Ok(())
}

async fn requests(client: AsyncClient) {
Expand Down
11 changes: 10 additions & 1 deletion rumqttc/examples/syncpubsub.rs
Expand Up @@ -15,13 +15,22 @@ fn main() {
thread::spawn(move || publish(client));

for (i, notification) in connection.iter().enumerate() {
println!("{}. Notification = {:?}", i, notification);
match notification {
Ok(notif) => {
println!("{}. Notification = {:?}", i, notif);
}
Err(error) => {
println!("{}. Notification = {:?}", i, error);
return;
}
}
}

println!("Done with the stream!!");
}

fn publish(mut client: Client) {
thread::sleep(Duration::from_secs(1));
client.subscribe("hello/+/world", QoS::AtMostOnce).unwrap();
for i in 0..10_usize {
let payload = vec![1; i];
Expand Down
14 changes: 12 additions & 2 deletions rumqttc/examples/syncpubsub_v5.rs
@@ -1,12 +1,13 @@
use rumqttc::v5::mqttbytes::{LastWill, QoS};
use rumqttc::v5::ConnectionError;
use rumqttc::v5::{Client, MqttOptions};
use std::thread;
use std::time::Duration;

fn main() {
pretty_env_logger::init();

let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1883);
let mut mqttoptions = MqttOptions::new("test-1", "localhost", 1884);
let will = LastWill::new("hello/world", "good bye", QoS::AtMostOnce, false);
mqttoptions
.set_keep_alive(Duration::from_secs(5))
Expand All @@ -16,6 +17,15 @@ fn main() {
thread::spawn(move || publish(client));

for (i, notification) in connection.iter().enumerate() {
match notification {
Err(ConnectionError::Io(error))
if error.kind() == std::io::ErrorKind::ConnectionRefused =>
{
println!("Failed to connect to the server. Make sure correct client is configured properly!\nError: {:?}", error);
return;
}
_ => {}
}
println!("{}. Notification = {:?}", i, notification);
}

Expand All @@ -29,7 +39,7 @@ fn publish(client: Client) {
let topic = format!("hello/{}/world", i);
let qos = QoS::AtLeastOnce;

client.publish(topic, qos, true, payload).unwrap();
let _ = client.publish(topic, qos, true, payload);
}

thread::sleep(Duration::from_secs(1));
Expand Down
1 change: 1 addition & 0 deletions rumqttc/examples/tls.rs
Expand Up @@ -40,6 +40,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
Ok(Event::Outgoing(o)) => println!("Outgoing = {:?}", o),
Err(e) => {
println!("Error = {:?}", e);
return Ok(());
}
}
}
Expand Down
9 changes: 7 additions & 2 deletions rumqttc/examples/websocket.rs
Expand Up @@ -28,8 +28,13 @@ async fn main() -> Result<(), Box<dyn Error>> {
loop {
let event = eventloop.poll().await;
match event {
Ok(ev) => println!("{:?}", ev),
Err(err) => println!("{:?}", err),
Ok(notif) => {
println!("Event = {:?}", notif);
}
Err(err) => {
println!("Error = {:?}", err);
return Ok(());
}
}
}
}
Expand Down

0 comments on commit b1c9361

Please sign in to comment.