diff --git a/client/asset/btc/btc.go b/client/asset/btc/btc.go index eb104c290e..43fcbc47be 100644 --- a/client/asset/btc/btc.go +++ b/client/asset/btc/btc.go @@ -77,6 +77,7 @@ const ( var ( // blockTicker is the delay between calls to check for new blocks. blockTicker = time.Second + walletBlockAllowance = time.Second * 10 conventionalConversionFactor = float64(dexbtc.UnitInfo.Conventional.ConversionFactor) rpcOpts = []*asset.ConfigOption{ { @@ -514,7 +515,7 @@ type ExchangeWallet struct { type block struct { height int64 - hash string + hash chainhash.Hash } // findRedemptionReq represents a request to find a contract's redemption, @@ -747,7 +748,7 @@ func (btc *ExchangeWallet) Connect(ctx context.Context) (*sync.WaitGroup, error) return nil, err } // Initialize the best block. - h, err := btc.node.getBestBlockHash() + bestBlockHash, err := btc.node.getBestBlockHash() if err != nil { return nil, fmt.Errorf("error initializing best block for %s: %w", btc.symbol, err) } @@ -758,7 +759,7 @@ func (btc *ExchangeWallet) Connect(ctx context.Context) (*sync.WaitGroup, error) } btc.tipMtx.Lock() - btc.currentTip, err = btc.blockFromHash(h.String()) + btc.currentTip, err = btc.blockFromHash(bestBlockHash) btc.tipMtx.Unlock() if err != nil { return nil, fmt.Errorf("error parsing best block for %s: %w", btc.symbol, err) @@ -1923,7 +1924,7 @@ func (btc *ExchangeWallet) LocktimeExpired(contract dex.Bytes) (bool, time.Time, if err != nil { return false, time.Time{}, fmt.Errorf("get best block hash error: %w", err) } - bestBlockHeader, err := btc.node.getBlockHeader(bestBlockHash.String()) + bestBlockHeader, err := btc.node.getBlockHeader(bestBlockHash) if err != nil { return false, time.Time{}, fmt.Errorf("get best block header error: %w", err) } @@ -2388,10 +2389,88 @@ func (btc *ExchangeWallet) RegFeeConfirmations(_ context.Context, id dex.Bytes) func (btc *ExchangeWallet) run(ctx context.Context) { ticker := time.NewTicker(blockTicker) defer ticker.Stop() + + var walletBlock <-chan *block + if notifier, isNotifier := btc.node.(tipNotifier); isNotifier { + walletBlock = notifier.tipFeed() + } + + // A polledBlock is a block found during polling, but whose broadcast has + // been queued in anticipation of a wallet notification. + type polledBlock struct { + *block + queue *time.Timer + } + + // queuedBlock is the currently queued, polling-discovered block that will + // be broadcast after a timeout if the wallet doesn't send the matching + // notification. + var queuedBlock *polledBlock + + // dequeuedBlock is where the queuedBlocks that time out will be sent for + // broadcast. + var dequeuedBlock chan *block + for { select { + + // Poll for the block. If the wallet offers tip reports, delay reporting + // the tip to give the wallet a moment to request and scan block data. case <-ticker.C: - btc.checkForNewBlocks(ctx) + newTipHash, err := btc.node.getBestBlockHash() + if err != nil { + go btc.tipChange(fmt.Errorf("failed to get best block hash from %s node", btc.symbol)) + return + } + + // This method is called frequently. Don't hold write lock + // unless tip has changed. + btc.tipMtx.RLock() + sameTip := btc.currentTip.hash == *newTipHash + btc.tipMtx.RUnlock() + if sameTip { + continue + } + + newTip, err := btc.blockFromHash(newTipHash) + if err != nil { + go btc.tipChange(fmt.Errorf("error setting new tip: %w", err)) + } + + // If the wallet is not offering tip reports, send this one right + // away. + if walletBlock == nil { + btc.reportNewTip(ctx, newTip) + } else { + // Queue it for reporting, but don't send it right away. Give the + // wallet a chance to provide their block update. SPV wallet may + // need more time after storing the block header to fetch and + // scan filters and issue the FilteredBlockConnected report. + if queuedBlock != nil { + queuedBlock.queue.Stop() + } + queuedBlock = &polledBlock{ + block: newTip, + queue: time.AfterFunc(walletBlockAllowance, func() { + dequeuedBlock <- newTip + }), + } + } + + // Tip reports from the wallet are always sent, and we'll clear any + // queued polled block that would appear to be superceded by this one. + case walletTip := <-walletBlock: + if queuedBlock != nil && walletTip.height >= queuedBlock.height { + queuedBlock.queue.Stop() + queuedBlock = nil + } + btc.reportNewTip(ctx, walletTip) + + case dqBlock := <-dequeuedBlock: + btc.log.Warnf("Reporting a block found in polling that the wallet apparently "+ + "never reported: %d %s. This may indicate a problem with the wallet.", dqBlock.height, dqBlock.hash) + btc.reportNewTip(ctx, dqBlock) + case <-ctx.Done(): return } @@ -2417,10 +2496,18 @@ func (btc *ExchangeWallet) prepareRedemptionRequestsForBlockCheck() []*findRedem return reqs } -// checkForNewBlocks checks for new blocks. When a tip change is detected, the -// tipChange callback function is invoked and a goroutine is started to check -// if any contracts in the findRedemptionQueue are redeemed in the new blocks. -func (btc *ExchangeWallet) checkForNewBlocks(ctx context.Context) { +// reportNewTip sets the currentTip. The tipChange callback function is invoked +// and a goroutine is started to check if any contracts in the +// findRedemptionQueue are redeemed in the new blocks. +func (btc *ExchangeWallet) reportNewTip(ctx context.Context, newTip *block) { + btc.tipMtx.Lock() + defer btc.tipMtx.Unlock() + + prevTip := btc.currentTip + btc.currentTip = newTip + btc.log.Debugf("tip change: %d (%s) => %d (%s)", prevTip.height, prevTip.hash, newTip.height, newTip.hash) + go btc.tipChange(nil) + reqs := btc.prepareRedemptionRequestsForBlockCheck() // Redemption search would be compromised if the starting point cannot // be determined, as searching just the new tip might result in blocks @@ -2432,41 +2519,12 @@ func (btc *ExchangeWallet) checkForNewBlocks(ctx context.Context) { } } - newTipHash, err := btc.node.getBestBlockHash() - if err != nil { - go btc.tipChange(fmt.Errorf("failed to get best block hash from %s node", btc.symbol)) - return - } - - // This method is called frequently. Don't hold write lock - // unless tip has changed. - btc.tipMtx.RLock() - sameTip := btc.currentTip.hash == newTipHash.String() - btc.tipMtx.RUnlock() - if sameTip { - return - } - - btc.tipMtx.Lock() - defer btc.tipMtx.Unlock() - - newTip, err := btc.blockFromHash(newTipHash.String()) - if err != nil { - go btc.tipChange(fmt.Errorf("error setting new tip: %w", err)) - return - } - - prevTip := btc.currentTip - btc.currentTip = newTip - btc.log.Debugf("tip change: %d (%s) => %d (%s)", prevTip.height, prevTip.hash, newTip.height, newTip.hash) - go btc.tipChange(nil) - var startPoint *block // Check if the previous tip is still part of the mainchain (prevTip confs >= 0). // Redemption search would typically resume from prevTipHeight + 1 unless the // previous tip was re-orged out of the mainchain, in which case redemption // search will resume from the mainchain ancestor of the previous tip. - prevTipHeader, err := btc.node.getBlockHeader(prevTip.hash) + prevTipHeader, err := btc.node.getBlockHeader(&prevTip.hash) switch { case err != nil: // Redemption search cannot continue reliably without knowing if there @@ -2479,7 +2537,11 @@ func (btc *ExchangeWallet) checkForNewBlocks(ctx context.Context) { // The previous tip is no longer part of the mainchain. Crawl blocks // backwards until finding a mainchain block. Start with the block // that is the immediate ancestor to the previous tip. - ancestorBlockHash := prevTipHeader.PreviousBlockHash + ancestorBlockHash, err := chainhash.NewHashFromStr(prevTipHeader.PreviousBlockHash) + if err != nil { + notifyFatalFindRedemptionError("hash decode error for block %s: %w", prevTipHeader.PreviousBlockHash, err) + return + } for { aBlock, err := btc.node.getBlockHeader(ancestorBlockHash) if err != nil { @@ -2488,7 +2550,7 @@ func (btc *ExchangeWallet) checkForNewBlocks(ctx context.Context) { } if aBlock.Confirmations > -1 { // Found the mainchain ancestor of previous tip. - startPoint = &block{height: aBlock.Height, hash: aBlock.Hash} + startPoint = &block{height: aBlock.Height, hash: *ancestorBlockHash} btc.log.Debugf("reorg detected from height %d to %d", aBlock.Height, newTip.height) break } @@ -2498,7 +2560,11 @@ func (btc *ExchangeWallet) checkForNewBlocks(ctx context.Context) { notifyFatalFindRedemptionError("no mainchain ancestor for orphaned block %s", prevTipHeader.Hash) return } - ancestorBlockHash = aBlock.PreviousBlockHash + ancestorBlockHash, err = chainhash.NewHashFromStr(aBlock.PreviousBlockHash) + if err != nil { + notifyFatalFindRedemptionError("hash decode error for block %s: %w", prevTipHeader.PreviousBlockHash, err) + return + } } case newTip.height-prevTipHeader.Height > 1: @@ -2509,7 +2575,7 @@ func (btc *ExchangeWallet) checkForNewBlocks(ctx context.Context) { notifyFatalFindRedemptionError("getBlockHash error for height %d: %w", afterPrivTip, err) return } - startPoint = &block{hash: hashAfterPrevTip.String(), height: afterPrivTip} + startPoint = &block{hash: *hashAfterPrevTip, height: afterPrivTip} default: // Just 1 new block since last tip report, search the lone block. @@ -2517,8 +2583,7 @@ func (btc *ExchangeWallet) checkForNewBlocks(ctx context.Context) { } if len(reqs) > 0 { - startHash, _ := chainhash.NewHashFromStr(startPoint.hash) - go btc.tryRedemptionRequests(ctx, startHash, reqs) + go btc.tryRedemptionRequests(ctx, &startPoint.hash, reqs) } } @@ -2591,12 +2656,12 @@ out: return blockHeight, nil } -func (btc *ExchangeWallet) blockFromHash(hash string) (*block, error) { +func (btc *ExchangeWallet) blockFromHash(hash *chainhash.Hash) (*block, error) { blk, err := btc.node.getBlockHeader(hash) if err != nil { return nil, fmt.Errorf("getBlockHeader error for hash %s: %w", hash, err) } - return &block{hash: hash, height: blk.Height}, nil + return &block{hash: *hash, height: blk.Height}, nil } // convertCoin converts the asset.Coin to an output. diff --git a/client/asset/btc/btc_test.go b/client/asset/btc/btc_test.go index 9265870d50..f53c191b7b 100644 --- a/client/asset/btc/btc_test.go +++ b/client/asset/btc/btc_test.go @@ -617,8 +617,10 @@ func tNewWallet(segwit bool, walletType string) (*ExchangeWallet, *testData, fun return nil, nil, nil, err } wallet.tipMtx.Lock() - wallet.currentTip = &block{height: data.GetBestBlockHeight(), - hash: bestHash.String()} + wallet.currentTip = &block{ + height: data.GetBestBlockHeight(), + hash: *bestHash, + } wallet.tipMtx.Unlock() go wallet.run(walletCtx) @@ -2058,7 +2060,10 @@ func testFindRedemption(t *testing.T, segwit bool, walletType string) { node.getCFilterScripts[*redeemBlockHash] = [][]byte{pkScript} // Update currentTip from "RPC". Normally run() would do this. - wallet.checkForNewBlocks(tCtx) + wallet.reportNewTip(tCtx, &block{ + hash: *redeemBlockHash, + height: contractHeight + 2, + }) // Check find redemption result. _, checkSecret, err := wallet.FindRedemption(tCtx, coinID) diff --git a/client/asset/btc/rpcclient.go b/client/asset/btc/rpcclient.go index 6cced815ae..f378dbf36b 100644 --- a/client/asset/btc/rpcclient.go +++ b/client/asset/btc/rpcclient.go @@ -221,7 +221,7 @@ func (wc *rpcClient) getBestBlockHeight() (int32, error) { if err != nil { return -1, err } - header, err := wc.getBlockHeader(tipHash.String()) + header, err := wc.getBlockHeader(tipHash) if err != nil { return -1, err } @@ -503,10 +503,10 @@ func (wc *rpcClient) swapConfirmations(txHash *chainhash.Hash, vout uint32, _ [] } // getBlockHeader gets the block header for the specified block hash. -func (wc *rpcClient) getBlockHeader(blockHash string) (*blockHeader, error) { +func (wc *rpcClient) getBlockHeader(blockHash *chainhash.Hash) (*blockHeader, error) { blkHeader := new(blockHeader) err := wc.call(methodGetBlockHeader, - anylist{blockHash, true}, blkHeader) + anylist{blockHash.String(), true}, blkHeader) if err != nil { return nil, err } @@ -514,8 +514,8 @@ func (wc *rpcClient) getBlockHeader(blockHash string) (*blockHeader, error) { } // getBlockHeight gets the mainchain height for the specified block. -func (wc *rpcClient) getBlockHeight(h *chainhash.Hash) (int32, error) { - hdr, err := wc.getBlockHeader(h.String()) +func (wc *rpcClient) getBlockHeight(blockHash *chainhash.Hash) (int32, error) { + hdr, err := wc.getBlockHeader(blockHash) if err != nil { return -1, err } diff --git a/client/asset/btc/spv.go b/client/asset/btc/spv.go index 4a8c7afe2f..2ab9c4dfc3 100644 --- a/client/asset/btc/spv.go +++ b/client/asset/btc/spv.go @@ -283,9 +283,13 @@ type spvWallet struct { log dex.Logger loader *wallet.Loader + + tipChan chan *block + syncTarget int32 } var _ Wallet = (*spvWallet)(nil) +var _ tipNotifier = (*spvWallet)(nil) // loadSPVWallet loads an existing wallet. func loadSPVWallet(dbDir string, logger dex.Logger, connectPeers []string, chainParams *chaincfg.Params) *spvWallet { @@ -298,9 +302,16 @@ func loadSPVWallet(dbDir string, logger dex.Logger, connectPeers []string, chain checkpoints: make(map[outPoint]*scanCheckpoint), log: logger, connectPeers: connectPeers, + tipChan: make(chan *block, 1), } } +// tipFeed satisfies the tipNotifier interface, signaling that *spvWallet +// will take precedence in sending block notifications. +func (w *spvWallet) tipFeed() <-chan *block { + return w.tipChan +} + // storeTxBlock stores the block hash for the tx in the cache. func (w *spvWallet) storeTxBlock(txHash, blockHash chainhash.Hash) { w.txBlocksMtx.Lock() @@ -467,6 +478,8 @@ func (w *spvWallet) syncStatus() (*syncStatus, error) { synced = true } + atomic.StoreInt32(&w.syncTarget, target) + return &syncStatus{ Target: target, Height: currentHeight, @@ -834,11 +847,7 @@ func (w *spvWallet) walletUnlock(pw []byte) error { return w.Unlock(pw) } -func (w *spvWallet) getBlockHeader(hashStr string) (*blockHeader, error) { - blockHash, err := chainhash.NewHashFromStr(hashStr) - if err != nil { - return nil, err - } +func (w *spvWallet) getBlockHeader(blockHash *chainhash.Hash) (*blockHeader, error) { hdr, err := w.cl.GetBlockHeader(blockHash) if err != nil { return nil, err @@ -912,14 +921,14 @@ func (w *spvWallet) connect(ctx context.Context, wg *sync.WaitGroup) error { return err } - // Possible to subscribe to block notifications here with a NewRescan -> - // *Rescan supplied with a QuitChan-type RescanOption. - // Actually, should use btcwallet.Wallet.NtfnServer ? + notes := make(<-chan interface{}) + if w.chainClient != nil { + notes = w.chainClient.Notifications() + if err := w.chainClient.NotifyBlocks(); err != nil { + return fmt.Errorf("failed to subscribe to block notifications: %w", err) + } - // notes := make(<-chan interface{}) - // if w.chainClient != nil { - // notes = w.chainClient.Notifications() - // } + } // Nanny for the caches checkpoints and txBlocks caches. wg.Add(1) @@ -948,8 +957,31 @@ func (w *spvWallet) connect(ctx context.Context, wg *sync.WaitGroup) error { } } w.checkpointMtx.Unlock() - // case note := <-notes: - // fmt.Printf("--Notification received: %T: %+v \n", note, note) + + case noteI := <-notes: + switch note := noteI.(type) { + case chain.FilteredBlockConnected: + blk := note.Block + + // Don't broadcast tip changes until the syncStatus has been + // requested and we have a syncTarget OR during initial sync + // except every 10k blocks. + syncTarget := atomic.LoadInt32(&w.syncTarget) + if syncTarget == 0 || (blk.Height < syncTarget && blk.Height%10_000 != 0) { + continue + } + + select { + case w.tipChan <- &block{ + hash: blk.Hash, + height: int64(blk.Height), + }: + + default: + w.log.Warnf("tip report channel was blocking") + } + } + case <-ctx.Done(): return } diff --git a/client/asset/btc/wallet.go b/client/asset/btc/wallet.go index fe7ccacda0..398b7572c3 100644 --- a/client/asset/btc/wallet.go +++ b/client/asset/btc/wallet.go @@ -37,10 +37,14 @@ type Wallet interface { locked() bool syncStatus() (*syncStatus, error) swapConfirmations(txHash *chainhash.Hash, vout uint32, contract []byte, startTime time.Time) (confs uint32, spent bool, err error) - getBlockHeader(blockHash string) (*blockHeader, error) + getBlockHeader(blockHash *chainhash.Hash) (*blockHeader, error) ownsAddress(addr btcutil.Address) (bool, error) getWalletTransaction(txHash *chainhash.Hash) (*GetTransactionResult, error) searchBlockForRedemptions(ctx context.Context, reqs map[outPoint]*findRedemptionReq, blockHash chainhash.Hash) (discovered map[outPoint]*findRedemptionResult) findRedemptionsInMempool(ctx context.Context, reqs map[outPoint]*findRedemptionReq) (discovered map[outPoint]*findRedemptionResult) getBlock(h chainhash.Hash) (*wire.MsgBlock, error) } + +type tipNotifier interface { + tipFeed() <-chan *block +} diff --git a/client/core/bookie.go b/client/core/bookie.go index 2e00af8360..8f272ac209 100644 --- a/client/core/bookie.go +++ b/client/core/bookie.go @@ -935,7 +935,7 @@ func (dc *dexConnection) subPriceFeed() { var msgErr *msgjson.Error // Ignore old servers' errors. if !errors.As(err, &msgErr) || msgErr.Code != msgjson.UnknownMessageType { - dc.log.Errorf("unable to fetch market overview: %w", err) + dc.log.Errorf("unable to fetch market overview: %v", err) } return }