Skip to content
This repository has been archived by the owner on Sep 25, 2023. It is now read-only.

Commit

Permalink
Merge pull request hyperledger-archives#83 from blockchaintp/fix/no-p…
Browse files Browse the repository at this point in the history
…anic-channels

fix: Do not panic on channel send failures
  • Loading branch information
vaporos committed May 24, 2023
2 parents 1c75dc3 + b76f7ed commit f781db6
Showing 1 changed file with 20 additions and 8 deletions.
28 changes: 20 additions & 8 deletions src/messaging/zmq_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,10 @@ impl MessageSender for ZmqMessageSender {

match sender.send(SocketCommand::Send(msg)) {
Ok(_) => Ok(future),
Err(_) => Err(SendError::UnknownError),
Err(e) => {
log::error!("Unable to send message: {:?}", e);
Err(SendError::UnknownError)
}
}
} else {
Err(SendError::DisconnectedError)
Expand All @@ -141,7 +144,10 @@ impl MessageSender for ZmqMessageSender {

match sender.send(SocketCommand::Send(msg)) {
Ok(_) => Ok(()),
Err(_) => Err(SendError::UnknownError),
Err(e) => {
log::error!("Unable to send message: {:?}", e);
Err(SendError::UnknownError)
}
}
} else {
Err(SendError::DisconnectedError)
Expand All @@ -152,7 +158,9 @@ impl MessageSender for ZmqMessageSender {
if let Some(ref sender) = self.outbound_sender.take() {
match sender.send(SocketCommand::Shutdown) {
Ok(_) => (),
Err(_) => info!("Sender has already closed."),
Err(_) => {
info!("Sender has already closed.")
}
}
}
}
Expand All @@ -176,23 +184,27 @@ impl InboundRouter {
Ok(message) => {
let mut expected_replies = self.expected_replies.lock().unwrap();
match expected_replies.remove(message.get_correlation_id()) {
Some(sender) => sender.send(Ok(message)).expect("Unable to route reply"),
Some(sender) => sender
.send(Ok(message))
.map_err(|e| log::warn!("Unable to route reply: {:?}", e))
.ok(),
None => self
.inbound_tx
.send(Ok(message))
.expect("Unable to route new message"),
}
.map_err(|e| log::warn!("Unable to route new message: {:?}", e))
.ok(),
};
}
Err(ReceiveError::DisconnectedError) => {
let mut expected_replies = self.expected_replies.lock().unwrap();
for (_, sender) in expected_replies.iter_mut() {
sender
.send(Err(ReceiveError::DisconnectedError))
.unwrap_or_else(|err| error!("Failed to send disconnect reply: {}", err));
.unwrap_or_else(|err| warn!("Failed to send disconnect reply: {}", err));
}
self.inbound_tx
.send(Err(ReceiveError::DisconnectedError))
.unwrap_or_else(|err| error!("Failed to send disconnect: {}", err));
.unwrap_or_else(|err| warn!("Failed to send disconnect: {}", err));
}
Err(err) => error!("Error: {}", err),
}
Expand Down

0 comments on commit f781db6

Please sign in to comment.