From 059e4ae8b5a5e1a865bfaace7a48cd7e50889477 Mon Sep 17 00:00:00 2001 From: vincentygarcia Date: Wed, 17 Mar 2021 14:39:54 +1100 Subject: [PATCH 1/2] Properly handle concurrent messages to and from peers Previously, we were forwarding incoming messages from peers to all swaps that were currently running. That is obviously wrong. The new design scopes an `EventLoopHandle` to a specific PeerId to avoid this problem. --- swap/src/database/alice.rs | 29 +--- swap/src/lib.rs | 1 - swap/src/protocol/alice/behaviour.rs | 4 +- .../src/protocol/alice/encrypted_signature.rs | 4 +- swap/src/protocol/alice/event_loop.rs | 138 +++++++++++------- swap/src/protocol/alice/state.rs | 3 - swap/src/protocol/alice/steps.rs | 4 +- swap/src/protocol/alice/swap.rs | 23 +-- swap/src/serde_peer_id.rs | 49 ------- 9 files changed, 98 insertions(+), 157 deletions(-) delete mode 100644 swap/src/serde_peer_id.rs diff --git a/swap/src/database/alice.rs b/swap/src/database/alice.rs index cb6c30b6..b2424fba 100644 --- a/swap/src/database/alice.rs +++ b/swap/src/database/alice.rs @@ -4,7 +4,6 @@ use crate::monero::monero_private_key; use crate::protocol::alice; use crate::protocol::alice::AliceState; use ::bitcoin::hashes::core::fmt::Display; -use libp2p::PeerId; use monero_rpc::wallet::BlockHeight; use serde::{Deserialize, Serialize}; @@ -15,13 +14,9 @@ use serde::{Deserialize, Serialize}; pub enum Alice { Started { state3: alice::State3, - #[serde(with = "crate::serde_peer_id")] - bob_peer_id: PeerId, }, BtcLocked { state3: alice::State3, - #[serde(with = "crate::serde_peer_id")] - bob_peer_id: PeerId, }, XmrLocked { monero_wallet_restore_blockheight: BlockHeight, @@ -64,19 +59,11 @@ pub enum AliceEndState { impl From<&AliceState> for Alice { fn from(alice_state: &AliceState) -> Self { match alice_state { - AliceState::Started { - state3, - bob_peer_id, - } => Alice::Started { + AliceState::Started { state3 } => Alice::Started { state3: state3.as_ref().clone(), - bob_peer_id: *bob_peer_id, }, - AliceState::BtcLocked { - state3, - bob_peer_id, - } => Alice::BtcLocked { + AliceState::BtcLocked { state3 } => Alice::BtcLocked { state3: state3.as_ref().clone(), - bob_peer_id: *bob_peer_id, }, AliceState::XmrLocked { monero_wallet_restore_blockheight, @@ -137,18 +124,10 @@ impl From<&AliceState> for Alice { impl From for AliceState { fn from(db_state: Alice) -> Self { match db_state { - Alice::Started { - state3, - bob_peer_id, - } => AliceState::Started { - bob_peer_id, + Alice::Started { state3 } => AliceState::Started { state3: Box::new(state3), }, - Alice::BtcLocked { - state3, - bob_peer_id, - } => AliceState::BtcLocked { - bob_peer_id, + Alice::BtcLocked { state3 } => AliceState::BtcLocked { state3: Box::new(state3), }, Alice::XmrLocked { diff --git a/swap/src/lib.rs b/swap/src/lib.rs index b769dc96..eaab1fec 100644 --- a/swap/src/lib.rs +++ b/swap/src/lib.rs @@ -30,4 +30,3 @@ pub mod seed; pub mod trace; mod monero_ext; -mod serde_peer_id; diff --git a/swap/src/protocol/alice/behaviour.rs b/swap/src/protocol/alice/behaviour.rs index c3790392..97a20f1c 100644 --- a/swap/src/protocol/alice/behaviour.rs +++ b/swap/src/protocol/alice/behaviour.rs @@ -32,6 +32,7 @@ pub enum OutEvent { EncryptedSignature { msg: Box, channel: ResponseChannel<()>, + peer: PeerId, }, ResponseSent, // Same variant is used for all messages as no processing is done Failure(Error), @@ -140,9 +141,10 @@ impl From for OutEvent { fn from(event: encrypted_signature::OutEvent) -> Self { use crate::protocol::alice::encrypted_signature::OutEvent::*; match event { - MsgReceived { msg, channel } => OutEvent::EncryptedSignature { + MsgReceived { msg, channel, peer } => OutEvent::EncryptedSignature { msg: Box::new(msg), channel, + peer, }, AckSent => OutEvent::ResponseSent, Failure(err) => OutEvent::Failure(err.context("Failure with Encrypted Signature")), diff --git a/swap/src/protocol/alice/encrypted_signature.rs b/swap/src/protocol/alice/encrypted_signature.rs index dac19113..f7dee187 100644 --- a/swap/src/protocol/alice/encrypted_signature.rs +++ b/swap/src/protocol/alice/encrypted_signature.rs @@ -5,7 +5,7 @@ use libp2p::request_response::{ ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent, RequestResponseMessage, ResponseChannel, }; -use libp2p::NetworkBehaviour; +use libp2p::{NetworkBehaviour, PeerId}; use std::time::Duration; use tracing::debug; @@ -14,6 +14,7 @@ pub enum OutEvent { MsgReceived { msg: EncryptedSignature, channel: ResponseChannel<()>, + peer: PeerId, }, AckSent, Failure(Error), @@ -67,6 +68,7 @@ impl From> for OutEvent { OutEvent::MsgReceived { msg: request, channel, + peer, } } RequestResponseEvent::Message { diff --git a/swap/src/protocol/alice/event_loop.rs b/swap/src/protocol/alice/event_loop.rs index 84da64d9..41cbb910 100644 --- a/swap/src/protocol/alice/event_loop.rs +++ b/swap/src/protocol/alice/event_loop.rs @@ -9,13 +9,16 @@ use crate::protocol::bob::EncryptedSignature; use crate::seed::Seed; use crate::{bitcoin, kraken, monero}; use anyhow::{bail, Context, Result}; +use futures::future; +use futures::future::{BoxFuture, FutureExt}; +use futures::stream::{FuturesUnordered, StreamExt}; use libp2p::core::Multiaddr; -use libp2p::futures::FutureExt; use libp2p::{PeerId, Swarm}; use rand::rngs::OsRng; +use std::collections::HashMap; use std::convert::Infallible; use std::sync::Arc; -use tokio::sync::{broadcast, mpsc}; +use tokio::sync::{mpsc, oneshot}; use tracing::{debug, error, trace}; use uuid::Uuid; @@ -30,19 +33,19 @@ pub struct EventLoop { latest_rate: RS, max_buy: bitcoin::Amount, - recv_encrypted_signature: broadcast::Sender, - send_transfer_proof: mpsc::Receiver<(PeerId, TransferProof)>, - - // Only used to produce new handles - send_transfer_proof_sender: mpsc::Sender<(PeerId, TransferProof)>, + /// Stores a sender per peer for incoming [`EncryptedSignature`]s. + recv_encrypted_signature: HashMap>, + /// Stores a list of futures, waiting for transfer proof which will be sent + /// to the given peer. + send_transfer_proof: FuturesUnordered>>, swap_sender: mpsc::Sender, } #[derive(Debug)] pub struct EventLoopHandle { - recv_encrypted_signature: broadcast::Receiver, - send_transfer_proof: mpsc::Sender<(PeerId, TransferProof)>, + recv_encrypted_signature: Option>, + send_transfer_proof: Option>, } impl EventLoop @@ -74,8 +77,6 @@ where Swarm::listen_on(&mut swarm, listen_address.clone()) .with_context(|| format!("Address is not supported: {:#}", listen_address))?; - let recv_encrypted_signature = BroadcastChannels::default(); - let send_transfer_proof = MpscChannels::default(); let swap_channel = MpscChannels::default(); let event_loop = EventLoop { @@ -86,30 +87,26 @@ where monero_wallet, db, latest_rate, - recv_encrypted_signature: recv_encrypted_signature.sender, - send_transfer_proof: send_transfer_proof.receiver, - send_transfer_proof_sender: send_transfer_proof.sender, swap_sender: swap_channel.sender, max_buy, + recv_encrypted_signature: Default::default(), + send_transfer_proof: Default::default(), }; Ok((event_loop, swap_channel.receiver)) } - pub fn new_handle(&self) -> EventLoopHandle { - EventLoopHandle { - recv_encrypted_signature: self.recv_encrypted_signature.subscribe(), - send_transfer_proof: self.send_transfer_proof_sender.clone(), - } - } - pub fn peer_id(&self) -> PeerId { self.peer_id } pub async fn run(mut self) { + // ensure that the send_transfer_proof stream is NEVER empty, otherwise it will + // terminate forever. + self.send_transfer_proof.push(future::pending().boxed()); + loop { tokio::select! { - swarm_event = self.swarm.next().fuse() => { + swarm_event = self.swarm.next() => { match swarm_event { OutEvent::ConnectionEstablished(alice) => { debug!("Connection Established with {}", alice); @@ -164,9 +161,17 @@ where OutEvent::TransferProofAcknowledged => { trace!("Bob acknowledged transfer proof"); } - OutEvent::EncryptedSignature{ msg, channel } => { - let _ = self.recv_encrypted_signature.send(*msg); - // Send back empty response so that the request/response protocol completes. + OutEvent::EncryptedSignature{ msg, channel, peer } => { + match self.recv_encrypted_signature.remove(&peer) { + Some(sender) => { + // this failing just means the receiver is no longer interested ... + let _ = sender.send(*msg); + }, + None => { + tracing::warn!(%peer, "No sender for encrypted signature, maybe already handled?") + } + } + if let Err(error) = self.swarm.send_encrypted_signature_ack(channel) { error!("Failed to send Encrypted Signature ack: {:?}", error); } @@ -177,11 +182,19 @@ where } } }, - transfer_proof = self.send_transfer_proof.recv().fuse() => { - if let Some((bob_peer_id, msg)) = transfer_proof { - self.swarm.send_transfer_proof(bob_peer_id, msg); + next_transfer_proof = self.send_transfer_proof.next() => { + match next_transfer_proof { + Some(Ok((peer, transfer_proof))) => { + self.swarm.send_transfer_proof(peer, transfer_proof); + }, + Some(Err(_)) => { + tracing::debug!("A swap stopped without sending a transfer proof"); + } + None => { + unreachable!("stream of transfer proof receivers must never terminate") + } } - }, + } } } } @@ -230,11 +243,10 @@ where async fn handle_execution_setup_done(&mut self, bob_peer_id: PeerId, state3: State3) { let swap_id = Uuid::new_v4(); - let handle = self.new_handle(); + let handle = self.new_handle(bob_peer_id); let initial_state = AliceState::Started { state3: Box::new(state3), - bob_peer_id, }; let swap = Swap { @@ -251,6 +263,29 @@ where tracing::warn!(%swap_id, "Swap cannot be spawned: {}", error); } } + + /// Create a new [`EventLoopHandle`] that is scoped for communication with + /// the given peer. + fn new_handle(&mut self, peer: PeerId) -> EventLoopHandle { + let (send_transfer_proof_sender, send_transfer_proof_receiver) = oneshot::channel(); + let (recv_enc_sig_sender, recv_enc_sig_receiver) = oneshot::channel(); + + self.recv_encrypted_signature + .insert(peer, recv_enc_sig_sender); + self.send_transfer_proof.push( + async move { + let transfer_proof = send_transfer_proof_receiver.await?; + + Ok((peer, transfer_proof)) + } + .boxed(), + ); + + EventLoopHandle { + recv_encrypted_signature: Some(recv_enc_sig_receiver), + send_transfer_proof: Some(send_transfer_proof_sender), + } + } } pub trait LatestRate { @@ -277,13 +312,24 @@ impl LatestRate for kraken::RateUpdateStream { impl EventLoopHandle { pub async fn recv_encrypted_signature(&mut self) -> Result { - self.recv_encrypted_signature - .recv() - .await - .context("Failed to receive Bitcoin encrypted signature from Bob") + let signature = self + .recv_encrypted_signature + .take() + .context("Encrypted signature was already received")? + .await?; + + Ok(signature) } - pub async fn send_transfer_proof(&mut self, bob: PeerId, msg: TransferProof) -> Result<()> { - let _ = self.send_transfer_proof.send((bob, msg)).await?; + pub async fn send_transfer_proof(&mut self, msg: TransferProof) -> Result<()> { + if self + .send_transfer_proof + .take() + .context("Transfer proof was already sent")? + .send(msg) + .is_err() + { + bail!("Failed to send transfer proof, receiver no longer listening?") + } Ok(()) } @@ -308,21 +354,3 @@ impl Default for MpscChannels { MpscChannels { sender, receiver } } } - -#[allow(missing_debug_implementations)] -struct BroadcastChannels -where - T: Clone, -{ - sender: broadcast::Sender, -} - -impl Default for BroadcastChannels -where - T: Clone, -{ - fn default() -> Self { - let (sender, _receiver) = broadcast::channel(100); - BroadcastChannels { sender } - } -} diff --git a/swap/src/protocol/alice/state.rs b/swap/src/protocol/alice/state.rs index 263aee10..dfd2441e 100644 --- a/swap/src/protocol/alice/state.rs +++ b/swap/src/protocol/alice/state.rs @@ -7,7 +7,6 @@ use crate::protocol::bob::{Message0, Message2, Message4}; use crate::protocol::CROSS_CURVE_PROOF_SYSTEM; use crate::{bitcoin, monero}; use anyhow::{anyhow, bail, Context, Result}; -use libp2p::PeerId; use monero_rpc::wallet::BlockHeight; use rand::{CryptoRng, RngCore}; use serde::{Deserialize, Serialize}; @@ -17,11 +16,9 @@ use std::fmt; #[derive(Debug)] pub enum AliceState { Started { - bob_peer_id: PeerId, state3: Box, }, BtcLocked { - bob_peer_id: PeerId, state3: Box, }, XmrLocked { diff --git a/swap/src/protocol/alice/steps.rs b/swap/src/protocol/alice/steps.rs index 3f7b18e8..eec8a86b 100644 --- a/swap/src/protocol/alice/steps.rs +++ b/swap/src/protocol/alice/steps.rs @@ -7,10 +7,8 @@ use crate::protocol::alice::TransferProof; use crate::{bitcoin, monero}; use anyhow::{bail, Context, Result}; use futures::pin_mut; -use libp2p::PeerId; pub async fn lock_xmr( - bob_peer_id: PeerId, state3: alice::State3, event_loop_handle: &mut EventLoopHandle, monero_wallet: &monero::Wallet, @@ -30,7 +28,7 @@ pub async fn lock_xmr( // Otherwise Alice might publish the lock tx twice! event_loop_handle - .send_transfer_proof(bob_peer_id, TransferProof { + .send_transfer_proof(TransferProof { tx_lock_proof: transfer_proof, }) .await?; diff --git a/swap/src/protocol/alice/swap.rs b/swap/src/protocol/alice/swap.rs index 2d652416..33822dbb 100644 --- a/swap/src/protocol/alice/swap.rs +++ b/swap/src/protocol/alice/swap.rs @@ -76,10 +76,7 @@ async fn run_until_internal( Ok(state) } else { match state { - AliceState::Started { - state3, - bob_peer_id, - } => { + AliceState::Started { state3 } => { timeout( execution_params.bob_time_to_act, bitcoin_wallet @@ -94,10 +91,7 @@ async fn run_until_internal( }) .await?; - let state = AliceState::BtcLocked { - bob_peer_id, - state3, - }; + let state = AliceState::BtcLocked { state3 }; let db_state = (&state).into(); db.insert_latest_state(swap_id, database::Swap::Alice(db_state)) @@ -114,21 +108,12 @@ async fn run_until_internal( ) .await } - AliceState::BtcLocked { - bob_peer_id, - state3, - } => { + AliceState::BtcLocked { state3 } => { // Record the current monero wallet block height so we don't have to scan from // block 0 for scenarios where we create a refund wallet. let monero_wallet_restore_blockheight = monero_wallet.block_height().await?; - lock_xmr( - bob_peer_id, - *state3.clone(), - &mut event_loop_handle, - &monero_wallet, - ) - .await?; + lock_xmr(*state3.clone(), &mut event_loop_handle, &monero_wallet).await?; let state = AliceState::XmrLocked { state3, diff --git a/swap/src/serde_peer_id.rs b/swap/src/serde_peer_id.rs deleted file mode 100644 index 8dc9a3e9..00000000 --- a/swap/src/serde_peer_id.rs +++ /dev/null @@ -1,49 +0,0 @@ -//! A serde module that defines how we want to serialize PeerIds on the -//! HTTP-API. - -use libp2p::PeerId; -use serde::de::Error; -use serde::{Deserialize, Deserializer, Serializer}; - -pub fn serialize(peer_id: &PeerId, serializer: S) -> Result -where - S: Serializer, -{ - let string = peer_id.to_string(); - serializer.serialize_str(&string) -} - -#[allow(dead_code)] -pub fn deserialize<'de, D>(deserializer: D) -> Result -where - D: Deserializer<'de>, -{ - let string = String::deserialize(deserializer)?; - let peer_id = string.parse().map_err(D::Error::custom)?; - - Ok(peer_id) -} - -#[cfg(test)] -mod tests { - use super::*; - use serde::Serialize; - use spectral::prelude::*; - - #[derive(Serialize)] - struct SerializablePeerId(#[serde(with = "super")] PeerId); - - #[test] - fn maker_id_serializes_as_expected() { - let peer_id = SerializablePeerId( - "QmfUfpC2frwFvcDzpspnfZitHt5wct6n4kpG5jzgRdsxkY" - .parse() - .unwrap(), - ); - - let got = serde_json::to_string(&peer_id).expect("failed to serialize peer id"); - - assert_that(&got) - .is_equal_to(r#""QmfUfpC2frwFvcDzpspnfZitHt5wct6n4kpG5jzgRdsxkY""#.to_string()); - } -} From b53a191398a5905322c02c946ac89cf51b2cfb82 Mon Sep 17 00:00:00 2001 From: vincentygarcia Date: Wed, 17 Mar 2021 14:42:21 +1100 Subject: [PATCH 2/2] Improve log messages by including PeerId --- swap/src/protocol/alice/behaviour.rs | 71 ++++++++++--------- .../src/protocol/alice/encrypted_signature.rs | 26 ++++--- swap/src/protocol/alice/event_loop.rs | 8 +-- swap/src/protocol/alice/execution_setup.rs | 4 +- swap/src/protocol/alice/transfer_proof.rs | 38 +++++----- 5 files changed, 82 insertions(+), 65 deletions(-) diff --git a/swap/src/protocol/alice/behaviour.rs b/swap/src/protocol/alice/behaviour.rs index 97a20f1c..5c621d7b 100644 --- a/swap/src/protocol/alice/behaviour.rs +++ b/swap/src/protocol/alice/behaviour.rs @@ -28,14 +28,17 @@ pub enum OutEvent { bob_peer_id: PeerId, state3: Box, }, - TransferProofAcknowledged, + TransferProofAcknowledged(PeerId), EncryptedSignature { msg: Box, channel: ResponseChannel<()>, peer: PeerId, }, ResponseSent, // Same variant is used for all messages as no processing is done - Failure(Error), + Failure { + peer: PeerId, + error: Error, + }, } impl From for OutEvent { @@ -62,23 +65,20 @@ impl From for OutEvent { } => OutEvent::SpotPriceRequested { msg, channel, peer }, spot_price::OutEvent::Message { message: RequestResponseMessage::Response { .. }, - .. - } => OutEvent::Failure(anyhow!( - "Alice is only meant to hand out spot prices, not receive them" - )), + peer, + } => OutEvent::Failure { + error: anyhow!("Alice is only meant to hand out spot prices, not receive them"), + peer, + }, spot_price::OutEvent::ResponseSent { .. } => OutEvent::ResponseSent, - spot_price::OutEvent::InboundFailure { peer, error, .. } => OutEvent::Failure(anyhow!( - "spot_price protocol with peer {} failed due to {:?}", + spot_price::OutEvent::InboundFailure { peer, error, .. } => OutEvent::Failure { + error: anyhow!("spot_price protocol failed due to {:?}", error), peer, - error - )), - spot_price::OutEvent::OutboundFailure { peer, error, .. } => { - OutEvent::Failure(anyhow!( - "spot_price protocol with peer {} failed due to {:?}", - peer, - error - )) - } + }, + spot_price::OutEvent::OutboundFailure { peer, error, .. } => OutEvent::Failure { + error: anyhow!("spot_price protocol failed due to {:?}", error), + peer, + }, } } } @@ -92,21 +92,20 @@ impl From for OutEvent { } => OutEvent::QuoteRequested { channel, peer }, quote::OutEvent::Message { message: RequestResponseMessage::Response { .. }, - .. - } => OutEvent::Failure(anyhow!( - "Alice is only meant to hand out quotes, not receive them" - )), + peer, + } => OutEvent::Failure { + error: anyhow!("Alice is only meant to hand out quotes, not receive them"), + peer, + }, quote::OutEvent::ResponseSent { .. } => OutEvent::ResponseSent, - quote::OutEvent::InboundFailure { peer, error, .. } => OutEvent::Failure(anyhow!( - "quote protocol with peer {} failed due to {:?}", + quote::OutEvent::InboundFailure { peer, error, .. } => OutEvent::Failure { + error: anyhow!("quote protocol failed due to {:?}", error), peer, - error - )), - quote::OutEvent::OutboundFailure { peer, error, .. } => OutEvent::Failure(anyhow!( - "quote protocol with peer {} failed due to {:?}", + }, + quote::OutEvent::OutboundFailure { peer, error, .. } => OutEvent::Failure { + error: anyhow!("quote protocol failed due to {:?}", error), peer, - error - )), + }, } } } @@ -122,7 +121,7 @@ impl From for OutEvent { bob_peer_id, state3: Box::new(state3), }, - Failure(err) => OutEvent::Failure(err), + Failure { peer, error } => OutEvent::Failure { peer, error }, } } } @@ -131,8 +130,11 @@ impl From for OutEvent { fn from(event: transfer_proof::OutEvent) -> Self { use crate::protocol::alice::transfer_proof::OutEvent::*; match event { - Acknowledged => OutEvent::TransferProofAcknowledged, - Failure(err) => OutEvent::Failure(err.context("Failure with Transfer Proof")), + Acknowledged(peer) => OutEvent::TransferProofAcknowledged(peer), + Failure { peer, error } => OutEvent::Failure { + peer, + error: error.context("Failure with Transfer Proof"), + }, } } } @@ -147,7 +149,10 @@ impl From for OutEvent { peer, }, AckSent => OutEvent::ResponseSent, - Failure(err) => OutEvent::Failure(err.context("Failure with Encrypted Signature")), + Failure { peer, error } => OutEvent::Failure { + peer, + error: error.context("Failure with Encrypted Signature"), + }, } } } diff --git a/swap/src/protocol/alice/encrypted_signature.rs b/swap/src/protocol/alice/encrypted_signature.rs index f7dee187..1cf1f14c 100644 --- a/swap/src/protocol/alice/encrypted_signature.rs +++ b/swap/src/protocol/alice/encrypted_signature.rs @@ -17,7 +17,10 @@ pub enum OutEvent { peer: PeerId, }, AckSent, - Failure(Error), + Failure { + peer: PeerId, + error: Error, + }, } /// A `NetworkBehaviour` that represents receiving the Bitcoin encrypted @@ -73,14 +76,19 @@ impl From> for OutEvent { } RequestResponseEvent::Message { message: RequestResponseMessage::Response { .. }, - .. - } => OutEvent::Failure(anyhow!("Alice should not get a Response")), - RequestResponseEvent::InboundFailure { error, .. } => { - OutEvent::Failure(anyhow!("Inbound failure: {:?}", error)) - } - RequestResponseEvent::OutboundFailure { error, .. } => { - OutEvent::Failure(anyhow!("Outbound failure: {:?}", error)) - } + peer, + } => OutEvent::Failure { + peer, + error: anyhow!("Alice should not get a Response"), + }, + RequestResponseEvent::InboundFailure { error, peer, .. } => OutEvent::Failure { + peer, + error: anyhow!("Inbound failure: {:?}", error), + }, + RequestResponseEvent::OutboundFailure { error, peer, .. } => OutEvent::Failure { + peer, + error: anyhow!("Outbound failure: {:?}", error), + }, RequestResponseEvent::ResponseSent { .. } => OutEvent::AckSent, } } diff --git a/swap/src/protocol/alice/event_loop.rs b/swap/src/protocol/alice/event_loop.rs index 41cbb910..aecae67f 100644 --- a/swap/src/protocol/alice/event_loop.rs +++ b/swap/src/protocol/alice/event_loop.rs @@ -158,8 +158,8 @@ where OutEvent::ExecutionSetupDone{bob_peer_id, state3} => { let _ = self.handle_execution_setup_done(bob_peer_id, *state3).await; } - OutEvent::TransferProofAcknowledged => { - trace!("Bob acknowledged transfer proof"); + OutEvent::TransferProofAcknowledged(peer) => { + trace!(%peer, "Bob acknowledged transfer proof"); } OutEvent::EncryptedSignature{ msg, channel, peer } => { match self.recv_encrypted_signature.remove(&peer) { @@ -177,8 +177,8 @@ where } } OutEvent::ResponseSent => {} - OutEvent::Failure(err) => { - error!("Communication error: {:#}", err); + OutEvent::Failure {peer, error} => { + error!(%peer, "Communication error: {:#}", error); } } }, diff --git a/swap/src/protocol/alice/execution_setup.rs b/swap/src/protocol/alice/execution_setup.rs index d5c50e38..b2ec5162 100644 --- a/swap/src/protocol/alice/execution_setup.rs +++ b/swap/src/protocol/alice/execution_setup.rs @@ -29,7 +29,7 @@ pub struct Message3 { #[derive(Debug)] pub enum OutEvent { Done { bob_peer_id: PeerId, state3: State3 }, - Failure(Error), + Failure { peer: PeerId, error: Error }, } impl From> for OutEvent { @@ -39,7 +39,7 @@ impl From> for OutEvent { bob_peer_id, state3, }, - BehaviourOutEvent::Inbound(_, Err(e)) => OutEvent::Failure(e), + BehaviourOutEvent::Inbound(peer, Err(e)) => OutEvent::Failure { peer, error: e }, BehaviourOutEvent::Outbound(..) => unreachable!("Alice only supports inbound"), } } diff --git a/swap/src/protocol/alice/transfer_proof.rs b/swap/src/protocol/alice/transfer_proof.rs index 9bf8ab50..2d03add8 100644 --- a/swap/src/protocol/alice/transfer_proof.rs +++ b/swap/src/protocol/alice/transfer_proof.rs @@ -16,8 +16,8 @@ pub struct TransferProof { #[derive(Debug)] pub enum OutEvent { - Acknowledged, - Failure(Error), + Acknowledged(PeerId), + Failure { peer: PeerId, error: Error }, } /// A `NetworkBehaviour` that represents sending the Monero transfer proof to @@ -56,23 +56,27 @@ impl From> for OutEvent { match event { RequestResponseEvent::Message { message: RequestResponseMessage::Request { .. }, - .. - } => OutEvent::Failure(anyhow!( - "Alice should never get a transfer proof request from Bob" - )), + peer, + } => OutEvent::Failure { + peer, + error: anyhow!("Alice should never get a transfer proof request from Bob"), + }, RequestResponseEvent::Message { message: RequestResponseMessage::Response { .. }, - .. - } => OutEvent::Acknowledged, - RequestResponseEvent::InboundFailure { error, .. } => { - OutEvent::Failure(anyhow!("Inbound failure: {:?}", error)) - } - RequestResponseEvent::OutboundFailure { error, .. } => { - OutEvent::Failure(anyhow!("Outbound failure: {:?}", error)) - } - RequestResponseEvent::ResponseSent { .. } => { - OutEvent::Failure(anyhow!("Alice should not send a response")) - } + peer, + } => OutEvent::Acknowledged(peer), + RequestResponseEvent::InboundFailure { error, peer, .. } => OutEvent::Failure { + peer, + error: anyhow!("Inbound failure: {:?}", error), + }, + RequestResponseEvent::OutboundFailure { error, peer, .. } => OutEvent::Failure { + peer, + error: anyhow!("Outbound failure: {:?}", error), + }, + RequestResponseEvent::ResponseSent { peer, .. } => OutEvent::Failure { + peer, + error: anyhow!("Alice should not send a response"), + }, } } }