Skip to content

Commit

Permalink
Introduce Retry Invoice Flow
Browse files Browse the repository at this point in the history
Description:

- Add functionality to handle retrying the sending of invoice_request
  messages on new reply_paths that are still awaiting invoices.

Changes:

1. Introduced invoice_request as an optional field in the `PendingOutboundPayments::AwaitingInvoice`
   variant to accommodate instances without invoice requests.
2. Refactored logic from `pay_for_offer` to create invoice request messages into a separate function
   for reuse with retry message flow.
3. Implemented `retry_tick_occurred` function in ChannelManager to handle generating invoice request
   messages for AwaitingInvoice payments and enqueueing them.
4. Added `retry_tick_occurred` to ln_background_processor with a timer duration of 5 seconds for
   timely retries without overwhelming the system with too many onion_messages.
  • Loading branch information
shaavan committed Apr 22, 2024
1 parent bc4a5ea commit fe09ff9
Show file tree
Hide file tree
Showing 4 changed files with 98 additions and 38 deletions.
24 changes: 20 additions & 4 deletions lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,9 @@ use alloc::vec::Vec;
/// * Monitoring whether the [`ChannelManager`] needs to be re-persisted to disk, and if so,
/// writing it to disk/backups by invoking the callback given to it at startup.
/// [`ChannelManager`] persistence should be done in the background.
/// * Calling [`ChannelManager::timer_tick_occurred`], [`ChainMonitor::rebroadcast_pending_claims`]
/// and [`PeerManager::timer_tick_occurred`] at the appropriate intervals.
/// * Calling [`ChannelManager::timer_tick_occurred`], [`ChannelManager::retry_tick_occurred`]
/// [`ChainMonitor::rebroadcast_pending_claims`] and [`PeerManager::timer_tick_occurred`]
/// at the appropriate intervals.
/// * Calling [`NetworkGraph::remove_stale_channels_and_tracking`] (if a [`GossipSync`] with a
/// [`NetworkGraph`] is provided to [`BackgroundProcessor::start`]).
///
Expand All @@ -81,6 +82,7 @@ use alloc::vec::Vec;
///
/// [`ChannelManager`]: lightning::ln::channelmanager::ChannelManager
/// [`ChannelManager::timer_tick_occurred`]: lightning::ln::channelmanager::ChannelManager::timer_tick_occurred
/// [`ChannelManager::retry_tick_occurred`]: lightning::ln::channelmanager::ChannelManager::retry_tick_occurred
/// [`ChannelMonitor`]: lightning::chain::channelmonitor::ChannelMonitor
/// [`Event`]: lightning::events::Event
/// [`PeerManager::timer_tick_occurred`]: lightning::ln::peer_handler::PeerManager::timer_tick_occurred
Expand All @@ -97,6 +99,11 @@ const FRESHNESS_TIMER: u64 = 60;
#[cfg(test)]
const FRESHNESS_TIMER: u64 = 1;

#[cfg(not(test))]
const RETRY_TIMER: u64 = 5;
#[cfg(test)]
const RETRY_TIMER: u64 = 1;

#[cfg(all(not(test), not(debug_assertions)))]
const PING_TIMER: u64 = 10;
/// Signature operations take a lot longer without compiler optimisations.
Expand Down Expand Up @@ -134,7 +141,7 @@ const REBROADCAST_TIMER: u64 = 1;
/// core::cmp::min is not currently const, so we define a trivial (and equivalent) replacement
const fn min_u64(a: u64, b: u64) -> u64 { if a < b { a } else { b } }
#[cfg(feature = "futures")]
const FASTEST_TIMER: u64 = min_u64(min_u64(FRESHNESS_TIMER, PING_TIMER),
const FASTEST_TIMER: u64 = min_u64(min_u64(RETRY_TIMER, min_u64(FRESHNESS_TIMER, PING_TIMER)),
min_u64(SCORER_PERSIST_TIMER, min_u64(FIRST_NETWORK_PRUNE_TIMER, REBROADCAST_TIMER)));

/// Either [`P2PGossipSync`] or [`RapidGossipSync`].
Expand Down Expand Up @@ -291,6 +298,7 @@ macro_rules! define_run_body {
$chain_monitor.rebroadcast_pending_claims();

let mut last_freshness_call = $get_timer(FRESHNESS_TIMER);
let mut last_retry_call = $get_timer(RETRY_TIMER);
let mut last_onion_message_handler_call = $get_timer(ONION_MESSAGE_HANDLER_TIMER);
let mut last_ping_call = $get_timer(PING_TIMER);
let mut last_prune_call = $get_timer(FIRST_NETWORK_PRUNE_TIMER);
Expand Down Expand Up @@ -346,6 +354,11 @@ macro_rules! define_run_body {
$channel_manager.get_cm().timer_tick_occurred();
last_freshness_call = $get_timer(FRESHNESS_TIMER);
}
if $timer_elapsed(&mut last_retry_call, RETRY_TIMER) {
log_trace!($logger, "Calling ChannelManager's retry_tick_occurred");
$channel_manager.get_cm().retry_tick_occurred();
last_retry_call = $get_timer(RETRY_TIMER);
}
if $timer_elapsed(&mut last_onion_message_handler_call, ONION_MESSAGE_HANDLER_TIMER) {
log_trace!($logger, "Calling OnionMessageHandler's timer_tick_occurred");
$peer_manager.onion_message_handler().timer_tick_occurred();
Expand Down Expand Up @@ -1444,6 +1457,7 @@ mod tests {
// - `ChainMonitor::rebroadcast_pending_claims` is called every `REBROADCAST_TIMER`,
// - `PeerManager::timer_tick_occurred` is called every `PING_TIMER`, and
// - `OnionMessageHandler::timer_tick_occurred` is called every `ONION_MESSAGE_HANDLER_TIMER`.
// - `ChannelManager::retry_tick_occurred` is called every `RETRY_TIMER`.
let (_, nodes) = create_nodes(1, "test_timer_tick_called");
let data_dir = nodes[0].kv_store.get_data_dir();
let persister = Arc::new(Persister::new(data_dir));
Expand All @@ -1455,10 +1469,12 @@ mod tests {
let desired_log_2 = "Calling PeerManager's timer_tick_occurred".to_string();
let desired_log_3 = "Rebroadcasting monitor's pending claims".to_string();
let desired_log_4 = "Calling OnionMessageHandler's timer_tick_occurred".to_string();
let desired_log_5 = "Calling ChannelManager's retry_tick_occurred".to_string();
if log_entries.get(&("lightning_background_processor", desired_log_1)).is_some() &&
log_entries.get(&("lightning_background_processor", desired_log_2)).is_some() &&
log_entries.get(&("lightning_background_processor", desired_log_3)).is_some() &&
log_entries.get(&("lightning_background_processor", desired_log_4)).is_some() {
log_entries.get(&("lightning_background_processor", desired_log_4)).is_some() &&
log_entries.get(&("lightning_background_processor", desired_log_5)).is_some() {
break
}
}
Expand Down
42 changes: 19 additions & 23 deletions lightning/src/ln/channelmanager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ use crate::util::string::UntrustedString;
use crate::util::ser::{BigSize, FixedLengthReader, Readable, ReadableArgs, MaybeReadable, Writeable, Writer, VecWriter};
use crate::util::logger::{Level, Logger, WithContext};
use crate::util::errors::APIError;
use super::onion_utils::construct_invoice_request_message;

#[cfg(not(c_bindings))]
use {
crate::offers::offer::DerivedMetadata,
Expand Down Expand Up @@ -6010,6 +6012,20 @@ where
});
}

pub fn retry_tick_occurred(&self) {
let invoice_requests = self.pending_outbound_payments.get_invoice_request_awaiting_invoice();

if invoice_requests.is_empty() { return; }

if let Ok(reply_path) = self.create_blinded_path() {
let mut pending_offers_messages = self.pending_offers_messages.lock().unwrap();

for invoice_request in invoice_requests {
pending_offers_messages.extend(construct_invoice_request_message(invoice_request, reply_path.clone()));
}
}
}

/// Indicates that the preimage for payment_hash is unknown or the received amount is incorrect
/// after a PaymentClaimable event, failing the HTLC back to its origin and freeing resources
/// along the path (including in our own channel on which we received it).
Expand Down Expand Up @@ -8623,7 +8639,7 @@ macro_rules! create_refund_builder { ($self: ident, $builder: ty) => {
let expiration = StaleExpiration::AbsoluteTimeout(absolute_expiry);
$self.pending_outbound_payments
.add_new_awaiting_invoice(
payment_id, expiration, retry_strategy, max_total_routing_fee_msat,
payment_id, expiration, retry_strategy, max_total_routing_fee_msat, None
)
.map_err(|_| Bolt12SemanticError::DuplicatePaymentId)?;

Expand Down Expand Up @@ -8740,32 +8756,12 @@ where
let expiration = StaleExpiration::TimerTicks(1);
self.pending_outbound_payments
.add_new_awaiting_invoice(
payment_id, expiration, retry_strategy, max_total_routing_fee_msat
payment_id, expiration, retry_strategy, max_total_routing_fee_msat, Some(invoice_request)
)
.map_err(|_| Bolt12SemanticError::DuplicatePaymentId)?;

let mut pending_offers_messages = self.pending_offers_messages.lock().unwrap();
if offer.paths().is_empty() {
let message = new_pending_onion_message(
OffersMessage::InvoiceRequest(invoice_request),
Destination::Node(offer.signing_pubkey()),
Some(reply_path),
);
pending_offers_messages.push(message);
} else {
// Send as many invoice requests as there are paths in the offer (with an upper bound).
// Using only one path could result in a failure if the path no longer exists. But only
// one invoice for a given payment id will be paid, even if more than one is received.
const REQUEST_LIMIT: usize = 10;
for path in offer.paths().into_iter().take(REQUEST_LIMIT) {
let message = new_pending_onion_message(
OffersMessage::InvoiceRequest(invoice_request.clone()),
Destination::BlindedPath(path.clone()),
Some(reply_path.clone()),
);
pending_offers_messages.push(message);
}
}
pending_offers_messages.extend(construct_invoice_request_message(invoice_request, reply_path));

Ok(())
}
Expand Down
30 changes: 30 additions & 0 deletions lightning/src/ln/onion_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,16 @@
// You may not use this file except in accordance with one or both of these
// licenses.

use crate::blinded_path::BlindedPath;
use crate::crypto::chacha20::ChaCha20;
use crate::crypto::streams::ChaChaReader;
use crate::ln::channelmanager::{HTLCSource, RecipientOnionFields};
use crate::ln::msgs;
use crate::ln::wire::Encode;
use crate::ln::{PaymentHash, PaymentPreimage};
use crate::offers::invoice_request::InvoiceRequest;
use crate::onion_message::messenger::{new_pending_onion_message, Destination, PendingOnionMessage};
use crate::onion_message::offers::OffersMessage;
use crate::routing::gossip::NetworkUpdate;
use crate::routing::router::{BlindedTail, Path, RouteHop};
use crate::sign::NodeSigner;
Expand Down Expand Up @@ -1235,6 +1239,32 @@ fn decode_next_hop<T, R: ReadableArgs<T>, N: NextPacketBytes>(
}
}

pub fn construct_invoice_request_message(invoice_request: InvoiceRequest, reply_path: BlindedPath) -> Vec<PendingOnionMessage<OffersMessage>> {
let mut messages = vec![];
if invoice_request.paths().is_empty() {
let message = new_pending_onion_message(
OffersMessage::InvoiceRequest(invoice_request),
Destination::Node(invoice_request.signing_pubkey()),
Some(reply_path),
);
messages.push(message);
} else {
// Send as many invoice requests as there are paths in the offer (with an upper bound).
// Using only one path could result in a failure if the path no longer exists. But only
// one invoice for a given payment id will be paid, even if more than one is received.
const REQUEST_LIMIT: usize = 10;
for path in invoice_request.paths().into_iter().take(REQUEST_LIMIT) {
let message = new_pending_onion_message(
OffersMessage::InvoiceRequest(invoice_request.clone()),
Destination::BlindedPath(path.clone()),
Some(reply_path.clone()),
);
messages.push(message);
}
}
messages
}

#[cfg(test)]
mod tests {
use crate::io;
Expand Down
40 changes: 29 additions & 11 deletions lightning/src/ln/outbound_payment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use bitcoin::hashes::Hash;
use bitcoin::hashes::sha256::Hash as Sha256;
use bitcoin::secp256k1::{self, Secp256k1, SecretKey};

use crate::offers::invoice_request::InvoiceRequest;
use crate::sign::{EntropySource, NodeSigner, Recipient};
use crate::events::{self, PaymentFailureReason};
use crate::ln::{PaymentHash, PaymentPreimage, PaymentSecret};
Expand Down Expand Up @@ -50,6 +51,7 @@ pub(crate) enum PendingOutboundPayment {
expiration: StaleExpiration,
retry_strategy: Retry,
max_total_routing_fee_msat: Option<u64>,
invoice_request: Option<InvoiceRequest>,
},
InvoiceReceived {
payment_hash: PaymentHash,
Expand Down Expand Up @@ -1291,7 +1293,7 @@ impl OutboundPayments {

pub(super) fn add_new_awaiting_invoice(
&self, payment_id: PaymentId, expiration: StaleExpiration, retry_strategy: Retry,
max_total_routing_fee_msat: Option<u64>
max_total_routing_fee_msat: Option<u64>, invoice_request: Option<InvoiceRequest>
) -> Result<(), ()> {
let mut pending_outbounds = self.pending_outbound_payments.lock().unwrap();
match pending_outbounds.entry(payment_id) {
Expand All @@ -1301,6 +1303,7 @@ impl OutboundPayments {
expiration,
retry_strategy,
max_total_routing_fee_msat,
invoice_request,
});

Ok(())
Expand Down Expand Up @@ -1766,6 +1769,20 @@ impl OutboundPayments {
pub fn clear_pending_payments(&self) {
self.pending_outbound_payments.lock().unwrap().clear()
}

pub fn get_invoice_request_awaiting_invoice(&self) -> Vec<InvoiceRequest> {
let pending_outbound_payments = self.pending_outbound_payments.lock().unwrap();

pending_outbound_payments.iter().filter_map(
|(_, payment)| {
if let PendingOutboundPayment::AwaitingInvoice { invoice_request, .. } = payment {
invoice_request.clone()
} else {
None
}
}
).collect()
}
}

/// Returns whether a payment with the given [`PaymentHash`] and [`PaymentId`] is, in fact, a
Expand Down Expand Up @@ -1821,6 +1838,7 @@ impl_writeable_tlv_based_enum_upgradable!(PendingOutboundPayment,
(0, expiration, required),
(2, retry_strategy, required),
(4, max_total_routing_fee_msat, option),
(6, invoice_request, option),
},
(7, InvoiceReceived) => {
(0, payment_hash, required),
Expand Down Expand Up @@ -2058,7 +2076,7 @@ mod tests {
assert!(!outbound_payments.has_pending_payments());
assert!(
outbound_payments.add_new_awaiting_invoice(
payment_id, expiration, Retry::Attempts(0), None
payment_id, expiration, Retry::Attempts(0), None, None
).is_ok()
);
assert!(outbound_payments.has_pending_payments());
Expand All @@ -2084,14 +2102,14 @@ mod tests {

assert!(
outbound_payments.add_new_awaiting_invoice(
payment_id, expiration, Retry::Attempts(0), None
payment_id, expiration, Retry::Attempts(0), None, None
).is_ok()
);
assert!(outbound_payments.has_pending_payments());

assert!(
outbound_payments.add_new_awaiting_invoice(
payment_id, expiration, Retry::Attempts(0), None
payment_id, expiration, Retry::Attempts(0), None, None
).is_err()
);
}
Expand All @@ -2107,7 +2125,7 @@ mod tests {
assert!(!outbound_payments.has_pending_payments());
assert!(
outbound_payments.add_new_awaiting_invoice(
payment_id, expiration, Retry::Attempts(0), None
payment_id, expiration, Retry::Attempts(0), None, None
).is_ok()
);
assert!(outbound_payments.has_pending_payments());
Expand All @@ -2133,14 +2151,14 @@ mod tests {

assert!(
outbound_payments.add_new_awaiting_invoice(
payment_id, expiration, Retry::Attempts(0), None
payment_id, expiration, Retry::Attempts(0), None, None
).is_ok()
);
assert!(outbound_payments.has_pending_payments());

assert!(
outbound_payments.add_new_awaiting_invoice(
payment_id, expiration, Retry::Attempts(0), None
payment_id, expiration, Retry::Attempts(0), None, None
).is_err()
);
}
Expand All @@ -2155,7 +2173,7 @@ mod tests {
assert!(!outbound_payments.has_pending_payments());
assert!(
outbound_payments.add_new_awaiting_invoice(
payment_id, expiration, Retry::Attempts(0), None
payment_id, expiration, Retry::Attempts(0), None, None
).is_ok()
);
assert!(outbound_payments.has_pending_payments());
Expand Down Expand Up @@ -2188,7 +2206,7 @@ mod tests {

assert!(
outbound_payments.add_new_awaiting_invoice(
payment_id, expiration, Retry::Attempts(0), None
payment_id, expiration, Retry::Attempts(0), None, None
).is_ok()
);
assert!(outbound_payments.has_pending_payments());
Expand Down Expand Up @@ -2250,7 +2268,7 @@ mod tests {
assert!(
outbound_payments.add_new_awaiting_invoice(
payment_id, expiration, Retry::Attempts(0),
Some(invoice.amount_msats() / 100 + 50_000)
Some(invoice.amount_msats() / 100 + 50_000), None
).is_ok()
);
assert!(outbound_payments.has_pending_payments());
Expand Down Expand Up @@ -2347,7 +2365,7 @@ mod tests {

assert!(
outbound_payments.add_new_awaiting_invoice(
payment_id, expiration, Retry::Attempts(0), Some(1234)
payment_id, expiration, Retry::Attempts(0), Some(1234), None
).is_ok()
);
assert!(outbound_payments.has_pending_payments());
Expand Down

0 comments on commit fe09ff9

Please sign in to comment.