Skip to content

Commit

Permalink
Rename raw tx publisher to final tx publisher (#2119)
Browse files Browse the repository at this point in the history
This makes the distinction between final and replaceable txs more obvious.

Do note that this doesn't mean that final txs cannot be replaced.
Transaction replacement logic may happen outside of the tx publisher
components, which will change the txid and replace the existing attempts.
  • Loading branch information
t-bast committed Jan 4, 2022
1 parent 1fd6344 commit fda3818
Show file tree
Hide file tree
Showing 15 changed files with 265 additions and 265 deletions.
20 changes: 10 additions & 10 deletions eclair-core/src/main/scala/fr/acinq/eclair/channel/Channel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import fr.acinq.eclair.channel.Helpers.{Closing, Funding, Syncing, getRelayFees}
import fr.acinq.eclair.channel.Monitoring.Metrics.ProcessMessage
import fr.acinq.eclair.channel.Monitoring.{Metrics, Tags}
import fr.acinq.eclair.channel.publish.TxPublisher
import fr.acinq.eclair.channel.publish.TxPublisher.{PublishRawTx, PublishReplaceableTx, PublishTx, SetChannelId}
import fr.acinq.eclair.channel.publish.TxPublisher.{PublishFinalTx, PublishReplaceableTx, PublishTx, SetChannelId}
import fr.acinq.eclair.crypto.ShaChain
import fr.acinq.eclair.crypto.keymanager.ChannelKeyManager
import fr.acinq.eclair.db.DbEventHandler.ChannelEvent.EventType
Expand Down Expand Up @@ -1486,7 +1486,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder, remo
}
val revokedCommitPublished1 = d.revokedCommitPublished.map { rev =>
val (rev1, penaltyTxs) = Closing.claimRevokedHtlcTxOutputs(keyManager, d.commitments, rev, tx, nodeParams.onChainFeeConf.feeEstimator)
penaltyTxs.foreach(claimTx => txPublisher ! PublishRawTx(claimTx, claimTx.fee, None))
penaltyTxs.foreach(claimTx => txPublisher ! PublishFinalTx(claimTx, claimTx.fee, None))
penaltyTxs.foreach(claimTx => blockchain ! WatchOutputSpent(self, tx.txid, claimTx.input.outPoint.index.toInt, hints = Set(claimTx.tx.txid)))
rev1
}
Expand All @@ -1501,7 +1501,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder, remo
// If the tx is one of our HTLC txs, we now publish a 3rd-stage claim-htlc-tx that claims its output.
val (localCommitPublished1, claimHtlcTx_opt) = Closing.claimLocalCommitHtlcTxOutput(localCommitPublished, keyManager, d.commitments, tx, nodeParams.onChainFeeConf.feeEstimator, nodeParams.onChainFeeConf.feeTargets)
claimHtlcTx_opt.foreach(claimHtlcTx => {
txPublisher ! PublishRawTx(claimHtlcTx, claimHtlcTx.fee, None)
txPublisher ! PublishFinalTx(claimHtlcTx, claimHtlcTx.fee, None)
blockchain ! WatchTxConfirmed(self, claimHtlcTx.tx.txid, nodeParams.minDepthBlocks)
})
Closing.updateLocalCommitPublished(localCommitPublished1, tx)
Expand Down Expand Up @@ -2173,7 +2173,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder, remo
// if we are funder, we never give up
// we cannot correctly set the fee, but it was correctly set when we initially published the transaction
log.info(s"republishing the funding tx...")
txPublisher ! PublishRawTx(fundingTx, fundingTx.txIn.head.outPoint, "funding", 0 sat, None)
txPublisher ! PublishFinalTx(fundingTx, fundingTx.txIn.head.outPoint, "funding", 0 sat, None)
// we also check if the funding tx has been double-spent
checkDoubleSpent(fundingTx)
context.system.scheduler.scheduleOnce(1 day, blockchain.toClassic, GetTxWithMeta(self, txid))
Expand Down Expand Up @@ -2398,7 +2398,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder, remo
private def doPublish(closingTx: ClosingTx, isFunder: Boolean): Unit = {
// the funder pays the fee
val fee = if (isFunder) closingTx.fee else 0.sat
txPublisher ! PublishRawTx(closingTx, fee, None)
txPublisher ! PublishFinalTx(closingTx, fee, None)
blockchain ! WatchTxConfirmed(self, closingTx.tx.txid, nodeParams.minDepthBlocks)
}

Expand Down Expand Up @@ -2464,12 +2464,12 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder, remo
val isFunder = commitments.localParams.isFunder
val publishQueue = commitments.commitmentFormat match {
case Transactions.DefaultCommitmentFormat =>
val redeemableHtlcTxs = htlcTxs.values.flatten.map(tx => PublishRawTx(tx, tx.fee, Some(commitTx.txid)))
List(PublishRawTx(commitTx, commitInput, "commit-tx", Closing.commitTxFee(commitments.commitInput, commitTx, isFunder), None)) ++ (claimMainDelayedOutputTx.map(tx => PublishRawTx(tx, tx.fee, None)) ++ redeemableHtlcTxs ++ claimHtlcDelayedTxs.map(tx => PublishRawTx(tx, tx.fee, None)))
val redeemableHtlcTxs = htlcTxs.values.flatten.map(tx => PublishFinalTx(tx, tx.fee, Some(commitTx.txid)))
List(PublishFinalTx(commitTx, commitInput, "commit-tx", Closing.commitTxFee(commitments.commitInput, commitTx, isFunder), None)) ++ (claimMainDelayedOutputTx.map(tx => PublishFinalTx(tx, tx.fee, None)) ++ redeemableHtlcTxs ++ claimHtlcDelayedTxs.map(tx => PublishFinalTx(tx, tx.fee, None)))
case _: Transactions.AnchorOutputsCommitmentFormat =>
val claimLocalAnchor = claimAnchorTxs.collect { case tx: Transactions.ClaimLocalAnchorOutputTx => PublishReplaceableTx(tx, commitments) }
val redeemableHtlcTxs = htlcTxs.values.collect { case Some(tx) => PublishReplaceableTx(tx, commitments) }
List(PublishRawTx(commitTx, commitInput, "commit-tx", Closing.commitTxFee(commitments.commitInput, commitTx, isFunder), None)) ++ claimLocalAnchor ++ claimMainDelayedOutputTx.map(tx => PublishRawTx(tx, tx.fee, None)) ++ redeemableHtlcTxs ++ claimHtlcDelayedTxs.map(tx => PublishRawTx(tx, tx.fee, None))
List(PublishFinalTx(commitTx, commitInput, "commit-tx", Closing.commitTxFee(commitments.commitInput, commitTx, isFunder), None)) ++ claimLocalAnchor ++ claimMainDelayedOutputTx.map(tx => PublishFinalTx(tx, tx.fee, None)) ++ redeemableHtlcTxs ++ claimHtlcDelayedTxs.map(tx => PublishFinalTx(tx, tx.fee, None))
}
publishIfNeeded(publishQueue, irrevocablySpent)

Expand Down Expand Up @@ -2538,7 +2538,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder, remo
private def doPublish(remoteCommitPublished: RemoteCommitPublished, commitments: Commitments): Unit = {
import remoteCommitPublished._

val publishQueue = claimMainOutputTx.map(tx => PublishRawTx(tx, tx.fee, None)).toSeq ++ claimHtlcTxs.values.flatten.map(tx => PublishReplaceableTx(tx, commitments))
val publishQueue = claimMainOutputTx.map(tx => PublishFinalTx(tx, tx.fee, None)).toSeq ++ claimHtlcTxs.values.flatten.map(tx => PublishReplaceableTx(tx, commitments))
publishIfNeeded(publishQueue, irrevocablySpent)

// we watch:
Expand Down Expand Up @@ -2579,7 +2579,7 @@ class Channel(val nodeParams: NodeParams, val wallet: OnChainChannelFunder, remo
private def doPublish(revokedCommitPublished: RevokedCommitPublished): Unit = {
import revokedCommitPublished._

val publishQueue = (claimMainOutputTx ++ mainPenaltyTx ++ htlcPenaltyTxs ++ claimHtlcDelayedPenaltyTxs).map(tx => PublishRawTx(tx, tx.fee, None))
val publishQueue = (claimMainOutputTx ++ mainPenaltyTx ++ htlcPenaltyTxs ++ claimHtlcDelayedPenaltyTxs).map(tx => PublishFinalTx(tx, tx.fee, None))
publishIfNeeded(publishQueue, irrevocablySpent)

// we watch:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,14 @@ import scala.util.{Failure, Random, Success}
*/

/**
* This actor publishes a raw transaction without modifying it.
* This actor publishes a fully signed transaction without modifying it.
* It waits for confirmation or failure before reporting back to the requesting actor.
*/
object RawTxPublisher {
object FinalTxPublisher {

// @formatter:off
sealed trait Command
case class Publish(replyTo: ActorRef[TxPublisher.PublishTxResult], cmd: TxPublisher.PublishRawTx) extends Command
case class Publish(replyTo: ActorRef[TxPublisher.PublishTxResult], cmd: TxPublisher.PublishFinalTx) extends Command
private case object TimeLocksOk extends Command
private case object CheckParentTx extends Command
private case object ParentTxOk extends Command
Expand All @@ -54,22 +54,22 @@ object RawTxPublisher {
Behaviors.setup { context =>
Behaviors.withTimers { timers =>
Behaviors.withMdc(loggingInfo.mdc()) {
new RawTxPublisher(nodeParams, bitcoinClient, watcher, context, timers, loggingInfo).start()
new FinalTxPublisher(nodeParams, bitcoinClient, watcher, context, timers, loggingInfo).start()
}
}
}
}

}

private class RawTxPublisher(nodeParams: NodeParams,
bitcoinClient: BitcoinCoreClient,
watcher: ActorRef[ZmqWatcher.Command],
context: ActorContext[RawTxPublisher.Command],
timers: TimerScheduler[RawTxPublisher.Command],
loggingInfo: TxPublishLogContext)(implicit ec: ExecutionContext = ExecutionContext.Implicits.global) {
private class FinalTxPublisher(nodeParams: NodeParams,
bitcoinClient: BitcoinCoreClient,
watcher: ActorRef[ZmqWatcher.Command],
context: ActorContext[FinalTxPublisher.Command],
timers: TimerScheduler[FinalTxPublisher.Command],
loggingInfo: TxPublishLogContext)(implicit ec: ExecutionContext = ExecutionContext.Implicits.global) {

import RawTxPublisher._
import FinalTxPublisher._

private val log = context.log

Expand All @@ -80,7 +80,7 @@ private class RawTxPublisher(nodeParams: NodeParams,
}
}

def checkTimeLocks(replyTo: ActorRef[TxPublisher.PublishTxResult], cmd: TxPublisher.PublishRawTx): Behavior[Command] = {
def checkTimeLocks(replyTo: ActorRef[TxPublisher.PublishTxResult], cmd: TxPublisher.PublishFinalTx): Behavior[Command] = {
val timeLocksChecker = context.spawn(TxTimeLocksMonitor(nodeParams, watcher, loggingInfo), "time-locks-monitor")
timeLocksChecker ! CheckTx(context.messageAdapter[TxTimeLocksMonitor.TimeLocksOk](_ => TimeLocksOk), cmd.tx, cmd.desc)
Behaviors.receiveMessagePartial {
Expand All @@ -91,7 +91,7 @@ private class RawTxPublisher(nodeParams: NodeParams,
}
}

def checkParentPublished(replyTo: ActorRef[TxPublisher.PublishTxResult], cmd: TxPublisher.PublishRawTx): Behavior[Command] = {
def checkParentPublished(replyTo: ActorRef[TxPublisher.PublishTxResult], cmd: TxPublisher.PublishFinalTx): Behavior[Command] = {
cmd.parentTx_opt match {
case Some(parentTxId) =>
context.self ! CheckParentTx
Expand All @@ -117,7 +117,7 @@ private class RawTxPublisher(nodeParams: NodeParams,
}
}

def publish(replyTo: ActorRef[TxPublisher.PublishTxResult], cmd: TxPublisher.PublishRawTx): Behavior[Command] = {
def publish(replyTo: ActorRef[TxPublisher.PublishTxResult], cmd: TxPublisher.PublishFinalTx): Behavior[Command] = {
val txMonitor = context.spawn(MempoolTxMonitor(nodeParams, bitcoinClient, loggingInfo), "mempool-tx-monitor")
txMonitor ! MempoolTxMonitor.Publish(context.messageAdapter[MempoolTxMonitor.TxResult](WrappedTxResult), cmd.tx, cmd.input, cmd.desc, cmd.fee)
Behaviors.receiveMessagePartial {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ object TxPublisher {
* NB: the parent tx should only be provided when it's being concurrently published, it's unnecessary when it is
* confirmed or when the tx has a relative delay.
*/
case class PublishRawTx(tx: Transaction, input: OutPoint, desc: String, fee: Satoshi, parentTx_opt: Option[ByteVector32]) extends PublishTx
object PublishRawTx {
def apply(txInfo: TransactionWithInputInfo, fee: Satoshi, parentTx_opt: Option[ByteVector32]): PublishRawTx = PublishRawTx(txInfo.tx, txInfo.input.outPoint, txInfo.desc, fee, parentTx_opt)
case class PublishFinalTx(tx: Transaction, input: OutPoint, desc: String, fee: Satoshi, parentTx_opt: Option[ByteVector32]) extends PublishTx
object PublishFinalTx {
def apply(txInfo: TransactionWithInputInfo, fee: Satoshi, parentTx_opt: Option[ByteVector32]): PublishFinalTx = PublishFinalTx(txInfo.tx, txInfo.input.outPoint, txInfo.desc, fee, parentTx_opt)
}
/** Publish an unsigned transaction that can be RBF-ed. */
case class PublishReplaceableTx(txInfo: ReplaceableTransactionWithInputInfo, commitments: Commitments) extends PublishTx {
Expand Down Expand Up @@ -132,15 +132,15 @@ object TxPublisher {

trait ChildFactory {
// @formatter:off
def spawnRawTxPublisher(context: ActorContext[TxPublisher.Command], loggingInfo: TxPublishLogContext): ActorRef[RawTxPublisher.Command]
def spawnFinalTxPublisher(context: ActorContext[TxPublisher.Command], loggingInfo: TxPublishLogContext): ActorRef[FinalTxPublisher.Command]
def spawnReplaceableTxPublisher(context: ActorContext[TxPublisher.Command], loggingInfo: TxPublishLogContext): ActorRef[ReplaceableTxPublisher.Command]
// @formatter:on
}

case class SimpleChildFactory(nodeParams: NodeParams, bitcoinClient: BitcoinCoreClient, watcher: ActorRef[ZmqWatcher.Command]) extends ChildFactory {
// @formatter:off
override def spawnRawTxPublisher(context: ActorContext[TxPublisher.Command], loggingInfo: TxPublishLogContext): ActorRef[RawTxPublisher.Command] = {
context.spawn(RawTxPublisher(nodeParams, bitcoinClient, watcher, loggingInfo), s"raw-tx-${loggingInfo.id}")
override def spawnFinalTxPublisher(context: ActorContext[TxPublisher.Command], loggingInfo: TxPublishLogContext): ActorRef[FinalTxPublisher.Command] = {
context.spawn(FinalTxPublisher(nodeParams, bitcoinClient, watcher, loggingInfo), s"final-tx-${loggingInfo.id}")
}
override def spawnReplaceableTxPublisher(context: ActorContext[Command], loggingInfo: TxPublishLogContext): ActorRef[ReplaceableTxPublisher.Command] = {
context.spawn(ReplaceableTxPublisher(nodeParams, bitcoinClient, watcher, loggingInfo), s"replaceable-tx-${loggingInfo.id}")
Expand Down Expand Up @@ -172,16 +172,16 @@ private class TxPublisher(nodeParams: NodeParams, factory: TxPublisher.ChildFact
def id: UUID
def cmd: PublishTx
}
private case class RawAttempt(id: UUID, cmd: PublishRawTx, actor: ActorRef[RawTxPublisher.Command]) extends PublishAttempt
private case class FinalAttempt(id: UUID, cmd: PublishFinalTx, actor: ActorRef[FinalTxPublisher.Command]) extends PublishAttempt
private case class ReplaceableAttempt(id: UUID, cmd: PublishReplaceableTx, feerate: FeeratePerKw, actor: ActorRef[ReplaceableTxPublisher.Command]) extends PublishAttempt
// @formatter:on

private def run(pending: Map[OutPoint, Seq[PublishAttempt]], retryNextBlock: Seq[PublishTx], channelInfo: ChannelLogContext): Behavior[Command] = {
Behaviors.receiveMessage {
case cmd: PublishRawTx =>
case cmd: PublishFinalTx =>
val attempts = pending.getOrElse(cmd.input, Seq.empty)
val alreadyPublished = attempts.exists {
case a: RawAttempt => a.cmd.tx.txid == cmd.tx.txid
case a: FinalAttempt => a.cmd.tx.txid == cmd.tx.txid
case _ => false
}
if (alreadyPublished) {
Expand All @@ -190,9 +190,9 @@ private class TxPublisher(nodeParams: NodeParams, factory: TxPublisher.ChildFact
} else {
val publishId = UUID.randomUUID()
log.info("publishing {} txid={} spending {}:{} with id={} ({} other attempts)", cmd.desc, cmd.tx.txid, cmd.input.txid, cmd.input.index, publishId, attempts.length)
val actor = factory.spawnRawTxPublisher(context, TxPublishLogContext(publishId, channelInfo.remoteNodeId, channelInfo.channelId_opt))
actor ! RawTxPublisher.Publish(context.self, cmd)
run(pending + (cmd.input -> attempts.appended(RawAttempt(publishId, cmd, actor))), retryNextBlock, channelInfo)
val actor = factory.spawnFinalTxPublisher(context, TxPublishLogContext(publishId, channelInfo.remoteNodeId, channelInfo.channelId_opt))
actor ! FinalTxPublisher.Publish(context.self, cmd)
run(pending + (cmd.input -> attempts.appended(FinalAttempt(publishId, cmd, actor))), retryNextBlock, channelInfo)
}

case cmd: PublishReplaceableTx =>
Expand Down Expand Up @@ -268,7 +268,7 @@ private class TxPublisher(nodeParams: NodeParams, factory: TxPublisher.ChildFact
private def stopAttempts(attempts: Seq[PublishAttempt]): Unit = attempts.foreach(stopAttempt)

private def stopAttempt(attempt: PublishAttempt): Unit = attempt match {
case RawAttempt(_, _, actor) => actor ! RawTxPublisher.Stop
case FinalAttempt(_, _, actor) => actor ! FinalTxPublisher.Stop
case ReplaceableAttempt(_, _, _, actor) => actor ! ReplaceableTxPublisher.Stop
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import fr.acinq.eclair.balance.CheckBalance.{ClosingBalance, MainAndHtlcBalance,
import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.{apply => _, _}
import fr.acinq.eclair.blockchain.bitcoind.rpc.BitcoinCoreClient
import fr.acinq.eclair.channel.Helpers.Closing.{CurrentRemoteClose, LocalClose}
import fr.acinq.eclair.channel.publish.TxPublisher.{PublishRawTx, PublishReplaceableTx}
import fr.acinq.eclair.channel.publish.TxPublisher.{PublishFinalTx, PublishReplaceableTx}
import fr.acinq.eclair.channel.states.ChannelStateTestsBase
import fr.acinq.eclair.channel.{CLOSING, CMD_SIGN, DATA_CLOSING, DATA_NORMAL}
import fr.acinq.eclair.db.jdbc.JdbcUtils.ExtendedResultSet._
Expand Down Expand Up @@ -91,7 +91,7 @@ class CheckBalanceSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with
assert(bobCommitTx.txOut.size == 6) // two main outputs and 4 pending htlcs
alice ! WatchFundingSpentTriggered(bobCommitTx)
// in response to that, alice publishes her claim txs
alice2blockchain.expectMsgType[PublishRawTx] // claim-main
alice2blockchain.expectMsgType[PublishFinalTx] // claim-main
val claimHtlcTxs = (1 to 3).map(_ => alice2blockchain.expectMsgType[PublishReplaceableTx].txInfo.tx)

val commitments = alice.stateData.asInstanceOf[DATA_CLOSING].commitments
Expand Down Expand Up @@ -139,7 +139,7 @@ class CheckBalanceSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with
alice ! WatchFundingSpentTriggered(bobCommitTx)

// in response to that, alice publishes her claim txs
alice2blockchain.expectMsgType[PublishRawTx] // claim-main
alice2blockchain.expectMsgType[PublishFinalTx] // claim-main
val claimHtlcTxs = (1 to 2).map(_ => alice2blockchain.expectMsgType[PublishReplaceableTx].txInfo.tx)

val commitments = alice.stateData.asInstanceOf[DATA_CLOSING].commitments
Expand Down Expand Up @@ -179,7 +179,7 @@ class CheckBalanceSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with
// alice publishes her commit tx
val aliceCommitTx = alice.stateData.asInstanceOf[DATA_NORMAL].commitments.localCommit.commitTxAndRemoteSig.commitTx.tx
alice ! Error(ByteVector32.Zeroes, "oops")
assert(alice2blockchain.expectMsgType[PublishRawTx].tx.txid === aliceCommitTx.txid)
assert(alice2blockchain.expectMsgType[PublishFinalTx].tx.txid === aliceCommitTx.txid)
assert(aliceCommitTx.txOut.size == 6) // two main outputs and 4 pending htlcs
awaitCond(alice.stateName == CLOSING)
assert(alice.stateData.asInstanceOf[DATA_CLOSING].localCommitPublished.isDefined)
Expand All @@ -193,10 +193,10 @@ class CheckBalanceSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with
htlcsUnpublished = htlca1.amountMsat.truncateToSatoshi + htlca3.amountMsat.truncateToSatoshi + htlcb1.amountMsat.truncateToSatoshi
))

alice2blockchain.expectMsgType[PublishRawTx] // claim-main
val htlcTx1 = alice2blockchain.expectMsgType[PublishRawTx].tx
val htlcTx2 = alice2blockchain.expectMsgType[PublishRawTx].tx
val htlcTx3 = alice2blockchain.expectMsgType[PublishRawTx].tx
alice2blockchain.expectMsgType[PublishFinalTx] // claim-main
val htlcTx1 = alice2blockchain.expectMsgType[PublishFinalTx].tx
val htlcTx2 = alice2blockchain.expectMsgType[PublishFinalTx].tx
val htlcTx3 = alice2blockchain.expectMsgType[PublishFinalTx].tx
alice2blockchain.expectMsgType[WatchTxConfirmed] // commit tx
alice2blockchain.expectMsgType[WatchTxConfirmed] // main-delayed
alice2blockchain.expectMsgType[WatchOutputSpent] // htlc 1
Expand All @@ -209,7 +209,7 @@ class CheckBalanceSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with
alice ! WatchOutputSpentTriggered(htlcTimeoutTx)
assert(alice2blockchain.expectMsgType[WatchTxConfirmed].txId === htlcTimeoutTx.txid)
alice ! WatchTxConfirmedTriggered(2701, 3, htlcTimeoutTx)
val claimHtlcDelayedTx = alice2blockchain.expectMsgType[PublishRawTx].tx
val claimHtlcDelayedTx = alice2blockchain.expectMsgType[PublishFinalTx].tx
assert(alice2blockchain.expectMsgType[WatchTxConfirmed].txId === claimHtlcDelayedTx.txid)
claimHtlcDelayedTx
}
Expand Down

0 comments on commit fda3818

Please sign in to comment.