Skip to content

Commit

Permalink
allows wallet time to process block filters
Browse files Browse the repository at this point in the history
When a wallet is able to provide block notifications, allow wallet notifications
to take precedence. Queue polled new tips for 10 seconds, canceling
their broadcast and sending the notification earlier if the wallet
notification comes during this window. If the wallet note doesn't
come by 10 seconds, send the queued notification and log a warning.

SPV tip updates are provided by the FilteredBlockConnected
notification, which can come after neutrino stores the block header(s).
Care is taken not to spam the caller with notifications, since they
are not limited by the polling frequency. The same mechanism for
limiting spam is used to solve the non-existent wallet error messages
for *Core though, so you get two for the price of one with that solution.
  • Loading branch information
buck54321 committed Oct 23, 2021
1 parent 10f299c commit f7e98c4
Show file tree
Hide file tree
Showing 6 changed files with 177 additions and 71 deletions.
159 changes: 112 additions & 47 deletions client/asset/btc/btc.go
Expand Up @@ -77,6 +77,7 @@ const (
var (
// blockTicker is the delay between calls to check for new blocks.
blockTicker = time.Second
walletBlockAllowance = time.Second * 10
conventionalConversionFactor = float64(dexbtc.UnitInfo.Conventional.ConversionFactor)
rpcOpts = []*asset.ConfigOption{
{
Expand Down Expand Up @@ -514,7 +515,7 @@ type ExchangeWallet struct {

type block struct {
height int64
hash string
hash chainhash.Hash
}

// findRedemptionReq represents a request to find a contract's redemption,
Expand Down Expand Up @@ -747,7 +748,7 @@ func (btc *ExchangeWallet) Connect(ctx context.Context) (*sync.WaitGroup, error)
return nil, err
}
// Initialize the best block.
h, err := btc.node.getBestBlockHash()
bestBlockHash, err := btc.node.getBestBlockHash()
if err != nil {
return nil, fmt.Errorf("error initializing best block for %s: %w", btc.symbol, err)
}
Expand All @@ -758,7 +759,7 @@ func (btc *ExchangeWallet) Connect(ctx context.Context) (*sync.WaitGroup, error)
}

btc.tipMtx.Lock()
btc.currentTip, err = btc.blockFromHash(h.String())
btc.currentTip, err = btc.blockFromHash(bestBlockHash)
btc.tipMtx.Unlock()
if err != nil {
return nil, fmt.Errorf("error parsing best block for %s: %w", btc.symbol, err)
Expand Down Expand Up @@ -1923,7 +1924,7 @@ func (btc *ExchangeWallet) LocktimeExpired(contract dex.Bytes) (bool, time.Time,
if err != nil {
return false, time.Time{}, fmt.Errorf("get best block hash error: %w", err)
}
bestBlockHeader, err := btc.node.getBlockHeader(bestBlockHash.String())
bestBlockHeader, err := btc.node.getBlockHeader(bestBlockHash)
if err != nil {
return false, time.Time{}, fmt.Errorf("get best block header error: %w", err)
}
Expand Down Expand Up @@ -2388,10 +2389,88 @@ func (btc *ExchangeWallet) RegFeeConfirmations(_ context.Context, id dex.Bytes)
func (btc *ExchangeWallet) run(ctx context.Context) {
ticker := time.NewTicker(blockTicker)
defer ticker.Stop()

var walletBlock <-chan *block
if notifier, isNotifier := btc.node.(tipNotifier); isNotifier {
walletBlock = notifier.tipFeed()
}

// A polledBlock is a block found during polling, but whose broadcast has
// been queued in anticipation of a wallet notification.
type polledBlock struct {
*block
queue *time.Timer
}

// queuedBlock is the currently queued, polling-discovered block that will
// be broadcast after a timeout if the wallet doesn't send the matching
// notification.
var queuedBlock *polledBlock

// dequeuedBlock is where the queuedBlocks that time out will be sent for
// broadcast.
var dequeuedBlock chan *block

for {
select {

// Poll for the block. If the wallet offers tip reports, delay reporting
// the tip to give the wallet a moment to request and scan block data.
case <-ticker.C:
btc.checkForNewBlocks(ctx)
newTipHash, err := btc.node.getBestBlockHash()
if err != nil {
go btc.tipChange(fmt.Errorf("failed to get best block hash from %s node", btc.symbol))
return
}

// This method is called frequently. Don't hold write lock
// unless tip has changed.
btc.tipMtx.RLock()
sameTip := btc.currentTip.hash == *newTipHash
btc.tipMtx.RUnlock()
if sameTip {
continue
}

newTip, err := btc.blockFromHash(newTipHash)
if err != nil {
go btc.tipChange(fmt.Errorf("error setting new tip: %w", err))
}

// If the wallet is not offering tip reports, send this one right
// away.
if walletBlock == nil {
btc.reportNewTip(ctx, newTip)
} else {
// Queue it for reporting, but don't send it right away. Give the
// wallet a chance to provide their block update. SPV wallet may
// need more time after storing the block header to fetch and
// scan filters and issue the FilteredBlockConnected report.
if queuedBlock != nil {
queuedBlock.queue.Stop()
}
queuedBlock = &polledBlock{
block: newTip,
queue: time.AfterFunc(walletBlockAllowance, func() {
dequeuedBlock <- newTip
}),
}
}

// Tip reports from the wallet are always sent, and we'll clear any
// queued polled block that would appear to be superceded by this one.
case walletTip := <-walletBlock:
if queuedBlock != nil && walletTip.height >= queuedBlock.height {
queuedBlock.queue.Stop()
queuedBlock = nil
}
btc.reportNewTip(ctx, walletTip)

case dqBlock := <-dequeuedBlock:
btc.log.Warnf("Reporting a block found in polling that the wallet apparently "+
"never reported: %d %s. This may indicate a problem with the wallet.", dqBlock.height, dqBlock.hash)
btc.reportNewTip(ctx, dqBlock)

case <-ctx.Done():
return
}
Expand All @@ -2417,10 +2496,18 @@ func (btc *ExchangeWallet) prepareRedemptionRequestsForBlockCheck() []*findRedem
return reqs
}

// checkForNewBlocks checks for new blocks. When a tip change is detected, the
// tipChange callback function is invoked and a goroutine is started to check
// if any contracts in the findRedemptionQueue are redeemed in the new blocks.
func (btc *ExchangeWallet) checkForNewBlocks(ctx context.Context) {
// reportNewTip sets the currentTip. The tipChange callback function is invoked
// and a goroutine is started to check if any contracts in the
// findRedemptionQueue are redeemed in the new blocks.
func (btc *ExchangeWallet) reportNewTip(ctx context.Context, newTip *block) {
btc.tipMtx.Lock()
defer btc.tipMtx.Unlock()

prevTip := btc.currentTip
btc.currentTip = newTip
btc.log.Debugf("tip change: %d (%s) => %d (%s)", prevTip.height, prevTip.hash, newTip.height, newTip.hash)
go btc.tipChange(nil)

reqs := btc.prepareRedemptionRequestsForBlockCheck()
// Redemption search would be compromised if the starting point cannot
// be determined, as searching just the new tip might result in blocks
Expand All @@ -2432,41 +2519,12 @@ func (btc *ExchangeWallet) checkForNewBlocks(ctx context.Context) {
}
}

newTipHash, err := btc.node.getBestBlockHash()
if err != nil {
go btc.tipChange(fmt.Errorf("failed to get best block hash from %s node", btc.symbol))
return
}

// This method is called frequently. Don't hold write lock
// unless tip has changed.
btc.tipMtx.RLock()
sameTip := btc.currentTip.hash == newTipHash.String()
btc.tipMtx.RUnlock()
if sameTip {
return
}

btc.tipMtx.Lock()
defer btc.tipMtx.Unlock()

newTip, err := btc.blockFromHash(newTipHash.String())
if err != nil {
go btc.tipChange(fmt.Errorf("error setting new tip: %w", err))
return
}

prevTip := btc.currentTip
btc.currentTip = newTip
btc.log.Debugf("tip change: %d (%s) => %d (%s)", prevTip.height, prevTip.hash, newTip.height, newTip.hash)
go btc.tipChange(nil)

var startPoint *block
// Check if the previous tip is still part of the mainchain (prevTip confs >= 0).
// Redemption search would typically resume from prevTipHeight + 1 unless the
// previous tip was re-orged out of the mainchain, in which case redemption
// search will resume from the mainchain ancestor of the previous tip.
prevTipHeader, err := btc.node.getBlockHeader(prevTip.hash)
prevTipHeader, err := btc.node.getBlockHeader(&prevTip.hash)
switch {
case err != nil:
// Redemption search cannot continue reliably without knowing if there
Expand All @@ -2479,7 +2537,11 @@ func (btc *ExchangeWallet) checkForNewBlocks(ctx context.Context) {
// The previous tip is no longer part of the mainchain. Crawl blocks
// backwards until finding a mainchain block. Start with the block
// that is the immediate ancestor to the previous tip.
ancestorBlockHash := prevTipHeader.PreviousBlockHash
ancestorBlockHash, err := chainhash.NewHashFromStr(prevTipHeader.PreviousBlockHash)
if err != nil {
notifyFatalFindRedemptionError("hash decode error for block %s: %w", prevTipHeader.PreviousBlockHash, err)
return
}
for {
aBlock, err := btc.node.getBlockHeader(ancestorBlockHash)
if err != nil {
Expand All @@ -2488,7 +2550,7 @@ func (btc *ExchangeWallet) checkForNewBlocks(ctx context.Context) {
}
if aBlock.Confirmations > -1 {
// Found the mainchain ancestor of previous tip.
startPoint = &block{height: aBlock.Height, hash: aBlock.Hash}
startPoint = &block{height: aBlock.Height, hash: *ancestorBlockHash}
btc.log.Debugf("reorg detected from height %d to %d", aBlock.Height, newTip.height)
break
}
Expand All @@ -2498,7 +2560,11 @@ func (btc *ExchangeWallet) checkForNewBlocks(ctx context.Context) {
notifyFatalFindRedemptionError("no mainchain ancestor for orphaned block %s", prevTipHeader.Hash)
return
}
ancestorBlockHash = aBlock.PreviousBlockHash
ancestorBlockHash, err = chainhash.NewHashFromStr(aBlock.PreviousBlockHash)
if err != nil {
notifyFatalFindRedemptionError("hash decode error for block %s: %w", prevTipHeader.PreviousBlockHash, err)
return
}
}

case newTip.height-prevTipHeader.Height > 1:
Expand All @@ -2509,16 +2575,15 @@ func (btc *ExchangeWallet) checkForNewBlocks(ctx context.Context) {
notifyFatalFindRedemptionError("getBlockHash error for height %d: %w", afterPrivTip, err)
return
}
startPoint = &block{hash: hashAfterPrevTip.String(), height: afterPrivTip}
startPoint = &block{hash: *hashAfterPrevTip, height: afterPrivTip}

default:
// Just 1 new block since last tip report, search the lone block.
startPoint = newTip
}

if len(reqs) > 0 {
startHash, _ := chainhash.NewHashFromStr(startPoint.hash)
go btc.tryRedemptionRequests(ctx, startHash, reqs)
go btc.tryRedemptionRequests(ctx, &startPoint.hash, reqs)
}
}

Expand Down Expand Up @@ -2591,12 +2656,12 @@ out:
return blockHeight, nil
}

func (btc *ExchangeWallet) blockFromHash(hash string) (*block, error) {
func (btc *ExchangeWallet) blockFromHash(hash *chainhash.Hash) (*block, error) {
blk, err := btc.node.getBlockHeader(hash)
if err != nil {
return nil, fmt.Errorf("getBlockHeader error for hash %s: %w", hash, err)
}
return &block{hash: hash, height: blk.Height}, nil
return &block{hash: *hash, height: blk.Height}, nil
}

// convertCoin converts the asset.Coin to an output.
Expand Down
11 changes: 8 additions & 3 deletions client/asset/btc/btc_test.go
Expand Up @@ -617,8 +617,10 @@ func tNewWallet(segwit bool, walletType string) (*ExchangeWallet, *testData, fun
return nil, nil, nil, err
}
wallet.tipMtx.Lock()
wallet.currentTip = &block{height: data.GetBestBlockHeight(),
hash: bestHash.String()}
wallet.currentTip = &block{
height: data.GetBestBlockHeight(),
hash: *bestHash,
}
wallet.tipMtx.Unlock()
go wallet.run(walletCtx)

Expand Down Expand Up @@ -2058,7 +2060,10 @@ func testFindRedemption(t *testing.T, segwit bool, walletType string) {
node.getCFilterScripts[*redeemBlockHash] = [][]byte{pkScript}

// Update currentTip from "RPC". Normally run() would do this.
wallet.checkForNewBlocks(tCtx)
wallet.reportNewTip(tCtx, &block{
hash: *redeemBlockHash,
height: contractHeight + 2,
})

// Check find redemption result.
_, checkSecret, err := wallet.FindRedemption(tCtx, coinID)
Expand Down
10 changes: 5 additions & 5 deletions client/asset/btc/rpcclient.go
Expand Up @@ -221,7 +221,7 @@ func (wc *rpcClient) getBestBlockHeight() (int32, error) {
if err != nil {
return -1, err
}
header, err := wc.getBlockHeader(tipHash.String())
header, err := wc.getBlockHeader(tipHash)
if err != nil {
return -1, err
}
Expand Down Expand Up @@ -503,19 +503,19 @@ func (wc *rpcClient) swapConfirmations(txHash *chainhash.Hash, vout uint32, _ []
}

// getBlockHeader gets the block header for the specified block hash.
func (wc *rpcClient) getBlockHeader(blockHash string) (*blockHeader, error) {
func (wc *rpcClient) getBlockHeader(blockHash *chainhash.Hash) (*blockHeader, error) {
blkHeader := new(blockHeader)
err := wc.call(methodGetBlockHeader,
anylist{blockHash, true}, blkHeader)
anylist{blockHash.String(), true}, blkHeader)
if err != nil {
return nil, err
}
return blkHeader, nil
}

// getBlockHeight gets the mainchain height for the specified block.
func (wc *rpcClient) getBlockHeight(h *chainhash.Hash) (int32, error) {
hdr, err := wc.getBlockHeader(h.String())
func (wc *rpcClient) getBlockHeight(blockHash *chainhash.Hash) (int32, error) {
hdr, err := wc.getBlockHeader(blockHash)
if err != nil {
return -1, err
}
Expand Down

0 comments on commit f7e98c4

Please sign in to comment.