Skip to content

Commit

Permalink
Merge pull request #14127 from ahrtr/threshold_3.5
Browse files Browse the repository at this point in the history
[3.5] Restrict the max size of each WAL entry to the remaining size of the WAL file
  • Loading branch information
ahrtr committed Jun 17, 2022
2 parents 0be65da + 621cd7b commit 4443e14
Show file tree
Hide file tree
Showing 9 changed files with 229 additions and 74 deletions.
60 changes: 60 additions & 0 deletions client/pkg/fileutil/filereader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
// Copyright 2022 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package fileutil

import (
"bufio"
"io"
"io/fs"
"os"
)

// FileReader is a wrapper of io.Reader. It also provides file info.
type FileReader interface {
io.Reader
FileInfo() (fs.FileInfo, error)
}

type fileReader struct {
*os.File
}

func NewFileReader(f *os.File) FileReader {
return &fileReader{f}
}

func (fr *fileReader) FileInfo() (fs.FileInfo, error) {
return fr.Stat()
}

// FileBufReader is a wrapper of bufio.Reader. It also provides file info.
type FileBufReader struct {
*bufio.Reader
fi fs.FileInfo
}

func NewFileBufReader(fr FileReader) *FileBufReader {
bufReader := bufio.NewReader(fr)
fi, err := fr.FileInfo()
if err != nil {
// This should never happen.
panic(err)
}
return &FileBufReader{bufReader, fi}
}

func (fbr *FileBufReader) FileInfo() fs.FileInfo {
return fbr.fi
}
44 changes: 44 additions & 0 deletions client/pkg/fileutil/filereader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2022 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package fileutil

import (
"os"
"strings"
"testing"

"github.com/stretchr/testify/assert"
)

func TestFileBufReader(t *testing.T) {
f, err := os.CreateTemp(t.TempDir(), "wal")
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
fi, err := f.Stat()
if err != nil {
t.Errorf("Unexpected error: %v", err)
}

fbr := NewFileBufReader(NewFileReader(f))

if !strings.HasPrefix(fbr.FileInfo().Name(), "wal") {
t.Errorf("Unexpected file name: %s", fbr.FileInfo().Name())
}
assert.Equal(t, fi.Size(), fbr.FileInfo().Size())
assert.Equal(t, fi.IsDir(), fbr.FileInfo().IsDir())
assert.Equal(t, fi.Mode(), fbr.FileInfo().Mode())
assert.Equal(t, fi.ModTime(), fbr.FileInfo().ModTime())
}
1 change: 1 addition & 0 deletions client/pkg/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.16

require (
github.com/coreos/go-systemd/v22 v22.3.2
github.com/stretchr/testify v1.7.0
go.uber.org/zap v1.17.0
golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57
)
1 change: 1 addition & 0 deletions client/pkg/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ go.uber.org/zap v1.17.0 h1:MTjgFu6ZLKvY6Pvaqk97GlxNBuMpV4Hy/3P6tRGlI2U=
go.uber.org/zap v1.17.0/go.mod h1:MXVU+bhUf/A7Xi2HNOnopQOrmycQ5Ih87HtOu4q5SSo=
golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57 h1:F5Gozwx4I1xtr/sr/8CFbb57iKi3297KFs0QDbGN60A=
golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.8 h1:obN1ZagJSUGI0Ek/LBmuj4SNLPfIny3KsKFopxRdj10=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
Expand Down
28 changes: 14 additions & 14 deletions server/wal/decoder.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
package wal

import (
"bufio"
"encoding/binary"
"fmt"
"hash"
"io"
"sync"

"go.etcd.io/etcd/client/pkg/v3/fileutil"
"go.etcd.io/etcd/pkg/v3/crc"
"go.etcd.io/etcd/pkg/v3/pbutil"
"go.etcd.io/etcd/raft/v3/raftpb"
Expand All @@ -34,17 +35,17 @@ const frameSizeBytes = 8

type decoder struct {
mu sync.Mutex
brs []*bufio.Reader
brs []*fileutil.FileBufReader

// lastValidOff file offset following the last valid decoded record
lastValidOff int64
crc hash.Hash32
}

func newDecoder(r ...io.Reader) *decoder {
readers := make([]*bufio.Reader, len(r))
func newDecoder(r ...fileutil.FileReader) *decoder {
readers := make([]*fileutil.FileBufReader, len(r))
for i := range r {
readers[i] = bufio.NewReader(r[i])
readers[i] = fileutil.NewFileBufReader(r[i])
}
return &decoder{
brs: readers,
Expand All @@ -59,17 +60,13 @@ func (d *decoder) decode(rec *walpb.Record) error {
return d.decodeRecord(rec)
}

// raft max message size is set to 1 MB in etcd server
// assume projects set reasonable message size limit,
// thus entry size should never exceed 10 MB
const maxWALEntrySizeLimit = int64(10 * 1024 * 1024)

func (d *decoder) decodeRecord(rec *walpb.Record) error {
if len(d.brs) == 0 {
return io.EOF
}

l, err := readInt64(d.brs[0])
fileBufReader := d.brs[0]
l, err := readInt64(fileBufReader)
if err == io.EOF || (err == nil && l == 0) {
// hit end of file or preallocated space
d.brs = d.brs[1:]
Expand All @@ -84,12 +81,15 @@ func (d *decoder) decodeRecord(rec *walpb.Record) error {
}

recBytes, padBytes := decodeFrameSize(l)
if recBytes >= maxWALEntrySizeLimit-padBytes {
return ErrMaxWALEntrySizeLimitExceeded
// The length of current WAL entry must be less than the remaining file size.
maxEntryLimit := fileBufReader.FileInfo().Size() - d.lastValidOff - padBytes
if recBytes > maxEntryLimit {
return fmt.Errorf("wal: max entry size limit exceeded, recBytes: %d, fileSize(%d) - offset(%d) - padBytes(%d) = entryLimit(%d)",
recBytes, fileBufReader.FileInfo().Size(), d.lastValidOff, padBytes, maxEntryLimit)
}

data := make([]byte, recBytes+padBytes)
if _, err = io.ReadFull(d.brs[0], data); err != nil {
if _, err = io.ReadFull(fileBufReader, data); err != nil {
// ReadFull returns io.EOF only if no bytes were read
// the decoder should treat this as an ErrUnexpectedEOF instead.
if err == io.EOF {
Expand Down
32 changes: 26 additions & 6 deletions server/wal/record_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,11 @@ import (
"errors"
"hash/crc32"
"io"
"io/ioutil"
"os"
"reflect"
"testing"

"go.etcd.io/etcd/client/pkg/v3/fileutil"
"go.etcd.io/etcd/server/v3/wal/walpb"
)

Expand All @@ -43,8 +44,7 @@ func TestReadRecord(t *testing.T) {
}{
{infoRecord, &walpb.Record{Type: 1, Crc: crc32.Checksum(infoData, crcTable), Data: infoData}, nil},
{[]byte(""), &walpb.Record{}, io.EOF},
{infoRecord[:8], &walpb.Record{}, io.ErrUnexpectedEOF},
{infoRecord[:len(infoRecord)-len(infoData)-8], &walpb.Record{}, io.ErrUnexpectedEOF},
{infoRecord[:14], &walpb.Record{}, io.ErrUnexpectedEOF},
{infoRecord[:len(infoRecord)-len(infoData)], &walpb.Record{}, io.ErrUnexpectedEOF},
{infoRecord[:len(infoRecord)-8], &walpb.Record{}, io.ErrUnexpectedEOF},
{badInfoRecord, &walpb.Record{}, walpb.ErrCRCMismatch},
Expand All @@ -53,7 +53,11 @@ func TestReadRecord(t *testing.T) {
rec := &walpb.Record{}
for i, tt := range tests {
buf := bytes.NewBuffer(tt.data)
decoder := newDecoder(ioutil.NopCloser(buf))
f, err := createFileWithData(t, buf)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
decoder := newDecoder(fileutil.NewFileReader(f))
e := decoder.decode(rec)
if !reflect.DeepEqual(rec, tt.wr) {
t.Errorf("#%d: block = %v, want %v", i, rec, tt.wr)
Expand All @@ -73,8 +77,12 @@ func TestWriteRecord(t *testing.T) {
e := newEncoder(buf, 0, 0)
e.encode(&walpb.Record{Type: typ, Data: d})
e.flush()
decoder := newDecoder(ioutil.NopCloser(buf))
err := decoder.decode(b)
f, err := createFileWithData(t, buf)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
decoder := newDecoder(fileutil.NewFileReader(f))
err = decoder.decode(b)
if err != nil {
t.Errorf("err = %v, want nil", err)
}
Expand All @@ -85,3 +93,15 @@ func TestWriteRecord(t *testing.T) {
t.Errorf("data = %v, want %v", b.Data, d)
}
}

func createFileWithData(t *testing.T, bf *bytes.Buffer) (*os.File, error) {
f, err := os.CreateTemp(t.TempDir(), "wal")
if err != nil {
return nil, err
}
if _, err := f.Write(bf.Bytes()); err != nil {
return nil, err
}
f.Seek(0, 0)
return f, nil
}
2 changes: 1 addition & 1 deletion server/wal/repair.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func Repair(lg *zap.Logger, dirpath string) bool {
lg.Info("repairing", zap.String("path", f.Name()))

rec := &walpb.Record{}
decoder := newDecoder(f)
decoder := newDecoder(fileutil.NewFileReader(f.File))
for {
lastOffset := decoder.lastOffset()
err := decoder.decode(rec)
Expand Down
27 changes: 15 additions & 12 deletions server/wal/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,14 @@ var (
// so that tests can set a different segment size.
SegmentSizeBytes int64 = 64 * 1000 * 1000 // 64MB

ErrMetadataConflict = errors.New("wal: conflicting metadata found")
ErrFileNotFound = errors.New("wal: file not found")
ErrCRCMismatch = errors.New("wal: crc mismatch")
ErrSnapshotMismatch = errors.New("wal: snapshot mismatch")
ErrSnapshotNotFound = errors.New("wal: snapshot not found")
ErrSliceOutOfRange = errors.New("wal: slice bounds out of range")
ErrMaxWALEntrySizeLimitExceeded = errors.New("wal: max entry size limit exceeded")
ErrDecoderNotFound = errors.New("wal: decoder not found")
crcTable = crc32.MakeTable(crc32.Castagnoli)
ErrMetadataConflict = errors.New("wal: conflicting metadata found")
ErrFileNotFound = errors.New("wal: file not found")
ErrCRCMismatch = errors.New("wal: crc mismatch")
ErrSnapshotMismatch = errors.New("wal: snapshot mismatch")
ErrSnapshotNotFound = errors.New("wal: snapshot not found")
ErrSliceOutOfRange = errors.New("wal: slice bounds out of range")
ErrDecoderNotFound = errors.New("wal: decoder not found")
crcTable = crc32.MakeTable(crc32.Castagnoli)
)

// WAL is a logical representation of the stable storage.
Expand Down Expand Up @@ -378,12 +377,13 @@ func selectWALFiles(lg *zap.Logger, dirpath string, snap walpb.Snapshot) ([]stri
return names, nameIndex, nil
}

func openWALFiles(lg *zap.Logger, dirpath string, names []string, nameIndex int, write bool) ([]io.Reader, []*fileutil.LockedFile, func() error, error) {
func openWALFiles(lg *zap.Logger, dirpath string, names []string, nameIndex int, write bool) ([]fileutil.FileReader, []*fileutil.LockedFile, func() error, error) {
rcs := make([]io.ReadCloser, 0)
rs := make([]io.Reader, 0)
rs := make([]fileutil.FileReader, 0)
ls := make([]*fileutil.LockedFile, 0)
for _, name := range names[nameIndex:] {
p := filepath.Join(dirpath, name)
var f *os.File
if write {
l, err := fileutil.TryLockFile(p, os.O_RDWR, fileutil.PrivateFileMode)
if err != nil {
Expand All @@ -392,6 +392,7 @@ func openWALFiles(lg *zap.Logger, dirpath string, names []string, nameIndex int,
}
ls = append(ls, l)
rcs = append(rcs, l)
f = l.File
} else {
rf, err := os.OpenFile(p, os.O_RDONLY, fileutil.PrivateFileMode)
if err != nil {
Expand All @@ -400,8 +401,10 @@ func openWALFiles(lg *zap.Logger, dirpath string, names []string, nameIndex int,
}
ls = append(ls, nil)
rcs = append(rcs, rf)
f = rf
}
rs = append(rs, rcs[len(rcs)-1])
fileReader := fileutil.NewFileReader(f)
rs = append(rs, fileReader)
}

closer := func() error { return closeAll(lg, rcs...) }
Expand Down

0 comments on commit 4443e14

Please sign in to comment.