Skip to content

Commit

Permalink
Merge #321
Browse files Browse the repository at this point in the history
321: Properly handle concurrent messages to and from peers r=thomaseizinger a=thomaseizinger

Previously, we were forwarding incoming messages from peers to all
swaps that were currently running. That is obviously wrong. The new
design scopes an `EventLoopHandle` to a specific PeerId to avoid
this problem.

Co-authored-by: Thomas Eizinger <thomas@eizinger.io>
  • Loading branch information
abraham-nixon and thomaseizinger committed Mar 17, 2021
2 parents f15af2e + b53a191 commit e8ae93b
Show file tree
Hide file tree
Showing 11 changed files with 180 additions and 222 deletions.
29 changes: 4 additions & 25 deletions swap/src/database/alice.rs
Expand Up @@ -4,7 +4,6 @@ use crate::monero::monero_private_key;
use crate::protocol::alice;
use crate::protocol::alice::AliceState;
use ::bitcoin::hashes::core::fmt::Display;
use libp2p::PeerId;
use monero_rpc::wallet::BlockHeight;
use serde::{Deserialize, Serialize};

Expand All @@ -15,13 +14,9 @@ use serde::{Deserialize, Serialize};
pub enum Alice {
Started {
state3: alice::State3,
#[serde(with = "crate::serde_peer_id")]
bob_peer_id: PeerId,
},
BtcLocked {
state3: alice::State3,
#[serde(with = "crate::serde_peer_id")]
bob_peer_id: PeerId,
},
XmrLocked {
monero_wallet_restore_blockheight: BlockHeight,
Expand Down Expand Up @@ -64,19 +59,11 @@ pub enum AliceEndState {
impl From<&AliceState> for Alice {
fn from(alice_state: &AliceState) -> Self {
match alice_state {
AliceState::Started {
state3,
bob_peer_id,
} => Alice::Started {
AliceState::Started { state3 } => Alice::Started {
state3: state3.as_ref().clone(),
bob_peer_id: *bob_peer_id,
},
AliceState::BtcLocked {
state3,
bob_peer_id,
} => Alice::BtcLocked {
AliceState::BtcLocked { state3 } => Alice::BtcLocked {
state3: state3.as_ref().clone(),
bob_peer_id: *bob_peer_id,
},
AliceState::XmrLocked {
monero_wallet_restore_blockheight,
Expand Down Expand Up @@ -137,18 +124,10 @@ impl From<&AliceState> for Alice {
impl From<Alice> for AliceState {
fn from(db_state: Alice) -> Self {
match db_state {
Alice::Started {
state3,
bob_peer_id,
} => AliceState::Started {
bob_peer_id,
Alice::Started { state3 } => AliceState::Started {
state3: Box::new(state3),
},
Alice::BtcLocked {
state3,
bob_peer_id,
} => AliceState::BtcLocked {
bob_peer_id,
Alice::BtcLocked { state3 } => AliceState::BtcLocked {
state3: Box::new(state3),
},
Alice::XmrLocked {
Expand Down
1 change: 0 additions & 1 deletion swap/src/lib.rs
Expand Up @@ -30,4 +30,3 @@ pub mod seed;
pub mod trace;

mod monero_ext;
mod serde_peer_id;
75 changes: 41 additions & 34 deletions swap/src/protocol/alice/behaviour.rs
Expand Up @@ -28,13 +28,17 @@ pub enum OutEvent {
bob_peer_id: PeerId,
state3: Box<State3>,
},
TransferProofAcknowledged,
TransferProofAcknowledged(PeerId),
EncryptedSignature {
msg: Box<EncryptedSignature>,
channel: ResponseChannel<()>,
peer: PeerId,
},
ResponseSent, // Same variant is used for all messages as no processing is done
Failure(Error),
Failure {
peer: PeerId,
error: Error,
},
}

impl From<peer_tracker::OutEvent> for OutEvent {
Expand All @@ -61,23 +65,20 @@ impl From<spot_price::OutEvent> for OutEvent {
} => OutEvent::SpotPriceRequested { msg, channel, peer },
spot_price::OutEvent::Message {
message: RequestResponseMessage::Response { .. },
..
} => OutEvent::Failure(anyhow!(
"Alice is only meant to hand out spot prices, not receive them"
)),
peer,
} => OutEvent::Failure {
error: anyhow!("Alice is only meant to hand out spot prices, not receive them"),
peer,
},
spot_price::OutEvent::ResponseSent { .. } => OutEvent::ResponseSent,
spot_price::OutEvent::InboundFailure { peer, error, .. } => OutEvent::Failure(anyhow!(
"spot_price protocol with peer {} failed due to {:?}",
spot_price::OutEvent::InboundFailure { peer, error, .. } => OutEvent::Failure {
error: anyhow!("spot_price protocol failed due to {:?}", error),
peer,
error
)),
spot_price::OutEvent::OutboundFailure { peer, error, .. } => {
OutEvent::Failure(anyhow!(
"spot_price protocol with peer {} failed due to {:?}",
peer,
error
))
}
},
spot_price::OutEvent::OutboundFailure { peer, error, .. } => OutEvent::Failure {
error: anyhow!("spot_price protocol failed due to {:?}", error),
peer,
},
}
}
}
Expand All @@ -91,21 +92,20 @@ impl From<quote::OutEvent> for OutEvent {
} => OutEvent::QuoteRequested { channel, peer },
quote::OutEvent::Message {
message: RequestResponseMessage::Response { .. },
..
} => OutEvent::Failure(anyhow!(
"Alice is only meant to hand out quotes, not receive them"
)),
peer,
} => OutEvent::Failure {
error: anyhow!("Alice is only meant to hand out quotes, not receive them"),
peer,
},
quote::OutEvent::ResponseSent { .. } => OutEvent::ResponseSent,
quote::OutEvent::InboundFailure { peer, error, .. } => OutEvent::Failure(anyhow!(
"quote protocol with peer {} failed due to {:?}",
quote::OutEvent::InboundFailure { peer, error, .. } => OutEvent::Failure {
error: anyhow!("quote protocol failed due to {:?}", error),
peer,
error
)),
quote::OutEvent::OutboundFailure { peer, error, .. } => OutEvent::Failure(anyhow!(
"quote protocol with peer {} failed due to {:?}",
},
quote::OutEvent::OutboundFailure { peer, error, .. } => OutEvent::Failure {
error: anyhow!("quote protocol failed due to {:?}", error),
peer,
error
)),
},
}
}
}
Expand All @@ -121,7 +121,7 @@ impl From<execution_setup::OutEvent> for OutEvent {
bob_peer_id,
state3: Box::new(state3),
},
Failure(err) => OutEvent::Failure(err),
Failure { peer, error } => OutEvent::Failure { peer, error },
}
}
}
Expand All @@ -130,8 +130,11 @@ impl From<transfer_proof::OutEvent> for OutEvent {
fn from(event: transfer_proof::OutEvent) -> Self {
use crate::protocol::alice::transfer_proof::OutEvent::*;
match event {
Acknowledged => OutEvent::TransferProofAcknowledged,
Failure(err) => OutEvent::Failure(err.context("Failure with Transfer Proof")),
Acknowledged(peer) => OutEvent::TransferProofAcknowledged(peer),
Failure { peer, error } => OutEvent::Failure {
peer,
error: error.context("Failure with Transfer Proof"),
},
}
}
}
Expand All @@ -140,12 +143,16 @@ impl From<encrypted_signature::OutEvent> for OutEvent {
fn from(event: encrypted_signature::OutEvent) -> Self {
use crate::protocol::alice::encrypted_signature::OutEvent::*;
match event {
MsgReceived { msg, channel } => OutEvent::EncryptedSignature {
MsgReceived { msg, channel, peer } => OutEvent::EncryptedSignature {
msg: Box::new(msg),
channel,
peer,
},
AckSent => OutEvent::ResponseSent,
Failure(err) => OutEvent::Failure(err.context("Failure with Encrypted Signature")),
Failure { peer, error } => OutEvent::Failure {
peer,
error: error.context("Failure with Encrypted Signature"),
},
}
}
}
Expand Down
30 changes: 20 additions & 10 deletions swap/src/protocol/alice/encrypted_signature.rs
Expand Up @@ -5,7 +5,7 @@ use libp2p::request_response::{
ProtocolSupport, RequestResponse, RequestResponseConfig, RequestResponseEvent,
RequestResponseMessage, ResponseChannel,
};
use libp2p::NetworkBehaviour;
use libp2p::{NetworkBehaviour, PeerId};
use std::time::Duration;
use tracing::debug;

Expand All @@ -14,9 +14,13 @@ pub enum OutEvent {
MsgReceived {
msg: EncryptedSignature,
channel: ResponseChannel<()>,
peer: PeerId,
},
AckSent,
Failure(Error),
Failure {
peer: PeerId,
error: Error,
},
}

/// A `NetworkBehaviour` that represents receiving the Bitcoin encrypted
Expand Down Expand Up @@ -67,18 +71,24 @@ impl From<RequestResponseEvent<EncryptedSignature, ()>> for OutEvent {
OutEvent::MsgReceived {
msg: request,
channel,
peer,
}
}
RequestResponseEvent::Message {
message: RequestResponseMessage::Response { .. },
..
} => OutEvent::Failure(anyhow!("Alice should not get a Response")),
RequestResponseEvent::InboundFailure { error, .. } => {
OutEvent::Failure(anyhow!("Inbound failure: {:?}", error))
}
RequestResponseEvent::OutboundFailure { error, .. } => {
OutEvent::Failure(anyhow!("Outbound failure: {:?}", error))
}
peer,
} => OutEvent::Failure {
peer,
error: anyhow!("Alice should not get a Response"),
},
RequestResponseEvent::InboundFailure { error, peer, .. } => OutEvent::Failure {
peer,
error: anyhow!("Inbound failure: {:?}", error),
},
RequestResponseEvent::OutboundFailure { error, peer, .. } => OutEvent::Failure {
peer,
error: anyhow!("Outbound failure: {:?}", error),
},
RequestResponseEvent::ResponseSent { .. } => OutEvent::AckSent,
}
}
Expand Down

0 comments on commit e8ae93b

Please sign in to comment.