diff --git a/CHANGELOG.md b/CHANGELOG.md index 7d686fdd31b2..0c5c9204af69 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -37,6 +37,10 @@ Ref: https://keepachangelog.com/en/1.0.0/ ## [Unreleased] +### Improvements + +* (store) [\#8664](https://github.com/cosmos/cosmos-sdk/pull/8664) Implementation of ADR-038 file StreamingService + ### Reverts * revert f02c26ce51b948366a26817ff75c2f4aae15ed02: populate ctx.ConsensusParams for begin blockers diff --git a/baseapp/abci.go b/baseapp/abci.go index 604acecc9050..1c78fc892b3b 100644 --- a/baseapp/abci.go +++ b/baseapp/abci.go @@ -195,6 +195,14 @@ func (app *BaseApp) BeginBlock(req abci.RequestBeginBlock) (res abci.ResponseBeg } // set the signed validators for addition to context in deliverTx app.voteInfos = req.LastCommitInfo.GetVotes() + + // call the hooks with the BeginBlock messages + for _, streamingListener := range app.abciListeners { + if err := streamingListener.ListenBeginBlock(app.deliverState.ctx, req, res); err != nil { + app.logger.Error("BeginBlock listening hook failed", "height", req.Header.Height, "err", err) + } + } + return res } @@ -215,6 +223,13 @@ func (app *BaseApp) EndBlock(req abci.RequestEndBlock) (res abci.ResponseEndBloc res.ConsensusParamUpdates = cp } + // call the streaming service hooks with the EndBlock messages + for _, streamingListener := range app.abciListeners { + if err := streamingListener.ListenEndBlock(app.deliverState.ctx, req, res); err != nil { + app.logger.Error("EndBlock listening hook failed", "height", req.Height, "err", err) + } + } + return res } @@ -259,12 +274,20 @@ func (app *BaseApp) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx { // Otherwise, the ResponseDeliverTx will contain releveant error information. // Regardless of tx execution outcome, the ResponseDeliverTx will contain relevant // gas execution context. -func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx { +func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) (res abci.ResponseDeliverTx) { defer telemetry.MeasureSince(time.Now(), "abci", "deliver_tx") gInfo := sdk.GasInfo{} resultStr := "successful" + defer func() { + for _, streamingListener := range app.abciListeners { + if err := streamingListener.ListenDeliverTx(app.deliverState.ctx, req, res); err != nil { + app.logger.Error("DeliverTx listening hook failed", "err", err) + } + } + }() + defer func() { telemetry.IncrCounter(1, "tx", "count") telemetry.IncrCounter(1, "tx", resultStr) diff --git a/baseapp/baseapp.go b/baseapp/baseapp.go index e6b2609bd559..3f47241da0e5 100644 --- a/baseapp/baseapp.go +++ b/baseapp/baseapp.go @@ -133,6 +133,10 @@ type BaseApp struct { // nolint: maligned // indexEvents defines the set of events in the form {eventType}.{attributeKey}, // which informs Tendermint what to index. If empty, all events will be indexed. indexEvents map[string]struct{} + + // abciListeners for hooking into the ABCI message processing of the BaseApp + // and exposing the requests and responses to external consumers + abciListeners []ABCIListener } // NewBaseApp returns a reference to an initialized BaseApp. It accepts a diff --git a/baseapp/options.go b/baseapp/options.go index be9fbdc659a0..e45ac2f38226 100644 --- a/baseapp/options.go +++ b/baseapp/options.go @@ -237,3 +237,14 @@ func (app *BaseApp) SetInterfaceRegistry(registry types.InterfaceRegistry) { app.grpcQueryRouter.SetInterfaceRegistry(registry) app.msgServiceRouter.SetInterfaceRegistry(registry) } + +// SetStreamingService is used to set a streaming service into the BaseApp hooks and load the listeners into the multistore +func (app *BaseApp) SetStreamingService(s StreamingService) { + // add the listeners for each StoreKey + for key, lis := range s.Listeners() { + app.cms.AddListeners(key, lis) + } + // register the StreamingService within the BaseApp + // BaseApp will pass BeginBlock, DeliverTx, and EndBlock requests and responses to the streaming services to update their ABCI context + app.abciListeners = append(app.abciListeners, s) +} diff --git a/baseapp/streaming.go b/baseapp/streaming.go new file mode 100644 index 000000000000..39e0f1ca6e9b --- /dev/null +++ b/baseapp/streaming.go @@ -0,0 +1,33 @@ +package baseapp + +import ( + "io" + "sync" + + abci "github.com/tendermint/tendermint/abci/types" + + store "github.com/cosmos/cosmos-sdk/store/types" + "github.com/cosmos/cosmos-sdk/types" +) + +// ABCIListener interface used to hook into the ABCI message processing of the BaseApp +type ABCIListener interface { + // ListenBeginBlock updates the streaming service with the latest BeginBlock messages + ListenBeginBlock(ctx types.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error + // ListenEndBlock updates the steaming service with the latest EndBlock messages + ListenEndBlock(ctx types.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error + // ListenDeliverTx updates the steaming service with the latest DeliverTx messages + ListenDeliverTx(ctx types.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error +} + +// StreamingService interface for registering WriteListeners with the BaseApp and updating the service with the ABCI messages using the hooks +type StreamingService interface { + // Stream is the streaming service loop, awaits kv pairs and writes them to some destination stream or file + Stream(wg *sync.WaitGroup) error + // Listeners returns the streaming service's listeners for the BaseApp to register + Listeners() map[store.StoreKey][]store.WriteListener + // ABCIListener interface for hooking into the ABCI messages from inside the BaseApp + ABCIListener + // Closer interface + io.Closer +} diff --git a/docs/architecture/adr-038-state-listening.md b/docs/architecture/adr-038-state-listening.md index 9bc644dddb26..e6d321e8cfbd 100644 --- a/docs/architecture/adr-038-state-listening.md +++ b/docs/architecture/adr-038-state-listening.md @@ -32,7 +32,7 @@ In a new file, `store/types/listening.go`, we will create a `WriteListener` inte type WriteListener interface { // if value is nil then it was deleted // storeKey indicates the source KVStore, to facilitate using the the same WriteListener across separate KVStores - // set bool indicates if it was a set; true: set, false: delete + // delete bool indicates if it was a delete; true: delete, false: set OnWrite(storeKey StoreKey, key []byte, value []byte, delete bool) error } ``` @@ -205,20 +205,30 @@ func (rs *Store) CacheMultiStore() types.CacheMultiStore { ### Exposing the data We will introduce a new `StreamingService` interface for exposing `WriteListener` data streams to external consumers. +In addition to streaming state changes as `StoreKVPair`s, the interface satisfies an `ABCIListener` interface that plugs into the BaseApp +and relays ABCI requests and responses so that the service can group the state changes with the ABCI requests that affected them and the ABCI responses they affected. ```go -// Hook interface used to hook into the ABCI message processing of the BaseApp -type Hook interface { - ListenBeginBlock(ctx sdk.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) // update the streaming service with the latest BeginBlock messages - ListenEndBlock(ctx sdk.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) // update the steaming service with the latest EndBlock messages - ListenDeliverTx(ctx sdk.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) // update the steaming service with the latest DeliverTx messages +// ABCIListener interface used to hook into the ABCI message processing of the BaseApp +type ABCIListener interface { + // ListenBeginBlock updates the streaming service with the latest BeginBlock messages + ListenBeginBlock(ctx types.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error + // ListenEndBlock updates the steaming service with the latest EndBlock messages + ListenEndBlock(ctx types.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error + // ListenDeliverTx updates the steaming service with the latest DeliverTx messages + ListenDeliverTx(ctx types.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error } // StreamingService interface for registering WriteListeners with the BaseApp and updating the service with the ABCI messages using the hooks type StreamingService interface { - Stream(wg *sync.WaitGroup, quitChan <-chan struct{}) // streaming service loop, awaits kv pairs and writes them to some destination stream or file - Listeners() map[sdk.StoreKey][]storeTypes.WriteListener // returns the streaming service's listeners for the BaseApp to register - Hook + // Stream is the streaming service loop, awaits kv pairs and writes them to some destination stream or file + Stream(wg *sync.WaitGroup) error + // Listeners returns the streaming service's listeners for the BaseApp to register + Listeners() map[types.StoreKey][]store.WriteListener + // ABCIListener interface for hooking into the ABCI messages from inside the BaseApp + ABCIListener + // Closer interface + io.Closer } ``` @@ -228,18 +238,45 @@ We will introduce an implementation of `StreamingService` which writes state cha This service uses the same `StoreKVPairWriteListener` for every KVStore, writing all the KV pairs from every KVStore out to the same files, relying on the `StoreKey` field in the `StoreKVPair` protobuf message to later distinguish the source for each pair. -The file naming schema is as such: +Writing to a file is the simplest approach for streaming the data out to consumers. +This approach also provides the advantages of being persistent and durable, and the files can be read directly, +or an auxiliary streaming services can read from the files and serve the data over a remote interface. + +##### Encoding + +For each pair of `BeginBlock` requests and responses, a file is created and named `block-{N}-begin`, where N is the block number. +At the head of this file the length-prefixed protobuf encoded `BeginBlock` request is written. +At the tail of this file the length-prefixed protobuf encoded `BeginBlock` response is written. +In between these two encoded messages, the state changes that occurred due to the `BeginBlock` request are written chronologically as +a series of length-prefixed protobuf encoded `StoreKVPair`s representing `Set` and `Delete` operations within the KVStores the service +is configured to listen to. + +For each pair of `DeliverTx` requests and responses, a file is created and named `block-{N}-tx-{M}` where N is the block number and M +is the tx number in the block (i.e. 0, 1, 2...). +At the head of this file the length-prefixed protobuf encoded `DeliverTx` request is written. +At the tail of this file the length-prefixed protobuf encoded `DeliverTx` response is written. +In between these two encoded messages, the state changes that occurred due to the `DeliverTx` request are written chronologically as +a series of length-prefixed protobuf encoded `StoreKVPair`s representing `Set` and `Delete` operations within the KVStores the service +is configured to listen to. + +For each pair of `EndBlock` requests and responses, a file is created and named `block-{N}-end`, where N is the block number. +At the head of this file the length-prefixed protobuf encoded `EndBlock` request is written. +At the tail of this file the length-prefixed protobuf encoded `EndBlock` response is written. +In between these two encoded messages, the state changes that occurred due to the `EndBlock` request are written chronologically as +a series of length-prefixed protobuf encoded `StoreKVPair`s representing `Set` and `Delete` operations within the KVStores the service +is configured to listen to. + +##### Decoding -* After every `BeginBlock` request a new file is created with the name `block-{N}-begin`, where N is the block number. All -subsequent state changes are written out to this file until the first `DeliverTx` request is received. At the head of these files, - the length-prefixed protobuf encoded `BeginBlock` request is written, and the response is written at the tail. -* After every `DeliverTx` request a new file is created with the name `block-{N}-tx-{M}` where N is the block number and M -is the tx number in the block (i.e. 0, 1, 2...). All subsequent state changes are written out to this file until the next -`DeliverTx` request is received or an `EndBlock` request is received. At the head of these files, the length-prefixed protobuf - encoded `DeliverTx` request is written, and the response is written at the tail. -* After every `EndBlock` request a new file is created with the name `block-{N}-end`, where N is the block number. All -subsequent state changes are written out to this file until the next `BeginBlock` request is received. At the head of these files, - the length-prefixed protobuf encoded `EndBlock` request is written, and the response is written at the tail. +To decode the files written in the above format we read all the bytes from a given file into memory and segment them into proto +messages based on the length-prefixing of each message. Once segmented, it is known that the first message is the ABCI request, +the last message is the ABCI response, and that every message in between is a `StoreKVPair`. This enables us to decode each segment into +the appropriate message type. + +The type of ABCI req/res, the block height, and the transaction index (where relevant) is known +from the file name, and the KVStore each `StoreKVPair` originates from is known since the `StoreKey` is included as a field in the proto message. + +##### Implementation example ```go // FileStreamingService is a concrete implementation of StreamingService that writes state changes out to a file @@ -357,10 +394,6 @@ func (fss *FileStreamingService) Stream(wg *sync.WaitGroup, quitChan <-chan stru } ``` -Writing to a file is the simplest approach for streaming the data out to consumers. -This approach also provides the advantages of being persistent and durable, and the files can be read directly, -or an auxiliary streaming services can read from the files and serve the data over a remote interface. - #### Auxiliary streaming service We will create a separate standalone process that reads and internally queues the state as it is written out to these files @@ -384,8 +417,8 @@ using the provided `AppOptions` and TOML configuration fields. We will add a new method to the `BaseApp` to enable the registration of `StreamingService`s: ```go -// RegisterStreamingService is used to register a streaming service with the BaseApp -func (app *BaseApp) RegisterHooks(s StreamingService) { +// SetStreamingService is used to register a streaming service with the BaseApp +func (app *BaseApp) SetStreamingService(s StreamingService) { // set the listeners for each StoreKey for key, lis := range s.Listeners() { app.cms.AddListeners(key, lis) @@ -474,68 +507,70 @@ Note: the actual namespace is TBD. [streamers] [streamers.file] keys = ["list", "of", "store", "keys", "we", "want", "to", "expose", "for", "this", "streaming", "service"] - writeDir = "path to the write directory" + write_dir = "path to the write directory" prefix = "optional prefix to prepend to the generated file names" ``` We will also provide a mapping of the TOML `store.streamers` "file" configuration option to a helper functions for constructing the specified streaming service. In the future, as other streaming services are added, their constructors will be added here as well. +Each configured streamer will receive the + ```go -// StreamingServiceConstructor is used to construct a streaming service -type StreamingServiceConstructor func(opts servertypes.AppOptions, keys []sdk.StoreKey) (StreamingService, error) +// ServiceConstructor is used to construct a streaming service +type ServiceConstructor func(opts serverTypes.AppOptions, keys []sdk.StoreKey, marshaller codec.BinaryMarshaler) (sdk.StreamingService, error) -// StreamingServiceType enum for specifying the type of StreamingService -type StreamingServiceType int +// ServiceType enum for specifying the type of StreamingService +type ServiceType int const ( - Unknown StreamingServiceType = iota - File - // add more in the future + Unknown ServiceType = iota + File + // add more in the future ) -// NewStreamingServiceType returns the StreamingServiceType corresponding to the provided name -func NewStreamingServiceType(name string) StreamingServiceType { - switch strings.ToLower(name) { - case "file", "f": - return File - default: - return Unknown - } -} - -// String returns the string name of a StreamingServiceType -func (sst StreamingServiceType) String() string { - switch sst { - case File: - return "file" - default: - return "" - } -} - -// StreamingServiceConstructorLookupTable is a mapping of StreamingServiceTypes to StreamingServiceConstructors -var StreamingServiceConstructorLookupTable = map[StreamingServiceType]StreamingServiceConstructor{ - File: FileStreamingConstructor, -} - -// NewStreamingServiceConstructor returns the StreamingServiceConstructor corresponding to the provided name -func NewStreamingServiceConstructor(name string) (StreamingServiceConstructor, error) { - ssType := NewStreamingServiceType(name) - if ssType == Unknown { - return nil, fmt.Errorf("unrecognized streaming service name %s", name) - } - if constructor, ok := StreamingServiceConstructorLookupTable[ssType]; ok { - return constructor, nil - } - return nil, fmt.Errorf("streaming service constructor of type %s not found", ssType.String()) -} - -// FileStreamingConstructor is the StreamingServiceConstructor function for creating a FileStreamingService -func FileStreamingConstructor(opts servertypes.AppOptions, keys []sdk.StoreKey) (StreamingService, error) { - filePrefix := cast.ToString(opts.Get("streamers.file.prefix")) - fileDir := cast.ToString(opts.Get("streamers.file.writeDir")) - return streaming.NewFileStreamingService(fileDir, filePrefix, keys), nil +// NewStreamingServiceType returns the streaming.ServiceType corresponding to the provided name +func NewStreamingServiceType(name string) ServiceType { + switch strings.ToLower(name) { + case "file", "f": + return File + default: + return Unknown + } +} + +// String returns the string name of a streaming.ServiceType +func (sst ServiceType) String() string { + switch sst { + case File: + return "file" + default: + return "" + } +} + +// ServiceConstructorLookupTable is a mapping of streaming.ServiceTypes to streaming.ServiceConstructors +var ServiceConstructorLookupTable = map[ServiceType]ServiceConstructor{ + File: NewFileStreamingService, +} + +// ServiceTypeFromString returns the streaming.ServiceConstructor corresponding to the provided name +func ServiceTypeFromString(name string) (ServiceConstructor, error) { + ssType := NewStreamingServiceType(name) + if ssType == Unknown { + return nil, fmt.Errorf("unrecognized streaming service name %s", name) + } + if constructor, ok := ServiceConstructorLookupTable[ssType]; ok { + return constructor, nil + } + return nil, fmt.Errorf("streaming service constructor of type %s not found", ssType.String()) +} + +// NewFileStreamingService is the streaming.ServiceConstructor function for creating a FileStreamingService +func NewFileStreamingService(opts serverTypes.AppOptions, keys []sdk.StoreKey, marshaller codec.BinaryMarshaler) (sdk.StreamingService, error) { + filePrefix := cast.ToString(opts.Get("streamers.file.prefix")) + fileDir := cast.ToString(opts.Get("streamers.file.write_dir")) + return file.NewStreamingService(fileDir, filePrefix, keys, marshaller) } ``` @@ -563,13 +598,24 @@ func NewSimApp( // configure state listening capabilities using AppOptions listeners := cast.ToStringSlice(appOpts.Get("store.streamers")) for _, listenerName := range listeners { - // get the store keys allowed to be exposed for this streaming service/state listeners - exposeKeyStrs := cast.ToStringSlice(appOpts.Get(fmt.Sprintf("streamers.%s.keys", listenerName)) - exposeStoreKeys = make([]storeTypes.StoreKey, 0, len(exposeKeyStrs)) - for _, keyStr := range exposeKeyStrs { - if storeKey, ok := keys[keyStr]; ok { + // get the store keys allowed to be exposed for this streaming service + exposeKeyStrs := cast.ToStringSlice(appOpts.Get(fmt.Sprintf("streamers.%s.keys", streamerName))) + var exposeStoreKeys []sdk.StoreKey + if exposeAll(exposeKeyStrs) { // if list contains `*`, expose all StoreKeys + exposeStoreKeys = make([]sdk.StoreKey, 0, len(keys)) + for _, storeKey := range keys { exposeStoreKeys = append(exposeStoreKeys, storeKey) } + } else { + exposeStoreKeys = make([]sdk.StoreKey, 0, len(exposeKeyStrs)) + for _, keyStr := range exposeKeyStrs { + if storeKey, ok := keys[keyStr]; ok { + exposeStoreKeys = append(exposeStoreKeys, storeKey) + } + } + } + if len(exposeStoreKeys) == 0 { // short circuit if we are not exposing anything + continue } // get the constructor for this listener name constructor, err := baseapp.NewStreamingServiceConstructor(listenerName) @@ -577,7 +623,7 @@ func NewSimApp( tmos.Exit(err.Error()) // or continue? } // generate the streaming service using the constructor, appOptions, and the StoreKeys we want to expose - streamingService, err := constructor(appOpts, exposeStoreKeys) + streamingService, err := constructor(appOpts, exposeStoreKeys, appCodec) if err != nil { tmos.Exit(err.Error()) } @@ -585,7 +631,7 @@ func NewSimApp( bApp.RegisterStreamingService(streamingService) // waitgroup and quit channel for optional shutdown coordination of the streaming service wg := new(sync.WaitGroup) - quitChan := new(chan struct{})) + quitChan := make(chan struct{})) // kick off the background streaming service loop streamingService.Stream(wg, quitChan) // maybe this should be done from inside BaseApp instead? } diff --git a/docs/core/store.md b/docs/core/store.md index a469b77ebfd3..1dbe0553617f 100644 --- a/docs/core/store.md +++ b/docs/core/store.md @@ -222,6 +222,36 @@ When `Store.{Get, Set}()` is called, the store forwards the call to its parent, When `Store.Iterator()` is called, it does not simply prefix the `Store.prefix`, since it does not work as intended. In that case, some of the elements are traversed even they are not starting with the prefix. +### `ListenKv` Store + +`listenkv.Store` is a wrapper `KVStore` which provides state listening capabilities over the underlying `KVStore`. +It is applied automatically by the Cosmos SDK on any `KVStore` whose `StoreKey` is specified during state streaming configuration. +Additional information about state streaming configuration can be found in the [store/streaming/README.md](../../store/streaming/README.md). + ++++ https://github.com/cosmos/cosmos-sdk/blob/v0.44.1/store/listenkv/store.go#L11-L18 + +When `KVStore.Set` or `KVStore.Delete` methods are called, `listenkv.Store` automatically writes the operations to the set of `Store.listeners`. + +## New Store package (`store/v2`) + +The SDK is in the process of transitioning to use the types listed here as the default interface for state storage. At the time of writing, these cannot be used within an application and are not directly compatible with the `CommitMultiStore` and related types. + +### `BasicKVStore` interface + +An interface providing only the basic CRUD functionality (`Get`, `Set`, `Has`, and `Delete` methods), without iteration or caching. This is used to partially expose components of a larger store, such as a `flat.Store`. + +### Flat Store + +`flat.Store` is the new default persistent store, which internally decouples the concerns of state storage and commitment scheme. Values are stored directly in the backing key-value database (the "storage" bucket), while the value's hash is mapped in a separate store which is able to generate a cryptographic commitment (the "state commitment" bucket, implmented with `smt.Store`). + +This can optionally be constructed to use different backend databases for each bucket. + + + +### SMT Store + +A `BasicKVStore` which is used to partially expose functions of an underlying store (for instance, to allow access to the commitment store in `flat.Store`). + ## Next {hide} Learn about [encoding](./encoding.md) {hide} diff --git a/simapp/app.go b/simapp/app.go index 37cc41412a9b..8c8eb856a474 100644 --- a/simapp/app.go +++ b/simapp/app.go @@ -25,6 +25,7 @@ import ( "github.com/cosmos/cosmos-sdk/server/config" servertypes "github.com/cosmos/cosmos-sdk/server/types" simappparams "github.com/cosmos/cosmos-sdk/simapp/params" + "github.com/cosmos/cosmos-sdk/store/streaming" "github.com/cosmos/cosmos-sdk/testutil/testdata" sdk "github.com/cosmos/cosmos-sdk/types" "github.com/cosmos/cosmos-sdk/types/module" @@ -213,6 +214,12 @@ func NewSimApp( // not include this key. memKeys := sdk.NewMemoryStoreKeys(capabilitytypes.MemStoreKey, "testingkey") + // configure state listening capabilities using AppOptions + // we are doing nothing with the returned streamingServices and waitGroup in this case + if _, _, err := streaming.LoadStreamingServices(bApp, appOpts, appCodec, keys); err != nil { + tmos.Exit(err.Error()) + } + app := &SimApp{ BaseApp: bApp, legacyAmino: legacyAmino, diff --git a/store/cachemulti/store.go b/store/cachemulti/store.go index 05637a45ff16..b60ed0465392 100644 --- a/store/cachemulti/store.go +++ b/store/cachemulti/store.go @@ -8,6 +8,8 @@ import ( "github.com/cosmos/cosmos-sdk/store/cachekv" "github.com/cosmos/cosmos-sdk/store/dbadapter" + "github.com/cosmos/cosmos-sdk/store/listenkv" + "github.com/cosmos/cosmos-sdk/store/tracekv" "github.com/cosmos/cosmos-sdk/store/types" ) @@ -49,17 +51,13 @@ func NewFromKVStore( } for key, store := range stores { - var cacheWrapped types.CacheWrap if cms.TracingEnabled() { - cacheWrapped = store.CacheWrapWithTrace(cms.traceWriter, cms.traceContext) - } else { - cacheWrapped = store.CacheWrap() + store = tracekv.NewStore(store.(types.KVStore), cms.traceWriter, cms.traceContext) } if cms.ListeningEnabled(key) { - cms.stores[key] = cacheWrapped.CacheWrapWithListeners(key, cms.listeners[key]) - } else { - cms.stores[key] = cacheWrapped + store = listenkv.NewStore(store.(types.KVStore), key, listeners[key]) } + cms.stores[key] = cachekv.NewStore(store.(types.KVStore)) } return cms diff --git a/store/streaming/README.md b/store/streaming/README.md new file mode 100644 index 000000000000..819514aef796 --- /dev/null +++ b/store/streaming/README.md @@ -0,0 +1,67 @@ +# State Streaming Service +This package contains the constructors for the `StreamingService`s used to write state changes out from individual KVStores to a +file or stream, as described in [ADR-038](../../docs/architecture/adr-038-state-listening.md) and defined in [types/streaming.go](../../baseapp/streaming.go). +The child directories contain the implementations for specific output destinations. + +Currently, a `StreamingService` implementation that writes state changes out to files is supported, in the future support for additional +output destinations can be added. + +The `StreamingService` is configured from within an App using the `AppOptions` loaded from the app.toml file: + +```toml +[store] + streamers = [ # if len(streamers) > 0 we are streaming + "file", # name of the streaming service, used by constructor + ] + +[streamers] + [streamers.file] + keys = ["list", "of", "store", "keys", "we", "want", "to", "expose", "for", "this", "streaming", "service"] + write_dir = "path to the write directory" + prefix = "optional prefix to prepend to the generated file names" +``` + +`store.streamers` contains a list of the names of the `StreamingService` implementations to employ which are used by `ServiceTypeFromString` +to return the `ServiceConstructor` for that particular implementation: + + +```go +listeners := cast.ToStringSlice(appOpts.Get("store.streamers")) +for _, listenerName := range listeners { + constructor, err := ServiceTypeFromString(listenerName) + if err != nil { + // handle error + } +} +``` + +`streamers` contains a mapping of the specific `StreamingService` implementation name to the configuration parameters for that specific service. +`streamers.x.keys` contains the list of `StoreKey` names for the KVStores to expose using this service and is required by every type of `StreamingService`. +In order to expose *all* KVStores, we can include `*` in this list. An empty list is equivalent to turning the service off. + +Additional configuration parameters are optional and specific to the implementation. +In the case of the file streaming service, `streamers.file.write_dir` contains the path to the +directory to write the files to, and `streamers.file.prefix` contains an optional prefix to prepend to the output files to prevent potential collisions +with other App `StreamingService` output files. + +The `ServiceConstructor` accepts `AppOptions`, the store keys collected using `streamers.x.keys`, a `BinaryMarshaller` and +returns a `StreamingService` implementation. The `AppOptions` are passed in to provide access to any implementation specific configuration options, +e.g. in the case of the file streaming service the `streamers.file.write_dir` and `streamers.file.prefix`. + +```go +streamingService, err := constructor(appOpts, exposeStoreKeys, appCodec) +if err != nil { + // handler error +} +``` + +The returned `StreamingService` is loaded into the BaseApp using the BaseApp's `SetStreamingService` method. +The `Stream` method is called on the service to begin the streaming process. Depending on the implementation this process +may be synchronous or asynchronous with the message processing of the state machine. + +```go +bApp.SetStreamingService(streamingService) +wg := new(sync.WaitGroup) +quitChan := make(chan struct{}) +streamingService.Stream(wg, quitChan) +``` diff --git a/store/streaming/constructor.go b/store/streaming/constructor.go new file mode 100644 index 000000000000..e576f84b83d1 --- /dev/null +++ b/store/streaming/constructor.go @@ -0,0 +1,137 @@ +package streaming + +import ( + "fmt" + "strings" + "sync" + + "github.com/cosmos/cosmos-sdk/baseapp" + "github.com/cosmos/cosmos-sdk/codec" + serverTypes "github.com/cosmos/cosmos-sdk/server/types" + "github.com/cosmos/cosmos-sdk/store/streaming/file" + "github.com/cosmos/cosmos-sdk/store/types" + + "github.com/spf13/cast" +) + +// ServiceConstructor is used to construct a streaming service +type ServiceConstructor func(opts serverTypes.AppOptions, keys []types.StoreKey, marshaller codec.BinaryCodec) (baseapp.StreamingService, error) + +// ServiceType enum for specifying the type of StreamingService +type ServiceType int + +const ( + Unknown ServiceType = iota + File + // add more in the future +) + +// ServiceTypeFromString returns the streaming.ServiceType corresponding to the provided name +func ServiceTypeFromString(name string) ServiceType { + switch strings.ToLower(name) { + case "file", "f": + return File + default: + return Unknown + } +} + +// String returns the string name of a streaming.ServiceType +func (sst ServiceType) String() string { + switch sst { + case File: + return "file" + default: + return "unknown" + } +} + +// ServiceConstructorLookupTable is a mapping of streaming.ServiceTypes to streaming.ServiceConstructors +var ServiceConstructorLookupTable = map[ServiceType]ServiceConstructor{ + File: NewFileStreamingService, +} + +// NewServiceConstructor returns the streaming.ServiceConstructor corresponding to the provided name +func NewServiceConstructor(name string) (ServiceConstructor, error) { + ssType := ServiceTypeFromString(name) + if ssType == Unknown { + return nil, fmt.Errorf("unrecognized streaming service name %s", name) + } + if constructor, ok := ServiceConstructorLookupTable[ssType]; ok && constructor != nil { + return constructor, nil + } + return nil, fmt.Errorf("streaming service constructor of type %s not found", ssType.String()) +} + +// NewFileStreamingService is the streaming.ServiceConstructor function for creating a FileStreamingService +func NewFileStreamingService(opts serverTypes.AppOptions, keys []types.StoreKey, marshaller codec.BinaryCodec) (baseapp.StreamingService, error) { + filePrefix := cast.ToString(opts.Get("streamers.file.prefix")) + fileDir := cast.ToString(opts.Get("streamers.file.write_dir")) + return file.NewStreamingService(fileDir, filePrefix, keys, marshaller) +} + +// LoadStreamingServices is a function for loading StreamingServices onto the BaseApp using the provided AppOptions, codec, and keys +// It returns the WaitGroup and quit channel used to synchronize with the streaming services and any error that occurs during the setup +func LoadStreamingServices(bApp *baseapp.BaseApp, appOpts serverTypes.AppOptions, appCodec codec.BinaryCodec, keys map[string]*types.KVStoreKey) ([]baseapp.StreamingService, *sync.WaitGroup, error) { + // waitgroup and quit channel for optional shutdown coordination of the streaming service(s) + wg := new(sync.WaitGroup) + // configure state listening capabilities using AppOptions + streamers := cast.ToStringSlice(appOpts.Get("store.streamers")) + activeStreamers := make([]baseapp.StreamingService, 0, len(streamers)) + for _, streamerName := range streamers { + // get the store keys allowed to be exposed for this streaming service + exposeKeyStrs := cast.ToStringSlice(appOpts.Get(fmt.Sprintf("streamers.%s.keys", streamerName))) + var exposeStoreKeys []types.StoreKey + if exposeAll(exposeKeyStrs) { // if list contains `*`, expose all StoreKeys + exposeStoreKeys = make([]types.StoreKey, 0, len(keys)) + for _, storeKey := range keys { + exposeStoreKeys = append(exposeStoreKeys, storeKey) + } + } else { + exposeStoreKeys = make([]types.StoreKey, 0, len(exposeKeyStrs)) + for _, keyStr := range exposeKeyStrs { + if storeKey, ok := keys[keyStr]; ok { + exposeStoreKeys = append(exposeStoreKeys, storeKey) + } + } + } + if len(exposeStoreKeys) == 0 { // short circuit if we are not exposing anything + continue + } + // get the constructor for this streamer name + constructor, err := NewServiceConstructor(streamerName) + if err != nil { + // close any services we may have already spun up before hitting the error on this one + for _, activeStreamer := range activeStreamers { + activeStreamer.Close() + } + return nil, nil, err + } + // generate the streaming service using the constructor, appOptions, and the StoreKeys we want to expose + streamingService, err := constructor(appOpts, exposeStoreKeys, appCodec) + if err != nil { + // close any services we may have already spun up before hitting the error on this one + for _, activeStreamer := range activeStreamers { + activeStreamer.Close() + } + return nil, nil, err + } + // register the streaming service with the BaseApp + bApp.SetStreamingService(streamingService) + // kick off the background streaming service loop + streamingService.Stream(wg) + // add to the list of active streamers + activeStreamers = append(activeStreamers, streamingService) + } + // if there are no active streamers, activeStreamers is empty (len == 0) and the waitGroup is not waiting on anything + return activeStreamers, wg, nil +} + +func exposeAll(list []string) bool { + for _, ele := range list { + if ele == "*" { + return true + } + } + return false +} diff --git a/store/streaming/constructor_test.go b/store/streaming/constructor_test.go new file mode 100644 index 000000000000..5f9d58016f68 --- /dev/null +++ b/store/streaming/constructor_test.go @@ -0,0 +1,43 @@ +package streaming + +import ( + "testing" + + "github.com/cosmos/cosmos-sdk/codec" + codecTypes "github.com/cosmos/cosmos-sdk/codec/types" + "github.com/cosmos/cosmos-sdk/store/streaming/file" + "github.com/cosmos/cosmos-sdk/store/types" + sdk "github.com/cosmos/cosmos-sdk/types" + + "github.com/stretchr/testify/require" +) + +type fakeOptions struct{} + +func (f *fakeOptions) Get(string) interface{} { return nil } + +var ( + mockOptions = new(fakeOptions) + mockKeys = []types.StoreKey{sdk.NewKVStoreKey("mockKey1"), sdk.NewKVStoreKey("mockKey2")} + interfaceRegistry = codecTypes.NewInterfaceRegistry() + testMarshaller = codec.NewProtoCodec(interfaceRegistry) +) + +func TestStreamingServiceConstructor(t *testing.T) { + _, err := NewServiceConstructor("unexpectedName") + require.NotNil(t, err) + + constructor, err := NewServiceConstructor("file") + require.Nil(t, err) + var expectedType ServiceConstructor + require.IsType(t, expectedType, constructor) + + serv, err := constructor(mockOptions, mockKeys, testMarshaller) + require.Nil(t, err) + require.IsType(t, &file.StreamingService{}, serv) + listeners := serv.Listeners() + for _, key := range mockKeys { + _, ok := listeners[key] + require.True(t, ok) + } +} diff --git a/store/streaming/file/README.md b/store/streaming/file/README.md new file mode 100644 index 000000000000..7243f15dd0bf --- /dev/null +++ b/store/streaming/file/README.md @@ -0,0 +1,64 @@ +# File Streaming Service +This pkg contains an implementation of the [StreamingService](../../../baseapp/streaming.go) that writes +the data stream out to files on the local filesystem. This process is performed synchronously with the message processing +of the state machine. + +## Configuration + +The `file.StreamingService` is configured from within an App using the `AppOptions` loaded from the app.toml file: + +```toml +[store] + streamers = [ # if len(streamers) > 0 we are streaming + "file", # name of the streaming service, used by constructor + ] + +[streamers] + [streamers.file] + keys = ["list", "of", "store", "keys", "we", "want", "to", "expose", "for", "this", "streaming", "service"] + write_dir = "path to the write directory" + prefix = "optional prefix to prepend to the generated file names" +``` + +We turn the service on by adding its name, "file", to `store.streamers`- the list of streaming services for this App to employ. + +In `streamers.file` we include three configuration parameters for the file streaming service: +1. `streamers.x.keys` contains the list of `StoreKey` names for the KVStores to expose using this service. +In order to expose *all* KVStores, we can include `*` in this list. An empty list is equivalent to turning the service off. +2. `streamers.file.write_dir` contains the path to the directory to write the files to. +3. `streamers.file.prefix` contains an optional prefix to prepend to the output files to prevent potential collisions +with other App `StreamingService` output files. + +##### Encoding + +For each pair of `BeginBlock` requests and responses, a file is created and named `block-{N}-begin`, where N is the block number. +At the head of this file the length-prefixed protobuf encoded `BeginBlock` request is written. +At the tail of this file the length-prefixed protobuf encoded `BeginBlock` response is written. +In between these two encoded messages, the state changes that occurred due to the `BeginBlock` request are written chronologically as +a series of length-prefixed protobuf encoded `StoreKVPair`s representing `Set` and `Delete` operations within the KVStores the service +is configured to listen to. + +For each pair of `DeliverTx` requests and responses, a file is created and named `block-{N}-tx-{M}` where N is the block number and M +is the tx number in the block (i.e. 0, 1, 2...). +At the head of this file the length-prefixed protobuf encoded `DeliverTx` request is written. +At the tail of this file the length-prefixed protobuf encoded `DeliverTx` response is written. +In between these two encoded messages, the state changes that occurred due to the `DeliverTx` request are written chronologically as +a series of length-prefixed protobuf encoded `StoreKVPair`s representing `Set` and `Delete` operations within the KVStores the service +is configured to listen to. + +For each pair of `EndBlock` requests and responses, a file is created and named `block-{N}-end`, where N is the block number. +At the head of this file the length-prefixed protobuf encoded `EndBlock` request is written. +At the tail of this file the length-prefixed protobuf encoded `EndBlock` response is written. +In between these two encoded messages, the state changes that occurred due to the `EndBlock` request are written chronologically as +a series of length-prefixed protobuf encoded `StoreKVPair`s representing `Set` and `Delete` operations within the KVStores the service +is configured to listen to. + +##### Decoding + +To decode the files written in the above format we read all the bytes from a given file into memory and segment them into proto +messages based on the length-prefixing of each message. Once segmented, it is known that the first message is the ABCI request, +the last message is the ABCI response, and that every message in between is a `StoreKVPair`. This enables us to decode each segment into +the appropriate message type. + +The type of ABCI req/res, the block height, and the transaction index (where relevant) is known +from the file name, and the KVStore each `StoreKVPair` originates from is known since the `StoreKey` is included as a field in the proto message. diff --git a/store/streaming/file/example_config.toml b/store/streaming/file/example_config.toml new file mode 100644 index 000000000000..8202bd8ef559 --- /dev/null +++ b/store/streaming/file/example_config.toml @@ -0,0 +1,10 @@ +[store] + streamers = [ # if len(streamers) > 0 we are streaming + "file", # name of the streaming service, used by constructor + ] + +[streamers] + [streamers.file] + keys = ["list", "of", "store", "keys", "we", "want", "to", "expose", "for", "this", "streaming", "service"] + write_dir = "path to the write directory" + prefix = "optional prefix to prepend to the generated file names" diff --git a/store/streaming/file/service.go b/store/streaming/file/service.go new file mode 100644 index 000000000000..02feb403e99b --- /dev/null +++ b/store/streaming/file/service.go @@ -0,0 +1,279 @@ +package file + +import ( + "errors" + "fmt" + "io/ioutil" + "os" + "path" + "path/filepath" + "sync" + + abci "github.com/tendermint/tendermint/abci/types" + + "github.com/cosmos/cosmos-sdk/baseapp" + "github.com/cosmos/cosmos-sdk/codec" + "github.com/cosmos/cosmos-sdk/store/types" + sdk "github.com/cosmos/cosmos-sdk/types" +) + +var _ baseapp.StreamingService = &StreamingService{} + +// StreamingService is a concrete implementation of StreamingService that writes state changes out to files +type StreamingService struct { + listeners map[types.StoreKey][]types.WriteListener // the listeners that will be initialized with BaseApp + srcChan <-chan []byte // the channel that all the WriteListeners write their data out to + filePrefix string // optional prefix for each of the generated files + writeDir string // directory to write files into + codec codec.BinaryCodec // marshaller used for re-marshalling the ABCI messages to write them out to the destination files + stateCache [][]byte // cache the protobuf binary encoded StoreKVPairs in the order they are received + stateCacheLock *sync.Mutex // mutex for the state cache + currentBlockNumber int64 // the current block number + currentTxIndex int64 // the index of the current tx + quitChan chan struct{} // channel to synchronize closure +} + +// IntermediateWriter is used so that we do not need to update the underlying io.Writer +// inside the StoreKVPairWriteListener everytime we begin writing to a new file +type IntermediateWriter struct { + outChan chan<- []byte +} + +// NewIntermediateWriter create an instance of an intermediateWriter that sends to the provided channel +func NewIntermediateWriter(outChan chan<- []byte) *IntermediateWriter { + return &IntermediateWriter{ + outChan: outChan, + } +} + +// Write satisfies io.Writer +func (iw *IntermediateWriter) Write(b []byte) (int, error) { + iw.outChan <- b + return len(b), nil +} + +// NewStreamingService creates a new StreamingService for the provided writeDir, (optional) filePrefix, and storeKeys +func NewStreamingService(writeDir, filePrefix string, storeKeys []types.StoreKey, c codec.BinaryCodec) (*StreamingService, error) { + listenChan := make(chan []byte) + iw := NewIntermediateWriter(listenChan) + listener := types.NewStoreKVPairWriteListener(iw, c) + listeners := make(map[types.StoreKey][]types.WriteListener, len(storeKeys)) + // in this case, we are using the same listener for each Store + for _, key := range storeKeys { + listeners[key] = append(listeners[key], listener) + } + // check that the writeDir exists and is writeable so that we can catch the error here at initialization if it is not + // we don't open a dstFile until we receive our first ABCI message + if err := isDirWriteable(writeDir); err != nil { + return nil, err + } + return &StreamingService{ + listeners: listeners, + srcChan: listenChan, + filePrefix: filePrefix, + writeDir: writeDir, + codec: c, + stateCache: make([][]byte, 0), + stateCacheLock: new(sync.Mutex), + }, nil +} + +// Listeners satisfies the baseapp.StreamingService interface +// It returns the StreamingService's underlying WriteListeners +// Use for registering the underlying WriteListeners with the BaseApp +func (fss *StreamingService) Listeners() map[types.StoreKey][]types.WriteListener { + return fss.listeners +} + +// ListenBeginBlock satisfies the baseapp.ABCIListener interface +// It writes the received BeginBlock request and response and the resulting state changes +// out to a file as described in the above the naming schema +func (fss *StreamingService) ListenBeginBlock(ctx sdk.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error { + // generate the new file + dstFile, err := fss.openBeginBlockFile(req) + if err != nil { + return err + } + // write req to file + lengthPrefixedReqBytes, err := fss.codec.MarshalLengthPrefixed(&req) + if err != nil { + return err + } + if _, err = dstFile.Write(lengthPrefixedReqBytes); err != nil { + return err + } + // write all state changes cached for this stage to file + fss.stateCacheLock.Lock() + for _, stateChange := range fss.stateCache { + if _, err = dstFile.Write(stateChange); err != nil { + fss.stateCache = nil + fss.stateCacheLock.Unlock() + return err + } + } + // reset cache + fss.stateCache = nil + fss.stateCacheLock.Unlock() + // write res to file + lengthPrefixedResBytes, err := fss.codec.MarshalLengthPrefixed(&res) + if err != nil { + return err + } + if _, err = dstFile.Write(lengthPrefixedResBytes); err != nil { + return err + } + // close file + return dstFile.Close() +} + +func (fss *StreamingService) openBeginBlockFile(req abci.RequestBeginBlock) (*os.File, error) { + fss.currentBlockNumber = req.GetHeader().Height + fss.currentTxIndex = 0 + fileName := fmt.Sprintf("block-%d-begin", fss.currentBlockNumber) + if fss.filePrefix != "" { + fileName = fmt.Sprintf("%s-%s", fss.filePrefix, fileName) + } + return os.OpenFile(filepath.Join(fss.writeDir, fileName), os.O_CREATE|os.O_WRONLY, 0600) +} + +// ListenDeliverTx satisfies the baseapp.ABCIListener interface +// It writes the received DeliverTx request and response and the resulting state changes +// out to a file as described in the above the naming schema +func (fss *StreamingService) ListenDeliverTx(ctx sdk.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error { + // generate the new file + dstFile, err := fss.openDeliverTxFile() + if err != nil { + return err + } + // write req to file + lengthPrefixedReqBytes, err := fss.codec.MarshalLengthPrefixed(&req) + if err != nil { + return err + } + if _, err = dstFile.Write(lengthPrefixedReqBytes); err != nil { + return err + } + // write all state changes cached for this stage to file + fss.stateCacheLock.Lock() + for _, stateChange := range fss.stateCache { + if _, err = dstFile.Write(stateChange); err != nil { + fss.stateCache = nil + fss.stateCacheLock.Unlock() + return err + } + } + // reset cache + fss.stateCache = nil + fss.stateCacheLock.Unlock() + // write res to file + lengthPrefixedResBytes, err := fss.codec.MarshalLengthPrefixed(&res) + if err != nil { + return err + } + if _, err = dstFile.Write(lengthPrefixedResBytes); err != nil { + return err + } + // close file + return dstFile.Close() +} + +func (fss *StreamingService) openDeliverTxFile() (*os.File, error) { + fileName := fmt.Sprintf("block-%d-tx-%d", fss.currentBlockNumber, fss.currentTxIndex) + if fss.filePrefix != "" { + fileName = fmt.Sprintf("%s-%s", fss.filePrefix, fileName) + } + fss.currentTxIndex++ + return os.OpenFile(filepath.Join(fss.writeDir, fileName), os.O_CREATE|os.O_WRONLY, 0600) +} + +// ListenEndBlock satisfies the baseapp.ABCIListener interface +// It writes the received EndBlock request and response and the resulting state changes +// out to a file as described in the above the naming schema +func (fss *StreamingService) ListenEndBlock(ctx sdk.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error { + // generate the new file + dstFile, err := fss.openEndBlockFile() + if err != nil { + return err + } + // write req to file + lengthPrefixedReqBytes, err := fss.codec.MarshalLengthPrefixed(&req) + if err != nil { + return err + } + if _, err = dstFile.Write(lengthPrefixedReqBytes); err != nil { + return err + } + // write all state changes cached for this stage to file + fss.stateCacheLock.Lock() + for _, stateChange := range fss.stateCache { + if _, err = dstFile.Write(stateChange); err != nil { + fss.stateCache = nil + fss.stateCacheLock.Unlock() + return err + } + } + // reset cache + fss.stateCache = nil + fss.stateCacheLock.Unlock() + // write res to file + lengthPrefixedResBytes, err := fss.codec.MarshalLengthPrefixed(&res) + if err != nil { + return err + } + if _, err = dstFile.Write(lengthPrefixedResBytes); err != nil { + return err + } + // close file + return dstFile.Close() +} + +func (fss *StreamingService) openEndBlockFile() (*os.File, error) { + fileName := fmt.Sprintf("block-%d-end", fss.currentBlockNumber) + if fss.filePrefix != "" { + fileName = fmt.Sprintf("%s-%s", fss.filePrefix, fileName) + } + return os.OpenFile(filepath.Join(fss.writeDir, fileName), os.O_CREATE|os.O_WRONLY, 0600) +} + +// Stream satisfies the baseapp.StreamingService interface +// It spins up a goroutine select loop which awaits length-prefixed binary encoded KV pairs +// and caches them in the order they were received +// returns an error if it is called twice +func (fss *StreamingService) Stream(wg *sync.WaitGroup) error { + if fss.quitChan != nil { + return errors.New("`Stream` has already been called. The stream needs to be closed before it can be started again") + } + fss.quitChan = make(chan struct{}) + wg.Add(1) + go func() { + defer wg.Done() + for { + select { + case <-fss.quitChan: + fss.quitChan = nil + return + case by := <-fss.srcChan: + fss.stateCacheLock.Lock() + fss.stateCache = append(fss.stateCache, by) + fss.stateCacheLock.Unlock() + } + } + }() + return nil +} + +// Close satisfies the io.Closer interface, which satisfies the baseapp.StreamingService interface +func (fss *StreamingService) Close() error { + close(fss.quitChan) + return nil +} + +// isDirWriteable checks if dir is writable by writing and removing a file +// to dir. It returns nil if dir is writable. +func isDirWriteable(dir string) error { + f := path.Join(dir, ".touch") + if err := ioutil.WriteFile(f, []byte(""), 0600); err != nil { + return err + } + return os.Remove(f) +} diff --git a/store/streaming/file/service_test.go b/store/streaming/file/service_test.go new file mode 100644 index 000000000000..493f2297b08c --- /dev/null +++ b/store/streaming/file/service_test.go @@ -0,0 +1,401 @@ +package file + +import ( + "encoding/binary" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "sync" + "testing" + + "github.com/cosmos/cosmos-sdk/codec" + codecTypes "github.com/cosmos/cosmos-sdk/codec/types" + "github.com/cosmos/cosmos-sdk/store/types" + sdk "github.com/cosmos/cosmos-sdk/types" + + "github.com/stretchr/testify/require" + abci "github.com/tendermint/tendermint/abci/types" + types1 "github.com/tendermint/tendermint/proto/tendermint/types" +) + +var ( + interfaceRegistry = codecTypes.NewInterfaceRegistry() + testMarshaller = codec.NewProtoCodec(interfaceRegistry) + testStreamingService *StreamingService + testListener1, testListener2 types.WriteListener + emptyContext = sdk.Context{} + + // test abci message types + mockHash = []byte{1, 2, 3, 4, 5, 6, 7, 8, 9} + testBeginBlockReq = abci.RequestBeginBlock{ + Header: types1.Header{ + Height: 1, + }, + ByzantineValidators: []abci.Evidence{}, + Hash: mockHash, + LastCommitInfo: abci.LastCommitInfo{ + Round: 1, + Votes: []abci.VoteInfo{}, + }, + } + testBeginBlockRes = abci.ResponseBeginBlock{ + Events: []abci.Event{ + { + Type: "testEventType1", + }, + { + Type: "testEventType2", + }, + }, + } + testEndBlockReq = abci.RequestEndBlock{ + Height: 1, + } + testEndBlockRes = abci.ResponseEndBlock{ + Events: []abci.Event{}, + ConsensusParamUpdates: &abci.ConsensusParams{}, + ValidatorUpdates: []abci.ValidatorUpdate{}, + } + mockTxBytes1 = []byte{9, 8, 7, 6, 5, 4, 3, 2, 1} + testDeliverTxReq1 = abci.RequestDeliverTx{ + Tx: mockTxBytes1, + } + mockTxBytes2 = []byte{8, 7, 6, 5, 4, 3, 2} + testDeliverTxReq2 = abci.RequestDeliverTx{ + Tx: mockTxBytes2, + } + mockTxResponseData1 = []byte{1, 3, 5, 7, 9} + testDeliverTxRes1 = abci.ResponseDeliverTx{ + Events: []abci.Event{}, + Code: 1, + Codespace: "mockCodeSpace", + Data: mockTxResponseData1, + GasUsed: 2, + GasWanted: 3, + Info: "mockInfo", + Log: "mockLog", + } + mockTxResponseData2 = []byte{1, 3, 5, 7, 9} + testDeliverTxRes2 = abci.ResponseDeliverTx{ + Events: []abci.Event{}, + Code: 1, + Codespace: "mockCodeSpace", + Data: mockTxResponseData2, + GasUsed: 2, + GasWanted: 3, + Info: "mockInfo", + Log: "mockLog", + } + + // mock store keys + mockStoreKey1 = sdk.NewKVStoreKey("mockStore1") + mockStoreKey2 = sdk.NewKVStoreKey("mockStore2") + + // file stuff + testPrefix = "testPrefix" + testDir = "./.test" + + // mock state changes + mockKey1 = []byte{1, 2, 3} + mockValue1 = []byte{3, 2, 1} + mockKey2 = []byte{2, 3, 4} + mockValue2 = []byte{4, 3, 2} + mockKey3 = []byte{3, 4, 5} + mockValue3 = []byte{5, 4, 3} +) + +func TestIntermediateWriter(t *testing.T) { + outChan := make(chan []byte, 0) + iw := NewIntermediateWriter(outChan) + require.IsType(t, &IntermediateWriter{}, iw) + testBytes := []byte{1, 2, 3, 4, 5} + var length int + var err error + waitChan := make(chan struct{}, 0) + go func() { + length, err = iw.Write(testBytes) + waitChan <- struct{}{} + }() + receivedBytes := <-outChan + <-waitChan + require.Equal(t, len(testBytes), length) + require.Equal(t, testBytes, receivedBytes) + require.Nil(t, err) +} + +func TestFileStreamingService(t *testing.T) { + if os.Getenv("CI") != "" { + t.Skip("Skipping TestFileStreamingService in CI environment") + } + err := os.Mkdir(testDir, 0700) + require.Nil(t, err) + defer os.RemoveAll(testDir) + + testKeys := []types.StoreKey{mockStoreKey1, mockStoreKey2} + testStreamingService, err = NewStreamingService(testDir, testPrefix, testKeys, testMarshaller) + require.Nil(t, err) + require.IsType(t, &StreamingService{}, testStreamingService) + require.Equal(t, testPrefix, testStreamingService.filePrefix) + require.Equal(t, testDir, testStreamingService.writeDir) + require.Equal(t, testMarshaller, testStreamingService.codec) + testListener1 = testStreamingService.listeners[mockStoreKey1][0] + testListener2 = testStreamingService.listeners[mockStoreKey2][0] + wg := new(sync.WaitGroup) + testStreamingService.Stream(wg) + testListenBeginBlock(t) + testListenDeliverTx1(t) + testListenDeliverTx2(t) + testListenEndBlock(t) + testStreamingService.Close() + wg.Wait() +} + +func testListenBeginBlock(t *testing.T) { + expectedBeginBlockReqBytes, err := testMarshaller.Marshal(&testBeginBlockReq) + require.Nil(t, err) + expectedBeginBlockResBytes, err := testMarshaller.Marshal(&testBeginBlockRes) + require.Nil(t, err) + + // write state changes + testListener1.OnWrite(mockStoreKey1, mockKey1, mockValue1, false) + testListener2.OnWrite(mockStoreKey2, mockKey2, mockValue2, false) + testListener1.OnWrite(mockStoreKey1, mockKey3, mockValue3, false) + + // expected KV pairs + expectedKVPair1, err := testMarshaller.Marshal(&types.StoreKVPair{ + StoreKey: mockStoreKey1.Name(), + Key: mockKey1, + Value: mockValue1, + Delete: false, + }) + require.Nil(t, err) + expectedKVPair2, err := testMarshaller.Marshal(&types.StoreKVPair{ + StoreKey: mockStoreKey2.Name(), + Key: mockKey2, + Value: mockValue2, + Delete: false, + }) + require.Nil(t, err) + expectedKVPair3, err := testMarshaller.Marshal(&types.StoreKVPair{ + StoreKey: mockStoreKey1.Name(), + Key: mockKey3, + Value: mockValue3, + Delete: false, + }) + require.Nil(t, err) + + // send the ABCI messages + err = testStreamingService.ListenBeginBlock(emptyContext, testBeginBlockReq, testBeginBlockRes) + require.Nil(t, err) + + // load the file, checking that it was created with the expected name + fileName := fmt.Sprintf("%s-block-%d-begin", testPrefix, testBeginBlockReq.GetHeader().Height) + fileBytes, err := readInFile(fileName) + require.Nil(t, err) + + // segment the file into the separate gRPC messages and check the correctness of each + segments, err := segmentBytes(fileBytes) + require.Nil(t, err) + require.Equal(t, 5, len(segments)) + require.Equal(t, expectedBeginBlockReqBytes, segments[0]) + require.Equal(t, expectedKVPair1, segments[1]) + require.Equal(t, expectedKVPair2, segments[2]) + require.Equal(t, expectedKVPair3, segments[3]) + require.Equal(t, expectedBeginBlockResBytes, segments[4]) +} + +func testListenDeliverTx1(t *testing.T) { + expectedDeliverTxReq1Bytes, err := testMarshaller.Marshal(&testDeliverTxReq1) + require.Nil(t, err) + expectedDeliverTxRes1Bytes, err := testMarshaller.Marshal(&testDeliverTxRes1) + require.Nil(t, err) + + // write state changes + testListener1.OnWrite(mockStoreKey1, mockKey1, mockValue1, false) + testListener2.OnWrite(mockStoreKey2, mockKey2, mockValue2, false) + testListener1.OnWrite(mockStoreKey2, mockKey3, mockValue3, false) + + // expected KV pairs + expectedKVPair1, err := testMarshaller.Marshal(&types.StoreKVPair{ + StoreKey: mockStoreKey1.Name(), + Key: mockKey1, + Value: mockValue1, + Delete: false, + }) + require.Nil(t, err) + expectedKVPair2, err := testMarshaller.Marshal(&types.StoreKVPair{ + StoreKey: mockStoreKey2.Name(), + Key: mockKey2, + Value: mockValue2, + Delete: false, + }) + require.Nil(t, err) + expectedKVPair3, err := testMarshaller.Marshal(&types.StoreKVPair{ + StoreKey: mockStoreKey2.Name(), + Key: mockKey3, + Value: mockValue3, + Delete: false, + }) + require.Nil(t, err) + + // send the ABCI messages + err = testStreamingService.ListenDeliverTx(emptyContext, testDeliverTxReq1, testDeliverTxRes1) + require.Nil(t, err) + + // load the file, checking that it was created with the expected name + fileName := fmt.Sprintf("%s-block-%d-tx-%d", testPrefix, testBeginBlockReq.GetHeader().Height, 0) + fileBytes, err := readInFile(fileName) + require.Nil(t, err) + + // segment the file into the separate gRPC messages and check the correctness of each + segments, err := segmentBytes(fileBytes) + require.Nil(t, err) + require.Equal(t, 5, len(segments)) + require.Equal(t, expectedDeliverTxReq1Bytes, segments[0]) + require.Equal(t, expectedKVPair1, segments[1]) + require.Equal(t, expectedKVPair2, segments[2]) + require.Equal(t, expectedKVPair3, segments[3]) + require.Equal(t, expectedDeliverTxRes1Bytes, segments[4]) +} + +func testListenDeliverTx2(t *testing.T) { + expectedDeliverTxReq2Bytes, err := testMarshaller.Marshal(&testDeliverTxReq2) + require.Nil(t, err) + expectedDeliverTxRes2Bytes, err := testMarshaller.Marshal(&testDeliverTxRes2) + require.Nil(t, err) + + // write state changes + testListener1.OnWrite(mockStoreKey2, mockKey1, mockValue1, false) + testListener2.OnWrite(mockStoreKey1, mockKey2, mockValue2, false) + testListener1.OnWrite(mockStoreKey2, mockKey3, mockValue3, false) + + // expected KV pairs + expectedKVPair1, err := testMarshaller.Marshal(&types.StoreKVPair{ + StoreKey: mockStoreKey2.Name(), + Key: mockKey1, + Value: mockValue1, + Delete: false, + }) + require.Nil(t, err) + expectedKVPair2, err := testMarshaller.Marshal(&types.StoreKVPair{ + StoreKey: mockStoreKey1.Name(), + Key: mockKey2, + Value: mockValue2, + Delete: false, + }) + require.Nil(t, err) + expectedKVPair3, err := testMarshaller.Marshal(&types.StoreKVPair{ + StoreKey: mockStoreKey2.Name(), + Key: mockKey3, + Value: mockValue3, + Delete: false, + }) + require.Nil(t, err) + + // send the ABCI messages + err = testStreamingService.ListenDeliverTx(emptyContext, testDeliverTxReq2, testDeliverTxRes2) + require.Nil(t, err) + + // load the file, checking that it was created with the expected name + fileName := fmt.Sprintf("%s-block-%d-tx-%d", testPrefix, testBeginBlockReq.GetHeader().Height, 1) + fileBytes, err := readInFile(fileName) + require.Nil(t, err) + + // segment the file into the separate gRPC messages and check the correctness of each + segments, err := segmentBytes(fileBytes) + require.Nil(t, err) + require.Equal(t, 5, len(segments)) + require.Equal(t, expectedDeliverTxReq2Bytes, segments[0]) + require.Equal(t, expectedKVPair1, segments[1]) + require.Equal(t, expectedKVPair2, segments[2]) + require.Equal(t, expectedKVPair3, segments[3]) + require.Equal(t, expectedDeliverTxRes2Bytes, segments[4]) +} + +func testListenEndBlock(t *testing.T) { + expectedEndBlockReqBytes, err := testMarshaller.Marshal(&testEndBlockReq) + require.Nil(t, err) + expectedEndBlockResBytes, err := testMarshaller.Marshal(&testEndBlockRes) + require.Nil(t, err) + + // write state changes + testListener1.OnWrite(mockStoreKey1, mockKey1, mockValue1, false) + testListener2.OnWrite(mockStoreKey1, mockKey2, mockValue2, false) + testListener1.OnWrite(mockStoreKey2, mockKey3, mockValue3, false) + + // expected KV pairs + expectedKVPair1, err := testMarshaller.Marshal(&types.StoreKVPair{ + StoreKey: mockStoreKey1.Name(), + Key: mockKey1, + Value: mockValue1, + Delete: false, + }) + require.Nil(t, err) + expectedKVPair2, err := testMarshaller.Marshal(&types.StoreKVPair{ + StoreKey: mockStoreKey1.Name(), + Key: mockKey2, + Value: mockValue2, + Delete: false, + }) + require.Nil(t, err) + expectedKVPair3, err := testMarshaller.Marshal(&types.StoreKVPair{ + StoreKey: mockStoreKey2.Name(), + Key: mockKey3, + Value: mockValue3, + Delete: false, + }) + require.Nil(t, err) + + // send the ABCI messages + err = testStreamingService.ListenEndBlock(emptyContext, testEndBlockReq, testEndBlockRes) + require.Nil(t, err) + + // load the file, checking that it was created with the expected name + fileName := fmt.Sprintf("%s-block-%d-end", testPrefix, testEndBlockReq.Height) + fileBytes, err := readInFile(fileName) + require.Nil(t, err) + + // segment the file into the separate gRPC messages and check the correctness of each + segments, err := segmentBytes(fileBytes) + require.Nil(t, err) + require.Equal(t, 5, len(segments)) + require.Equal(t, expectedEndBlockReqBytes, segments[0]) + require.Equal(t, expectedKVPair1, segments[1]) + require.Equal(t, expectedKVPair2, segments[2]) + require.Equal(t, expectedKVPair3, segments[3]) + require.Equal(t, expectedEndBlockResBytes, segments[4]) +} + +func readInFile(name string) ([]byte, error) { + path := filepath.Join(testDir, name) + return ioutil.ReadFile(path) +} + +// Returns all of the protobuf messages contained in the byte array as an array of byte arrays +// The messages have their length prefix removed +func segmentBytes(bz []byte) ([][]byte, error) { + var err error + segments := make([][]byte, 0) + for len(bz) > 0 { + var segment []byte + segment, bz, err = getHeadSegment(bz) + if err != nil { + return nil, err + } + segments = append(segments, segment) + } + return segments, nil +} + +// Returns the bytes for the leading protobuf object in the byte array (removing the length prefix) and returns the remainder of the byte array +func getHeadSegment(bz []byte) ([]byte, []byte, error) { + size, prefixSize := binary.Uvarint(bz) + if prefixSize < 0 { + return nil, nil, fmt.Errorf("invalid number of bytes read from length-prefixed encoding: %d", prefixSize) + } + if size > uint64(len(bz)-prefixSize) { + return nil, nil, fmt.Errorf("not enough bytes to read; want: %v, got: %v", size, len(bz)-prefixSize) + } + return bz[prefixSize:(uint64(prefixSize) + size)], bz[uint64(prefixSize)+size:], nil +}