From aeeec0b39625be045f1772b682565c53a7051735 Mon Sep 17 00:00:00 2001 From: Aleksandr Bezobchuk Date: Mon, 28 Nov 2022 10:43:49 -0500 Subject: [PATCH 1/2] refactor: cleanup store/streaming/constructor.go #14044 (cherry picked from commit c6189bb630772c1e76aaf3d400bdd1602dc1a148) # Conflicts: # store/streaming/constructor.go # types/utils.go --- store/streaming/constructor.go | 65 +++++++++++++++++++++++----------- types/utils.go | 28 +++++++++++++++ 2 files changed, 73 insertions(+), 20 deletions(-) diff --git a/store/streaming/constructor.go b/store/streaming/constructor.go index 20cd9fe1ff8b..a8ebcc57a832 100644 --- a/store/streaming/constructor.go +++ b/store/streaming/constructor.go @@ -13,6 +13,7 @@ import ( serverTypes "github.com/cosmos/cosmos-sdk/server/types" "github.com/cosmos/cosmos-sdk/store/streaming/file" "github.com/cosmos/cosmos-sdk/store/types" + sdk "github.com/cosmos/cosmos-sdk/types" "github.com/spf13/cast" ) @@ -26,11 +27,11 @@ type ServiceType int const ( Unknown ServiceType = iota File - // add more in the future ) // Streaming option keys const ( +<<<<<<< HEAD OptStreamersFilePrefix = "streamers.file.prefix" OptStreamersFileWriteDir = "streamers.file.write_dir" OptStreamersFileOutputMetadata = "streamers.file.output-metadata" @@ -38,6 +39,11 @@ const ( OptStreamersFileFsync = "streamers.file.fsync" OptStoreStreamers = "store.streamers" +======= + OptStreamersFilePrefix = "streamers.file.prefix" + OptStreamersFileWriteDir = "streamers.file.write_dir" + OptStoreStreamers = "store.streamers" +>>>>>>> c6189bb63 (refactor: cleanup store/streaming/constructor.go #14044) ) // ServiceTypeFromString returns the streaming.ServiceType corresponding to the @@ -46,6 +52,7 @@ func ServiceTypeFromString(name string) ServiceType { switch strings.ToLower(name) { case "file", "f": return File + default: return Unknown } @@ -56,25 +63,30 @@ func (sst ServiceType) String() string { switch sst { case File: return "file" + default: return "unknown" } } -// ServiceConstructorLookupTable is a mapping of streaming.ServiceTypes to streaming.ServiceConstructors +// ServiceConstructorLookupTable is a mapping of streaming.ServiceTypes to +// streaming.ServiceConstructors types. var ServiceConstructorLookupTable = map[ServiceType]ServiceConstructor{ File: NewFileStreamingService, } -// NewServiceConstructor returns the streaming.ServiceConstructor corresponding to the provided name +// NewServiceConstructor returns the streaming.ServiceConstructor corresponding +// to the provided name. func NewServiceConstructor(name string) (ServiceConstructor, error) { ssType := ServiceTypeFromString(name) if ssType == Unknown { return nil, fmt.Errorf("unrecognized streaming service name %s", name) } + if constructor, ok := ServiceConstructorLookupTable[ssType]; ok && constructor != nil { return constructor, nil } + return nil, fmt.Errorf("streaming service constructor of type %s not found", ssType.String()) } @@ -85,6 +97,7 @@ func NewFileStreamingService( keys []types.StoreKey, marshaller codec.BinaryCodec, ) (baseapp.StreamingService, error) { +<<<<<<< HEAD homePath := cast.ToString(opts.Get(flags.FlagHome)) filePrefix := cast.ToString(opts.Get(OptStreamersFilePrefix)) fileDir := cast.ToString(opts.Get(OptStreamersFileWriteDir)) @@ -105,6 +118,12 @@ func NewFileStreamingService( } return file.NewStreamingService(fileDir, filePrefix, keys, marshaller, outputMetadata, stopNodeOnErr, fsync) +======= + filePrefix := cast.ToString(opts.Get(OptStreamersFilePrefix)) + fileDir := cast.ToString(opts.Get(OptStreamersFileWriteDir)) + + return file.NewStreamingService(fileDir, filePrefix, keys, marshaller) +>>>>>>> c6189bb63 (refactor: cleanup store/streaming/constructor.go #14044) } // LoadStreamingServices is a function for loading StreamingServices onto the @@ -119,14 +138,19 @@ func LoadStreamingServices( ) ([]baseapp.StreamingService, *sync.WaitGroup, error) { // waitgroup and quit channel for optional shutdown coordination of the streaming service(s) wg := new(sync.WaitGroup) + // configure state listening capabilities using AppOptions - streamers := cast.ToStringSlice(appOpts.Get("store.streamers")) + streamers := cast.ToStringSlice(appOpts.Get(OptStoreStreamers)) activeStreamers := make([]baseapp.StreamingService, 0, len(streamers)) + for _, streamerName := range streamers { + var exposeStoreKeys []types.StoreKey + // get the store keys allowed to be exposed for this streaming service exposeKeyStrs := cast.ToStringSlice(appOpts.Get(fmt.Sprintf("streamers.%s.keys", streamerName))) - var exposeStoreKeys []types.StoreKey - if exposeAll(exposeKeyStrs) { // if list contains `*`, expose all StoreKeys + + // if list contains '*', expose all store keys + if sdk.SliceContains(exposeKeyStrs, "*") { exposeStoreKeys = make([]types.StoreKey, 0, len(keys)) for _, storeKey := range keys { exposeStoreKeys = append(exposeStoreKeys, storeKey) @@ -139,16 +163,19 @@ func LoadStreamingServices( } } } - if len(exposeStoreKeys) == 0 { // short circuit if we are not exposing anything + + if len(exposeStoreKeys) == 0 { continue } - // get the constructor for this streamer name + constructor, err := NewServiceConstructor(streamerName) if err != nil { - // close any services we may have already spun up before hitting the error on this one + // Close any services we may have already spun up before hitting the error + // on this one. for _, activeStreamer := range activeStreamers { activeStreamer.Close() } + return nil, nil, err } @@ -156,28 +183,26 @@ func LoadStreamingServices( // StoreKeys we want to expose. streamingService, err := constructor(appOpts, exposeStoreKeys, appCodec) if err != nil { - // close any services we may have already spun up before hitting the error on this one + // Close any services we may have already spun up before hitting the error + // on this one. for _, activeStreamer := range activeStreamers { activeStreamer.Close() } + return nil, nil, err } + // register the streaming service with the BaseApp bApp.SetStreamingService(streamingService) + // kick off the background streaming service loop streamingService.Stream(wg) + // add to the list of active streamers activeStreamers = append(activeStreamers, streamingService) } - // if there are no active streamers, activeStreamers is empty (len == 0) and the waitGroup is not waiting on anything - return activeStreamers, wg, nil -} -func exposeAll(list []string) bool { - for _, ele := range list { - if ele == "*" { - return true - } - } - return false + // If there are no active streamers, activeStreamers is empty (len == 0) and + // the waitGroup is not waiting on anything. + return activeStreamers, wg, nil } diff --git a/types/utils.go b/types/utils.go index 50074f2b052d..6182b482ddaa 100644 --- a/types/utils.go +++ b/types/utils.go @@ -6,8 +6,14 @@ import ( "fmt" "time" +<<<<<<< HEAD "github.com/cosmos/cosmos-sdk/types/kv" dbm "github.com/tendermint/tm-db" +======= + log "github.com/tendermint/tendermint/libs/log" + + "github.com/cosmos/cosmos-sdk/types/kv" +>>>>>>> c6189bb63 (refactor: cleanup store/streaming/constructor.go #14044) ) var ( @@ -134,3 +140,25 @@ func ParseLengthPrefixedBytes(key []byte, startIndex int, sliceLength int) ([]by return byteSlice, endIndex } +<<<<<<< HEAD +======= + +// LogDeferred logs an error in a deferred function call if the returned error is non-nil. +func LogDeferred(logger log.Logger, f func() error) { + if err := f(); err != nil { + logger.Error(err.Error()) + } +} + +// SliceContains implements a generic function for checking if a slice contains +// a certain value. +func SliceContains[T comparable](elements []T, v T) bool { + for _, s := range elements { + if v == s { + return true + } + } + + return false +} +>>>>>>> c6189bb63 (refactor: cleanup store/streaming/constructor.go #14044) From 0d9ff09d675d662e57df4f471dac2d8fcc690efd Mon Sep 17 00:00:00 2001 From: Aleksandr Bezobchuk Date: Fri, 9 Dec 2022 10:57:30 -0500 Subject: [PATCH 2/2] updates --- store/streaming/constructor.go | 15 +-------------- types/utils.go | 15 --------------- 2 files changed, 1 insertion(+), 29 deletions(-) diff --git a/store/streaming/constructor.go b/store/streaming/constructor.go index a8ebcc57a832..02b5f34f472c 100644 --- a/store/streaming/constructor.go +++ b/store/streaming/constructor.go @@ -31,7 +31,6 @@ const ( // Streaming option keys const ( -<<<<<<< HEAD OptStreamersFilePrefix = "streamers.file.prefix" OptStreamersFileWriteDir = "streamers.file.write_dir" OptStreamersFileOutputMetadata = "streamers.file.output-metadata" @@ -39,11 +38,6 @@ const ( OptStreamersFileFsync = "streamers.file.fsync" OptStoreStreamers = "store.streamers" -======= - OptStreamersFilePrefix = "streamers.file.prefix" - OptStreamersFileWriteDir = "streamers.file.write_dir" - OptStoreStreamers = "store.streamers" ->>>>>>> c6189bb63 (refactor: cleanup store/streaming/constructor.go #14044) ) // ServiceTypeFromString returns the streaming.ServiceType corresponding to the @@ -97,7 +91,6 @@ func NewFileStreamingService( keys []types.StoreKey, marshaller codec.BinaryCodec, ) (baseapp.StreamingService, error) { -<<<<<<< HEAD homePath := cast.ToString(opts.Get(flags.FlagHome)) filePrefix := cast.ToString(opts.Get(OptStreamersFilePrefix)) fileDir := cast.ToString(opts.Get(OptStreamersFileWriteDir)) @@ -110,7 +103,7 @@ func NewFileStreamingService( fileDir = path.Join(homePath, fileDir) } - // try to create output directory if not exists. + // try to create output directory if it does not exist if _, err := os.Stat(fileDir); os.IsNotExist(err) { if err = os.MkdirAll(fileDir, os.ModePerm); err != nil { return nil, err @@ -118,12 +111,6 @@ func NewFileStreamingService( } return file.NewStreamingService(fileDir, filePrefix, keys, marshaller, outputMetadata, stopNodeOnErr, fsync) -======= - filePrefix := cast.ToString(opts.Get(OptStreamersFilePrefix)) - fileDir := cast.ToString(opts.Get(OptStreamersFileWriteDir)) - - return file.NewStreamingService(fileDir, filePrefix, keys, marshaller) ->>>>>>> c6189bb63 (refactor: cleanup store/streaming/constructor.go #14044) } // LoadStreamingServices is a function for loading StreamingServices onto the diff --git a/types/utils.go b/types/utils.go index 6182b482ddaa..05bc948d5525 100644 --- a/types/utils.go +++ b/types/utils.go @@ -6,14 +6,9 @@ import ( "fmt" "time" -<<<<<<< HEAD - "github.com/cosmos/cosmos-sdk/types/kv" dbm "github.com/tendermint/tm-db" -======= - log "github.com/tendermint/tendermint/libs/log" "github.com/cosmos/cosmos-sdk/types/kv" ->>>>>>> c6189bb63 (refactor: cleanup store/streaming/constructor.go #14044) ) var ( @@ -140,15 +135,6 @@ func ParseLengthPrefixedBytes(key []byte, startIndex int, sliceLength int) ([]by return byteSlice, endIndex } -<<<<<<< HEAD -======= - -// LogDeferred logs an error in a deferred function call if the returned error is non-nil. -func LogDeferred(logger log.Logger, f func() error) { - if err := f(); err != nil { - logger.Error(err.Error()) - } -} // SliceContains implements a generic function for checking if a slice contains // a certain value. @@ -161,4 +147,3 @@ func SliceContains[T comparable](elements []T, v T) bool { return false } ->>>>>>> c6189bb63 (refactor: cleanup store/streaming/constructor.go #14044)