Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] funding: add publisher #6400

Draft
wants to merge 8 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
213 changes: 191 additions & 22 deletions funding/manager.go
Expand Up @@ -303,9 +303,8 @@ type Config struct {
// funds from on-chain transaction outputs into Lightning channels.
Wallet *lnwallet.LightningWallet

// PublishTransaction facilitates the process of broadcasting a
// transaction to the network.
PublishTransaction func(*wire.MsgTx, string) error
// PublisherCfg is the config required to initialise the Publisher.
PublisherCfg *PublisherCfg

// UpdateLabel updates the label that a transaction has in our wallet,
// overwriting any existing labels.
Expand Down Expand Up @@ -527,6 +526,12 @@ type Manager struct {
handleFundingLockedMtx sync.RWMutex
handleFundingLockedBarriers map[lnwire.ChannelID]struct{}

// publisher facilitates the process of first checking the sanity and
// standardness of a transaction, then possible testing its acceptance
// to the mempool if one is available, and then broadcasting it to the
// network.
publisher *Publisher

quit chan struct{}
wg sync.WaitGroup
}
Expand Down Expand Up @@ -580,6 +585,7 @@ func NewFundingManager(cfg Config) (*Manager, error) {
fundingRequests: make(chan *InitFundingMsg, msgBufferSize),
localDiscoverySignals: make(map[lnwire.ChannelID]chan struct{}),
handleFundingLockedBarriers: make(map[lnwire.ChannelID]struct{}),
publisher: NewPublisher(cfg.PublisherCfg),
quit: make(chan struct{}),
}, nil
}
Expand Down Expand Up @@ -658,15 +664,13 @@ func (f *Manager) start() error {
labels.LabelTypeChannelOpen, nil,
)

err = f.cfg.PublishTransaction(
err = f.publisher.CheckAndPublish(
channel.FundingTxn, label,
)
if err != nil {
log.Errorf("Unable to rebroadcast "+
"funding tx %x for "+
"ChannelPoint(%v): %v",
fundingTxBuf.Bytes(),
channel.FundingOutpoint, err)
if f.maybeAbortOnStartupTxPublishErr(
err, channel, fundingTxBuf.Bytes(),
) {
continue
}
}
}
Expand Down Expand Up @@ -778,6 +782,12 @@ func (f *Manager) failFundingFlow(peer lnpeer.Peer, tempChanID [32]byte,
ctx.err <- fundingErr
}

f.sendError(peer, tempChanID, fundingErr)
}

func (f *Manager) sendError(peer lnpeer.Peer, tempChanID [32]byte,
fundingErr error) {

// We only send the exact error if it is part of out whitelisted set of
// errors (lnwire.FundingError or lnwallet.ReservationError).
var msg lnwire.ErrorData
Expand Down Expand Up @@ -1049,6 +1059,49 @@ func (f *Manager) stateStep(channel *channeldb.OpenChannel,
return fmt.Errorf("undefined channelState: %v", channelState)
}

// failPendingChannel creates a close summary and closes the channel if it's
// initiated by us and it's in pending state.
func (f *Manager) failPendingChannel(pendingChan *channeldb.OpenChannel) error {
// Check the channel is pending.
if !pendingChan.IsPending {
return fmt.Errorf("cannot fail a non-pending channel: %v",
pendingChan.FundingOutpoint,
)
}

// Check we are the initiator.
if !pendingChan.IsInitiator {
return fmt.Errorf("cannot fail a remote pending channel: %v",
pendingChan.FundingOutpoint,
)
}

// Create the close summary.
localBalance := pendingChan.LocalCommitment.LocalBalance.ToSatoshis()
closeInfo := &channeldb.ChannelCloseSummary{
ChanPoint: pendingChan.FundingOutpoint,
ChainHash: pendingChan.ChainHash,
RemotePub: pendingChan.IdentityPub,
CloseType: channeldb.FundingCanceled,
Capacity: pendingChan.Capacity,
SettledBalance: localBalance,
RemoteCurrentRevocation: pendingChan.RemoteCurrentRevocation,
RemoteNextRevocation: pendingChan.RemoteNextRevocation,
LocalChanConfig: pendingChan.LocalChanCfg,
}

// Close the channel with us as the initiator because we are
// deciding to exit the funding flow due to an internal error.
if err := pendingChan.CloseChannel(
closeInfo, channeldb.ChanStatusLocalCloseInitiator,
); err != nil {
log.Errorf("Failed closing channel %v: %v",
pendingChan.FundingOutpoint, err)
return err
}
return nil
}

// advancePendingChannelState waits for a pending channel's funding tx to
// confirm, and marks it open in the database when that happens.
func (f *Manager) advancePendingChannelState(
Expand Down Expand Up @@ -2146,19 +2199,13 @@ func (f *Manager) handleFundingSigned(peer lnpeer.Peer,
labels.LabelTypeChannelOpen, nil,
)

err = f.cfg.PublishTransaction(fundingTx, label)
if err != nil {
log.Errorf("Unable to broadcast funding tx %x for "+
"ChannelPoint(%v): %v", fundingTxBuf.Bytes(),
completeChan.FundingOutpoint, err)
err = f.publisher.CheckAndPublish(fundingTx, label)
if f.maybeAbortOnTxPublishErr(err, resCtx, completeChan,
fundingTxBuf.Bytes()) {

// We failed to broadcast the funding transaction, but
// watch the channel regardless, in case the
// transaction made it to the network. We will retry
// broadcast at startup.
//
// TODO(halseth): retry more often? Handle with CPFP?
// Just delete from the DB?
log.Errorf("Aborted funding flow for ChannelPoint(%v)",
fundingPoint)
return
}
}

Expand Down Expand Up @@ -3689,3 +3736,125 @@ func (f *Manager) deleteChannelOpeningState(chanPoint *wire.OutPoint) error {
outpointBytes.Bytes(),
)
}
func (f *Manager) maybeAbortOnTxPublishErr(publishErr error,
resCtx *reservationWithCtx, ch *channeldb.OpenChannel,
fundingTxBytes []byte) bool {

if publishErr == nil {
return false
}

log.Errorf("Unable to broadcast funding tx %x for ChannelPoint(%v): %v",
fundingTxBytes, ch.FundingOutpoint, publishErr)

externallyFunded := resCtx.reservation.IsPsbt() ||
resCtx.reservation.IsCannedShim()

abort := func() bool {
if _, ok := publishErr.(*ErrSanity); ok {
return true
}

if externallyFunded {
return false
}

switch publishErr.(type) {
case *ErrStandardness, *ErrMempoolTestAccept:
return true

case *ErrPublish:
return false
}

return false
}()

if !abort {
// We failed to broadcast the funding transaction, but
// watch the channel regardless, in case the
// transaction made it to the network. We will retry
// broadcast at startup.
//
// TODO(halseth): retry more often? Handle with CPFP?
// Just delete from the DB?
return false
}

err := f.failPendingChannel(ch)
if err != nil {
log.Errorf("Unable to close channel for ChannelPoint(%v): %v",
fundingTxBytes, err)
}

err = resCtx.reservation.Cancel()
if err != nil {
log.Errorf("Unable to cancel reservation: %v", err)
}

return true
}

func (f *Manager) maybeAbortOnStartupTxPublishErr(err error,
ch *channeldb.OpenChannel, fundingTxBytes []byte) bool {

if err == nil {
return false
}

log.Errorf("Unable to broadcast funding tx %x for "+
"ChannelPoint(%v): %v", fundingTxBytes, ch.FundingOutpoint, err)

var abort bool

switch err.(type) {

// If the transaction failed the sanity check, it means that the tx
// does not pass the consensus rules, so we can immediately abort this
// funding flow.
case *ErrSanity:
abort = true

case *ErrMempoolTestAccept:
// On startup, we no longer have a lock held on the inputs to
// this transaction, so we might run into a double spend error
// here in which case we can fail the funding flow for this
// channel.
if err.(*ErrStandardness).err == lnwallet.ErrDoubleSpend {
abort = true
}

case *ErrPublish:
// See comment above for ErrMempoolTestAccept.
// TODO(elle): is this ok for neutrino backend?
if err.(*ErrStandardness).err == lnwallet.ErrDoubleSpend {
abort = true
}

default:
// For all other errors, we continue to monitor the channel
// since at startup we can't be sure if this transaction has
// been broadcast before (by us or externally).
// TODO(elle): should monitor for a while & occasionally attempt
// to rebroadcast. Since on startup, we dont have locks on the
// inputs for this tx, the inputs should eventually be double
// spent in which case we can safety abort.
abort = false
}

if abort {
err := f.failPendingChannel(ch)
if err != nil {
// If we get an error failing the pending channel,
// then rather dont abort so that we can retry next
// time.
log.Errorf("Unable to close channel for "+
"ChannelPoint(%v): %v", fundingTxBytes, err,
)

abort = false
}
}

return abort
}