-
Notifications
You must be signed in to change notification settings - Fork 3.4k
/
constructor.go
195 lines (161 loc) · 5.99 KB
/
constructor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
package streaming
import (
"fmt"
"os"
"path"
"strings"
"sync"
"github.com/cosmos/cosmos-sdk/baseapp"
"github.com/cosmos/cosmos-sdk/client/flags"
"github.com/cosmos/cosmos-sdk/codec"
serverTypes "github.com/cosmos/cosmos-sdk/server/types"
"github.com/cosmos/cosmos-sdk/store/streaming/file"
"github.com/cosmos/cosmos-sdk/store/types"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/spf13/cast"
)
// ServiceConstructor is used to construct a streaming service
type ServiceConstructor func(serverTypes.AppOptions, []types.StoreKey, codec.BinaryCodec) (baseapp.StreamingService, error)
// ServiceType enum for specifying the type of StreamingService
type ServiceType int
const (
Unknown ServiceType = iota
File
)
// Streaming option keys
const (
OptStreamersFilePrefix = "streamers.file.prefix"
OptStreamersFileWriteDir = "streamers.file.write_dir"
OptStreamersFileOutputMetadata = "streamers.file.output-metadata"
OptStreamersFileStopNodeOnError = "streamers.file.stop-node-on-error"
OptStreamersFileFsync = "streamers.file.fsync"
OptStoreStreamers = "store.streamers"
)
// ServiceTypeFromString returns the streaming.ServiceType corresponding to the
// provided name.
func ServiceTypeFromString(name string) ServiceType {
switch strings.ToLower(name) {
case "file", "f":
return File
default:
return Unknown
}
}
// String returns the string name of a streaming.ServiceType
func (sst ServiceType) String() string {
switch sst {
case File:
return "file"
default:
return "unknown"
}
}
// ServiceConstructorLookupTable is a mapping of streaming.ServiceTypes to
// streaming.ServiceConstructors types.
var ServiceConstructorLookupTable = map[ServiceType]ServiceConstructor{
File: NewFileStreamingService,
}
// NewServiceConstructor returns the streaming.ServiceConstructor corresponding
// to the provided name.
func NewServiceConstructor(name string) (ServiceConstructor, error) {
ssType := ServiceTypeFromString(name)
if ssType == Unknown {
return nil, fmt.Errorf("unrecognized streaming service name %s", name)
}
if constructor, ok := ServiceConstructorLookupTable[ssType]; ok && constructor != nil {
return constructor, nil
}
return nil, fmt.Errorf("streaming service constructor of type %s not found", ssType.String())
}
// NewFileStreamingService is the streaming.ServiceConstructor function for
// creating a FileStreamingService.
func NewFileStreamingService(
opts serverTypes.AppOptions,
keys []types.StoreKey,
marshaller codec.BinaryCodec,
) (baseapp.StreamingService, error) {
homePath := cast.ToString(opts.Get(flags.FlagHome))
filePrefix := cast.ToString(opts.Get(OptStreamersFilePrefix))
fileDir := cast.ToString(opts.Get(OptStreamersFileWriteDir))
outputMetadata := cast.ToBool(opts.Get(OptStreamersFileOutputMetadata))
stopNodeOnErr := cast.ToBool(opts.Get(OptStreamersFileStopNodeOnError))
fsync := cast.ToBool(opts.Get(OptStreamersFileFsync))
// relative path is based on node home directory.
if !path.IsAbs(fileDir) {
fileDir = path.Join(homePath, fileDir)
}
// try to create output directory if it does not exist
if _, err := os.Stat(fileDir); os.IsNotExist(err) {
if err = os.MkdirAll(fileDir, os.ModePerm); err != nil {
return nil, err
}
}
return file.NewStreamingService(fileDir, filePrefix, keys, marshaller, outputMetadata, stopNodeOnErr, fsync)
}
// LoadStreamingServices is a function for loading StreamingServices onto the
// BaseApp using the provided AppOptions, codec, and keys. It returns the
// WaitGroup and quit channel used to synchronize with the streaming services
// and any error that occurs during the setup.
func LoadStreamingServices(
bApp *baseapp.BaseApp,
appOpts serverTypes.AppOptions,
appCodec codec.BinaryCodec,
keys map[string]*types.KVStoreKey,
) ([]baseapp.StreamingService, *sync.WaitGroup, error) {
// waitgroup and quit channel for optional shutdown coordination of the streaming service(s)
wg := new(sync.WaitGroup)
// configure state listening capabilities using AppOptions
streamers := cast.ToStringSlice(appOpts.Get(OptStoreStreamers))
activeStreamers := make([]baseapp.StreamingService, 0, len(streamers))
for _, streamerName := range streamers {
var exposeStoreKeys []types.StoreKey
// get the store keys allowed to be exposed for this streaming service
exposeKeyStrs := cast.ToStringSlice(appOpts.Get(fmt.Sprintf("streamers.%s.keys", streamerName)))
// if list contains '*', expose all store keys
if sdk.SliceContains(exposeKeyStrs, "*") {
exposeStoreKeys = make([]types.StoreKey, 0, len(keys))
for _, storeKey := range keys {
exposeStoreKeys = append(exposeStoreKeys, storeKey)
}
} else {
exposeStoreKeys = make([]types.StoreKey, 0, len(exposeKeyStrs))
for _, keyStr := range exposeKeyStrs {
if storeKey, ok := keys[keyStr]; ok {
exposeStoreKeys = append(exposeStoreKeys, storeKey)
}
}
}
if len(exposeStoreKeys) == 0 {
continue
}
constructor, err := NewServiceConstructor(streamerName)
if err != nil {
// Close any services we may have already spun up before hitting the error
// on this one.
for _, activeStreamer := range activeStreamers {
activeStreamer.Close()
}
return nil, nil, err
}
// Generate the streaming service using the constructor, appOptions, and the
// StoreKeys we want to expose.
streamingService, err := constructor(appOpts, exposeStoreKeys, appCodec)
if err != nil {
// Close any services we may have already spun up before hitting the error
// on this one.
for _, activeStreamer := range activeStreamers {
activeStreamer.Close()
}
return nil, nil, err
}
// register the streaming service with the BaseApp
bApp.SetStreamingService(streamingService)
// kick off the background streaming service loop
streamingService.Stream(wg)
// add to the list of active streamers
activeStreamers = append(activeStreamers, streamingService)
}
// If there are no active streamers, activeStreamers is empty (len == 0) and
// the waitGroup is not waiting on anything.
return activeStreamers, wg, nil
}