Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add HTLCHandlingFailed event #1403

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion fuzz/src/chanmon_consistency.rs
Expand Up @@ -148,7 +148,7 @@ impl chain::Watch<EnforcingSigner> for TestChainMonitor {
self.chain_monitor.update_channel(funding_txo, update)
}

fn release_pending_monitor_events(&self) -> Vec<(OutPoint, Vec<MonitorEvent>)> {
fn release_pending_monitor_events(&self) -> Vec<(OutPoint, Vec<MonitorEvent>, Option<PublicKey>)> {
return self.chain_monitor.release_pending_monitor_events();
}
}
Expand Down Expand Up @@ -860,6 +860,7 @@ pub fn do_test<Out: Output>(data: &[u8], underlying_out: Out) {
events::Event::PendingHTLCsForwardable { .. } => {
nodes[$node].process_pending_htlc_forwards();
},
events::Event::HTLCHandlingFailed { .. } => {},
_ => if out.may_fail.load(atomic::Ordering::Acquire) {
return;
} else {
Expand Down
16 changes: 10 additions & 6 deletions lightning/src/chain/chainmonitor.rs
Expand Up @@ -43,6 +43,7 @@ use prelude::*;
use sync::{RwLock, RwLockReadGuard, Mutex, MutexGuard};
use core::ops::Deref;
use core::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use bitcoin::secp256k1::PublicKey;

#[derive(Clone, Copy, Hash, PartialEq, Eq)]
/// A specific update's ID stored in a `MonitorUpdateId`, separated out to make the contents
Expand Down Expand Up @@ -235,7 +236,7 @@ pub struct ChainMonitor<ChannelSigner: Sign, C: Deref, T: Deref, F: Deref, L: De
persister: P,
/// "User-provided" (ie persistence-completion/-failed) [`MonitorEvent`]s. These came directly
/// from the user and not from a [`ChannelMonitor`].
pending_monitor_events: Mutex<Vec<(OutPoint, Vec<MonitorEvent>)>>,
pending_monitor_events: Mutex<Vec<(OutPoint, Vec<MonitorEvent>, Option<PublicKey>)>>,
/// The best block height seen, used as a proxy for the passage of time.
highest_chain_height: AtomicUsize,
}
Expand Down Expand Up @@ -299,7 +300,7 @@ where C::Target: chain::Filter,
log_trace!(self.logger, "Finished syncing Channel Monitor for channel {}", log_funding_info!(monitor)),
Err(ChannelMonitorUpdateErr::PermanentFailure) => {
monitor_state.channel_perm_failed.store(true, Ordering::Release);
self.pending_monitor_events.lock().unwrap().push((*funding_outpoint, vec![MonitorEvent::UpdateFailed(*funding_outpoint)]));
self.pending_monitor_events.lock().unwrap().push((*funding_outpoint, vec![MonitorEvent::UpdateFailed(*funding_outpoint)], monitor.get_counterparty_node_id()));
},
Err(ChannelMonitorUpdateErr::TemporaryFailure) => {
log_debug!(self.logger, "Channel Monitor sync for channel {} in progress, holding events until completion!", log_funding_info!(monitor));
Expand Down Expand Up @@ -458,7 +459,7 @@ where C::Target: chain::Filter,
self.pending_monitor_events.lock().unwrap().push((funding_txo, vec![MonitorEvent::UpdateCompleted {
funding_txo,
monitor_update_id: monitor_data.monitor.get_latest_update_id(),
}]));
}], monitor_data.monitor.get_counterparty_node_id()));
},
MonitorUpdateId { contents: UpdateOrigin::ChainSync(_) } => {
if !monitor_data.has_pending_chainsync_updates(&pending_monitor_updates) {
Expand All @@ -476,10 +477,12 @@ where C::Target: chain::Filter,
/// channel_monitor_updated once with the highest ID.
#[cfg(any(test, fuzzing))]
pub fn force_channel_monitor_updated(&self, funding_txo: OutPoint, monitor_update_id: u64) {
let monitors = self.monitors.read().unwrap();
let counterparty_node_id = monitors.get(&funding_txo).and_then(|m| m.monitor.get_counterparty_node_id());
self.pending_monitor_events.lock().unwrap().push((funding_txo, vec![MonitorEvent::UpdateCompleted {
funding_txo,
monitor_update_id,
}]));
}], counterparty_node_id));
}

#[cfg(any(test, fuzzing, feature = "_test_utils"))]
Expand Down Expand Up @@ -666,7 +669,7 @@ where C::Target: chain::Filter,
}
}

fn release_pending_monitor_events(&self) -> Vec<(OutPoint, Vec<MonitorEvent>)> {
fn release_pending_monitor_events(&self) -> Vec<(OutPoint, Vec<MonitorEvent>, Option<PublicKey>)> {
let mut pending_monitor_events = self.pending_monitor_events.lock().unwrap().split_off(0);
for monitor_state in self.monitors.read().unwrap().values() {
let is_pending_monitor_update = monitor_state.has_pending_chainsync_updates(&monitor_state.pending_monitor_updates.lock().unwrap());
Expand Down Expand Up @@ -695,7 +698,8 @@ where C::Target: chain::Filter,
let monitor_events = monitor_state.monitor.get_and_clear_pending_monitor_events();
if monitor_events.len() > 0 {
let monitor_outpoint = monitor_state.monitor.get_funding_txo().0;
pending_monitor_events.push((monitor_outpoint, monitor_events));
let counterparty_node_id = monitor_state.monitor.get_counterparty_node_id();
pending_monitor_events.push((monitor_outpoint, monitor_events, counterparty_node_id));
}
}
}
Expand Down
4 changes: 4 additions & 0 deletions lightning/src/chain/channelmonitor.rs
Expand Up @@ -1209,6 +1209,10 @@ impl<Signer: Sign> ChannelMonitor<Signer> {
self.inner.lock().unwrap().get_cur_holder_commitment_number()
}

pub(crate) fn get_counterparty_node_id(&self) -> Option<PublicKey> {
self.inner.lock().unwrap().counterparty_node_id
}

/// Used by ChannelManager deserialization to broadcast the latest holder state if its copy of
/// the Channel was out-of-date. You may use it to get a broadcastable holder toxic tx in case of
/// fallen-behind, i.e when receiving a channel_reestablish with a proof that our counterparty side knows
Expand Down
3 changes: 2 additions & 1 deletion lightning/src/chain/mod.rs
Expand Up @@ -15,6 +15,7 @@ use bitcoin::blockdata::script::Script;
use bitcoin::blockdata::transaction::{Transaction, TxOut};
use bitcoin::hash_types::{BlockHash, Txid};
use bitcoin::network::constants::Network;
use bitcoin::secp256k1::PublicKey;

use chain::channelmonitor::{ChannelMonitor, ChannelMonitorUpdate, MonitorEvent};
use chain::keysinterface::Sign;
Expand Down Expand Up @@ -302,7 +303,7 @@ pub trait Watch<ChannelSigner: Sign> {
///
/// For details on asynchronous [`ChannelMonitor`] updating and returning
/// [`MonitorEvent::UpdateCompleted`] here, see [`ChannelMonitorUpdateErr::TemporaryFailure`].
fn release_pending_monitor_events(&self) -> Vec<(OutPoint, Vec<MonitorEvent>)>;
fn release_pending_monitor_events(&self) -> Vec<(OutPoint, Vec<MonitorEvent>, Option<PublicKey>)>;
}

/// The `Filter` trait defines behavior for indicating chain activity of interest pertaining to
Expand Down
18 changes: 9 additions & 9 deletions lightning/src/ln/chanmon_update_fail_tests.rs
Expand Up @@ -26,7 +26,7 @@ use ln::msgs;
use ln::msgs::{ChannelMessageHandler, RoutingMessageHandler};
use util::config::UserConfig;
use util::enforcing_trait_impls::EnforcingSigner;
use util::events::{Event, MessageSendEvent, MessageSendEventsProvider, PaymentPurpose, ClosureReason};
use util::events::{Event, MessageSendEvent, MessageSendEventsProvider, PaymentPurpose, ClosureReason, HTLCDestination};
use util::errors::APIError;
use util::ser::{ReadableArgs, Writeable};
use util::test_utils::TestBroadcaster;
Expand Down Expand Up @@ -832,7 +832,7 @@ fn do_test_monitor_update_fail_raa(test_ignore_second_cs: bool) {

// Fail the payment backwards, failing the monitor update on nodes[1]'s receipt of the RAA
nodes[2].node.fail_htlc_backwards(&payment_hash_1);
expect_pending_htlcs_forwardable!(nodes[2]);
expect_pending_htlcs_forwardable_and_htlc_handling_failed!(nodes[2], vec![HTLCDestination::FailedPayment { payment_hash: payment_hash_1 }]);
check_added_monitors!(nodes[2], 1);

let updates = get_htlc_update_msgs!(nodes[2], nodes[1].node.get_our_node_id());
Expand Down Expand Up @@ -913,7 +913,7 @@ fn do_test_monitor_update_fail_raa(test_ignore_second_cs: bool) {
let (outpoint, latest_update, _) = nodes[1].chain_monitor.latest_monitor_update_id.lock().unwrap().get(&chan_2.2).unwrap().clone();
nodes[1].chain_monitor.chain_monitor.force_channel_monitor_updated(outpoint, latest_update);
check_added_monitors!(nodes[1], 0);
expect_pending_htlcs_forwardable!(nodes[1]);
expect_pending_htlcs_forwardable_and_htlc_handling_failed!(nodes[1], vec![HTLCDestination::NextHopChannel { node_id: Some(nodes[2].node.get_our_node_id()), channel_id: chan_2.2 }]);
check_added_monitors!(nodes[1], 1);

let mut events_3 = nodes[1].node.get_and_clear_pending_msg_events();
Expand Down Expand Up @@ -1690,14 +1690,14 @@ fn test_monitor_update_on_pending_forwards() {
let node_chanmgrs = create_node_chanmgrs(3, &node_cfgs, &[None, None, None]);
let mut nodes = create_network(3, &node_cfgs, &node_chanmgrs);
let chan_1 = create_announced_chan_between_nodes(&nodes, 0, 1, InitFeatures::known(), InitFeatures::known());
create_announced_chan_between_nodes(&nodes, 1, 2, InitFeatures::known(), InitFeatures::known());
let chan_2 = create_announced_chan_between_nodes(&nodes, 1, 2, InitFeatures::known(), InitFeatures::known());

// Rebalance a bit so that we can send backwards from 3 to 1.
send_payment(&nodes[0], &[&nodes[1], &nodes[2]], 5000000);

let (_, payment_hash_1, _) = route_payment(&nodes[0], &[&nodes[1], &nodes[2]], 1000000);
nodes[2].node.fail_htlc_backwards(&payment_hash_1);
expect_pending_htlcs_forwardable!(nodes[2]);
expect_pending_htlcs_forwardable_and_htlc_handling_failed!(nodes[2], vec![HTLCDestination::FailedPayment { payment_hash: payment_hash_1 }]);
check_added_monitors!(nodes[2], 1);

let cs_fail_update = get_htlc_update_msgs!(nodes[2], nodes[1].node.get_our_node_id());
Expand All @@ -1718,7 +1718,7 @@ fn test_monitor_update_on_pending_forwards() {
commitment_signed_dance!(nodes[1], nodes[2], payment_event.commitment_msg, false);

chanmon_cfgs[1].persister.set_update_ret(Err(ChannelMonitorUpdateErr::TemporaryFailure));
expect_pending_htlcs_forwardable!(nodes[1]);
expect_pending_htlcs_forwardable_and_htlc_handling_failed!(nodes[1], vec![HTLCDestination::NextHopChannel { node_id: Some(nodes[2].node.get_our_node_id()), channel_id: chan_2.2 }]);
check_added_monitors!(nodes[1], 1);
assert!(nodes[1].node.get_and_clear_pending_msg_events().is_empty());
nodes[1].logger.assert_log("lightning::ln::channelmanager".to_string(), "Failed to update ChannelMonitor".to_string(), 1);
Expand Down Expand Up @@ -2106,7 +2106,7 @@ fn test_fail_htlc_on_broadcast_after_claim() {
check_closed_broadcast!(nodes[1], true);
connect_blocks(&nodes[1], ANTI_REORG_DELAY - 1);
check_added_monitors!(nodes[1], 1);
expect_pending_htlcs_forwardable!(nodes[1]);
expect_pending_htlcs_forwardable_and_htlc_handling_failed!(nodes[1], vec![HTLCDestination::NextHopChannel { node_id: Some(nodes[2].node.get_our_node_id()), channel_id: chan_id_2 }]);

nodes[0].node.handle_update_fulfill_htlc(&nodes[1].node.get_our_node_id(), &bs_updates.update_fulfill_htlcs[0]);
expect_payment_sent_without_paths!(nodes[0], payment_preimage);
Expand Down Expand Up @@ -2469,7 +2469,7 @@ fn do_test_reconnect_dup_htlc_claims(htlc_status: HTLCStatusAtDupClaim, second_f
};
if second_fails {
nodes[2].node.fail_htlc_backwards(&payment_hash);
expect_pending_htlcs_forwardable!(nodes[2]);
expect_pending_htlcs_forwardable_and_htlc_handling_failed!(nodes[2], vec![HTLCDestination::FailedPayment { payment_hash }]);
check_added_monitors!(nodes[2], 1);
get_htlc_update_msgs!(nodes[2], nodes[1].node.get_our_node_id());
} else {
Expand Down Expand Up @@ -2505,7 +2505,7 @@ fn do_test_reconnect_dup_htlc_claims(htlc_status: HTLCStatusAtDupClaim, second_f

if second_fails {
reconnect_nodes(&nodes[1], &nodes[2], (false, false), (0, 0), (0, 0), (1, 0), (0, 0), (0, 0), (false, false));
expect_pending_htlcs_forwardable!(nodes[1]);
expect_pending_htlcs_forwardable_and_htlc_handling_failed!(nodes[1], vec![HTLCDestination::NextHopChannel { node_id: Some(nodes[2].node.get_our_node_id()), channel_id: chan_id_2 }]);
} else {
reconnect_nodes(&nodes[1], &nodes[2], (false, false), (0, 0), (1, 0), (0, 0), (0, 0), (0, 0), (false, false));
}
Expand Down
5 changes: 3 additions & 2 deletions lightning/src/ln/channel.rs
Expand Up @@ -5765,7 +5765,7 @@ impl<Signer: Sign> Channel<Signer> {
/// those explicitly stated to be allowed after shutdown completes, eg some simple getters).
/// Also returns the list of payment_hashes for channels which we can safely fail backwards
/// immediately (others we will have to allow to time out).
pub fn force_shutdown(&mut self, should_broadcast: bool) -> (Option<(OutPoint, ChannelMonitorUpdate)>, Vec<(HTLCSource, PaymentHash)>) {
pub fn force_shutdown(&mut self, should_broadcast: bool) -> (Option<(OutPoint, ChannelMonitorUpdate)>, Vec<(HTLCSource, PaymentHash, PublicKey, [u8; 32])>) {
// Note that we MUST only generate a monitor update that indicates force-closure - we're
// called during initialization prior to the chain_monitor in the encompassing ChannelManager
// being fully configured in some cases. Thus, its likely any monitor events we generate will
Expand All @@ -5775,10 +5775,11 @@ impl<Signer: Sign> Channel<Signer> {
// We go ahead and "free" any holding cell HTLCs or HTLCs we haven't yet committed to and
// return them to fail the payment.
let mut dropped_outbound_htlcs = Vec::with_capacity(self.holding_cell_htlc_updates.len());
let counterparty_node_id = self.get_counterparty_node_id();
for htlc_update in self.holding_cell_htlc_updates.drain(..) {
match htlc_update {
HTLCUpdateAwaitingACK::AddHTLC { source, payment_hash, .. } => {
dropped_outbound_htlcs.push((source, payment_hash));
dropped_outbound_htlcs.push((source, payment_hash, counterparty_node_id, self.channel_id));
},
_ => {}
}
Expand Down