From 1039c26e31446052dea608127681cc92b6473f7a Mon Sep 17 00:00:00 2001 From: Aleksandr Bezobchuk Date: Fri, 9 Dec 2022 09:57:25 -0500 Subject: [PATCH 1/7] updates --- store/streaming/file/service.go | 113 ++++++++++++++++++++------------ 1 file changed, 72 insertions(+), 41 deletions(-) diff --git a/store/streaming/file/service.go b/store/streaming/file/service.go index e98e678352cb..2ad706518df0 100644 --- a/store/streaming/file/service.go +++ b/store/streaming/file/service.go @@ -22,7 +22,8 @@ import ( var _ baseapp.StreamingService = &StreamingService{} -// StreamingService is a concrete implementation of StreamingService that writes state changes out to files +// StreamingService is a concrete implementation of StreamingService that writes +// state changes out to files. type StreamingService struct { storeListeners []*types.MemoryListener // a series of KVStore listeners for each KVStore filePrefix string // optional prefix for each of the generated files @@ -32,32 +33,45 @@ type StreamingService struct { currentBlockNumber int64 blockMetadata types.BlockMetadata - // if write the metadata file, otherwise only data file is outputted. + + // outputMetadata, if true, writes additional metadata to file per block outputMetadata bool - // if true, when commit failed it will panic and stop the consensus state machine to ensure the - // eventual consistency of the output, otherwise the error is ignored and have the risk of lossing data. + + // stopNodeOnErr, if true, will panic and stop the node during ABCI Commit + // to ensure eventual consistency of the output, otherwise, any errors are + // logged and ignored which could yield data loss in streamed output. stopNodeOnErr bool - // if true, the file.Sync() is called to make sure the data is persisted onto disk, otherwise it risks lossing data when system crash. + + // fsync, if true, will execute file Sync to make sure the data is persisted + // onto disk, otherwise there is a risk of data loss during any crash. fsync bool } -// NewStreamingService creates a new StreamingService for the provided writeDir, (optional) filePrefix, and storeKeys -func NewStreamingService(writeDir, filePrefix string, storeKeys []types.StoreKey, c codec.BinaryCodec, logger log.Logger, outputMetadata bool, stopNodeOnErr bool, fsync bool) (*StreamingService, error) { +func NewStreamingService( + writeDir, filePrefix string, + storeKeys []types.StoreKey, + c codec.BinaryCodec, + logger log.Logger, + outputMetadata, stopNodeOnErr, fsync bool, +) (*StreamingService, error) { // sort storeKeys for deterministic output sort.SliceStable(storeKeys, func(i, j int) bool { return storeKeys[i].Name() < storeKeys[j].Name() }) + // NOTE: We use the same listener for each store. listeners := make([]*types.MemoryListener, len(storeKeys)) - // in this case, we are using the same listener for each Store for i, key := range storeKeys { listeners[i] = types.NewMemoryListener(key) } - // check that the writeDir exists and is writable so that we can catch the error here at initialization if it is not - // we don't open a dstFile until we receive our first ABCI message + + // Check that the writeDir exists and is writable so that we can catch the + // error here at initialization. If it is not we don't open a dstFile until we + // receive our first ABCI message. if err := isDirWriteable(writeDir); err != nil { return nil, err } + return &StreamingService{ storeListeners: listeners, filePrefix: filePrefix, @@ -70,65 +84,74 @@ func NewStreamingService(writeDir, filePrefix string, storeKeys []types.StoreKey }, nil } -// Listeners satisfies the baseapp.StreamingService interface -// It returns the StreamingService's underlying WriteListeners -// Use for registering the underlying WriteListeners with the BaseApp +// Listeners satisfies the StreamingService interface. It returns the +// StreamingService's underlying WriteListeners. Use for registering the +// underlying WriteListeners with the BaseApp. func (fss *StreamingService) Listeners() map[types.StoreKey][]types.WriteListener { listeners := make(map[types.StoreKey][]types.WriteListener, len(fss.storeListeners)) for _, listener := range fss.storeListeners { listeners[listener.StoreKey()] = []types.WriteListener{listener} } + return listeners } -// ListenBeginBlock satisfies the baseapp.ABCIListener interface -// It writes the received BeginBlock request and response and the resulting state changes -// out to a file as described in the above the naming schema -func (fss *StreamingService) ListenBeginBlock(ctx context.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) (rerr error) { +// ListenBeginBlock satisfies the ABCIListener interface. It sets the received +// BeginBlock request, response and the current block number. Note, these are +// not written to file until ListenCommit is executed, after which it will be +// reset again on the next block. +func (fss *StreamingService) ListenBeginBlock(ctx context.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error { fss.blockMetadata.RequestBeginBlock = &req fss.blockMetadata.ResponseBeginBlock = &res fss.currentBlockNumber = req.Header.Height return nil } -// ListenDeliverTx satisfies the baseapp.ABCIListener interface -// It writes the received DeliverTx request and response and the resulting state changes -// out to a file as described in the above the naming schema -func (fss *StreamingService) ListenDeliverTx(ctx context.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) (rerr error) { +// ListenDeliverTx satisfies the ABCIListener interface. It appends the received +// DeliverTx request and response to a list of DeliverTxs objects. Note, these +// are not written to file until ListenCommit is executed, after which it will be +// reset again on the next block. +func (fss *StreamingService) ListenDeliverTx(ctx context.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error { fss.blockMetadata.DeliverTxs = append(fss.blockMetadata.DeliverTxs, &types.BlockMetadata_DeliverTx{ Request: &req, Response: &res, }) + return nil } -// ListenEndBlock satisfies the baseapp.ABCIListener interface -// It writes the received EndBlock request and response and the resulting state changes -// out to a file as described in the above the naming schema -func (fss *StreamingService) ListenEndBlock(ctx context.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) (rerr error) { +// ListenEndBlock satisfies the ABCIListener interface. It sets the received +// EndBlock request, response and the current block number. Note, these are +// not written to file until ListenCommit is executed, after which it will be +// reset again on the next block. +func (fss *StreamingService) ListenEndBlock(ctx context.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error { fss.blockMetadata.RequestEndBlock = &req fss.blockMetadata.ResponseEndBlock = &res return nil } -// ListenEndBlock satisfies the baseapp.ABCIListener interface +// ListenCommit satisfies the ABCIListener interface. It is executed during the +// ABCI Commit request and is responsible for writing all staged data to files. +// It will only return a non-nil error when stopNodeOnErr is set. func (fss *StreamingService) ListenCommit(ctx context.Context, res abci.ResponseCommit) error { - err := fss.doListenCommit(ctx, res) - if err != nil { - fss.logger.Error("Commit listening hook failed", "height", fss.currentBlockNumber, "err", err) + if err := fss.doListenCommit(ctx, res); err != nil { + fss.logger.Error("Listen commit failed", "height", fss.currentBlockNumber, "err", err) if fss.stopNodeOnErr { return err } } + return nil } func (fss *StreamingService) doListenCommit(ctx context.Context, res abci.ResponseCommit) (err error) { fss.blockMetadata.ResponseCommit = &res - // write to target files, the file size is written at the beginning, which can be used to detect completeness. + // Write to target files, the file size is written at the beginning, which can + // be used to detect completeness. metaFileName := fmt.Sprintf("block-%d-meta", fss.currentBlockNumber) dataFileName := fmt.Sprintf("block-%d-data", fss.currentBlockNumber) + if fss.filePrefix != "" { metaFileName = fmt.Sprintf("%s-%s", fss.filePrefix, metaFileName) dataFileName = fmt.Sprintf("%s-%s", fss.filePrefix, dataFileName) @@ -139,6 +162,7 @@ func (fss *StreamingService) doListenCommit(ctx context.Context, res abci.Respon if err != nil { return err } + if err := writeLengthPrefixedFile(path.Join(fss.writeDir, metaFileName), bz, fss.fsync); err != nil { return err } @@ -148,42 +172,44 @@ func (fss *StreamingService) doListenCommit(ctx context.Context, res abci.Respon if err := fss.writeBlockData(&buf); err != nil { return err } + return writeLengthPrefixedFile(path.Join(fss.writeDir, dataFileName), buf.Bytes(), fss.fsync) } func (fss *StreamingService) writeBlockData(writer io.Writer) error { for _, listener := range fss.storeListeners { cache := listener.PopStateCache() + for i := range cache { bz, err := fss.codec.MarshalLengthPrefixed(&cache[i]) if err != nil { return err } - if _, err = writer.Write(bz); err != nil { + + if _, err := writer.Write(bz); err != nil { return err } } } - return nil -} -// Stream satisfies the baseapp.StreamingService interface -func (fss *StreamingService) Stream(wg *sync.WaitGroup) error { return nil } -// Close satisfies the io.Closer interface, which satisfies the baseapp.StreamingService interface -func (fss *StreamingService) Close() error { - return nil -} +// Stream satisfies the StreamingService interface. It performs a no-op. +func (fss *StreamingService) Stream(wg *sync.WaitGroup) error { return nil } + +// Close satisfies the StreamingService interface. It performs a no-op. +func (fss *StreamingService) Close() error { return nil } // isDirWriteable checks if dir is writable by writing and removing a file -// to dir. It returns nil if dir is writable. +// to dir. It returns nil if dir is writable. We have to do this as there is no +// platform-independent way of determining if a directory is writeable. func isDirWriteable(dir string) error { f := path.Join(dir, ".touch") if err := os.WriteFile(f, []byte(""), 0o600); err != nil { return err } + return os.Remove(f) } @@ -193,25 +219,30 @@ func writeLengthPrefixedFile(path string, data []byte, fsync bool) (err error) { if err != nil { return sdkerrors.Wrapf(err, "open file failed: %s", path) } + defer func() { // avoid overriding the real error with file close error if err1 := f.Close(); err1 != nil && err == nil { err = sdkerrors.Wrapf(err, "close file failed: %s", path) } }() + _, err = f.Write(sdk.Uint64ToBigEndian(uint64(len(data)))) if err != nil { return sdkerrors.Wrapf(err, "write length prefix failed: %s", path) } + _, err = f.Write(data) if err != nil { return sdkerrors.Wrapf(err, "write block data failed: %s", path) } + if fsync { err = f.Sync() if err != nil { return sdkerrors.Wrapf(err, "fsync failed: %s", path) } } - return + + return err } From 4bde206462997d8bd16e5b14ff92ff23fc90e990 Mon Sep 17 00:00:00 2001 From: Aleksandr Bezobchuk Date: Fri, 9 Dec 2022 10:08:55 -0500 Subject: [PATCH 2/7] updates --- store/streaming/file/service_test.go | 44 ++++++++++++++++++++++------ 1 file changed, 35 insertions(+), 9 deletions(-) diff --git a/store/streaming/file/service_test.go b/store/streaming/file/service_test.go index 21ebf508f6a6..9eb0a555f8df 100644 --- a/store/streaming/file/service_test.go +++ b/store/streaming/file/service_test.go @@ -16,13 +16,13 @@ import ( tmproto "github.com/tendermint/tendermint/proto/tendermint/types" "github.com/cosmos/cosmos-sdk/codec" - codecTypes "github.com/cosmos/cosmos-sdk/codec/types" + codectypes "github.com/cosmos/cosmos-sdk/codec/types" "github.com/cosmos/cosmos-sdk/store/types" sdk "github.com/cosmos/cosmos-sdk/types" ) var ( - interfaceRegistry = codecTypes.NewInterfaceRegistry() + interfaceRegistry = codectypes.NewInterfaceRegistry() testMarshaller = codec.NewProtoCodec(interfaceRegistry) testStreamingService *StreamingService testListener1, testListener2 types.WriteListener @@ -115,20 +115,23 @@ func TestFileStreamingService(t *testing.T) { if os.Getenv("CI") != "" { t.Skip("Skipping TestFileStreamingService in CI environment") } - err := os.Mkdir(testDir, 0o700) - require.Nil(t, err) + + require.Nil(t, os.Mkdir(testDir, 0o700)) defer os.RemoveAll(testDir) testKeys := []types.StoreKey{mockStoreKey1, mockStoreKey2} - testStreamingService, err = NewStreamingService(testDir, testPrefix, testKeys, testMarshaller, log.NewNopLogger(), true, false, false) + testStreamingService, err := NewStreamingService(testDir, testPrefix, testKeys, testMarshaller, log.NewNopLogger(), true, false, false) require.Nil(t, err) require.IsType(t, &StreamingService{}, testStreamingService) require.Equal(t, testPrefix, testStreamingService.filePrefix) require.Equal(t, testDir, testStreamingService.writeDir) require.Equal(t, testMarshaller, testStreamingService.codec) + testListener1 = testStreamingService.storeListeners[0] testListener2 = testStreamingService.storeListeners[1] + wg := new(sync.WaitGroup) + testStreamingService.Stream(wg) testListenBlock(t) testStreamingService.Close() @@ -136,7 +139,10 @@ func TestFileStreamingService(t *testing.T) { } func testListenBlock(t *testing.T) { - var expectKVPairsStore1, expectKVPairsStore2 [][]byte + var ( + expectKVPairsStore1 [][]byte + expectKVPairsStore2 [][]byte + ) // write state changes testListener1.OnWrite(mockStoreKey1, mockKey1, mockValue1, false) @@ -151,6 +157,7 @@ func testListenBlock(t *testing.T) { Delete: false, }) require.Nil(t, err) + expectedKVPair2, err := testMarshaller.Marshal(&types.StoreKVPair{ StoreKey: mockStoreKey2.Name(), Key: mockKey2, @@ -158,6 +165,7 @@ func testListenBlock(t *testing.T) { Delete: false, }) require.Nil(t, err) + expectedKVPair3, err := testMarshaller.Marshal(&types.StoreKVPair{ StoreKey: mockStoreKey1.Name(), Key: mockKey3, @@ -165,6 +173,7 @@ func testListenBlock(t *testing.T) { Delete: false, }) require.Nil(t, err) + expectKVPairsStore1 = append(expectKVPairsStore1, expectedKVPair1, expectedKVPair3) expectKVPairsStore2 = append(expectKVPairsStore2, expectedKVPair2) @@ -185,6 +194,7 @@ func testListenBlock(t *testing.T) { Delete: false, }) require.Nil(t, err) + expectedKVPair2, err = testMarshaller.Marshal(&types.StoreKVPair{ StoreKey: mockStoreKey2.Name(), Key: mockKey2, @@ -192,6 +202,7 @@ func testListenBlock(t *testing.T) { Delete: false, }) require.Nil(t, err) + expectedKVPair3, err = testMarshaller.Marshal(&types.StoreKVPair{ StoreKey: mockStoreKey2.Name(), Key: mockKey3, @@ -199,6 +210,7 @@ func testListenBlock(t *testing.T) { Delete: false, }) require.Nil(t, err) + expectKVPairsStore1 = append(expectKVPairsStore1, expectedKVPair1) expectKVPairsStore2 = append(expectKVPairsStore2, expectedKVPair2, expectedKVPair3) @@ -219,6 +231,7 @@ func testListenBlock(t *testing.T) { Delete: false, }) require.Nil(t, err) + expectedKVPair2, err = testMarshaller.Marshal(&types.StoreKVPair{ StoreKey: mockStoreKey1.Name(), Key: mockKey2, @@ -226,6 +239,7 @@ func testListenBlock(t *testing.T) { Delete: false, }) require.Nil(t, err) + expectedKVPair3, err = testMarshaller.Marshal(&types.StoreKVPair{ StoreKey: mockStoreKey2.Name(), Key: mockKey3, @@ -233,6 +247,7 @@ func testListenBlock(t *testing.T) { Delete: false, }) require.Nil(t, err) + expectKVPairsStore1 = append(expectKVPairsStore1, expectedKVPair2) expectKVPairsStore2 = append(expectKVPairsStore2, expectedKVPair1, expectedKVPair3) @@ -253,6 +268,7 @@ func testListenBlock(t *testing.T) { Delete: false, }) require.Nil(t, err) + expectedKVPair2, err = testMarshaller.Marshal(&types.StoreKVPair{ StoreKey: mockStoreKey1.Name(), Key: mockKey2, @@ -260,6 +276,7 @@ func testListenBlock(t *testing.T) { Delete: false, }) require.Nil(t, err) + expectedKVPair3, err = testMarshaller.Marshal(&types.StoreKVPair{ StoreKey: mockStoreKey2.Name(), Key: mockKey3, @@ -314,37 +331,46 @@ func readInFile(name string) ([]byte, error) { if err != nil { return nil, err } + size := sdk.BigEndianToUint64(bz[:8]) if len(bz) != int(size)+8 { return nil, errors.New("incomplete file ") } + return bz[8:], nil } -// segmentBytes returns all of the protobuf messages contained in the byte array as an array of byte arrays -// The messages have their length prefix removed +// segmentBytes returns all of the protobuf messages contained in the byte array +// as an array of byte arrays. The messages have their length prefix removed. func segmentBytes(bz []byte) ([][]byte, error) { var err error + segments := make([][]byte, 0) for len(bz) > 0 { var segment []byte + segment, bz, err = getHeadSegment(bz) if err != nil { return nil, err } + segments = append(segments, segment) } + return segments, nil } -// getHeadSegment returns the bytes for the leading protobuf object in the byte array (removing the length prefix) and returns the remainder of the byte array +// getHeadSegment returns the bytes for the leading protobuf object in the byte +// array (removing the length prefix) and returns the remainder of the byte array. func getHeadSegment(bz []byte) ([]byte, []byte, error) { size, prefixSize := binary.Uvarint(bz) if prefixSize < 0 { return nil, nil, fmt.Errorf("invalid number of bytes read from length-prefixed encoding: %d", prefixSize) } + if size > uint64(len(bz)-prefixSize) { return nil, nil, fmt.Errorf("not enough bytes to read; want: %v, got: %v", size, len(bz)-prefixSize) } + return bz[prefixSize:(uint64(prefixSize) + size)], bz[uint64(prefixSize)+size:], nil } From a7471bda422d72276aed577de7342fd11d684465 Mon Sep 17 00:00:00 2001 From: Aleksandr Bezobchuk Date: Fri, 9 Dec 2022 10:24:07 -0500 Subject: [PATCH 3/7] updates --- store/types/listening.go | 35 ++++++++++++++++++++++------------- 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/store/types/listening.go b/store/types/listening.go index 5f21689449fd..d345e75678c3 100644 --- a/store/types/listening.go +++ b/store/types/listening.go @@ -6,7 +6,7 @@ import ( "github.com/cosmos/cosmos-sdk/codec" ) -// WriteListener interface for streaming data out from a listenkv.Store +// WriteListener interface for streaming data out from a KVStore type WriteListener interface { // if value is nil then it was deleted // storeKey indicates the source KVStore, to facilitate using the same WriteListener across separate KVStores @@ -14,14 +14,16 @@ type WriteListener interface { OnWrite(storeKey StoreKey, key []byte, value []byte, delete bool) error } -// StoreKVPairWriteListener is used to configure listening to a KVStore by writing out length-prefixed -// protobuf encoded StoreKVPairs to an underlying io.Writer +// StoreKVPairWriteListener is used to configure listening to a KVStore by +// writing out length-prefixed Protobuf encoded StoreKVPairs to an underlying +// io.Writer object. type StoreKVPairWriteListener struct { writer io.Writer marshaller codec.BinaryCodec } -// NewStoreKVPairWriteListener wraps creates a StoreKVPairWriteListener with a provdied io.Writer and codec.BinaryCodec +// NewStoreKVPairWriteListener wraps creates a StoreKVPairWriteListener with a +// provided io.Writer and codec.BinaryCodec. func NewStoreKVPairWriteListener(w io.Writer, m codec.BinaryCodec) *StoreKVPairWriteListener { return &StoreKVPairWriteListener{ writer: w, @@ -29,20 +31,25 @@ func NewStoreKVPairWriteListener(w io.Writer, m codec.BinaryCodec) *StoreKVPairW } } -// OnWrite satisfies the WriteListener interface by writing length-prefixed protobuf encoded StoreKVPairs +// OnWrite satisfies the WriteListener interface by writing length-prefixed +// Protobuf encoded StoreKVPairs. func (wl *StoreKVPairWriteListener) OnWrite(storeKey StoreKey, key []byte, value []byte, delete bool) error { - kvPair := new(StoreKVPair) - kvPair.StoreKey = storeKey.Name() - kvPair.Delete = delete - kvPair.Key = key - kvPair.Value = value + kvPair := &StoreKVPair{ + StoreKey: storeKey.Name(), + Key: key, + Value: value, + Delete: delete, + } + by, err := wl.marshaller.MarshalLengthPrefixed(kvPair) if err != nil { return err } + if _, err := wl.writer.Write(by); err != nil { return err } + return nil } @@ -57,7 +64,7 @@ func NewMemoryListener(key StoreKey) *MemoryListener { return &MemoryListener{key: key} } -// OnWrite implements WriteListener interface +// OnWrite implements WriteListener interface. func (fl *MemoryListener) OnWrite(storeKey StoreKey, key []byte, value []byte, delete bool) error { fl.stateCache = append(fl.stateCache, StoreKVPair{ StoreKey: storeKey.Name(), @@ -65,17 +72,19 @@ func (fl *MemoryListener) OnWrite(storeKey StoreKey, key []byte, value []byte, d Key: key, Value: value, }) + return nil } -// PopStateCache returns the current state caches and set to nil +// PopStateCache returns the current state caches and set to nil. func (fl *MemoryListener) PopStateCache() []StoreKVPair { res := fl.stateCache fl.stateCache = nil + return res } -// StoreKey returns the storeKey it listens to +// StoreKey returns the storeKey it listens to. func (fl *MemoryListener) StoreKey() StoreKey { return fl.key } From 33dbc92e4b89b4500be2c2690fbc18a5f251dab2 Mon Sep 17 00:00:00 2001 From: Aleksandr Bezobchuk Date: Fri, 9 Dec 2022 10:43:09 -0500 Subject: [PATCH 4/7] updates --- store/streaming/file/service.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/store/streaming/file/service.go b/store/streaming/file/service.go index 2ad706518df0..64a861e3f761 100644 --- a/store/streaming/file/service.go +++ b/store/streaming/file/service.go @@ -98,8 +98,8 @@ func (fss *StreamingService) Listeners() map[types.StoreKey][]types.WriteListene // ListenBeginBlock satisfies the ABCIListener interface. It sets the received // BeginBlock request, response and the current block number. Note, these are -// not written to file until ListenCommit is executed, after which it will be -// reset again on the next block. +// not written to file until ListenCommit is executed and outputMetadata is set, +// after which it will be reset again on the next block. func (fss *StreamingService) ListenBeginBlock(ctx context.Context, req abci.RequestBeginBlock, res abci.ResponseBeginBlock) error { fss.blockMetadata.RequestBeginBlock = &req fss.blockMetadata.ResponseBeginBlock = &res @@ -109,8 +109,8 @@ func (fss *StreamingService) ListenBeginBlock(ctx context.Context, req abci.Requ // ListenDeliverTx satisfies the ABCIListener interface. It appends the received // DeliverTx request and response to a list of DeliverTxs objects. Note, these -// are not written to file until ListenCommit is executed, after which it will be -// reset again on the next block. +// are not written to file until ListenCommit is executed and outputMetadata is +// set, after which it will be reset again on the next block. func (fss *StreamingService) ListenDeliverTx(ctx context.Context, req abci.RequestDeliverTx, res abci.ResponseDeliverTx) error { fss.blockMetadata.DeliverTxs = append(fss.blockMetadata.DeliverTxs, &types.BlockMetadata_DeliverTx{ Request: &req, @@ -122,8 +122,8 @@ func (fss *StreamingService) ListenDeliverTx(ctx context.Context, req abci.Reque // ListenEndBlock satisfies the ABCIListener interface. It sets the received // EndBlock request, response and the current block number. Note, these are -// not written to file until ListenCommit is executed, after which it will be -// reset again on the next block. +// not written to file until ListenCommit is executed and outputMetadata is set, +// after which it will be reset again on the next block. func (fss *StreamingService) ListenEndBlock(ctx context.Context, req abci.RequestEndBlock, res abci.ResponseEndBlock) error { fss.blockMetadata.RequestEndBlock = &req fss.blockMetadata.ResponseEndBlock = &res From 31520121153aab3f4d1e3417c1846d1e71d5976b Mon Sep 17 00:00:00 2001 From: Aleksandr Bezobchuk Date: Fri, 9 Dec 2022 10:50:30 -0500 Subject: [PATCH 5/7] updates --- server/config/toml.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/server/config/toml.go b/server/config/toml.go index 915b4fd73e13..346385596e7f 100644 --- a/server/config/toml.go +++ b/server/config/toml.go @@ -212,11 +212,14 @@ streamers = [{{ range .Store.Streamers }}{{ printf "%q, " . }}{{end}}] keys = [{{ range .Streamers.File.Keys }}{{ printf "%q, " . }}{{end}}] write_dir = "{{ .Streamers.File.WriteDir }}" prefix = "{{ .Streamers.File.Prefix }}" + # output-metadata specifies if output the metadata file which includes the abci request/responses # during processing the block. output-metadata = "{{ .Streamers.File.OutputMetadata }}" + # stop-node-on-error specifies if propagate the file streamer errors to consensus state machine. stop-node-on-error = "{{ .Streamers.File.StopNodeOnError }}" + # fsync specifies if call fsync after writing the files. fsync = "{{ .Streamers.File.Fsync }}" @@ -229,7 +232,6 @@ fsync = "{{ .Streamers.File.Fsync }}" # Setting max_txs to negative 1 (-1) will disable transactions from being inserted into the mempool. # Setting max_txs to a positive number (> 0) will limit the number of transactions in the mempool, by the specified amount. max-txs = "{{ .Mempool.MaxTxs }}" - ` var configTemplate *template.Template From f3a6f47fb21f6771e7e92d97b1099cee7857405f Mon Sep 17 00:00:00 2001 From: Aleksandr Bezobchuk Date: Fri, 9 Dec 2022 15:50:41 -0500 Subject: [PATCH 6/7] updates --- server/config/config.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/config/config.go b/server/config/config.go index b4fb4e707e60..62585065addf 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -295,10 +295,10 @@ func DefaultConfig() *Config { Streamers: StreamersConfig{ File: FileStreamerConfig{ Keys: []string{"*"}, - WriteDir: "data/file_streamer", + WriteDir: "", OutputMetadata: true, StopNodeOnError: true, - // NOTICE: the default config don't protect the streamer data integrity + // NOTICE: The default config doesn't protect the streamer data integrity // in face of system crash. Fsync: false, }, From f4403ef903654b715d95895c440c9ae3e7d479dc Mon Sep 17 00:00:00 2001 From: Aleksandr Bezobchuk Date: Fri, 9 Dec 2022 15:53:36 -0500 Subject: [PATCH 7/7] updates --- store/streaming/file/service.go | 3 +-- store/streaming/file/service_test.go | 10 ---------- 2 files changed, 1 insertion(+), 12 deletions(-) diff --git a/store/streaming/file/service.go b/store/streaming/file/service.go index 282a08fd2e40..5fc615bc7245 100644 --- a/store/streaming/file/service.go +++ b/store/streaming/file/service.go @@ -10,11 +10,10 @@ import ( "sort" "sync" + "cosmossdk.io/errors" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/libs/log" - "cosmossdk.io/errors" - "github.com/cosmos/cosmos-sdk/baseapp" "github.com/cosmos/cosmos-sdk/store/types" sdk "github.com/cosmos/cosmos-sdk/types" diff --git a/store/streaming/file/service_test.go b/store/streaming/file/service_test.go index f584e03a6c30..4f0787202171 100644 --- a/store/streaming/file/service_test.go +++ b/store/streaming/file/service_test.go @@ -15,22 +15,12 @@ import ( "github.com/tendermint/tendermint/libs/log" tmproto "github.com/tendermint/tendermint/proto/tendermint/types" -<<<<<<< HEAD - "github.com/cosmos/cosmos-sdk/codec" - codectypes "github.com/cosmos/cosmos-sdk/codec/types" -======= ->>>>>>> main "github.com/cosmos/cosmos-sdk/store/types" sdk "github.com/cosmos/cosmos-sdk/types" ) var ( -<<<<<<< HEAD - interfaceRegistry = codectypes.NewInterfaceRegistry() - testMarshaller = codec.NewProtoCodec(interfaceRegistry) -======= testMarshaller = types.NewTestCodec() ->>>>>>> main testStreamingService *StreamingService testListener1, testListener2 types.WriteListener emptyContext = context.TODO()