From c91be22ee76ebfa779e7ccc64a0ec8a34c4d90df Mon Sep 17 00:00:00 2001 From: jayt106 Date: Thu, 6 May 2021 14:58:40 -0400 Subject: [PATCH] Add node test with tx-index settings --- node/node.go | 17 ++++++--- node/node_test.go | 55 +++++++++++++++++++++++++++ state/indexer/indexer_service.go | 13 ++++++- state/indexer/indexer_service_test.go | 2 +- 4 files changed, 79 insertions(+), 8 deletions(-) diff --git a/node/node.go b/node/node.go index a1c792431f00..018b127eac16 100644 --- a/node/node.go +++ b/node/node.go @@ -3,6 +3,7 @@ package node import ( "bytes" "context" + "database/sql" "errors" "fmt" "math" @@ -18,6 +19,7 @@ import ( "github.com/rs/cors" dbm "github.com/tendermint/tm-db" + _ "github.com/lib/pq" // provide the psql db driver abci "github.com/tendermint/tendermint/abci/types" bcv0 "github.com/tendermint/tendermint/blockchain/v0" bcv2 "github.com/tendermint/tendermint/blockchain/v2" @@ -269,33 +271,38 @@ func createAndStartIndexerService( ) (*indexer.Service, []indexer.EventSink, error) { eventSinks := []indexer.EventSink{} - + var sqlDB *sql.DB +loop: for _, db := range config.TxIndex.Indexer { switch strings.ToLower(db) { case string(indexer.NULL): // when we see null in the config, the eventsinks will be reset with the nullEventSink. eventSinks = append([]indexer.EventSink{}, nullSink.NewNullEventSink()) + break loop case string(indexer.KV): store, err := dbProvider(&DBContext{"tx_index", config}) if err != nil { return nil, nil, err } - eventSinks = append(eventSinks, kvSink.NewKVEventSink(store)) case string(indexer.PSQL): conn := config.TxIndex.PsqlConn - es, _, err := psqlsink.NewPSQLEventSink(conn) + if conn == "" { + return nil, nil, errors.New("the psql connection settings cannot be empty") + } + + es, db, err := psqlsink.NewPSQLEventSink(conn) if err != nil { return nil, nil, err } - eventSinks = append(eventSinks, es) + sqlDB = db default: return nil, nil, errors.New("unsupported event sink type") } } - indexerService := indexer.NewIndexerService(eventSinks, eventBus) + indexerService := indexer.NewIndexerService(eventSinks, eventBus, sqlDB) indexerService.SetLogger(logger.With("module", "txindex")) if err := indexerService.Start(); err != nil { diff --git a/node/node_test.go b/node/node_test.go index 43bba009927a..ea40d358bb53 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -2,6 +2,7 @@ package node import ( "context" + "errors" "fmt" "math" "net" @@ -29,6 +30,7 @@ import ( "github.com/tendermint/tendermint/privval" "github.com/tendermint/tendermint/proxy" sm "github.com/tendermint/tendermint/state" + "github.com/tendermint/tendermint/state/indexer" "github.com/tendermint/tendermint/store" "github.com/tendermint/tendermint/types" tmtime "github.com/tendermint/tendermint/types/time" @@ -527,6 +529,59 @@ func TestNodeNewSeedNode(t *testing.T) { assert.True(t, n.pexReactor.IsRunning()) } +func TestNodeSetEventSink(t *testing.T) { + config := cfg.ResetTestRoot("node_app_version_test") + defer os.RemoveAll(config.RootDir) + + // create & start node + n, err := DefaultNewNode(config, log.TestingLogger()) + require.NoError(t, err) + + assert.Equal(t, 1, len(n.eventSinks)) + assert.Equal(t, indexer.KV, n.eventSinks[0].Type()) + + config.TxIndex.Indexer = []string{"null"} + n, err = DefaultNewNode(config, log.TestingLogger()) + require.NoError(t, err) + + assert.Equal(t, 1, len(n.eventSinks)) + assert.Equal(t, indexer.NULL, n.eventSinks[0].Type()) + + config.TxIndex.Indexer = []string{"null", "kv"} + n, err = DefaultNewNode(config, log.TestingLogger()) + require.NoError(t, err) + + assert.Equal(t, 1, len(n.eventSinks)) + assert.Equal(t, indexer.NULL, n.eventSinks[0].Type()) + + config.TxIndex.Indexer = []string{"kvv"} + n, err = DefaultNewNode(config, log.TestingLogger()) + assert.Nil(t, n) + assert.Equal(t, errors.New("unsupported event sink type"), err) + + config.TxIndex.Indexer = []string{"psql"} + n, err = DefaultNewNode(config, log.TestingLogger()) + assert.Nil(t, n) + assert.Equal(t, errors.New("the psql connection settings cannot be empty"), err) + + config.TxIndex.Indexer = []string{"psql"} + config.TxIndex.PsqlConn = "test" + n, err = DefaultNewNode(config, log.TestingLogger()) + require.NoError(t, err) + assert.Equal(t, 1, len(n.eventSinks)) + assert.Equal(t, indexer.PSQL, n.eventSinks[0].Type()) + n.OnStop() + + config.TxIndex.Indexer = []string{"psql", "kv"} + config.TxIndex.PsqlConn = "test" + n, err = DefaultNewNode(config, log.TestingLogger()) + require.NoError(t, err) + assert.Equal(t, 2, len(n.eventSinks)) + assert.Equal(t, indexer.PSQL, n.eventSinks[0].Type()) + assert.Equal(t, indexer.KV, n.eventSinks[1].Type()) + n.OnStop() +} + func state(nVals int, height int64) (sm.State, dbm.DB, []types.PrivValidator) { privVals := make([]types.PrivValidator, nVals) vals := make([]types.GenesisValidator, nVals) diff --git a/state/indexer/indexer_service.go b/state/indexer/indexer_service.go index dd8ef5a54e2c..2c69860416c4 100644 --- a/state/indexer/indexer_service.go +++ b/state/indexer/indexer_service.go @@ -2,6 +2,7 @@ package indexer import ( "context" + "database/sql" "github.com/tendermint/tendermint/libs/service" "github.com/tendermint/tendermint/types" @@ -20,15 +21,17 @@ type Service struct { eventSinks []EventSink eventBus *types.EventBus + sqlDB *sql.DB } // NewIndexerService returns a new service instance. func NewIndexerService( es []EventSink, eventBus *types.EventBus, + sqlDB *sql.DB, ) *Service { - is := &Service{eventSinks: es, eventBus: eventBus} + is := &Service{eventSinks: es, eventBus: eventBus, sqlDB: sqlDB} is.BaseService = *service.NewBaseService(nil, "IndexerService", is) return is } @@ -92,9 +95,15 @@ func (is *Service) OnStart() error { return nil } -// OnStop implements service.Service by unsubscribing from all transactions. +// OnStop implements service.Service by unsubscribing from all transactions and +// close the sqlDB if the service is using the psqlEventSink func (is *Service) OnStop() { if is.eventBus.IsRunning() { _ = is.eventBus.UnsubscribeAll(context.Background(), subscriber) } + + if is.sqlDB != nil { + err := is.sqlDB.Close() + is.Logger.Error("failed to close the sqlDB", "err", err) + } } diff --git a/state/indexer/indexer_service_test.go b/state/indexer/indexer_service_test.go index afee7edf5b9d..6cf941962092 100644 --- a/state/indexer/indexer_service_test.go +++ b/state/indexer/indexer_service_test.go @@ -122,7 +122,7 @@ func TestIndexerServiceIndexesBlocks(t *testing.T) { store := db.NewMemDB() eventSinks := []indexer.EventSink{kvsink.NewKVEventSink(store), pSink} - service := indexer.NewIndexerService(eventSinks, eventBus) + service := indexer.NewIndexerService(eventSinks, eventBus, nil) service.SetLogger(tmlog.TestingLogger()) err = service.Start() require.NoError(t, err)