-
Notifications
You must be signed in to change notification settings - Fork 49
/
reader.go
276 lines (238 loc) · 6.7 KB
/
reader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
package reader
import (
"bufio"
"bytes"
"errors"
"fmt"
"io"
"os"
"sync"
log "github.com/golang/glog"
"github.com/klauspost/compress/zstd"
)
// errNotInitialized is the error returned from Read() by a ReedSeeker that
// hasn't yet had Initialize() called.
//
// stylecheck is disabled because the error text starts with a capital letter,
// and changing the text would be an API change.
var errNotInitialized = errors.New("Not yet initialized") // nolint:stylecheck
// Initializable is an interface containing methods to initialize a ReadSeeker.
type Initializable interface {
IsInitialized() bool
Initialize() error
}
// ReadSeeker is an interface used to capture a file reader with seek functionality.
type ReadSeeker interface {
io.Reader
io.Closer
Initializable
SeekOffset(offset int64) error
}
type fileSeeker struct {
reader *bufio.Reader
f *os.File
path string
buffSize int
seekOffset int64
initialized bool
}
// NewFileReadSeeker wraps a buffered file reader with Seeking functionality.
// Notice that Seek calls un-set the reader and require Initialize calls. This
// is to avoid potentially unnecessary disk IO.
func NewFileReadSeeker(path string, buffsize int) ReadSeeker {
return &fileSeeker{
f: nil,
path: path,
buffSize: buffsize,
seekOffset: 0,
initialized: false,
}
}
// Close closes the reader. It still can be reopened with Initialize().
func (fio *fileSeeker) Close() (err error) {
fio.initialized = false
if fio.f != nil {
err = fio.f.Close()
}
fio.f = nil
fio.reader = nil
return err
}
// Read implements io.Reader.
func (fio *fileSeeker) Read(p []byte) (int, error) {
if !fio.IsInitialized() {
return 0, errNotInitialized
}
return fio.reader.Read(p)
}
// Seek is a simplified version of io.Seeker. It only supports offsets from the
// beginning of the file, and it errors lazily at the next Initialize.
func (fio *fileSeeker) SeekOffset(offset int64) error {
fio.seekOffset = offset
fio.initialized = false
fio.reader = nil
return nil
}
// IsInitialized indicates whether this reader is ready. If false, Read calls
// will fail.
func (fio *fileSeeker) IsInitialized() bool {
return fio.initialized
}
// Initialize does the required IO pre-work for Read calls to function.
func (fio *fileSeeker) Initialize() error {
if fio.initialized {
return errors.New("Already initialized")
}
if fio.f == nil {
var err error
fio.f, err = os.Open(fio.path)
if err != nil {
return err
}
}
off, err := fio.f.Seek(fio.seekOffset, io.SeekStart)
if err != nil {
return err
}
if off != fio.seekOffset {
return fmt.Errorf("File seeking ended at %d. Expected %d,", off, fio.seekOffset)
}
if fio.reader == nil {
fio.reader = bufio.NewReaderSize(fio.f, fio.buffSize)
} else {
fio.reader.Reset(fio.f)
}
fio.initialized = true
return nil
}
// The zstd encoder lib will async write to the buffer, so we need
// to lock access to actually check for contents.
type syncedBuffer struct {
mu sync.Mutex
buf *bytes.Buffer
}
func (sb *syncedBuffer) Read(p []byte) (int, error) {
sb.mu.Lock()
defer sb.mu.Unlock()
return sb.buf.Read(p)
}
func (sb *syncedBuffer) Write(p []byte) (int, error) {
sb.mu.Lock()
defer sb.mu.Unlock()
return sb.buf.Write(p)
}
func (sb *syncedBuffer) Len() int {
sb.mu.Lock()
defer sb.mu.Unlock()
return sb.buf.Len()
}
func (sb *syncedBuffer) Reset() {
sb.mu.Lock()
defer sb.mu.Unlock()
sb.buf.Reset()
}
type compressedSeeker struct {
fs ReadSeeker
encdW *zstd.Encoder
// This keeps the compressed data
buf *syncedBuffer
}
var encoderInit sync.Once
var encoders *sync.Pool
// NewCompressedFileSeeker creates a ReadSeeker based on a file path.
func NewCompressedFileSeeker(path string, buffsize int) (ReadSeeker, error) {
return NewCompressedSeeker(NewFileReadSeeker(path, buffsize))
}
// NewCompressedSeeker wraps a ReadSeeker to compress its data on the fly.
func NewCompressedSeeker(fs ReadSeeker) (ReadSeeker, error) {
if _, ok := fs.(*compressedSeeker); ok {
return nil, errors.New("trying to double compress files")
}
encoderInit.Do(func() {
encoders = &sync.Pool{
New: func() interface{} {
e, err := zstd.NewWriter(nil, zstd.WithEncoderConcurrency(1))
if err != nil {
log.Errorf("Error creating new encoder: %v", err)
return nil
}
return e
},
}
})
buf := bytes.NewBuffer(nil)
sb := &syncedBuffer{buf: buf}
encdIntf := encoders.Get()
encdW, ok := encdIntf.(*zstd.Encoder)
if !ok || encdW == nil {
return nil, errors.New("failed creating new encoder")
}
encdW.Reset(sb)
return &compressedSeeker{
fs: fs,
encdW: encdW,
buf: sb,
}, nil
}
func (cfs *compressedSeeker) Read(p []byte) (int, error) {
if !cfs.IsInitialized() {
return 0, errNotInitialized
}
var err error
// Repeatedly encode chunks of input data until there's enough compressed
// data to fill the output buffer. It can't be known ahead of time how much
// uncompressed data will correspond to the desired amount of output
// compressed data, hence the need for a loop.
//
// err will be nil until the loop encounters an error. cfs.encdW will be nil
// when entering the loop if a previous Read call encountered an error or
// reached an EOF, in which case there's no more data to encode.
for cfs.buf.Len() < len(p) && err == nil && cfs.encdW != nil {
var n int
// Read is allowed to use the entirety of p as a scratchpad.
n, err = cfs.fs.Read(p)
// errW must be non-nil if written bytes != n.
_, errW := cfs.encdW.Write(p[:n])
if errW != nil && (err == nil || err == io.EOF) {
err = errW
}
}
if err != nil {
// When the buffer ends (EOF), or in case of an unexpected error,
// compress remaining available bytes. The encoder requires a Close call
// to finish writing compressed data smaller than zstd's window size.
closeErr := cfs.encdW.Close()
if err == io.EOF {
err = closeErr
}
encoders.Put(cfs.encdW)
cfs.encdW = nil
}
n, readErr := cfs.buf.Read(p)
if err == nil {
err = readErr
}
return n, err
}
func (cfs *compressedSeeker) SeekOffset(offset int64) error {
cfs.buf.Reset()
if cfs.encdW == nil {
encdIntf := encoders.Get()
var ok bool
cfs.encdW, ok = encdIntf.(*zstd.Encoder)
if !ok || cfs.encdW == nil {
return errors.New("failed to get a new encoder")
}
} else if err := cfs.encdW.Close(); err != nil {
encoders.Put(cfs.encdW)
cfs.encdW = nil
return err
}
cfs.buf.Reset()
cfs.encdW.Reset(cfs.buf)
return cfs.fs.SeekOffset(offset)
}
func (cfs *compressedSeeker) IsInitialized() bool { return cfs.fs.IsInitialized() }
func (cfs *compressedSeeker) Initialize() error { return cfs.fs.Initialize() }
// No need for close to close the encoder - that's handled by Read.
func (cfs *compressedSeeker) Close() error { return cfs.fs.Close() }