Skip to content

Commit

Permalink
mm: Individually Start Bots / Live Updates (#2738)
Browse files Browse the repository at this point in the history
* mm: Individually Start Bots / Live Updates

This diff adds the ability to start/stop bots individually and also to
perform live updates to bot settings and the amount of funds controlled
by bots.

All balance fields are removed from the BotConfig. These are now specified
when a bot is started.

Bots and exchange adaptors are now dex.Connectors. To perform an update,
the bot is first paused, the updates are made, and then the bot is resumed
again. A botConnectionMaster that wraps a dex.ConnectionMaster is added to
facilitate pausing and resuming of bots.

To perform a balance update, we need to know the exact amount that is
available in the wallets and on the CEX that is not currently reserved
by a running bot. To do this, we first check the available amounts
according to the wallet/cex, then we sync the state of all pending trades,
deposits, and withdrawals, and then we recheck the available amounts. If
the first check is the same as the last, we know nothing has changed and
we have the correct amounts, so we can proceed. In order for this to work
properly, the `WalletTransaction` function of wallets must return
`Confirmed == true` if and only if the any incoming funds from that
transaction are part of the available balance.

The priceOracle is also refactored. Instead of having a “synced”
priceOracle used for markets on which a bot is running, and an “unsynced”
one for any other markets, there is now only one priceOracle.
`startAutoSyncingMarket` and `stopAutoSyncingMarket` are called whenever
bots are started / stopped.

A testing program is added that tracks the available balances in
wallets/cexes that are unused by any bots. This amount should not change
unless the user start, stops, or updates a bot. If there are any unexpected
changes, this means there is a bug in the balance tracking.
  • Loading branch information
martonp committed May 16, 2024
1 parent 2541717 commit d0c76e9
Show file tree
Hide file tree
Showing 48 changed files with 4,606 additions and 3,413 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ client/asset/eth/cmd/getgas/getgas
client/asset/eth/cmd/deploy/deploy
client/cmd/dexc-desktop/pkg/installers
client/cmd/testbinance/testbinance
client/cmd/mmbaltracker/mmbaltracker
server/noderelay/cmd/sourcenode/sourcenode
tatanka/cmd/demo/demo
server/cmd/validatemarkets
Expand Down
6 changes: 5 additions & 1 deletion client/asset/dcr/dcr.go
Original file line number Diff line number Diff line change
Expand Up @@ -1816,7 +1816,7 @@ func (dcr *ExchangeWallet) SingleLotRedeemFees(_ uint32, feeSuggestion uint64) (
return 0, err
}

dcr.log.Infof("SingleLotRedeemFees: worst case = %d, feeSuggestion = %d", preRedeem.Estimate.RealisticWorstCase, feeSuggestion)
dcr.log.Tracef("SingleLotRedeemFees: worst case = %d, feeSuggestion = %d", preRedeem.Estimate.RealisticWorstCase, feeSuggestion)

return preRedeem.Estimate.RealisticWorstCase, nil
}
Expand Down Expand Up @@ -6421,6 +6421,10 @@ func (dcr *ExchangeWallet) WalletTransaction(ctx context.Context, txID string) (
return nil, asset.CoinNotFoundError
}

// If the wallet knows about the transaction, it will be part of the
// available balance, so we always return Confirmed = true.
txs[0].Confirmed = true

return txs[0], nil
}

Expand Down
17 changes: 14 additions & 3 deletions client/asset/eth/eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -4837,7 +4837,8 @@ func (w *ETHWallet) getReceivingTransaction(ctx context.Context, txID string) (*
// WalletTransaction returns a transaction that either the wallet has made or
// one in which the wallet has received funds.
func (w *ETHWallet) WalletTransaction(ctx context.Context, txID string) (*asset.WalletTransaction, error) {
txID = common.HexToHash(txID).String()
txHash := common.HexToHash(txID)
txID = txHash.String()
txs, err := w.TxHistory(1, &txID, false)
if errors.Is(err, asset.CoinNotFoundError) {
return w.getReceivingTransaction(ctx, txID)
Expand All @@ -4849,7 +4850,12 @@ func (w *ETHWallet) WalletTransaction(ctx context.Context, txID string) (*asset.
return nil, asset.CoinNotFoundError
}

return txs[0], err
tx := txs[0]
if tx.BlockNumber > 0 {
tx.Confirmed = true
}

return tx, nil
}

func (w *TokenWallet) getReceivingTransaction(ctx context.Context, txID string) (*asset.WalletTransaction, error) {
Expand Down Expand Up @@ -4908,7 +4914,12 @@ func (w *TokenWallet) WalletTransaction(ctx context.Context, txID string) (*asse
return nil, asset.CoinNotFoundError
}

return txs[0], err
tx := txs[0]
if tx.BlockNumber > 0 {
tx.Confirmed = true
}

return tx, nil
}

// providersFile reads a file located at ~/dextest/credentials.json.
Expand Down
2 changes: 1 addition & 1 deletion client/cmd/dexc-desktop/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ func mainCore() error {
if cfg.Experimental {
// TODO: on shutdown, stop market making and wait for trades to be
// canceled.
marketMaker, err = mm.NewMarketMaker(appCtx, clientCore, cfg.MMConfig.EventLogDBPath, cfg.BotConfigPath, logMaker.Logger("MM"))
marketMaker, err = mm.NewMarketMaker(clientCore, cfg.MMConfig.EventLogDBPath, cfg.BotConfigPath, logMaker.Logger("MM"))
if err != nil {
return fmt.Errorf("error creating market maker: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion client/cmd/dexc-desktop/app_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ func mainCore() error {
if cfg.Experimental {
// TODO: on shutdown, stop market making and wait for trades to be
// canceled.
marketMaker, err = mm.NewMarketMaker(appCtx, clientCore, cfg.MMConfig.EventLogDBPath, cfg.MMConfig.BotConfigPath, logMaker.Logger("MM"))
marketMaker, err = mm.NewMarketMaker(clientCore, cfg.MMConfig.EventLogDBPath, cfg.MMConfig.BotConfigPath, logMaker.Logger("MM"))
if err != nil {
return fmt.Errorf("error creating market maker: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion client/cmd/dexc/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ func runCore(cfg *app.Config) error {
if cfg.Experimental {
// TODO: on shutdown, stop market making and wait for trades to be
// canceled.
marketMaker, err = mm.NewMarketMaker(appCtx, clientCore, cfg.MMConfig.EventLogDBPath, cfg.MMConfig.BotConfigPath, logMaker.Logger("MM"))
marketMaker, err = mm.NewMarketMaker(clientCore, cfg.MMConfig.EventLogDBPath, cfg.MMConfig.BotConfigPath, logMaker.Logger("MM"))
if err != nil {
return fmt.Errorf("error creating market maker: %w", err)
}
Expand Down
1 change: 1 addition & 0 deletions client/cmd/dexcctl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ var promptPasswords = map[string][]string{
"startmarketmaking": {"App password:"},
"multitrade": {"App password:"},
"purchasetickets": {"App password:"},
"startmmbot": {"App password:"},
}

// optionalTextFiles is a map of routes to arg index for routes that should read
Expand Down
194 changes: 194 additions & 0 deletions client/cmd/mmbaltracker/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
package main

/*
* Starts a process that repeatedly calls the mmavailablebalances command to
* to check for changes in the available balances for market making on the
* specified markets. Whenever there is a diff, it is logged. This is used
* to check for bugs in the balance tracking logic. If there is a diff without
* a bot being started, stopped, or updated, and the wallet is not handling
* bonds, then there is a bug.
*/

import (
"context"
"encoding/json"
"fmt"
"os"
"os/exec"
"os/signal"
"strconv"
"strings"
"time"

"decred.org/dcrdex/client/mm"
"decred.org/dcrdex/dex"
)

var (
log = dex.StdOutLogger("BALTRACKER", dex.LevelDebug)
)

func printUsage() {
fmt.Println("Usage: mmbaltracker <configpath> <market>")
fmt.Println(" <configpath> is the path to the market making configuration file.")
fmt.Println(" <market> is a market in the form <host>-<baseassetid>-<quoteassetid>. You can specify multiple markets.")
}

func parseMkt(mkt string) (*mm.MarketWithHost, error) {
parts := strings.Split(mkt, "-")
if len(parts) != 3 {
return nil, fmt.Errorf("invalid market format")
}

host := parts[0]
baseID, err := strconv.Atoi(parts[1])
if err != nil {
return nil, fmt.Errorf("invalid base asset ID")
}

quoteID, err := strconv.Atoi(parts[2])
if err != nil {
return nil, fmt.Errorf("invalid quote asset ID")
}

return &mm.MarketWithHost{
Host: host,
BaseID: uint32(baseID),
QuoteID: uint32(quoteID),
}, nil
}

type balances struct {
DEXBalances map[uint32]uint64 `json:"dexBalances"`
CEXBalances map[uint32]uint64 `json:"cexBalances"`
}

func getAvailableBalances(mkt *mm.MarketWithHost, configPath string) (bals *balances, err error) {
cmd := exec.Command("dexcctl", "mmavailablebalances", configPath,
mkt.Host, strconv.Itoa(int(mkt.BaseID)), strconv.Itoa(int(mkt.QuoteID)))
out, err := cmd.Output()
if err != nil {
return nil, fmt.Errorf("error getting available balances: %v", err)
}

bals = new(balances)
err = json.Unmarshal(out, bals)
if err != nil {
return nil, fmt.Errorf("error unmarshalling available balances: %v", err)
}

return bals, nil
}

func main() {
if len(os.Args) < 3 {
printUsage()
os.Exit(1)
}

configPath := os.Args[1]

currBalances := make(map[mm.MarketWithHost]*balances, len(os.Args)-2)
for i := 2; i < len(os.Args); i++ {
mkt, err := parseMkt(os.Args[i])
if err != nil {
log.Errorf("Error parsing market: %v\n", err)
os.Exit(1)
}

currBalances[*mkt], err = getAvailableBalances(mkt, configPath)
if err != nil {
log.Errorf("Error getting initial balances: %v\n", err)
os.Exit(1)
}
}

log.Infof("Initial Balances:")
for mkt, bals := range currBalances {
log.Infof("Market: %s-%d-%d", mkt.Host, mkt.BaseID, mkt.QuoteID)
log.Infof(" DEX Balances:")
for assetID, bal := range bals.DEXBalances {
log.Infof(" %d: %d", assetID, bal)
}
log.Infof(" CEX Balances:")
for assetID, bal := range bals.CEXBalances {
log.Infof(" %d: %d", assetID, bal)
}
}

type diff struct {
assetID uint32
oldBal uint64
newBal uint64
}

checkForDiffs := func(mkt *mm.MarketWithHost) {
newBals, err := getAvailableBalances(mkt, configPath)
if err != nil {
log.Errorf("Error getting balances: %v\n", err)
return
}

dexDiffs := make([]*diff, 0)
cexDiffs := make([]*diff, 0)

for assetID, newBal := range newBals.DEXBalances {
oldBal := currBalances[*mkt].DEXBalances[assetID]
if oldBal != newBal {
dexDiffs = append(dexDiffs, &diff{assetID, oldBal, newBal})
}
currBalances[*mkt].DEXBalances[assetID] = newBal
}
for assetID, newBal := range newBals.CEXBalances {
oldBal := currBalances[*mkt].CEXBalances[assetID]
if oldBal != newBal {
cexDiffs = append(cexDiffs, &diff{assetID, oldBal, newBal})
}
currBalances[*mkt].CEXBalances[assetID] = newBal
}

logStr := ""

if len(dexDiffs) > 0 || len(cexDiffs) > 0 {
logStr += "================================================\n"
logStr += fmt.Sprintf("\nDiffs on Market: %s-%d-%d", mkt.Host, mkt.BaseID, mkt.QuoteID)
if len(dexDiffs) > 0 {
logStr += "\n DEX diffs:"
for _, d := range dexDiffs {
logStr += fmt.Sprintf("\n %s: %d -> %d (%d)", dex.BipIDSymbol(d.assetID), d.oldBal, d.newBal, int64(d.newBal)-int64(d.oldBal))
}
}

if len(cexDiffs) > 0 {
logStr += "\n CEX diffs:"
for _, d := range cexDiffs {
logStr += fmt.Sprintf("\n %s: %d -> %d (%d)", dex.BipIDSymbol(d.assetID), d.oldBal, d.newBal, int64(d.newBal)-int64(d.oldBal))
}
}
logStr += "\n\n"
log.Infof(logStr)
}
}

ctx, cancel := context.WithCancel(context.Background())

sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt)
go func() {
<-sigChan
cancel()
}()

timer := time.NewTicker(time.Second * 2)
for {
select {
case <-timer.C:
for mkt := range currBalances {
checkForDiffs(&mkt)
}
case <-ctx.Done():
log.Infof("Exiting...")
os.Exit(0)
}
}
}
8 changes: 4 additions & 4 deletions client/cmd/testbinance/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -847,10 +847,10 @@ func (f *fakeBinance) handleGetOrder(w http.ResponseWriter, r *http.Request) {
Symbol: ord.slug,
// OrderID: ,
ClientOrderID: tradeID,
Price: strconv.FormatFloat(ord.rate, 'f', 9, 64),
OrigQty: strconv.FormatFloat(ord.qty, 'f', 9, 64),
ExecutedQty: "0",
CumulativeQuoteQty: "0",
Price: ord.rate,
OrigQty: ord.qty,
ExecutedQty: 0,
CumulativeQuoteQty: 0,
Status: status,
TimeInForce: "GTC",
}
Expand Down

0 comments on commit d0c76e9

Please sign in to comment.