Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

btc spv: give wallet precedence for block notifications #1250

Merged
merged 8 commits into from Oct 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
172 changes: 121 additions & 51 deletions client/asset/btc/btc.go
Expand Up @@ -77,6 +77,7 @@ const (
var (
// blockTicker is the delay between calls to check for new blocks.
blockTicker = time.Second
walletBlockAllowance = time.Second * 10
chappjc marked this conversation as resolved.
Show resolved Hide resolved
conventionalConversionFactor = float64(dexbtc.UnitInfo.Conventional.ConversionFactor)
rpcOpts = []*asset.ConfigOption{
{
Expand Down Expand Up @@ -521,7 +522,7 @@ type ExchangeWallet struct {

type block struct {
height int64
hash string
hash chainhash.Hash
chappjc marked this conversation as resolved.
Show resolved Hide resolved
}

// findRedemptionReq represents a request to find a contract's redemption,
Expand Down Expand Up @@ -754,7 +755,7 @@ func (btc *ExchangeWallet) Connect(ctx context.Context) (*sync.WaitGroup, error)
return nil, err
}
// Initialize the best block.
h, err := btc.node.getBestBlockHash()
bestBlockHash, err := btc.node.getBestBlockHash()
if err != nil {
return nil, fmt.Errorf("error initializing best block for %s: %w", btc.symbol, err)
}
Expand All @@ -765,7 +766,7 @@ func (btc *ExchangeWallet) Connect(ctx context.Context) (*sync.WaitGroup, error)
}

btc.tipMtx.Lock()
btc.currentTip, err = btc.blockFromHash(h.String())
btc.currentTip, err = btc.blockFromHash(bestBlockHash)
btc.tipMtx.Unlock()
if err != nil {
return nil, fmt.Errorf("error parsing best block for %s: %w", btc.symbol, err)
Expand All @@ -774,7 +775,7 @@ func (btc *ExchangeWallet) Connect(ctx context.Context) (*sync.WaitGroup, error)
wg.Add(1)
go func() {
defer wg.Done()
btc.run(ctx)
btc.watchBlocks(ctx)
btc.shutdown()
}()
return &wg, nil
Expand Down Expand Up @@ -1930,7 +1931,7 @@ func (btc *ExchangeWallet) LocktimeExpired(contract dex.Bytes) (bool, time.Time,
if err != nil {
return false, time.Time{}, fmt.Errorf("get best block hash error: %w", err)
}
bestBlockHeader, err := btc.node.getBlockHeader(bestBlockHash.String())
bestBlockHeader, err := btc.node.getBlockHeader(bestBlockHash)
if err != nil {
return false, time.Time{}, fmt.Errorf("get best block header error: %w", err)
}
Expand Down Expand Up @@ -2390,15 +2391,98 @@ func (btc *ExchangeWallet) RegFeeConfirmations(_ context.Context, id dex.Bytes)
return uint32(tx.Confirmations), nil
}

// run pings for new blocks and runs the tipChange callback function when the
// block changes.
func (btc *ExchangeWallet) run(ctx context.Context) {
// watchBlocks pings for new blocks and runs the tipChange callback function
// when the block changes.
func (btc *ExchangeWallet) watchBlocks(ctx context.Context) {
ticker := time.NewTicker(blockTicker)
defer ticker.Stop()

var walletBlock <-chan *block
if notifier, isNotifier := btc.node.(tipNotifier); isNotifier {
walletBlock = notifier.tipFeed()
}

// A polledBlock is a block found during polling, but whose broadcast has
// been queued in anticipation of a wallet notification.
type polledBlock struct {
*block
queue *time.Timer
}

// queuedBlock is the currently queued, polling-discovered block that will
// be broadcast after a timeout if the wallet doesn't send the matching
// notification.
var queuedBlock *polledBlock

for {
select {

// Poll for the block. If the wallet offers tip reports, delay reporting
// the tip to give the wallet a moment to request and scan block data.
case <-ticker.C:
btc.checkForNewBlocks(ctx)
newTipHash, err := btc.node.getBestBlockHash()
if err != nil {
go btc.tipChange(fmt.Errorf("failed to get best block hash from %s node", btc.symbol))
return
}

if queuedBlock != nil && *newTipHash == queuedBlock.block.hash {
continue
}

btc.tipMtx.RLock()
sameTip := btc.currentTip.hash == *newTipHash
btc.tipMtx.RUnlock()
if sameTip {
continue
}

newTip, err := btc.blockFromHash(newTipHash)
if err != nil {
go btc.tipChange(fmt.Errorf("error setting new tip: %w", err))
}

// If the wallet is not offering tip reports, send this one right
// away.
if walletBlock == nil {
btc.reportNewTip(ctx, newTip)
} else {
Comment on lines +2447 to +2449
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like you could continue rather than using the else indention. No difference really though.

// Queue it for reporting, but don't send it right away. Give the
// wallet a chance to provide their block update. SPV wallet may
// need more time after storing the block header to fetch and
// scan filters and issue the FilteredBlockConnected report.
if queuedBlock != nil {
queuedBlock.queue.Stop()
}
blockAllowance := walletBlockAllowance
syncStatus, err := btc.node.syncStatus()
if err != nil {
btc.log.Errorf("Error retreiving sync status before queuing polled block: %v", err)
} else if syncStatus.Syncing {
blockAllowance *= 10
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even with the a 10 second allowance, I was seeing warnings during testnet syncing.

}
queuedBlock = &polledBlock{
block: newTip,
queue: time.AfterFunc(blockAllowance, func() {
btc.log.Warnf("Reporting a block found in polling that the wallet apparently "+
"never reported: %d %s. If you see this message repeatedly, it may indicate "+
"an issue with the wallet.", newTip.height, newTip.hash)
btc.reportNewTip(ctx, newTip)
}),
}
}

// Tip reports from the wallet are always sent, and we'll clear any
// queued polled block that would appear to be superceded by this one.
case walletTip := <-walletBlock:
if queuedBlock != nil && walletTip.height >= queuedBlock.height {
if !queuedBlock.queue.Stop() && walletTip.hash == queuedBlock.hash {
continue
}
queuedBlock = nil
}
btc.reportNewTip(ctx, walletTip)

case <-ctx.Done():
return
}
Expand All @@ -2424,10 +2508,18 @@ func (btc *ExchangeWallet) prepareRedemptionRequestsForBlockCheck() []*findRedem
return reqs
}

// checkForNewBlocks checks for new blocks. When a tip change is detected, the
// tipChange callback function is invoked and a goroutine is started to check
// if any contracts in the findRedemptionQueue are redeemed in the new blocks.
func (btc *ExchangeWallet) checkForNewBlocks(ctx context.Context) {
// reportNewTip sets the currentTip. The tipChange callback function is invoked
// and a goroutine is started to check if any contracts in the
// findRedemptionQueue are redeemed in the new blocks.
func (btc *ExchangeWallet) reportNewTip(ctx context.Context, newTip *block) {
btc.tipMtx.Lock()
defer btc.tipMtx.Unlock()

prevTip := btc.currentTip
btc.currentTip = newTip
Comment on lines +2518 to +2519
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The sameTip check in run/watchBlocks previously was good enough, but given the logic race with Stopping the queuedBlock's timer, it's possible although unlikely that the same tip could be reported twice. So maybe we either check here like:

prevTip := btc.currentTip
if prevTip.hash == newTip.hash {
	return // already reported
}
btc.currentTip = newTip

or in watchBlocks like

	case walletTip := <-walletBlock:
		if queuedBlock != nil && walletTip.height >= queuedBlock.height {
			if !queuedBlock.queue.Stop() && walletTip.hash == queuedBlock.hash {
				continue
			}
			queuedBlock = nil
		}

I think it's clearer and just as good here in reportTip.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the reportTip solution in the latest commit, but I think I'm going to go with your second one instead. If a wallet tip comes through after the queued block is sent, we'd still want to notify the user to trigger a balance check.

btc.log.Debugf("tip change: %d (%s) => %d (%s)", prevTip.height, prevTip.hash, newTip.height, newTip.hash)
go btc.tipChange(nil)

reqs := btc.prepareRedemptionRequestsForBlockCheck()
// Redemption search would be compromised if the starting point cannot
// be determined, as searching just the new tip might result in blocks
Expand All @@ -2439,41 +2531,12 @@ func (btc *ExchangeWallet) checkForNewBlocks(ctx context.Context) {
}
}

newTipHash, err := btc.node.getBestBlockHash()
if err != nil {
go btc.tipChange(fmt.Errorf("failed to get best block hash from %s node", btc.symbol))
return
}

// This method is called frequently. Don't hold write lock
// unless tip has changed.
btc.tipMtx.RLock()
sameTip := btc.currentTip.hash == newTipHash.String()
btc.tipMtx.RUnlock()
if sameTip {
return
}

btc.tipMtx.Lock()
defer btc.tipMtx.Unlock()

newTip, err := btc.blockFromHash(newTipHash.String())
if err != nil {
go btc.tipChange(fmt.Errorf("error setting new tip: %w", err))
return
}

prevTip := btc.currentTip
btc.currentTip = newTip
btc.log.Debugf("tip change: %d (%s) => %d (%s)", prevTip.height, prevTip.hash, newTip.height, newTip.hash)
go btc.tipChange(nil)

var startPoint *block
// Check if the previous tip is still part of the mainchain (prevTip confs >= 0).
// Redemption search would typically resume from prevTipHeight + 1 unless the
// previous tip was re-orged out of the mainchain, in which case redemption
// search will resume from the mainchain ancestor of the previous tip.
prevTipHeader, err := btc.node.getBlockHeader(prevTip.hash)
prevTipHeader, err := btc.node.getBlockHeader(&prevTip.hash)
switch {
case err != nil:
// Redemption search cannot continue reliably without knowing if there
Expand All @@ -2486,7 +2549,11 @@ func (btc *ExchangeWallet) checkForNewBlocks(ctx context.Context) {
// The previous tip is no longer part of the mainchain. Crawl blocks
// backwards until finding a mainchain block. Start with the block
// that is the immediate ancestor to the previous tip.
ancestorBlockHash := prevTipHeader.PreviousBlockHash
ancestorBlockHash, err := chainhash.NewHashFromStr(prevTipHeader.PreviousBlockHash)
if err != nil {
notifyFatalFindRedemptionError("hash decode error for block %s: %w", prevTipHeader.PreviousBlockHash, err)
return
}
for {
aBlock, err := btc.node.getBlockHeader(ancestorBlockHash)
if err != nil {
Expand All @@ -2495,7 +2562,7 @@ func (btc *ExchangeWallet) checkForNewBlocks(ctx context.Context) {
}
if aBlock.Confirmations > -1 {
// Found the mainchain ancestor of previous tip.
startPoint = &block{height: aBlock.Height, hash: aBlock.Hash}
startPoint = &block{height: aBlock.Height, hash: *ancestorBlockHash}
btc.log.Debugf("reorg detected from height %d to %d", aBlock.Height, newTip.height)
break
}
Expand All @@ -2505,7 +2572,11 @@ func (btc *ExchangeWallet) checkForNewBlocks(ctx context.Context) {
notifyFatalFindRedemptionError("no mainchain ancestor for orphaned block %s", prevTipHeader.Hash)
return
}
ancestorBlockHash = aBlock.PreviousBlockHash
ancestorBlockHash, err = chainhash.NewHashFromStr(aBlock.PreviousBlockHash)
if err != nil {
notifyFatalFindRedemptionError("hash decode error for block %s: %w", prevTipHeader.PreviousBlockHash, err)
return
}
}

case newTip.height-prevTipHeader.Height > 1:
Expand All @@ -2516,16 +2587,15 @@ func (btc *ExchangeWallet) checkForNewBlocks(ctx context.Context) {
notifyFatalFindRedemptionError("getBlockHash error for height %d: %w", afterPrivTip, err)
return
}
startPoint = &block{hash: hashAfterPrevTip.String(), height: afterPrivTip}
startPoint = &block{hash: *hashAfterPrevTip, height: afterPrivTip}

default:
// Just 1 new block since last tip report, search the lone block.
startPoint = newTip
}

if len(reqs) > 0 {
startHash, _ := chainhash.NewHashFromStr(startPoint.hash)
go btc.tryRedemptionRequests(ctx, startHash, reqs)
go btc.tryRedemptionRequests(ctx, &startPoint.hash, reqs)
}
}

Expand Down Expand Up @@ -2598,12 +2668,12 @@ out:
return blockHeight, nil
}

func (btc *ExchangeWallet) blockFromHash(hash string) (*block, error) {
func (btc *ExchangeWallet) blockFromHash(hash *chainhash.Hash) (*block, error) {
blk, err := btc.node.getBlockHeader(hash)
if err != nil {
return nil, fmt.Errorf("getBlockHeader error for hash %s: %w", hash, err)
}
return &block{hash: hash, height: blk.Height}, nil
return &block{hash: *hash, height: blk.Height}, nil
}

// convertCoin converts the asset.Coin to an output.
Expand Down