From 9caff81b18d4c67b5544f507b88af8a38e3c49a2 Mon Sep 17 00:00:00 2001 From: Brian Stafford Date: Sat, 23 Oct 2021 14:58:26 -0500 Subject: [PATCH 1/8] allows wallet time to process block filters When a wallet is able to provide block notifications, allow wallet notifications to take precedence. Queue polled new tips for 10 seconds, canceling their broadcast and sending the notification earlier if the wallet notification comes during this window. If the wallet note doesn't come by 10 seconds, send the queued notification and log a warning. SPV tip updates are provided by the FilteredBlockConnected notification, which can come after neutrino stores the block header(s). Care is taken not to spam the caller with notifications, since they are not limited by the polling frequency. The same mechanism for limiting spam is used to solve the non-existent wallet error messages for *Core though, so you get two for the price of one with that solution. --- client/asset/btc/btc.go | 159 ++++++++++++++++++++++++---------- client/asset/btc/btc_test.go | 11 ++- client/asset/btc/rpcclient.go | 10 +-- client/asset/btc/spv.go | 60 ++++++++++--- client/asset/btc/wallet.go | 6 +- client/core/bookie.go | 2 +- 6 files changed, 177 insertions(+), 71 deletions(-) diff --git a/client/asset/btc/btc.go b/client/asset/btc/btc.go index 8381f0048a..b29c6271b3 100644 --- a/client/asset/btc/btc.go +++ b/client/asset/btc/btc.go @@ -77,6 +77,7 @@ const ( var ( // blockTicker is the delay between calls to check for new blocks. blockTicker = time.Second + walletBlockAllowance = time.Second * 10 conventionalConversionFactor = float64(dexbtc.UnitInfo.Conventional.ConversionFactor) rpcOpts = []*asset.ConfigOption{ { @@ -521,7 +522,7 @@ type ExchangeWallet struct { type block struct { height int64 - hash string + hash chainhash.Hash } // findRedemptionReq represents a request to find a contract's redemption, @@ -754,7 +755,7 @@ func (btc *ExchangeWallet) Connect(ctx context.Context) (*sync.WaitGroup, error) return nil, err } // Initialize the best block. - h, err := btc.node.getBestBlockHash() + bestBlockHash, err := btc.node.getBestBlockHash() if err != nil { return nil, fmt.Errorf("error initializing best block for %s: %w", btc.symbol, err) } @@ -765,7 +766,7 @@ func (btc *ExchangeWallet) Connect(ctx context.Context) (*sync.WaitGroup, error) } btc.tipMtx.Lock() - btc.currentTip, err = btc.blockFromHash(h.String()) + btc.currentTip, err = btc.blockFromHash(bestBlockHash) btc.tipMtx.Unlock() if err != nil { return nil, fmt.Errorf("error parsing best block for %s: %w", btc.symbol, err) @@ -1930,7 +1931,7 @@ func (btc *ExchangeWallet) LocktimeExpired(contract dex.Bytes) (bool, time.Time, if err != nil { return false, time.Time{}, fmt.Errorf("get best block hash error: %w", err) } - bestBlockHeader, err := btc.node.getBlockHeader(bestBlockHash.String()) + bestBlockHeader, err := btc.node.getBlockHeader(bestBlockHash) if err != nil { return false, time.Time{}, fmt.Errorf("get best block header error: %w", err) } @@ -2395,10 +2396,88 @@ func (btc *ExchangeWallet) RegFeeConfirmations(_ context.Context, id dex.Bytes) func (btc *ExchangeWallet) run(ctx context.Context) { ticker := time.NewTicker(blockTicker) defer ticker.Stop() + + var walletBlock <-chan *block + if notifier, isNotifier := btc.node.(tipNotifier); isNotifier { + walletBlock = notifier.tipFeed() + } + + // A polledBlock is a block found during polling, but whose broadcast has + // been queued in anticipation of a wallet notification. + type polledBlock struct { + *block + queue *time.Timer + } + + // queuedBlock is the currently queued, polling-discovered block that will + // be broadcast after a timeout if the wallet doesn't send the matching + // notification. + var queuedBlock *polledBlock + + // dequeuedBlock is where the queuedBlocks that time out will be sent for + // broadcast. + var dequeuedBlock chan *block + for { select { + + // Poll for the block. If the wallet offers tip reports, delay reporting + // the tip to give the wallet a moment to request and scan block data. case <-ticker.C: - btc.checkForNewBlocks(ctx) + newTipHash, err := btc.node.getBestBlockHash() + if err != nil { + go btc.tipChange(fmt.Errorf("failed to get best block hash from %s node", btc.symbol)) + return + } + + // This method is called frequently. Don't hold write lock + // unless tip has changed. + btc.tipMtx.RLock() + sameTip := btc.currentTip.hash == *newTipHash + btc.tipMtx.RUnlock() + if sameTip { + continue + } + + newTip, err := btc.blockFromHash(newTipHash) + if err != nil { + go btc.tipChange(fmt.Errorf("error setting new tip: %w", err)) + } + + // If the wallet is not offering tip reports, send this one right + // away. + if walletBlock == nil { + btc.reportNewTip(ctx, newTip) + } else { + // Queue it for reporting, but don't send it right away. Give the + // wallet a chance to provide their block update. SPV wallet may + // need more time after storing the block header to fetch and + // scan filters and issue the FilteredBlockConnected report. + if queuedBlock != nil { + queuedBlock.queue.Stop() + } + queuedBlock = &polledBlock{ + block: newTip, + queue: time.AfterFunc(walletBlockAllowance, func() { + dequeuedBlock <- newTip + }), + } + } + + // Tip reports from the wallet are always sent, and we'll clear any + // queued polled block that would appear to be superceded by this one. + case walletTip := <-walletBlock: + if queuedBlock != nil && walletTip.height >= queuedBlock.height { + queuedBlock.queue.Stop() + queuedBlock = nil + } + btc.reportNewTip(ctx, walletTip) + + case dqBlock := <-dequeuedBlock: + btc.log.Warnf("Reporting a block found in polling that the wallet apparently "+ + "never reported: %d %s. This may indicate a problem with the wallet.", dqBlock.height, dqBlock.hash) + btc.reportNewTip(ctx, dqBlock) + case <-ctx.Done(): return } @@ -2424,10 +2503,18 @@ func (btc *ExchangeWallet) prepareRedemptionRequestsForBlockCheck() []*findRedem return reqs } -// checkForNewBlocks checks for new blocks. When a tip change is detected, the -// tipChange callback function is invoked and a goroutine is started to check -// if any contracts in the findRedemptionQueue are redeemed in the new blocks. -func (btc *ExchangeWallet) checkForNewBlocks(ctx context.Context) { +// reportNewTip sets the currentTip. The tipChange callback function is invoked +// and a goroutine is started to check if any contracts in the +// findRedemptionQueue are redeemed in the new blocks. +func (btc *ExchangeWallet) reportNewTip(ctx context.Context, newTip *block) { + btc.tipMtx.Lock() + defer btc.tipMtx.Unlock() + + prevTip := btc.currentTip + btc.currentTip = newTip + btc.log.Debugf("tip change: %d (%s) => %d (%s)", prevTip.height, prevTip.hash, newTip.height, newTip.hash) + go btc.tipChange(nil) + reqs := btc.prepareRedemptionRequestsForBlockCheck() // Redemption search would be compromised if the starting point cannot // be determined, as searching just the new tip might result in blocks @@ -2439,41 +2526,12 @@ func (btc *ExchangeWallet) checkForNewBlocks(ctx context.Context) { } } - newTipHash, err := btc.node.getBestBlockHash() - if err != nil { - go btc.tipChange(fmt.Errorf("failed to get best block hash from %s node", btc.symbol)) - return - } - - // This method is called frequently. Don't hold write lock - // unless tip has changed. - btc.tipMtx.RLock() - sameTip := btc.currentTip.hash == newTipHash.String() - btc.tipMtx.RUnlock() - if sameTip { - return - } - - btc.tipMtx.Lock() - defer btc.tipMtx.Unlock() - - newTip, err := btc.blockFromHash(newTipHash.String()) - if err != nil { - go btc.tipChange(fmt.Errorf("error setting new tip: %w", err)) - return - } - - prevTip := btc.currentTip - btc.currentTip = newTip - btc.log.Debugf("tip change: %d (%s) => %d (%s)", prevTip.height, prevTip.hash, newTip.height, newTip.hash) - go btc.tipChange(nil) - var startPoint *block // Check if the previous tip is still part of the mainchain (prevTip confs >= 0). // Redemption search would typically resume from prevTipHeight + 1 unless the // previous tip was re-orged out of the mainchain, in which case redemption // search will resume from the mainchain ancestor of the previous tip. - prevTipHeader, err := btc.node.getBlockHeader(prevTip.hash) + prevTipHeader, err := btc.node.getBlockHeader(&prevTip.hash) switch { case err != nil: // Redemption search cannot continue reliably without knowing if there @@ -2486,7 +2544,11 @@ func (btc *ExchangeWallet) checkForNewBlocks(ctx context.Context) { // The previous tip is no longer part of the mainchain. Crawl blocks // backwards until finding a mainchain block. Start with the block // that is the immediate ancestor to the previous tip. - ancestorBlockHash := prevTipHeader.PreviousBlockHash + ancestorBlockHash, err := chainhash.NewHashFromStr(prevTipHeader.PreviousBlockHash) + if err != nil { + notifyFatalFindRedemptionError("hash decode error for block %s: %w", prevTipHeader.PreviousBlockHash, err) + return + } for { aBlock, err := btc.node.getBlockHeader(ancestorBlockHash) if err != nil { @@ -2495,7 +2557,7 @@ func (btc *ExchangeWallet) checkForNewBlocks(ctx context.Context) { } if aBlock.Confirmations > -1 { // Found the mainchain ancestor of previous tip. - startPoint = &block{height: aBlock.Height, hash: aBlock.Hash} + startPoint = &block{height: aBlock.Height, hash: *ancestorBlockHash} btc.log.Debugf("reorg detected from height %d to %d", aBlock.Height, newTip.height) break } @@ -2505,7 +2567,11 @@ func (btc *ExchangeWallet) checkForNewBlocks(ctx context.Context) { notifyFatalFindRedemptionError("no mainchain ancestor for orphaned block %s", prevTipHeader.Hash) return } - ancestorBlockHash = aBlock.PreviousBlockHash + ancestorBlockHash, err = chainhash.NewHashFromStr(aBlock.PreviousBlockHash) + if err != nil { + notifyFatalFindRedemptionError("hash decode error for block %s: %w", prevTipHeader.PreviousBlockHash, err) + return + } } case newTip.height-prevTipHeader.Height > 1: @@ -2516,7 +2582,7 @@ func (btc *ExchangeWallet) checkForNewBlocks(ctx context.Context) { notifyFatalFindRedemptionError("getBlockHash error for height %d: %w", afterPrivTip, err) return } - startPoint = &block{hash: hashAfterPrevTip.String(), height: afterPrivTip} + startPoint = &block{hash: *hashAfterPrevTip, height: afterPrivTip} default: // Just 1 new block since last tip report, search the lone block. @@ -2524,8 +2590,7 @@ func (btc *ExchangeWallet) checkForNewBlocks(ctx context.Context) { } if len(reqs) > 0 { - startHash, _ := chainhash.NewHashFromStr(startPoint.hash) - go btc.tryRedemptionRequests(ctx, startHash, reqs) + go btc.tryRedemptionRequests(ctx, &startPoint.hash, reqs) } } @@ -2598,12 +2663,12 @@ out: return blockHeight, nil } -func (btc *ExchangeWallet) blockFromHash(hash string) (*block, error) { +func (btc *ExchangeWallet) blockFromHash(hash *chainhash.Hash) (*block, error) { blk, err := btc.node.getBlockHeader(hash) if err != nil { return nil, fmt.Errorf("getBlockHeader error for hash %s: %w", hash, err) } - return &block{hash: hash, height: blk.Height}, nil + return &block{hash: *hash, height: blk.Height}, nil } // convertCoin converts the asset.Coin to an output. diff --git a/client/asset/btc/btc_test.go b/client/asset/btc/btc_test.go index 9265870d50..f53c191b7b 100644 --- a/client/asset/btc/btc_test.go +++ b/client/asset/btc/btc_test.go @@ -617,8 +617,10 @@ func tNewWallet(segwit bool, walletType string) (*ExchangeWallet, *testData, fun return nil, nil, nil, err } wallet.tipMtx.Lock() - wallet.currentTip = &block{height: data.GetBestBlockHeight(), - hash: bestHash.String()} + wallet.currentTip = &block{ + height: data.GetBestBlockHeight(), + hash: *bestHash, + } wallet.tipMtx.Unlock() go wallet.run(walletCtx) @@ -2058,7 +2060,10 @@ func testFindRedemption(t *testing.T, segwit bool, walletType string) { node.getCFilterScripts[*redeemBlockHash] = [][]byte{pkScript} // Update currentTip from "RPC". Normally run() would do this. - wallet.checkForNewBlocks(tCtx) + wallet.reportNewTip(tCtx, &block{ + hash: *redeemBlockHash, + height: contractHeight + 2, + }) // Check find redemption result. _, checkSecret, err := wallet.FindRedemption(tCtx, coinID) diff --git a/client/asset/btc/rpcclient.go b/client/asset/btc/rpcclient.go index 6cced815ae..f378dbf36b 100644 --- a/client/asset/btc/rpcclient.go +++ b/client/asset/btc/rpcclient.go @@ -221,7 +221,7 @@ func (wc *rpcClient) getBestBlockHeight() (int32, error) { if err != nil { return -1, err } - header, err := wc.getBlockHeader(tipHash.String()) + header, err := wc.getBlockHeader(tipHash) if err != nil { return -1, err } @@ -503,10 +503,10 @@ func (wc *rpcClient) swapConfirmations(txHash *chainhash.Hash, vout uint32, _ [] } // getBlockHeader gets the block header for the specified block hash. -func (wc *rpcClient) getBlockHeader(blockHash string) (*blockHeader, error) { +func (wc *rpcClient) getBlockHeader(blockHash *chainhash.Hash) (*blockHeader, error) { blkHeader := new(blockHeader) err := wc.call(methodGetBlockHeader, - anylist{blockHash, true}, blkHeader) + anylist{blockHash.String(), true}, blkHeader) if err != nil { return nil, err } @@ -514,8 +514,8 @@ func (wc *rpcClient) getBlockHeader(blockHash string) (*blockHeader, error) { } // getBlockHeight gets the mainchain height for the specified block. -func (wc *rpcClient) getBlockHeight(h *chainhash.Hash) (int32, error) { - hdr, err := wc.getBlockHeader(h.String()) +func (wc *rpcClient) getBlockHeight(blockHash *chainhash.Hash) (int32, error) { + hdr, err := wc.getBlockHeader(blockHash) if err != nil { return -1, err } diff --git a/client/asset/btc/spv.go b/client/asset/btc/spv.go index d2ee5a1d9e..a4bec3cfba 100644 --- a/client/asset/btc/spv.go +++ b/client/asset/btc/spv.go @@ -284,9 +284,13 @@ type spvWallet struct { log dex.Logger loader *wallet.Loader + + tipChan chan *block + syncTarget int32 } var _ Wallet = (*spvWallet)(nil) +var _ tipNotifier = (*spvWallet)(nil) // loadSPVWallet loads an existing wallet. func loadSPVWallet(dbDir string, logger dex.Logger, connectPeers []string, chainParams *chaincfg.Params) *spvWallet { @@ -299,9 +303,16 @@ func loadSPVWallet(dbDir string, logger dex.Logger, connectPeers []string, chain checkpoints: make(map[outPoint]*scanCheckpoint), log: logger, connectPeers: connectPeers, + tipChan: make(chan *block, 1), } } +// tipFeed satisfies the tipNotifier interface, signaling that *spvWallet +// will take precedence in sending block notifications. +func (w *spvWallet) tipFeed() <-chan *block { + return w.tipChan +} + // storeTxBlock stores the block hash for the tx in the cache. func (w *spvWallet) storeTxBlock(txHash, blockHash chainhash.Hash) { w.txBlocksMtx.Lock() @@ -468,6 +479,8 @@ func (w *spvWallet) syncStatus() (*syncStatus, error) { synced = true } + atomic.StoreInt32(&w.syncTarget, target) + return &syncStatus{ Target: target, Height: currentHeight, @@ -835,11 +848,7 @@ func (w *spvWallet) walletUnlock(pw []byte) error { return w.Unlock(pw) } -func (w *spvWallet) getBlockHeader(hashStr string) (*blockHeader, error) { - blockHash, err := chainhash.NewHashFromStr(hashStr) - if err != nil { - return nil, err - } +func (w *spvWallet) getBlockHeader(blockHash *chainhash.Hash) (*blockHeader, error) { hdr, err := w.cl.GetBlockHeader(blockHash) if err != nil { return nil, err @@ -913,14 +922,14 @@ func (w *spvWallet) connect(ctx context.Context, wg *sync.WaitGroup) error { return err } - // Possible to subscribe to block notifications here with a NewRescan -> - // *Rescan supplied with a QuitChan-type RescanOption. - // Actually, should use btcwallet.Wallet.NtfnServer ? + notes := make(<-chan interface{}) + if w.chainClient != nil { + notes = w.chainClient.Notifications() + if err := w.chainClient.NotifyBlocks(); err != nil { + return fmt.Errorf("failed to subscribe to block notifications: %w", err) + } - // notes := make(<-chan interface{}) - // if w.chainClient != nil { - // notes = w.chainClient.Notifications() - // } + } // Nanny for the caches checkpoints and txBlocks caches. wg.Add(1) @@ -949,8 +958,31 @@ func (w *spvWallet) connect(ctx context.Context, wg *sync.WaitGroup) error { } } w.checkpointMtx.Unlock() - // case note := <-notes: - // fmt.Printf("--Notification received: %T: %+v \n", note, note) + + case noteI := <-notes: + switch note := noteI.(type) { + case chain.FilteredBlockConnected: + blk := note.Block + + // Don't broadcast tip changes until the syncStatus has been + // requested and we have a syncTarget OR during initial sync + // except every 10k blocks. + syncTarget := atomic.LoadInt32(&w.syncTarget) + if syncTarget == 0 || (blk.Height < syncTarget && blk.Height%10_000 != 0) { + continue + } + + select { + case w.tipChan <- &block{ + hash: blk.Hash, + height: int64(blk.Height), + }: + + default: + w.log.Warnf("tip report channel was blocking") + } + } + case <-ctx.Done(): return } diff --git a/client/asset/btc/wallet.go b/client/asset/btc/wallet.go index fe7ccacda0..398b7572c3 100644 --- a/client/asset/btc/wallet.go +++ b/client/asset/btc/wallet.go @@ -37,10 +37,14 @@ type Wallet interface { locked() bool syncStatus() (*syncStatus, error) swapConfirmations(txHash *chainhash.Hash, vout uint32, contract []byte, startTime time.Time) (confs uint32, spent bool, err error) - getBlockHeader(blockHash string) (*blockHeader, error) + getBlockHeader(blockHash *chainhash.Hash) (*blockHeader, error) ownsAddress(addr btcutil.Address) (bool, error) getWalletTransaction(txHash *chainhash.Hash) (*GetTransactionResult, error) searchBlockForRedemptions(ctx context.Context, reqs map[outPoint]*findRedemptionReq, blockHash chainhash.Hash) (discovered map[outPoint]*findRedemptionResult) findRedemptionsInMempool(ctx context.Context, reqs map[outPoint]*findRedemptionReq) (discovered map[outPoint]*findRedemptionResult) getBlock(h chainhash.Hash) (*wire.MsgBlock, error) } + +type tipNotifier interface { + tipFeed() <-chan *block +} diff --git a/client/core/bookie.go b/client/core/bookie.go index 2e00af8360..8f272ac209 100644 --- a/client/core/bookie.go +++ b/client/core/bookie.go @@ -935,7 +935,7 @@ func (dc *dexConnection) subPriceFeed() { var msgErr *msgjson.Error // Ignore old servers' errors. if !errors.As(err, &msgErr) || msgErr.Code != msgjson.UnknownMessageType { - dc.log.Errorf("unable to fetch market overview: %w", err) + dc.log.Errorf("unable to fetch market overview: %v", err) } return } From aae7613cb950c3a607f27cf985c7a886221348d3 Mon Sep 17 00:00:00 2001 From: Brian Stafford Date: Tue, 26 Oct 2021 06:00:47 -0500 Subject: [PATCH 2/8] add tests --- client/asset/btc/btc.go | 17 ++++++--- client/asset/btc/btc_test.go | 13 +++++-- client/asset/btc/spv_test.go | 71 ++++++++++++++++++++++++++++++++++++ 3 files changed, 92 insertions(+), 9 deletions(-) diff --git a/client/asset/btc/btc.go b/client/asset/btc/btc.go index b29c6271b3..f0bf26b856 100644 --- a/client/asset/btc/btc.go +++ b/client/asset/btc/btc.go @@ -775,7 +775,7 @@ func (btc *ExchangeWallet) Connect(ctx context.Context) (*sync.WaitGroup, error) wg.Add(1) go func() { defer wg.Done() - btc.run(ctx) + btc.watchBlocks(ctx) btc.shutdown() }() return &wg, nil @@ -2391,9 +2391,9 @@ func (btc *ExchangeWallet) RegFeeConfirmations(_ context.Context, id dex.Bytes) return uint32(tx.Confirmations), nil } -// run pings for new blocks and runs the tipChange callback function when the -// block changes. -func (btc *ExchangeWallet) run(ctx context.Context) { +// watchBlocks pings for new blocks and runs the tipChange callback function +// when the block changes. +func (btc *ExchangeWallet) watchBlocks(ctx context.Context) { ticker := time.NewTicker(blockTicker) defer ticker.Stop() @@ -2416,7 +2416,7 @@ func (btc *ExchangeWallet) run(ctx context.Context) { // dequeuedBlock is where the queuedBlocks that time out will be sent for // broadcast. - var dequeuedBlock chan *block + dequeuedBlock := make(chan *block, 1) for { select { @@ -2430,6 +2430,10 @@ func (btc *ExchangeWallet) run(ctx context.Context) { return } + if queuedBlock != nil && *newTipHash == queuedBlock.block.hash { + continue + } + // This method is called frequently. Don't hold write lock // unless tip has changed. btc.tipMtx.RLock() @@ -2475,7 +2479,8 @@ func (btc *ExchangeWallet) run(ctx context.Context) { case dqBlock := <-dequeuedBlock: btc.log.Warnf("Reporting a block found in polling that the wallet apparently "+ - "never reported: %d %s. This may indicate a problem with the wallet.", dqBlock.height, dqBlock.hash) + "never reported: %d %s. If you see this message repeatedly, it may indicate "+ + "an issue with the wallet.", dqBlock.height, dqBlock.hash) btc.reportNewTip(ctx, dqBlock) case <-ctx.Done(): diff --git a/client/asset/btc/btc_test.go b/client/asset/btc/btc_test.go index f53c191b7b..f3a461592c 100644 --- a/client/asset/btc/btc_test.go +++ b/client/asset/btc/btc_test.go @@ -167,6 +167,7 @@ type testData struct { signTxErr error listUnspent []*ListUnspentResult listUnspentErr error + tipChanged chan struct{} // spv fetchInputInfoTx *wire.MsgTx @@ -195,6 +196,7 @@ func newTestData() *testData { getCFilterScripts: make(map[chainhash.Hash][][]byte), confsErr: WalletTransactionNotFound, checkpoints: make(map[outPoint]*scanCheckpoint), + tipChanged: make(chan struct{}, 1), } } @@ -567,7 +569,12 @@ func tNewWallet(segwit bool, walletType string) (*ExchangeWallet, *testData, fun data := newTestData() walletCfg := &asset.WalletConfig{ - TipChange: func(error) {}, + TipChange: func(error) { + select { + case data.tipChanged <- struct{}{}: + default: + } + }, } walletCtx, shutdown := context.WithCancel(tCtx) cfg := &BTCCloneCFG{ @@ -596,6 +603,7 @@ func tNewWallet(segwit bool, walletType string) (*ExchangeWallet, *testData, fun chainParams: &chaincfg.MainNetParams, wallet: &tBtcWallet{data}, cl: neutrinoClient, + tipChan: make(chan *block), chainClient: nil, acctNum: 0, txBlocks: data.dbBlockForTx, @@ -622,7 +630,7 @@ func tNewWallet(segwit bool, walletType string) (*ExchangeWallet, *testData, fun hash: *bestHash, } wallet.tipMtx.Unlock() - go wallet.run(walletCtx) + go wallet.watchBlocks(walletCtx) return wallet, data, shutdown, nil } @@ -2966,5 +2974,4 @@ func testTryRedemptionRequests(t *testing.T, segwit bool, walletType string) { } } } - } diff --git a/client/asset/btc/spv_test.go b/client/asset/btc/spv_test.go index fbee237db2..9773cfed90 100644 --- a/client/asset/btc/spv_test.go +++ b/client/asset/btc/spv_test.go @@ -699,3 +699,74 @@ func TestSendWithSubtract(t *testing.T) { t.Fatalf("test passed with fees > available error") } } + +func TestTryBlocksWithNotifier(t *testing.T) { + defaultWalletBlockAllowance := walletBlockAllowance + defaultBlockTicker := blockTicker + + walletBlockAllowance = 30 * time.Millisecond + blockTicker = 5 * time.Millisecond + + defer func() { + walletBlockAllowance = defaultWalletBlockAllowance + blockTicker = defaultBlockTicker + }() + + wallet, node, shutdown, _ := tNewWallet(true, walletTypeSPV) + defer shutdown() + + spv := wallet.node.(*spvWallet) + + getNote := func(timeout time.Duration) bool { + select { + case <-node.tipChanged: + return true + case <-time.After(timeout): + return false + } + } + + if getNote(walletBlockAllowance * 2) { + t.Fatalf("got a first block") + } + + var tipHeight int64 + addBlock := func() *block { + tipHeight++ + h, _ := node.addRawTx(tipHeight, dummyTx()) + return &block{tipHeight, *h} + } + + addBlock() + + // It should not come through on the block tick, since it will be cached. + if getNote(blockTicker * 2) { + t.Fatalf("got block that should've been cached") + } + + // But it will come through after the blockAllowance, printing a warning. + if !getNote(walletBlockAllowance * 2) { + t.Fatal("block didn't time out") + } + + // On the other hand, a wallet block should come through immediately. Not + // even waiting on the block tick. + spv.tipChan <- addBlock() + if !getNote(blockTicker / 2) { + t.Fatal("wallet block wasn't sent through") + } + + // If we do the same thing but make sure that a polled block is queued + // first, we should still see the block right away, and the queued block + // should be canceled. + blk := addBlock() + time.Sleep(blockTicker * 2) + spv.tipChan <- blk + if !getNote(blockTicker / 2) { + t.Fatal("wallet block wasn't sent through with polled block queued") + } + + if getNote(walletBlockAllowance * 2) { + t.Fatal("queued polled block that should have been canceled came through") + } +} From b190518466cfbaf4a3ab019b1c05aa98d01eb8ad Mon Sep 17 00:00:00 2001 From: Brian Stafford Date: Tue, 26 Oct 2021 06:16:11 -0500 Subject: [PATCH 3/8] fix raciness --- client/asset/btc/btc.go | 8 ++++---- client/asset/btc/btc_test.go | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/client/asset/btc/btc.go b/client/asset/btc/btc.go index f0bf26b856..0e21f149de 100644 --- a/client/asset/btc/btc.go +++ b/client/asset/btc/btc.go @@ -775,7 +775,7 @@ func (btc *ExchangeWallet) Connect(ctx context.Context) (*sync.WaitGroup, error) wg.Add(1) go func() { defer wg.Done() - btc.watchBlocks(ctx) + btc.watchBlocks(ctx, blockTicker, walletBlockAllowance) btc.shutdown() }() return &wg, nil @@ -2393,8 +2393,8 @@ func (btc *ExchangeWallet) RegFeeConfirmations(_ context.Context, id dex.Bytes) // watchBlocks pings for new blocks and runs the tipChange callback function // when the block changes. -func (btc *ExchangeWallet) watchBlocks(ctx context.Context) { - ticker := time.NewTicker(blockTicker) +func (btc *ExchangeWallet) watchBlocks(ctx context.Context, blockPeriod, walletAllowance time.Duration) { + ticker := time.NewTicker(blockPeriod) defer ticker.Stop() var walletBlock <-chan *block @@ -2462,7 +2462,7 @@ func (btc *ExchangeWallet) watchBlocks(ctx context.Context) { } queuedBlock = &polledBlock{ block: newTip, - queue: time.AfterFunc(walletBlockAllowance, func() { + queue: time.AfterFunc(walletAllowance, func() { dequeuedBlock <- newTip }), } diff --git a/client/asset/btc/btc_test.go b/client/asset/btc/btc_test.go index f3a461592c..67ab7a1477 100644 --- a/client/asset/btc/btc_test.go +++ b/client/asset/btc/btc_test.go @@ -630,7 +630,7 @@ func tNewWallet(segwit bool, walletType string) (*ExchangeWallet, *testData, fun hash: *bestHash, } wallet.tipMtx.Unlock() - go wallet.watchBlocks(walletCtx) + go wallet.watchBlocks(walletCtx, blockTicker, walletBlockAllowance) return wallet, data, shutdown, nil } From 9a10cd610ee7421348cae98417a51fae22c99fdd Mon Sep 17 00:00:00 2001 From: Brian Stafford Date: Thu, 28 Oct 2021 10:55:53 -0500 Subject: [PATCH 4/8] chappjc review followup --- client/asset/btc/btc.go | 36 +++++++++++++++++++----------------- client/asset/btc/btc_test.go | 26 ++++++++++++++++++-------- client/asset/btc/spv.go | 5 ++--- client/asset/btc/spv_test.go | 33 +++++++++++++++++++++++++++++++-- go.mod | 2 +- go.sum | 3 ++- 6 files changed, 73 insertions(+), 32 deletions(-) diff --git a/client/asset/btc/btc.go b/client/asset/btc/btc.go index 0e21f149de..61a19ae236 100644 --- a/client/asset/btc/btc.go +++ b/client/asset/btc/btc.go @@ -775,7 +775,7 @@ func (btc *ExchangeWallet) Connect(ctx context.Context) (*sync.WaitGroup, error) wg.Add(1) go func() { defer wg.Done() - btc.watchBlocks(ctx, blockTicker, walletBlockAllowance) + btc.watchBlocks(ctx) btc.shutdown() }() return &wg, nil @@ -2393,8 +2393,8 @@ func (btc *ExchangeWallet) RegFeeConfirmations(_ context.Context, id dex.Bytes) // watchBlocks pings for new blocks and runs the tipChange callback function // when the block changes. -func (btc *ExchangeWallet) watchBlocks(ctx context.Context, blockPeriod, walletAllowance time.Duration) { - ticker := time.NewTicker(blockPeriod) +func (btc *ExchangeWallet) watchBlocks(ctx context.Context) { + ticker := time.NewTicker(blockTicker) defer ticker.Stop() var walletBlock <-chan *block @@ -2414,10 +2414,6 @@ func (btc *ExchangeWallet) watchBlocks(ctx context.Context, blockPeriod, walletA // notification. var queuedBlock *polledBlock - // dequeuedBlock is where the queuedBlocks that time out will be sent for - // broadcast. - dequeuedBlock := make(chan *block, 1) - for { select { @@ -2434,8 +2430,6 @@ func (btc *ExchangeWallet) watchBlocks(ctx context.Context, blockPeriod, walletA continue } - // This method is called frequently. Don't hold write lock - // unless tip has changed. btc.tipMtx.RLock() sameTip := btc.currentTip.hash == *newTipHash btc.tipMtx.RUnlock() @@ -2460,10 +2454,21 @@ func (btc *ExchangeWallet) watchBlocks(ctx context.Context, blockPeriod, walletA if queuedBlock != nil { queuedBlock.queue.Stop() } + blockAllowance := walletBlockAllowance + syncStatus, err := btc.node.syncStatus() + + if err != nil { + btc.log.Errorf("Error retreiving sync status before queuing polled block: %v", err) + } else if syncStatus.Syncing { + blockAllowance *= 10 + } queuedBlock = &polledBlock{ block: newTip, - queue: time.AfterFunc(walletAllowance, func() { - dequeuedBlock <- newTip + queue: time.AfterFunc(blockAllowance, func() { + btc.log.Warnf("Reporting a block found in polling that the wallet apparently "+ + "never reported: %d %s. If you see this message repeatedly, it may indicate "+ + "an issue with the wallet.", newTip.height, newTip.hash) + btc.reportNewTip(ctx, newTip) }), } } @@ -2477,12 +2482,6 @@ func (btc *ExchangeWallet) watchBlocks(ctx context.Context, blockPeriod, walletA } btc.reportNewTip(ctx, walletTip) - case dqBlock := <-dequeuedBlock: - btc.log.Warnf("Reporting a block found in polling that the wallet apparently "+ - "never reported: %d %s. If you see this message repeatedly, it may indicate "+ - "an issue with the wallet.", dqBlock.height, dqBlock.hash) - btc.reportNewTip(ctx, dqBlock) - case <-ctx.Done(): return } @@ -2516,6 +2515,9 @@ func (btc *ExchangeWallet) reportNewTip(ctx context.Context, newTip *block) { defer btc.tipMtx.Unlock() prevTip := btc.currentTip + if prevTip.hash == newTip.hash { + return // already reported + } btc.currentTip = newTip btc.log.Debugf("tip change: %d (%s) => %d (%s)", prevTip.height, prevTip.hash, newTip.height, newTip.hash) go btc.tipChange(nil) diff --git a/client/asset/btc/btc_test.go b/client/asset/btc/btc_test.go index 67ab7a1477..e360ddacc0 100644 --- a/client/asset/btc/btc_test.go +++ b/client/asset/btc/btc_test.go @@ -133,10 +133,11 @@ type testData struct { signFunc func(*wire.MsgTx) signMsgFunc func([]json.RawMessage) (json.RawMessage, error) - blockchainMtx sync.RWMutex - verboseBlocks map[string]*msgBlockWithHeight - dbBlockForTx map[chainhash.Hash]*hashEntry - mainchain map[int64]*chainhash.Hash + blockchainMtx sync.RWMutex + verboseBlocks map[string]*msgBlockWithHeight + dbBlockForTx map[chainhash.Hash]*hashEntry + mainchain map[int64]*chainhash.Hash + getBlockchainInfo *getBlockchainInfoResult getBestBlockHashErr error mempoolTxs map[chainhash.Hash]*wire.MsgTx @@ -157,7 +158,6 @@ type testData struct { getTransaction *GetTransactionResult getTransactionErr error - getBlockchainInfo *getBlockchainInfoResult getBlockchainInfoErr error unlockErr error lockErr error @@ -412,6 +412,8 @@ func (c *tRawRequester) RawRequest(_ context.Context, method string, params []js case methodGetTransaction: return encodeOrError(c.getTransaction, c.getTransactionErr) case methodGetBlockchainInfo: + c.blockchainMtx.RLock() + defer c.blockchainMtx.RUnlock() return encodeOrError(c.getBlockchainInfo, c.getBlockchainInfoErr) case methodLock: return nil, c.lockErr @@ -630,9 +632,17 @@ func tNewWallet(segwit bool, walletType string) (*ExchangeWallet, *testData, fun hash: *bestHash, } wallet.tipMtx.Unlock() - go wallet.watchBlocks(walletCtx, blockTicker, walletBlockAllowance) - - return wallet, data, shutdown, nil + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + wallet.watchBlocks(walletCtx) + }() + shutdownAndWait := func() { + shutdown() + wg.Wait() + } + return wallet, data, shutdownAndWait, nil } func mustMarshal(thing interface{}) []byte { diff --git a/client/asset/btc/spv.go b/client/asset/btc/spv.go index a4bec3cfba..0a1ea52609 100644 --- a/client/asset/btc/spv.go +++ b/client/asset/btc/spv.go @@ -303,7 +303,7 @@ func loadSPVWallet(dbDir string, logger dex.Logger, connectPeers []string, chain checkpoints: make(map[outPoint]*scanCheckpoint), log: logger, connectPeers: connectPeers, - tipChan: make(chan *block, 1), + tipChan: make(chan *block, 8), } } @@ -925,10 +925,9 @@ func (w *spvWallet) connect(ctx context.Context, wg *sync.WaitGroup) error { notes := make(<-chan interface{}) if w.chainClient != nil { notes = w.chainClient.Notifications() - if err := w.chainClient.NotifyBlocks(); err != nil { + if err = w.chainClient.NotifyBlocks(); err != nil { return fmt.Errorf("failed to subscribe to block notifications: %w", err) } - } // Nanny for the caches checkpoints and txBlocks caches. diff --git a/client/asset/btc/spv_test.go b/client/asset/btc/spv_test.go index 9773cfed90..e99db8faac 100644 --- a/client/asset/btc/spv_test.go +++ b/client/asset/btc/spv_test.go @@ -190,6 +190,8 @@ func (c *tBtcWallet) Stop() {} func (c *tBtcWallet) WaitForShutdown() {} func (c *tBtcWallet) ChainSynced() bool { + c.blockchainMtx.RLock() + defer c.blockchainMtx.RUnlock() if c.getBlockchainInfo == nil { return false } @@ -294,6 +296,8 @@ func (c *tNeutrinoClient) BestBlock() (*headerfs.BlockStamp, error) { } func (c *tNeutrinoClient) Peers() []*neutrino.ServerPeer { + c.blockchainMtx.RLock() + defer c.blockchainMtx.RUnlock() peer := &neutrino.ServerPeer{Peer: &peer.Peer{}} if c.getBlockchainInfo != nil { peer.UpdateLastBlockHeight(int32(c.getBlockchainInfo.Headers)) @@ -737,6 +741,12 @@ func TestTryBlocksWithNotifier(t *testing.T) { return &block{tipHeight, *h} } + // Start with no blocks so that we're not synced. + node.getBlockchainInfo = &getBlockchainInfoResult{ + Headers: 2, + Blocks: 0, + } + addBlock() // It should not come through on the block tick, since it will be cached. @@ -744,9 +754,28 @@ func TestTryBlocksWithNotifier(t *testing.T) { t.Fatalf("got block that should've been cached") } - // But it will come through after the blockAllowance, printing a warning. + // And it won't come through after a sigle block allowance, because we're + // not synced. + if getNote(walletBlockAllowance * 2) { + t.Fatal("block didn't wait for the syncing mode allowance") + } + + // But it will come through after the sync timeout = 10 * normal timeout. + if !getNote(walletBlockAllowance * 9) { + t.Fatal("block didn't time out in syncing mode") + } + + // But if we're synced, it should come through after the normal block + // allowance. + addBlock() + node.blockchainMtx.Lock() + node.getBlockchainInfo = &getBlockchainInfoResult{ + Headers: tipHeight, + Blocks: tipHeight, + } + node.blockchainMtx.Unlock() if !getNote(walletBlockAllowance * 2) { - t.Fatal("block didn't time out") + t.Fatal("block didn't time out in normal mode") } // On the other hand, a wallet block should come through immediately. Not diff --git a/go.mod b/go.mod index 85a311faf2..ba4489cd12 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.16 require ( decred.org/dcrwallet/v2 v2.0.0-20210913145543-714c2f555f04 - github.com/btcsuite/btcd v0.22.0-beta.0.20210803133449-f5a1fb9965e4 + github.com/btcsuite/btcd v0.22.0-beta.0.20211026140004-31791ba4dc6e github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f github.com/btcsuite/btcutil v1.0.3-0.20210527170813-e2ba6805a890 // note: hoists btcd's own require of btcutil github.com/btcsuite/btcutil/psbt v1.0.3-0.20201208143702-a53e38424cce diff --git a/go.sum b/go.sum index a4b28568d2..b86a0ba2ba 100644 --- a/go.sum +++ b/go.sum @@ -71,8 +71,9 @@ github.com/btcsuite/btcd v0.0.0-20190824003749-130ea5bddde3/go.mod h1:3J08xEfcug github.com/btcsuite/btcd v0.20.1-beta/go.mod h1:wVuoA8VJLEcwgqHBwHmzLRazpKxTv13Px/pDuV7OomQ= github.com/btcsuite/btcd v0.21.0-beta.0.20201208033208-6bd4c64a54fa/go.mod h1:Sv4JPQ3/M+teHz9Bo5jBpkNcP0x6r7rdihlNL/7tTAs= github.com/btcsuite/btcd v0.21.0-beta.0.20210426180113-7eba688b65e5/go.mod h1:9n5ntfhhHQBIhUvlhDvD3Qg6fRUj4jkN0VB8L8svzOA= -github.com/btcsuite/btcd v0.22.0-beta.0.20210803133449-f5a1fb9965e4 h1:EmyLrldY44jDVa3dQ2iscj1S6ExuVJhRzCZBOXo93r0= github.com/btcsuite/btcd v0.22.0-beta.0.20210803133449-f5a1fb9965e4/go.mod h1:9n5ntfhhHQBIhUvlhDvD3Qg6fRUj4jkN0VB8L8svzOA= +github.com/btcsuite/btcd v0.22.0-beta.0.20211026140004-31791ba4dc6e h1:d0NkvbJGQThTkhypOdtf5Orxe2+dUHLB86pQ4EXwrTo= +github.com/btcsuite/btcd v0.22.0-beta.0.20211026140004-31791ba4dc6e/go.mod h1:3PH+KbvLFfzBTCevQenPiDedjGQGt6aa70dVjJDWGTA= github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f h1:bAs4lUbRJpnnkd9VhRV3jjAVU7DJVjMaK+IsvSeZvFo= github.com/btcsuite/btclog v0.0.0-20170628155309-84c8d2346e9f/go.mod h1:TdznJufoqS23FtqVCzL0ZqgP5MqXbb4fg/WgDys70nA= github.com/btcsuite/btcutil v0.0.0-20190425235716-9e5f4b9a998d/go.mod h1:+5NJ2+qvTyV9exUAL/rxXi3DcLg2Ts+ymUAY5y4NvMg= From 9c1d997b0803124383f198d6852e9deabe9991fd Mon Sep 17 00:00:00 2001 From: Brian Stafford Date: Thu, 28 Oct 2021 12:45:08 -0500 Subject: [PATCH 5/8] allow late wallet blocks through --- client/asset/btc/btc.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/client/asset/btc/btc.go b/client/asset/btc/btc.go index 61a19ae236..9243dce04a 100644 --- a/client/asset/btc/btc.go +++ b/client/asset/btc/btc.go @@ -2477,7 +2477,9 @@ func (btc *ExchangeWallet) watchBlocks(ctx context.Context) { // queued polled block that would appear to be superceded by this one. case walletTip := <-walletBlock: if queuedBlock != nil && walletTip.height >= queuedBlock.height { - queuedBlock.queue.Stop() + if !queuedBlock.queue.Stop() && walletTip.hash == queuedBlock.hash { + continue + } queuedBlock = nil } btc.reportNewTip(ctx, walletTip) @@ -2515,9 +2517,6 @@ func (btc *ExchangeWallet) reportNewTip(ctx context.Context, newTip *block) { defer btc.tipMtx.Unlock() prevTip := btc.currentTip - if prevTip.hash == newTip.hash { - return // already reported - } btc.currentTip = newTip btc.log.Debugf("tip change: %d (%s) => %d (%s)", prevTip.height, prevTip.hash, newTip.height, newTip.hash) go btc.tipChange(nil) From 7bebed5b3fc943f61733c0c685be7b4a3f01e8a6 Mon Sep 17 00:00:00 2001 From: Brian Stafford Date: Thu, 28 Oct 2021 15:45:41 -0500 Subject: [PATCH 6/8] use NotificationServer --- client/asset/btc/spv.go | 33 ++++++++++++++------------------- client/asset/btc/spv_test.go | 4 ++++ 2 files changed, 18 insertions(+), 19 deletions(-) diff --git a/client/asset/btc/spv.go b/client/asset/btc/spv.go index 0a1ea52609..06a139f08e 100644 --- a/client/asset/btc/spv.go +++ b/client/asset/btc/spv.go @@ -98,6 +98,7 @@ type btcWallet interface { walletTransaction(txHash *chainhash.Hash) (*wtxmgr.TxDetails, error) syncedTo() waddrmgr.BlockStamp signTransaction(*wire.MsgTx) error + txNotifications() wallet.TransactionNotificationsClient } var _ btcWallet = (*walletExtender)(nil) @@ -922,19 +923,14 @@ func (w *spvWallet) connect(ctx context.Context, wg *sync.WaitGroup) error { return err } - notes := make(<-chan interface{}) - if w.chainClient != nil { - notes = w.chainClient.Notifications() - if err = w.chainClient.NotifyBlocks(); err != nil { - return fmt.Errorf("failed to subscribe to block notifications: %w", err) - } - } + txNotes := w.wallet.txNotifications() // Nanny for the caches checkpoints and txBlocks caches. wg.Add(1) go func() { defer wg.Done() defer w.stop() + defer txNotes.Done() ticker := time.NewTicker(time.Minute * 20) defer ticker.Stop() @@ -958,25 +954,19 @@ func (w *spvWallet) connect(ctx context.Context, wg *sync.WaitGroup) error { } w.checkpointMtx.Unlock() - case noteI := <-notes: - switch note := noteI.(type) { - case chain.FilteredBlockConnected: - blk := note.Block - - // Don't broadcast tip changes until the syncStatus has been - // requested and we have a syncTarget OR during initial sync - // except every 10k blocks. + case note := <-txNotes.C: + if len(note.AttachedBlocks) > 0 { + lastBlock := note.AttachedBlocks[len(note.AttachedBlocks)-1] syncTarget := atomic.LoadInt32(&w.syncTarget) - if syncTarget == 0 || (blk.Height < syncTarget && blk.Height%10_000 != 0) { + if syncTarget == 0 || (lastBlock.Height < syncTarget && lastBlock.Height%10_000 != 0) { continue } select { case w.tipChan <- &block{ - hash: blk.Hash, - height: int64(blk.Height), + hash: *lastBlock.Hash, + height: int64(lastBlock.Height), }: - default: w.log.Warnf("tip report channel was blocking") } @@ -1681,6 +1671,11 @@ func (w *walletExtender) signTransaction(tx *wire.MsgTx) error { }) } +// txNotifications gives access to the NotificationServer's tx notifications. +func (w *walletExtender) txNotifications() wallet.TransactionNotificationsClient { + return w.NtfnServer.TransactionNotifications() +} + // secretSource is used to locate keys and redemption scripts while signing a // transaction. secretSource satisfies the txauthor.SecretsSource interface. type secretSource struct { diff --git a/client/asset/btc/spv_test.go b/client/asset/btc/spv_test.go index e99db8faac..714d74cd78 100644 --- a/client/asset/btc/spv_test.go +++ b/client/asset/btc/spv_test.go @@ -267,6 +267,10 @@ func (c *tBtcWallet) signTransaction(tx *wire.MsgTx) error { return nil } +func (c *tBtcWallet) txNotifications() wallet.TransactionNotificationsClient { + return wallet.TransactionNotificationsClient{} +} + type tNeutrinoClient struct { *testData } From 85696093d3a577ab08be26cbf94deb6e3d205abc Mon Sep 17 00:00:00 2001 From: Brian Stafford Date: Thu, 28 Oct 2021 16:38:49 -0500 Subject: [PATCH 7/8] send tip after first syncStatus request --- client/asset/btc/btc.go | 1 - client/asset/btc/btc_test.go | 2 +- client/asset/btc/spv.go | 7 ++++++- client/asset/btc/spv_test.go | 4 ++++ 4 files changed, 11 insertions(+), 3 deletions(-) diff --git a/client/asset/btc/btc.go b/client/asset/btc/btc.go index 9243dce04a..552adf283f 100644 --- a/client/asset/btc/btc.go +++ b/client/asset/btc/btc.go @@ -2456,7 +2456,6 @@ func (btc *ExchangeWallet) watchBlocks(ctx context.Context) { } blockAllowance := walletBlockAllowance syncStatus, err := btc.node.syncStatus() - if err != nil { btc.log.Errorf("Error retreiving sync status before queuing polled block: %v", err) } else if syncStatus.Syncing { diff --git a/client/asset/btc/btc_test.go b/client/asset/btc/btc_test.go index e360ddacc0..91faaf710e 100644 --- a/client/asset/btc/btc_test.go +++ b/client/asset/btc/btc_test.go @@ -605,7 +605,7 @@ func tNewWallet(segwit bool, walletType string) (*ExchangeWallet, *testData, fun chainParams: &chaincfg.MainNetParams, wallet: &tBtcWallet{data}, cl: neutrinoClient, - tipChan: make(chan *block), + tipChan: make(chan *block, 1), chainClient: nil, acctNum: 0, txBlocks: data.dbBlockForTx, diff --git a/client/asset/btc/spv.go b/client/asset/btc/spv.go index 06a139f08e..f7247c8d27 100644 --- a/client/asset/btc/spv.go +++ b/client/asset/btc/spv.go @@ -480,7 +480,12 @@ func (w *spvWallet) syncStatus() (*syncStatus, error) { synced = true } - atomic.StoreInt32(&w.syncTarget, target) + if atomic.SwapInt32(&w.syncTarget, target) == 0 && target > 0 { + w.tipChan <- &block{ + hash: blk.Hash, + height: int64(blk.Height), + } + } return &syncStatus{ Target: target, diff --git a/client/asset/btc/spv_test.go b/client/asset/btc/spv_test.go index 714d74cd78..dd69e4caab 100644 --- a/client/asset/btc/spv_test.go +++ b/client/asset/btc/spv_test.go @@ -9,6 +9,7 @@ package btc import ( "errors" "fmt" + "sync/atomic" "testing" "time" @@ -753,6 +754,9 @@ func TestTryBlocksWithNotifier(t *testing.T) { addBlock() + // Prime the sync target to avoid the syncStatus tip send here. + atomic.StoreInt32(&spv.syncTarget, 1) + // It should not come through on the block tick, since it will be cached. if getNote(blockTicker * 2) { t.Fatalf("got block that should've been cached") From a631acc3851a10682debdd0a0609081de33d1200 Mon Sep 17 00:00:00 2001 From: Brian Stafford Date: Sat, 30 Oct 2021 11:56:32 -0500 Subject: [PATCH 8/8] add transactions log --- client/asset/btc/spv.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/client/asset/btc/spv.go b/client/asset/btc/spv.go index f7247c8d27..f9d363f5e2 100644 --- a/client/asset/btc/spv.go +++ b/client/asset/btc/spv.go @@ -963,6 +963,13 @@ func (w *spvWallet) connect(ctx context.Context, wg *sync.WaitGroup) error { if len(note.AttachedBlocks) > 0 { lastBlock := note.AttachedBlocks[len(note.AttachedBlocks)-1] syncTarget := atomic.LoadInt32(&w.syncTarget) + + for ib := range note.AttachedBlocks { + for _, nt := range note.AttachedBlocks[ib].Transactions { + w.log.Debugf("Block %d contains wallet transaction %v", note.AttachedBlocks[ib].Height, nt.Hash) + } + } + if syncTarget == 0 || (lastBlock.Height < syncTarget && lastBlock.Height%10_000 != 0) { continue }