Skip to content

Commit

Permalink
Add node test with tx-index settings
Browse files Browse the repository at this point in the history
  • Loading branch information
JayT106 committed May 6, 2021
1 parent 5aa0fb4 commit c91be22
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 8 deletions.
17 changes: 12 additions & 5 deletions node/node.go
Expand Up @@ -3,6 +3,7 @@ package node
import (
"bytes"
"context"
"database/sql"
"errors"
"fmt"
"math"
Expand All @@ -18,6 +19,7 @@ import (
"github.com/rs/cors"
dbm "github.com/tendermint/tm-db"

_ "github.com/lib/pq" // provide the psql db driver
abci "github.com/tendermint/tendermint/abci/types"
bcv0 "github.com/tendermint/tendermint/blockchain/v0"
bcv2 "github.com/tendermint/tendermint/blockchain/v2"
Expand Down Expand Up @@ -269,33 +271,38 @@ func createAndStartIndexerService(
) (*indexer.Service, []indexer.EventSink, error) {

eventSinks := []indexer.EventSink{}

var sqlDB *sql.DB
loop:
for _, db := range config.TxIndex.Indexer {
switch strings.ToLower(db) {
case string(indexer.NULL):
// when we see null in the config, the eventsinks will be reset with the nullEventSink.
eventSinks = append([]indexer.EventSink{}, nullSink.NewNullEventSink())
break loop
case string(indexer.KV):
store, err := dbProvider(&DBContext{"tx_index", config})
if err != nil {
return nil, nil, err
}

eventSinks = append(eventSinks, kvSink.NewKVEventSink(store))
case string(indexer.PSQL):
conn := config.TxIndex.PsqlConn
es, _, err := psqlsink.NewPSQLEventSink(conn)
if conn == "" {
return nil, nil, errors.New("the psql connection settings cannot be empty")
}

es, db, err := psqlsink.NewPSQLEventSink(conn)
if err != nil {
return nil, nil, err
}

eventSinks = append(eventSinks, es)
sqlDB = db
default:
return nil, nil, errors.New("unsupported event sink type")
}
}

indexerService := indexer.NewIndexerService(eventSinks, eventBus)
indexerService := indexer.NewIndexerService(eventSinks, eventBus, sqlDB)
indexerService.SetLogger(logger.With("module", "txindex"))

if err := indexerService.Start(); err != nil {
Expand Down
55 changes: 55 additions & 0 deletions node/node_test.go
Expand Up @@ -2,6 +2,7 @@ package node

import (
"context"
"errors"
"fmt"
"math"
"net"
Expand Down Expand Up @@ -29,6 +30,7 @@ import (
"github.com/tendermint/tendermint/privval"
"github.com/tendermint/tendermint/proxy"
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/state/indexer"
"github.com/tendermint/tendermint/store"
"github.com/tendermint/tendermint/types"
tmtime "github.com/tendermint/tendermint/types/time"
Expand Down Expand Up @@ -527,6 +529,59 @@ func TestNodeNewSeedNode(t *testing.T) {
assert.True(t, n.pexReactor.IsRunning())
}

func TestNodeSetEventSink(t *testing.T) {
config := cfg.ResetTestRoot("node_app_version_test")
defer os.RemoveAll(config.RootDir)

// create & start node
n, err := DefaultNewNode(config, log.TestingLogger())
require.NoError(t, err)

assert.Equal(t, 1, len(n.eventSinks))
assert.Equal(t, indexer.KV, n.eventSinks[0].Type())

config.TxIndex.Indexer = []string{"null"}
n, err = DefaultNewNode(config, log.TestingLogger())
require.NoError(t, err)

assert.Equal(t, 1, len(n.eventSinks))
assert.Equal(t, indexer.NULL, n.eventSinks[0].Type())

config.TxIndex.Indexer = []string{"null", "kv"}
n, err = DefaultNewNode(config, log.TestingLogger())
require.NoError(t, err)

assert.Equal(t, 1, len(n.eventSinks))
assert.Equal(t, indexer.NULL, n.eventSinks[0].Type())

config.TxIndex.Indexer = []string{"kvv"}
n, err = DefaultNewNode(config, log.TestingLogger())
assert.Nil(t, n)
assert.Equal(t, errors.New("unsupported event sink type"), err)

config.TxIndex.Indexer = []string{"psql"}
n, err = DefaultNewNode(config, log.TestingLogger())
assert.Nil(t, n)
assert.Equal(t, errors.New("the psql connection settings cannot be empty"), err)

config.TxIndex.Indexer = []string{"psql"}
config.TxIndex.PsqlConn = "test"
n, err = DefaultNewNode(config, log.TestingLogger())
require.NoError(t, err)
assert.Equal(t, 1, len(n.eventSinks))
assert.Equal(t, indexer.PSQL, n.eventSinks[0].Type())
n.OnStop()

config.TxIndex.Indexer = []string{"psql", "kv"}
config.TxIndex.PsqlConn = "test"
n, err = DefaultNewNode(config, log.TestingLogger())
require.NoError(t, err)
assert.Equal(t, 2, len(n.eventSinks))
assert.Equal(t, indexer.PSQL, n.eventSinks[0].Type())
assert.Equal(t, indexer.KV, n.eventSinks[1].Type())
n.OnStop()
}

func state(nVals int, height int64) (sm.State, dbm.DB, []types.PrivValidator) {
privVals := make([]types.PrivValidator, nVals)
vals := make([]types.GenesisValidator, nVals)
Expand Down
13 changes: 11 additions & 2 deletions state/indexer/indexer_service.go
Expand Up @@ -2,6 +2,7 @@ package indexer

import (
"context"
"database/sql"

"github.com/tendermint/tendermint/libs/service"
"github.com/tendermint/tendermint/types"
Expand All @@ -20,15 +21,17 @@ type Service struct {

eventSinks []EventSink
eventBus *types.EventBus
sqlDB *sql.DB
}

// NewIndexerService returns a new service instance.
func NewIndexerService(
es []EventSink,
eventBus *types.EventBus,
sqlDB *sql.DB,
) *Service {

is := &Service{eventSinks: es, eventBus: eventBus}
is := &Service{eventSinks: es, eventBus: eventBus, sqlDB: sqlDB}
is.BaseService = *service.NewBaseService(nil, "IndexerService", is)
return is
}
Expand Down Expand Up @@ -92,9 +95,15 @@ func (is *Service) OnStart() error {
return nil
}

// OnStop implements service.Service by unsubscribing from all transactions.
// OnStop implements service.Service by unsubscribing from all transactions and
// close the sqlDB if the service is using the psqlEventSink
func (is *Service) OnStop() {
if is.eventBus.IsRunning() {
_ = is.eventBus.UnsubscribeAll(context.Background(), subscriber)
}

if is.sqlDB != nil {
err := is.sqlDB.Close()
is.Logger.Error("failed to close the sqlDB", "err", err)
}
}
2 changes: 1 addition & 1 deletion state/indexer/indexer_service_test.go
Expand Up @@ -122,7 +122,7 @@ func TestIndexerServiceIndexesBlocks(t *testing.T) {
store := db.NewMemDB()
eventSinks := []indexer.EventSink{kvsink.NewKVEventSink(store), pSink}

service := indexer.NewIndexerService(eventSinks, eventBus)
service := indexer.NewIndexerService(eventSinks, eventBus, nil)
service.SetLogger(tmlog.TestingLogger())
err = service.Start()
require.NoError(t, err)
Expand Down

0 comments on commit c91be22

Please sign in to comment.