From 476302947c540e5b60482f9315c6b0cf8b88253e Mon Sep 17 00:00:00 2001 From: "Lee E. Hinman" Date: Wed, 11 May 2022 08:08:50 -0500 Subject: [PATCH 1/7] Add encrypted diskqueue - encryption reader/writer - compression on frames - change to segmentReader/segmentWriter instead of *os.File - updated tests - update benchmark --- NOTICE.txt | 76 +++--- go.mod | 2 +- .../publisher/queue/diskqueue/acks_test.go | 26 +- .../queue/diskqueue/benchmark_test.go | 88 +++++-- libbeat/publisher/queue/diskqueue/config.go | 26 +- .../publisher/queue/diskqueue/core_loop.go | 11 +- .../queue/diskqueue/core_loop_test.go | 20 +- .../publisher/queue/diskqueue/encryption.go | 152 +++++++++++ .../queue/diskqueue/encryption_test.go | 57 ++++ libbeat/publisher/queue/diskqueue/queue.go | 4 +- .../publisher/queue/diskqueue/reader_loop.go | 11 +- libbeat/publisher/queue/diskqueue/segments.go | 245 +++++++++++++----- .../queue/diskqueue/segments_test.go | 149 +++++++++++ .../publisher/queue/diskqueue/serialize.go | 38 ++- .../queue/diskqueue/serialize_test.go | 10 +- .../publisher/queue/diskqueue/writer_loop.go | 14 +- 16 files changed, 757 insertions(+), 172 deletions(-) create mode 100644 libbeat/publisher/queue/diskqueue/encryption.go create mode 100644 libbeat/publisher/queue/diskqueue/encryption_test.go create mode 100644 libbeat/publisher/queue/diskqueue/segments_test.go diff --git a/NOTICE.txt b/NOTICE.txt index c3cdb78e7d0..23789b3c138 100644 --- a/NOTICE.txt +++ b/NOTICE.txt @@ -13745,6 +13745,44 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. +-------------------------------------------------------------------------------- +Dependency : github.com/pierrec/lz4 +Version: v2.6.0+incompatible +Licence type (autodetected): BSD-3-Clause +-------------------------------------------------------------------------------- + +Contents of probable licence file $GOMODCACHE/github.com/pierrec/lz4@v2.6.0+incompatible/LICENSE: + +Copyright (c) 2015, Pierre Curto +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +* Redistributions of source code must retain the above copyright notice, this + list of conditions and the following disclaimer. + +* Redistributions in binary form must reproduce the above copyright notice, + this list of conditions and the following disclaimer in the documentation + and/or other materials provided with the distribution. + +* Neither the name of xxHash nor the names of its + contributors may be used to endorse or promote products derived from + this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + + + -------------------------------------------------------------------------------- Dependency : github.com/pierrre/gotestcover Version: v0.0.0-20160517101806-924dca7d15f0 @@ -34505,44 +34543,6 @@ Contents of probable licence file $GOMODCACHE/github.com/oxtoacart/bpool@v0.0.0- limitations under the License. --------------------------------------------------------------------------------- -Dependency : github.com/pierrec/lz4 -Version: v2.6.0+incompatible -Licence type (autodetected): BSD-3-Clause --------------------------------------------------------------------------------- - -Contents of probable licence file $GOMODCACHE/github.com/pierrec/lz4@v2.6.0+incompatible/LICENSE: - -Copyright (c) 2015, Pierre Curto -All rights reserved. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions are met: - -* Redistributions of source code must retain the above copyright notice, this - list of conditions and the following disclaimer. - -* Redistributions in binary form must reproduce the above copyright notice, - this list of conditions and the following disclaimer in the documentation - and/or other materials provided with the distribution. - -* Neither the name of xxHash nor the names of its - contributors may be used to endorse or promote products derived from - this software without specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE -FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL -DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR -SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER -CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, -OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE -OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - - - -------------------------------------------------------------------------------- Dependency : github.com/power-devops/perfstat Version: v0.0.0-20210106213030-5aafc221ea8c diff --git a/go.mod b/go.mod index 38c79f2ad87..15e96f44687 100644 --- a/go.mod +++ b/go.mod @@ -163,6 +163,7 @@ require ( github.com/elastic/elastic-agent-autodiscover v0.1.1 github.com/elastic/elastic-agent-libs v0.2.5 github.com/elastic/elastic-agent-system-metrics v0.3.1 + github.com/pierrec/lz4 v2.6.0+incompatible github.com/shirou/gopsutil/v3 v3.21.12 go.elastic.co/apm/module/apmelasticsearch/v2 v2.0.0 go.elastic.co/apm/module/apmhttp/v2 v2.0.0 @@ -262,7 +263,6 @@ require ( github.com/morikuni/aec v1.0.0 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.0.2 // indirect - github.com/pierrec/lz4 v2.6.0+incompatible // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect github.com/prometheus/client_golang v1.11.0 // indirect github.com/samuel/go-parser v0.0.0-20130731160455-ca8abbf65d0e // indirect diff --git a/libbeat/publisher/queue/diskqueue/acks_test.go b/libbeat/publisher/queue/diskqueue/acks_test.go index bacdf6689d5..7a7f9e20ef5 100644 --- a/libbeat/publisher/queue/diskqueue/acks_test.go +++ b/libbeat/publisher/queue/diskqueue/acks_test.go @@ -66,7 +66,7 @@ func TestAddFrames(t *testing.T) { rf(0, 0, true, 100), }, frameID(1), - &queuePosition{0, segmentHeaderSize + 100, 1}, + &queuePosition{0, segmentHeaderSizeV1 + 100, 1}, nil, }, { @@ -85,7 +85,7 @@ func TestAddFrames(t *testing.T) { rf(0, 1, false, 75), }, frameID(2), - &queuePosition{0, segmentHeaderSize + 175, 2}, + &queuePosition{0, segmentHeaderSizeV1 + 175, 2}, nil, }, { @@ -94,7 +94,7 @@ func TestAddFrames(t *testing.T) { rf(0, 2, false, 100), }, frameID(5), - &queuePosition{1, segmentHeaderSize + 150, 2}, + &queuePosition{1, segmentHeaderSizeV1 + 150, 2}, // This time we crossed a boundary so we should get an ACK for segment // 0 on the notification channel. segmentIDRef(0), @@ -122,7 +122,7 @@ func TestAddFrames(t *testing.T) { rf(0, 0, true, 100), }, frameID(2), - &queuePosition{0, segmentHeaderSize + 150, 2}, + &queuePosition{0, segmentHeaderSizeV1 + 150, 2}, nil, }, { @@ -131,7 +131,7 @@ func TestAddFrames(t *testing.T) { rf(0, 2, false, 75), }, frameID(3), - &queuePosition{0, segmentHeaderSize + 225, 3}, + &queuePosition{0, segmentHeaderSizeV1 + 225, 3}, nil, }, { @@ -140,7 +140,7 @@ func TestAddFrames(t *testing.T) { rf(1, 3, false, 100), }, frameID(7), - &queuePosition{2, segmentHeaderSize + 100, 1}, + &queuePosition{2, segmentHeaderSizeV1 + 100, 1}, segmentIDRef(1), }, { @@ -149,7 +149,7 @@ func TestAddFrames(t *testing.T) { rf(2, 7, false, 100), }, frameID(9), - &queuePosition{2, segmentHeaderSize + 300, 3}, + &queuePosition{2, segmentHeaderSizeV1 + 300, 3}, nil, }, }, @@ -165,7 +165,7 @@ func TestAddFrames(t *testing.T) { rf(0, 0, true, 100), }, frameID(4), - &queuePosition{3, segmentHeaderSize + 100, 1}, + &queuePosition{3, segmentHeaderSizeV1 + 100, 1}, // We advanced from segment 0 to segment 3, so we expect // segmentID 2 on the ACK channel. segmentIDRef(2), @@ -182,7 +182,7 @@ func TestAddFrames(t *testing.T) { rf(10, 35, true, 100), }, frameID(36), - &queuePosition{10, segmentHeaderSize + 100, 1}, + &queuePosition{10, segmentHeaderSizeV1 + 100, 1}, // We advanced to segment 10, so we expect segmentID 9 on // the ACK channel. segmentIDRef(9), @@ -218,7 +218,7 @@ func TestAddFrames(t *testing.T) { []*readFrame{ { segment: &queueSegment{ - schemaVersion: uint32Ref(0), + schemaVersion: 0, }, bytesOnDisk: 100, }, @@ -299,15 +299,11 @@ func (dqa *diskQueueACKs) assertACKedSegment( } } -func uint32Ref(v uint32) *uint32 { - return &v -} - // rf assembles a readFrame with the given parameters and a spoofed // queue segment, whose firstFrameID field is set to match the given frame // if "first" is true. func rf(seg segmentID, frame frameID, first bool, size uint64) *readFrame { - s := &queueSegment{id: seg} + s := &queueSegment{id: seg, schemaVersion: 1} if first { s.firstFrameID = frame } diff --git a/libbeat/publisher/queue/diskqueue/benchmark_test.go b/libbeat/publisher/queue/diskqueue/benchmark_test.go index 62d791f53b0..93c099b0c6d 100644 --- a/libbeat/publisher/queue/diskqueue/benchmark_test.go +++ b/libbeat/publisher/queue/diskqueue/benchmark_test.go @@ -64,13 +64,18 @@ func makeEvent() publisher.Event { // hold the queue. Location of the temporary directory is stored in // the queue settings. Call `cleanup` when done with the queue to // close the queue and remove the temp dir. -func setup() (*diskQueue, queue.Producer) { +func setup(encryption bool, compression bool) (*diskQueue, queue.Producer) { dir, err := os.MkdirTemp("", "benchmark") if err != nil { panic(err) } s := DefaultSettings() s.Path = dir + if encryption { + s.SchemaVersion = 2 + s.EncryptionKey = []byte("testtesttesttest") + s.UseCompression = compression + } q, err := NewQueue(logp.NewLogger("benchmark"), s) if err != nil { os.RemoveAll(dir) @@ -83,18 +88,23 @@ func setup() (*diskQueue, queue.Producer) { //clean closes the queue and deletes the temporory directory that // holds the queue. func cleanup(q *diskQueue) { - q.Close() + err := q.Close() os.RemoveAll(q.settings.directoryPath()) + if err != nil { + panic(err) + } } -//produceAndConsume does the interesting work. It generates events, -// publishes them, consumes them, and ACKS them. -func produceAndConsume(p queue.Producer, q *diskQueue, num_events int, batch_size int) error { - go func() { - for i := 0; i < num_events; i++ { - p.Publish(makeEvent()) +func publishEvents(p queue.Producer, num int) { + for i := 0; i < num; i++ { + ok := p.Publish(makeEvent()) + if !ok { + panic("didn't publish") } - }() + } +} + +func getAndAckEvents(q *diskQueue, num_events int, batch_size int) error { var received int for { batch, err := q.Get(batch_size) @@ -104,33 +114,65 @@ func produceAndConsume(p queue.Producer, q *diskQueue, num_events int, batch_siz batch.ACK() received = received + batch.Count() if received == num_events { - break + return nil } } - return nil +} + +//produceAndConsume generates and publishes events in a go routine, in +// the main go routine it consumes and acks them. This interleaves +// publish and consume. +func produceAndConsume(p queue.Producer, q *diskQueue, num_events int, batch_size int) error { + go publishEvents(p, num_events) + return getAndAckEvents(q, num_events, batch_size) +} + +//produceThenConsume generates and publishes events, when all events +// are published it consumes and acks them. +func produceThenConsume(p queue.Producer, q *diskQueue, num_events int, batch_size int) error { + publishEvents(p, num_events) + return getAndAckEvents(q, num_events, batch_size) } //benchmarkQueue is a wrapper for produceAndConsume, it tries to limit // timers to just produceAndConsume -func benchmarkQueue(num_events int, batch_size int, b *testing.B) { - var err error - rand.Seed(1) - q, p := setup() +func benchmarkQueue(num_events int, batch_size int, encryption bool, compression bool, async bool, b *testing.B) { b.ResetTimer() + var err error + for n := 0; n < b.N; n++ { - if err = produceAndConsume(p, q, num_events, batch_size); err != nil { - break + b.StopTimer() + rand.Seed(1) + q, p := setup(encryption, compression) + b.StartTimer() + if async { + if err = produceAndConsume(p, q, num_events, batch_size); err != nil { + cleanup(q) + break + } + } else { + if err = produceThenConsume(p, q, num_events, batch_size); err != nil { + cleanup(q) + break + } } + cleanup(q) } - b.StopTimer() - cleanup(q) if err != nil { b.Errorf("Error producing/consuming events: %v", err) } } // Actual benchmark calls follow -func Benchmark1M_10(b *testing.B) { benchmarkQueue(1000000, 10, b) } -func Benchmark1M_100(b *testing.B) { benchmarkQueue(1000000, 100, b) } -func Benchmark1M_1k(b *testing.B) { benchmarkQueue(1000000, 1000, b) } -func Benchmark1M_10k(b *testing.B) { benchmarkQueue(1000000, 10000, b) } +func BenchmarkAsync1k(b *testing.B) { benchmarkQueue(1000, 10, false, false, true, b) } +func BenchmarkAsync1M(b *testing.B) { benchmarkQueue(1000000, 1000, false, false, true, b) } +func BenchmarkAsyncEnc1k(b *testing.B) { benchmarkQueue(1000, 10, true, false, true, b) } +func BenchmarkAsyncEnc1M(b *testing.B) { benchmarkQueue(1000000, 1000, true, false, true, b) } +func BenchmarkAsyncEncComp1k(b *testing.B) { benchmarkQueue(1000, 10, true, true, true, b) } +func BenchmarkAsyncEncComp1M(b *testing.B) { benchmarkQueue(1000000, 1000, true, true, true, b) } +func BenchmarkSync1k(b *testing.B) { benchmarkQueue(1000, 10, false, false, false, b) } +func BenchmarkSync1M(b *testing.B) { benchmarkQueue(1000000, 1000, false, false, false, b) } +func BenchmarkSyncEnc1k(b *testing.B) { benchmarkQueue(1000, 10, true, false, false, b) } +func BenchmarkSyncEnc1M(b *testing.B) { benchmarkQueue(1000000, 1000, true, false, false, b) } +func BenchmarkSyncEncComp1k(b *testing.B) { benchmarkQueue(1000, 10, true, true, false, b) } +func BenchmarkSyncEncComp1M(b *testing.B) { benchmarkQueue(1000000, 1000, true, true, false, b) } diff --git a/libbeat/publisher/queue/diskqueue/config.go b/libbeat/publisher/queue/diskqueue/config.go index 009af69712b..804b937d552 100644 --- a/libbeat/publisher/queue/diskqueue/config.go +++ b/libbeat/publisher/queue/diskqueue/config.go @@ -29,6 +29,9 @@ import ( "github.com/elastic/elastic-agent-libs/paths" ) +//defaultSchemaVersion specifies which on disk schema (0,1,2) is the default. +const defaultSchemaVersion = 1 + // Settings contains the configuration fields to create a new disk queue // or open an existing one. type Settings struct { @@ -68,6 +71,16 @@ type Settings struct { // use exponential backoff up to the specified limit. RetryInterval time.Duration MaxRetryInterval time.Duration + + // Schema Version specifies which on-disk format, serialization, and encryption to use. + // 0, 1, or 2 are valid options + SchemaVersion uint32 + + // EncryptionKey is used to encrypt data if SchemaVersion 2 is used. + EncryptionKey []byte + + // UseCompression controls compression if SchemaVersion 2 is used. + UseCompression bool } // userConfig holds the parameters for a disk queue that are configurable @@ -129,6 +142,8 @@ func DefaultSettings() Settings { RetryInterval: 1 * time.Second, MaxRetryInterval: 30 * time.Second, + + SchemaVersion: defaultSchemaVersion, } } @@ -192,7 +207,16 @@ func (settings Settings) segmentPath(segmentID segmentID) string { // maxValidFrameSize returns the size of the largest possible frame that // can be stored with the current queue settings. func (settings Settings) maxValidFrameSize() uint64 { - return settings.MaxSegmentSize - segmentHeaderSize + switch settings.SchemaVersion { + case 0: + return settings.MaxSegmentSize - segmentHeaderSizeV0 + case 1: + return settings.MaxSegmentSize - segmentHeaderSizeV1 + case 2: + return settings.MaxSegmentSize - segmentHeaderSizeV2 + default: + return uint64(0) + } } // Given a retry interval, nextRetryInterval returns the next higher level diff --git a/libbeat/publisher/queue/diskqueue/core_loop.go b/libbeat/publisher/queue/diskqueue/core_loop.go index fd7e710372a..2746bd07874 100644 --- a/libbeat/publisher/queue/diskqueue/core_loop.go +++ b/libbeat/publisher/queue/diskqueue/core_loop.go @@ -277,7 +277,7 @@ func (dq *diskQueue) handleShutdown() { dq.acks.lock.Lock() finalPosition := dq.acks.nextPosition // We won't be updating the position anymore, so we can close the file. - dq.acks.positionFile.Sync() + dq.acks.positionFile.Sync() //nolint:errcheck //No error recovery path dq.acks.positionFile.Close() dq.acks.lock.Unlock() @@ -436,7 +436,14 @@ func (dq *diskQueue) enqueueWriteFrame(frame *writeFrame) { dq.segments.nextID++ // Reset the on-disk size to its initial value, the file's header size // with no frame data. - newSegmentSize = segmentHeaderSize + switch dq.settings.SchemaVersion { + case 0: + newSegmentSize = segmentHeaderSizeV0 + case 1: + newSegmentSize = segmentHeaderSizeV1 + case 2: + newSegmentSize = segmentHeaderSizeV2 + } } dq.segments.writingSegmentSize = newSegmentSize diff --git a/libbeat/publisher/queue/diskqueue/core_loop_test.go b/libbeat/publisher/queue/diskqueue/core_loop_test.go index 27f535b67c3..ab7ff5b33a5 100644 --- a/libbeat/publisher/queue/diskqueue/core_loop_test.go +++ b/libbeat/publisher/queue/diskqueue/core_loop_test.go @@ -122,7 +122,7 @@ func TestHandleProducerWriteRequest(t *testing.T) { } settings := DefaultSettings() - settings.MaxSegmentSize = 1000 + segmentHeaderSize + settings.MaxSegmentSize = 1000 + segmentHeaderSizeV1 settings.MaxBufferSize = 10000 for description, test := range testCases { dq := &diskQueue{ @@ -480,7 +480,7 @@ func TestMaybeReadPending(t *testing.T) { "read one full segment": { segments: diskQueueSegments{ reading: []*queueSegment{ - {id: 1, byteCount: 1000}, + {id: 1, byteCount: 1000, schemaVersion: 1}, }, // The next read request should start with frame 5 nextReadFrameID: 5, @@ -525,10 +525,10 @@ func TestMaybeReadPending(t *testing.T) { "ignore writing segments if reading is available": { segments: diskQueueSegments{ reading: []*queueSegment{ - {id: 1, byteCount: 1000}, + {id: 1, byteCount: 1000, schemaVersion: 1}, }, writing: []*queueSegment{ - {id: 2, byteCount: 1000}, + {id: 2, byteCount: 1000, schemaVersion: 1}, }, }, expectedRequest: &readerLoopRequest{ @@ -566,8 +566,8 @@ func TestMaybeReadPending(t *testing.T) { "skip the first reading segment if it's already been fully read": { segments: diskQueueSegments{ reading: []*queueSegment{ - {id: 1, byteCount: 1000}, - {id: 2, byteCount: 500}, + {id: 1, byteCount: 1000, schemaVersion: 1}, + {id: 2, byteCount: 500, schemaVersion: 1}, }, nextReadPosition: 1000, }, @@ -594,7 +594,7 @@ func TestMaybeReadPending(t *testing.T) { { id: 1, byteCount: 1000, - schemaVersion: makeUint32Ptr(0)}, + schemaVersion: 0}, }, // The next read request should start with frame 5 nextReadFrameID: 5, @@ -969,12 +969,8 @@ func makeWriteFrameWithSize(size int) *writeFrame { return &writeFrame{serialized: make([]byte, size-frameMetadataSize)} } -func makeUint32Ptr(value uint32) *uint32 { - return &value -} - func segmentWithSize(size int) *queueSegment { - if size < segmentHeaderSize { + if size < segmentHeaderSizeV1 { // Can't have a segment smaller than the segment header return nil } diff --git a/libbeat/publisher/queue/diskqueue/encryption.go b/libbeat/publisher/queue/diskqueue/encryption.go new file mode 100644 index 00000000000..c21c7a8fba2 --- /dev/null +++ b/libbeat/publisher/queue/diskqueue/encryption.go @@ -0,0 +1,152 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package diskqueue + +import ( + "bytes" + "crypto/aes" + "crypto/cipher" + "crypto/rand" + "fmt" + "io" +) + +const ( + // KeySize is 128-bit + KeySize = 16 +) + +//EncryptionReader allows reading from a AES-128-CTR stream +type EncryptionReader struct { + src io.ReadCloser + stream cipher.Stream + block cipher.Block + iv []byte +} + +//NewEncryptionReader returns a new AES-128-CTR decrypter +func NewEncryptionReader(r io.ReadCloser, key []byte) (*EncryptionReader, error) { + if len(key) != KeySize { + return nil, fmt.Errorf("key must be %d bytes long", KeySize) + } + + er := &EncryptionReader{} + er.src = r + + // turn key into block & save + block, err := aes.NewCipher(key) + if err != nil { + return nil, err + } + er.block = block + + // read IV from the io.ReadCloser + iv := make([]byte, aes.BlockSize) + if _, err := io.ReadFull(er.src, iv); err != nil { + return nil, err + } + er.iv = iv + + // create Stream + er.stream = cipher.NewCTR(block, iv) + + return er, nil +} + +func (er *EncryptionReader) Read(buf []byte) (int, error) { + ciphertext := make([]byte, len(buf)) + n, err := er.src.Read(ciphertext) + if err != nil { + return n, err + } + er.stream.XORKeyStream(buf, ciphertext) + return n, nil +} + +func (er *EncryptionReader) Close() error { + return er.src.Close() +} + +//Reset Sets up stream again, assumes that caller has already set the +// src to the iv +func (er *EncryptionReader) Reset() error { + iv := make([]byte, aes.BlockSize) + if _, err := io.ReadFull(er.src, iv); err != nil { + return err + } + if !bytes.Equal(iv, er.iv) { + return fmt.Errorf("different iv, something is wrong") + } + + // create Stream + er.stream = cipher.NewCTR(er.block, iv) + return nil +} + +//EncryptionWriter allows writing to a AES-128-CTR stream +type EncryptionWriter struct { + dst io.WriteCloser + stream cipher.Stream +} + +//NewEncryptionWriter returns a new AES-128-CTR stream encryptor +func NewEncryptionWriter(w io.WriteCloser, key []byte) (*EncryptionWriter, error) { + if len(key) != KeySize { + return nil, fmt.Errorf("key must be %d bytes long", KeySize) + } + + ew := &EncryptionWriter{} + + // turn key into block + block, err := aes.NewCipher(key) + if err != nil { + return nil, err + } + + // create random IV + iv := make([]byte, aes.BlockSize) + if _, err := io.ReadFull(rand.Reader, iv); err != nil { + return nil, err + } + + // create stream + stream := cipher.NewCTR(block, iv) + + //write IV + n, err := w.Write(iv) + if err != nil { + return nil, err + } + if n != len(iv) { + return nil, io.ErrShortWrite + } + + ew.dst = w + ew.stream = stream + return ew, nil +} + +func (ew *EncryptionWriter) Write(buf []byte) (int, error) { + ciphertext := make([]byte, len(buf)) + ew.stream.XORKeyStream(ciphertext, buf) + return ew.dst.Write(ciphertext) +} + +func (ew *EncryptionWriter) Close() error { + return ew.dst.Close() +} diff --git a/libbeat/publisher/queue/diskqueue/encryption_test.go b/libbeat/publisher/queue/diskqueue/encryption_test.go new file mode 100644 index 00000000000..5b08ad9344f --- /dev/null +++ b/libbeat/publisher/queue/diskqueue/encryption_test.go @@ -0,0 +1,57 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package diskqueue + +import ( + "bytes" + "io" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestEncryptionRoundTrip(t *testing.T) { + tests := map[string]struct { + plaintext []byte + }{ + "8 bits": {plaintext: []byte("a")}, + "128 bits": {plaintext: []byte("bbbbbbbbbbbbbbbb")}, + "136 bits": {plaintext: []byte("ccccccccccccccccc")}, + } + for name, tc := range tests { + pr, pw := io.Pipe() + src := bytes.NewReader(tc.plaintext) + var dst bytes.Buffer + key := []byte("kkkkkkkkkkkkkkkk") + + go func() { + //NewEncryptionWriter writes iv, so needs to be in go routine + ew, err := NewEncryptionWriter(pw, key) + assert.Nil(t, err, name) + _, err = io.Copy(ew, src) + assert.Nil(t, err, name) + ew.Close() + }() + + er, err := NewEncryptionReader(pr, key) + assert.Nil(t, err, name) + _, err = io.Copy(&dst, er) + assert.Nil(t, err, name) + assert.Equal(t, tc.plaintext, dst.Bytes(), name) + } +} diff --git a/libbeat/publisher/queue/diskqueue/queue.go b/libbeat/publisher/queue/diskqueue/queue.go index df6519edd01..64acae3ad96 100644 --- a/libbeat/publisher/queue/diskqueue/queue.go +++ b/libbeat/publisher/queue/diskqueue/queue.go @@ -267,10 +267,12 @@ func (dq *diskQueue) BufferConfig() queue.BufferConfig { } func (dq *diskQueue) Producer(cfg queue.ProducerConfig) queue.Producer { + encoder := newEventEncoder() + encoder.SetCompression(dq.settings.UseCompression) return &diskQueueProducer{ queue: dq, config: cfg, - encoder: newEventEncoder(), + encoder: encoder, done: make(chan struct{}), } } diff --git a/libbeat/publisher/queue/diskqueue/reader_loop.go b/libbeat/publisher/queue/diskqueue/reader_loop.go index d6bb49494e0..53ae7a13a49 100644 --- a/libbeat/publisher/queue/diskqueue/reader_loop.go +++ b/libbeat/publisher/queue/diskqueue/reader_loop.go @@ -21,7 +21,6 @@ import ( "encoding/binary" "fmt" "io" - "os" ) // startPosition and endPosition are absolute byte offsets into the segment @@ -71,13 +70,15 @@ type readerLoop struct { } func newReaderLoop(settings Settings) *readerLoop { + decoder := newEventDecoder() + decoder.SetCompression(settings.UseCompression) return &readerLoop{ settings: settings, requestChan: make(chan readerLoopRequest, 1), responseChan: make(chan readerLoopResponse), output: make(chan *readFrame, settings.ReadAheadLimit), - decoder: newEventDecoder(), + decoder: decoder, } } @@ -111,7 +112,7 @@ func (rl *readerLoop) processRequest(request readerLoopRequest) readerLoopRespon return readerLoopResponse{err: err} } - targetLength := uint64(request.endPosition - request.startPosition) + targetLength := request.endPosition - request.startPosition for { remainingLength := targetLength - byteCount @@ -173,9 +174,7 @@ func (rl *readerLoop) processRequest(request readerLoopRequest) readerLoopRespon // it does not exceed the given length bound. The returned frame leaves the // segment and frame IDs unset. // The returned error will be set if and only if the returned frame is nil. -func (rl *readerLoop) nextFrame( - handle *os.File, maxLength uint64, -) (*readFrame, error) { +func (rl *readerLoop) nextFrame(handle *segmentReader, maxLength uint64) (*readFrame, error) { // Ensure we are allowed to read the frame header. if maxLength < frameHeaderSize { return nil, fmt.Errorf( diff --git a/libbeat/publisher/queue/diskqueue/segments.go b/libbeat/publisher/queue/diskqueue/segments.go index a58381aa914..94835759000 100644 --- a/libbeat/publisher/queue/diskqueue/segments.go +++ b/libbeat/publisher/queue/diskqueue/segments.go @@ -19,6 +19,7 @@ package diskqueue import ( "encoding/binary" + "errors" "fmt" "io" "io/ioutil" @@ -91,14 +92,9 @@ type queueSegment struct { // A segment id is globally unique within its originating queue. id segmentID - // If this segment was loaded from a previous session, schemaVersion - // points to the file schema version that was read from its header. - // This is only used by queueSegment.headerSize(), which is used in - // maybeReadPending to calculate the position of the first data frame, - // and by queueSegment.shouldUseJSON(), which is used in the reader - // loop to detect old segments that used JSON encoding instead of - // the current CBOR. - schemaVersion *uint32 + // schemaVersion is used to determine on disk format, data serialization, + // and encryption + schemaVersion uint32 // The number of bytes occupied by this segment on-disk, as of the most // recent completed writerLoop request. @@ -125,7 +121,7 @@ type queueSegment struct { } type segmentHeader struct { - // The schema version for this segment file. Current schema version is 1. + // The schema version for this segment file version uint32 // If the segment file has been completely written, this field contains @@ -136,13 +132,14 @@ type segmentHeader struct { frameCount uint32 } -const currentSegmentVersion = 1 +// segmentHeaderSizeV0 schemaVersion 0 header size, uint32 for version +const segmentHeaderSizeV0 = 4 -// Segment headers are currently a 4-byte version plus a 4-byte frame count. -// In contexts where the segment may have been created by an earlier version, -// instead use (queueSegment).headerSize() which accounts for the schema -// version of the target segment. -const segmentHeaderSize = 8 +// segmentHeaderSizeV1 schemaVersion 1 header size, uint32 for version, uint32 for count +const segmentHeaderSizeV1 = 8 + +// segmentHeaderSizeV2 schemaVersion 2 header size uint32 for version +const segmentHeaderSizeV2 = 4 // Sort order: we store loaded segments in ascending order by their id. type bySegmentID []*queueSegment @@ -182,7 +179,7 @@ func scanExistingSegments(logger *logp.Logger, pathStr string) ([]*queueSegment, } segments = append(segments, &queueSegment{ id: segmentID(id), - schemaVersion: &header.version, + schemaVersion: header.version, frameCount: header.frameCount, byteCount: uint64(file.Size()), }) @@ -197,11 +194,16 @@ func scanExistingSegments(logger *logp.Logger, pathStr string) ([]*queueSegment, // been written to disk yet) of this segment file's header region. The // segment's first data frame begins immediately after the header. func (segment *queueSegment) headerSize() uint64 { - if segment.schemaVersion != nil && *segment.schemaVersion < 1 { - // Schema 0 had nothing except the 4-byte version. - return 4 + switch segment.schemaVersion { + case 0: + return segmentHeaderSizeV0 + case 1: + return segmentHeaderSizeV1 + case 2: + return segmentHeaderSizeV2 + default: + return uint64(0) } - return segmentHeaderSize } // The initial release of the disk queue used JSON to encode events @@ -209,46 +211,82 @@ func (segment *queueSegment) headerSize() uint64 { // with encoding multi-byte characters, and for lower encoding // overhead. func (segment *queueSegment) shouldUseJSON() bool { - return segment.schemaVersion != nil && *segment.schemaVersion == 0 + return segment.schemaVersion == 0 } // Should only be called from the reader loop. If successful, returns an open // file handle positioned at the beginning of the segment's data region. -func (segment *queueSegment) getReader( - queueSettings Settings, -) (*os.File, error) { +func (segment *queueSegment) getReader(queueSettings Settings) (*segmentReader, error) { path := queueSettings.segmentPath(segment.id) file, err := os.Open(path) if err != nil { return nil, fmt.Errorf( "couldn't open segment %d: %w", segment.id, err) } - // We don't need the header contents here, we just want to advance past the - // header region, so discard the return value. - _, err = readSegmentHeader(file) + err = binary.Read(file, binary.LittleEndian, &segment.schemaVersion) if err != nil { file.Close() - return nil, fmt.Errorf("couldn't read segment header: %w", err) + return nil, fmt.Errorf("couldn't read segment version: %w", err) + } + + if segment.schemaVersion > 2 { + file.Close() + return nil, fmt.Errorf("unknown segment version %d: %w", segment.schemaVersion, err) + } + + if segment.schemaVersion == 1 { + err = binary.Read(file, binary.LittleEndian, &segment.frameCount) + if err != nil { + file.Close() + return nil, fmt.Errorf("couldn't read segment frame count: %w", err) + } + } + + sr := &segmentReader{} + sr.src = file + sr.version = segment.schemaVersion + + if sr.version != 2 { + return sr, nil } - return file, nil + sr.er, err = NewEncryptionReader(sr.src, queueSettings.EncryptionKey) + if err != nil { + sr.src.Close() + return nil, fmt.Errorf("couldn't create encryption reader: %w", err) + } + return sr, nil } // Should only be called from the writer loop. -func (segment *queueSegment) getWriter( - queueSettings Settings, -) (*os.File, error) { +func (segment *queueSegment) getWriter(queueSettings Settings) (*segmentWriter, error) { path := queueSettings.segmentPath(segment.id) file, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600) if err != nil { return nil, err } - err = writeSegmentHeader(file, 0) + + sw := &segmentWriter{} + sw.dst = file + + segment.schemaVersion = queueSettings.SchemaVersion + sw.version = queueSettings.SchemaVersion + + if err := sw.WriteHeader(); err != nil { + return nil, err + } + + if sw.version != 2 { + return sw, nil + } + + sw.ew, err = NewEncryptionWriter(sw.dst, queueSettings.EncryptionKey) if err != nil { - return nil, fmt.Errorf("couldn't write segment header: %w", err) + sw.dst.Close() + return nil, fmt.Errorf("couldn't create encryption writer: %w", err) } - return file, nil + return sw, nil } // getWriterWithRetry tries to create a file handle for writing via @@ -257,7 +295,7 @@ func (segment *queueSegment) getWriter( // creating a queue segment from the writer loop. func (segment *queueSegment) getWriterWithRetry( queueSettings Settings, retry func(err error, firstTime bool) bool, -) (*os.File, error) { +) (*segmentWriter, error) { firstTime := true file, err := segment.getWriter(queueSettings) for err != nil && retry(err, firstTime) { @@ -309,7 +347,7 @@ func readSegmentHeaderWithFrameCount(path string) (*segmentHeader, error) { err = binary.Read(reader, binary.LittleEndian, &frameLength) if err != nil { // EOF at a frame boundary means we successfully scanned all frames. - if err == io.EOF && header.frameCount > 0 { + if errors.Is(err, io.EOF) && header.frameCount > 0 { return header, nil } // All other errors mean we are done scanning, exit the loop. @@ -354,36 +392,20 @@ func readSegmentHeader(in io.Reader) (*segmentHeader, error) { if err != nil { return nil, err } - if header.version > currentSegmentVersion { - return nil, fmt.Errorf("unrecognized schema version %d", header.version) - } - if header.version >= 1 { + switch header.version { + case 0: + case 1: err = binary.Read(in, binary.LittleEndian, &header.frameCount) if err != nil { return nil, err } + case 2: + default: + return nil, fmt.Errorf("unrecognized schema version %d", header.version) } return header, nil } -// writeSegmentHeader seeks to the beginning of the given file handle and -// writes a segment header with the current schema version, containing the -// given frameCount. -func writeSegmentHeader(out *os.File, frameCount uint32) error { - _, err := out.Seek(0, io.SeekStart) - if err != nil { - return err - } - - version := uint32(currentSegmentVersion) - err = binary.Write(out, binary.LittleEndian, version) - if err != nil { - return err - } - err = binary.Write(out, binary.LittleEndian, frameCount) - return err -} - // The number of bytes occupied by all the queue's segment files. This // should only be called from the core loop. func (segments *diskQueueSegments) sizeOnDisk() uint64 { @@ -402,3 +424,108 @@ func (segments *diskQueueSegments) sizeOnDisk() uint64 { } return total } + +type segmentReader struct { + src io.ReadSeekCloser + er *EncryptionReader + version uint32 +} + +func (r *segmentReader) Read(p []byte) (int, error) { + if r.version != 2 { + return r.src.Read(p) + } + return r.er.Read(p) +} + +func (r *segmentReader) Close() error { + if r.version != 2 { + return r.src.Close() + } + return r.er.Close() +} + +func (r *segmentReader) Seek(offset int64, whence int) (int64, error) { + if r.version != 2 { + return r.src.Seek(offset, whence) + } + //can't seek before segment header + if (offset + int64(whence)) < segmentHeaderSizeV2 { + return 0, fmt.Errorf("illegal seek offset %d, whence %d", offset, whence) + } + if _, err := r.src.Seek(segmentHeaderSizeV2, io.SeekStart); err != nil { + return 0, err + } + if err := r.er.Reset(); err != nil { + return 0, err + } + written, err := io.CopyN(io.Discard, r.er, (offset+int64(whence))-segmentHeaderSizeV2) + return written + segmentHeaderSizeV2, err +} + +type segmentWriter struct { + dst *os.File + ew *EncryptionWriter + version uint32 +} + +func (w *segmentWriter) Write(p []byte) (int, error) { + if w.version != 2 { + return w.dst.Write(p) + } + return w.ew.Write(p) +} + +func (w *segmentWriter) Close() error { + if w.version != 2 { + return w.dst.Close() + } + return w.ew.Close() +} + +func (w *segmentWriter) Seek(offset int64, whence int) (int64, error) { + if w.version != 2 { + return w.dst.Seek(offset, whence) + } + // Not something we can do with a stream, we can't re-write. + return 0, nil +} + +func (w *segmentWriter) Sync() error { + return w.dst.Sync() +} + +func (w *segmentWriter) WriteHeader() error { + if _, err := w.dst.Seek(0, io.SeekStart); err != nil { + return err + } + + if err := binary.Write(w.dst, binary.LittleEndian, w.version); err != nil { + return err + } + + // Version 0 & 2 don't have a count + if w.version == 0 || w.version == 2 { + return nil + } + + if err := binary.Write(w.dst, binary.LittleEndian, uint32(0)); err != nil { + return err + } + + return nil +} + +func (w *segmentWriter) UpdateCount(count uint32) error { + // Version 0 & 2 don't record count + if w.version != 1 { + return nil + } + + // Seek to count on disk + if _, err := w.dst.Seek(4, io.SeekStart); err != nil { + return err + } + + return binary.Write(w.dst, binary.LittleEndian, count) +} diff --git a/libbeat/publisher/queue/diskqueue/segments_test.go b/libbeat/publisher/queue/diskqueue/segments_test.go new file mode 100644 index 00000000000..13307520687 --- /dev/null +++ b/libbeat/publisher/queue/diskqueue/segments_test.go @@ -0,0 +1,149 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package diskqueue + +import ( + "io" + "os" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestSchemasRoundTrip(t *testing.T) { + tests := map[string]struct { + id segmentID + schemaVersion uint32 + plaintext []byte + }{ + "version 0": { + id: 0, + schemaVersion: uint32(0), + plaintext: []byte("abc"), + }, + "version 1": { + id: 1, + schemaVersion: uint32(1), + plaintext: []byte("abc"), + }, + "version 2": { + id: 2, + schemaVersion: uint32(2), + plaintext: []byte("abc"), + }, + } + dir, err := os.MkdirTemp("", t.Name()) + assert.Nil(t, err) + defer os.RemoveAll(dir) + for name, tc := range tests { + dst := make([]byte, len(tc.plaintext)) + settings := DefaultSettings() + settings.Path = dir + settings.SchemaVersion = tc.schemaVersion + settings.EncryptionKey = []byte("keykeykeykeykeyk") + qs := &queueSegment{ + id: tc.id, + } + sw, err := qs.getWriter(settings) + assert.Nil(t, err, name) + + n, err := sw.Write(tc.plaintext) + assert.Nil(t, err, name) + assert.Equal(t, len(tc.plaintext), n, name) + + err = sw.Close() + assert.Nil(t, err, name) + + sr, err := qs.getReader(settings) + assert.Nil(t, err, name) + + n, err = sr.Read(dst) + assert.Nil(t, err, name) + + err = sr.Close() + assert.Nil(t, err, name) + assert.Equal(t, len(dst), n, name) + + //make sure we read back what we wrote + assert.Equal(t, tc.plaintext, dst, name) + + } +} + +func TestSeek(t *testing.T) { + tests := map[string]struct { + id segmentID + schemaVersion uint32 + headerSize int64 + plaintexts [][]byte + }{ + "version 0": { + id: 0, + schemaVersion: uint32(0), + headerSize: segmentHeaderSizeV0, + plaintexts: [][]byte{[]byte("abc"), []byte("defg")}, + }, + "version 1": { + id: 1, + schemaVersion: uint32(1), + headerSize: segmentHeaderSizeV1, + plaintexts: [][]byte{[]byte("abc"), []byte("defg")}, + }, + "version 2": { + id: 2, + schemaVersion: uint32(2), + headerSize: segmentHeaderSizeV2, + plaintexts: [][]byte{[]byte("abc"), []byte("defg")}, + }, + } + dir, err := os.MkdirTemp("", t.Name()) + assert.Nil(t, err) + // defer os.RemoveAll(dir) + for name, tc := range tests { + settings := DefaultSettings() + settings.Path = dir + settings.SchemaVersion = tc.schemaVersion + settings.EncryptionKey = []byte("keykeykeykeykeyk") + qs := &queueSegment{ + id: tc.id, + } + sw, err := qs.getWriter(settings) + assert.Nil(t, err, name) + for _, plaintext := range tc.plaintexts { + n, err := sw.Write(plaintext) + assert.Nil(t, err, name) + assert.Equal(t, len(plaintext), n, name) + err = sw.Sync() + assert.Nil(t, err, name) + } + sw.Close() + sr, err := qs.getReader(settings) + assert.Nil(t, err, name) + //seek to second data piece + n, err := sr.Seek(tc.headerSize+int64(len(tc.plaintexts[0])), io.SeekStart) + assert.Nil(t, err, name) + assert.Equal(t, tc.headerSize+int64(len(tc.plaintexts[0])), n, name) + dst := make([]byte, len(tc.plaintexts[1])) + + _, err = sr.Read(dst) + assert.Nil(t, err, name) + assert.Equal(t, tc.plaintexts[1], dst, name) + + sw.Close() + } +} diff --git a/libbeat/publisher/queue/diskqueue/serialize.go b/libbeat/publisher/queue/diskqueue/serialize.go index 3c75534cdce..05c55d32dd0 100644 --- a/libbeat/publisher/queue/diskqueue/serialize.go +++ b/libbeat/publisher/queue/diskqueue/serialize.go @@ -25,6 +25,8 @@ import ( "fmt" "time" + "github.com/pierrec/lz4" + "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/outputs/codec" "github.com/elastic/beats/v7/libbeat/publisher" @@ -37,6 +39,8 @@ import ( type eventEncoder struct { buf bytes.Buffer folder *gotype.Iterator + // Use Compression + useCompression bool } type eventDecoder struct { @@ -50,6 +54,9 @@ type eventDecoder struct { useJSON bool unfolder *gotype.Unfolder + + // Use Compression + useCompression bool } type entry struct { @@ -108,13 +115,23 @@ func (e *eventEncoder) encode(evt interface{}) ([]byte, error) { } // Copy the encoded bytes to a new array owned by the caller. - bytes := e.buf.Bytes() - result := make([]byte, len(bytes)) - copy(result, bytes) - + if e.useCompression { + result := make([]byte, lz4.CompressBlockBound(e.buf.Len())) + n, err := lz4.CompressBlock(e.buf.Bytes(), result, nil) + if err != nil { + return nil, err + } + return result[:n], nil + } + result := make([]byte, e.buf.Len()) + copy(result, e.buf.Bytes()) return result, nil } +func (e *eventEncoder) SetCompression(val bool) { + e.useCompression = val +} + func newEventDecoder() *eventDecoder { d := &eventDecoder{} d.reset() @@ -150,6 +167,15 @@ func (d *eventDecoder) Decode() (publisher.Event, error) { } defer d.unfolder.Reset() + if d.useCompression { + out := make([]byte, 10*len(d.buf)) + n, err := lz4.UncompressBlock(d.buf, out) + if err != nil { + return publisher.Event{}, err + } + d.buf = out[:n] + } + if d.useJSON { err = d.jsonParser.Parse(d.buf) } else { @@ -170,3 +196,7 @@ func (d *eventDecoder) Decode() (publisher.Event, error) { }, }, nil } + +func (d *eventDecoder) SetCompression(val bool) { + d.useCompression = val +} diff --git a/libbeat/publisher/queue/diskqueue/serialize_test.go b/libbeat/publisher/queue/diskqueue/serialize_test.go index 64cb3e3342b..170e1b16cff 100644 --- a/libbeat/publisher/queue/diskqueue/serialize_test.go +++ b/libbeat/publisher/queue/diskqueue/serialize_test.go @@ -30,15 +30,20 @@ import ( // A test to make sure serialization works correctly on multi-byte characters. func TestSerialize(t *testing.T) { testCases := []struct { - name string - value string + name string + value string + useCompression bool }{ {name: "Ascii only", value: "{\"name\": \"Momotaro\"}"}, {name: "Multi-byte", value: "{\"name\": \"桃太郎\"}"}, + {name: "Compressed Ascii only", value: "{\"name\": \"Momotaro\"}", useCompression: true}, + {name: "Compressed Multi-byte", value: "{\"name\": \"桃太郎\"}", useCompression: true}, + {name: "Compressed high repeat", value: "{\"name\": \"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\"}", useCompression: true}, } for _, test := range testCases { encoder := newEventEncoder() + encoder.SetCompression(test.useCompression) event := publisher.Event{ Content: beat.Event{ Fields: mapstr.M{ @@ -53,6 +58,7 @@ func TestSerialize(t *testing.T) { // Use decoder to decode the serialized bytes. decoder := newEventDecoder() + decoder.SetCompression(test.useCompression) buf := decoder.Buffer(len(serialized)) copy(buf, serialized) decoded, err := decoder.Decode() diff --git a/libbeat/publisher/queue/diskqueue/writer_loop.go b/libbeat/publisher/queue/diskqueue/writer_loop.go index b47f3a2baef..c0bfbcd83cb 100644 --- a/libbeat/publisher/queue/diskqueue/writer_loop.go +++ b/libbeat/publisher/queue/diskqueue/writer_loop.go @@ -19,7 +19,6 @@ package diskqueue import ( "encoding/binary" - "os" "time" "github.com/elastic/elastic-agent-libs/logp" @@ -88,7 +87,7 @@ type writerLoop struct { // The file handle corresponding to currentSegment. When currentSegment // changes, this handle is closed and a new one is created. - outputFile *os.File + outputFile *segmentWriter currentRetryInterval time.Duration } @@ -112,8 +111,8 @@ func (wl *writerLoop) run() { // The request channel is closed, we are done. If there is an active // segment file, finalize its frame count and close it. if wl.outputFile != nil { - writeSegmentHeader(wl.outputFile, wl.currentSegment.frameCount) - wl.outputFile.Sync() + wl.outputFile.UpdateCount(wl.currentSegment.frameCount) //nolint:errcheck //No error recovery path + wl.outputFile.Sync() //nolint:errcheck //No error recovery path wl.outputFile.Close() wl.outputFile = nil } @@ -156,9 +155,8 @@ outerLoop: if wl.outputFile != nil { // Update the header with the frame count (including the ones we // just wrote), try to sync to disk, then close the file. - writeSegmentHeader(wl.outputFile, - wl.currentSegment.frameCount+curSegmentResponse.framesWritten) - wl.outputFile.Sync() + wl.outputFile.UpdateCount(wl.currentSegment.frameCount + curSegmentResponse.framesWritten) //nolint:errcheck //No error recovery path + wl.outputFile.Sync() //nolint:errcheck //No error recovery path wl.outputFile.Close() wl.outputFile = nil // We are done with this segment, add the totals to the response and @@ -228,7 +226,7 @@ outerLoop: } } // Try to sync the written data to disk. - wl.outputFile.Sync() + wl.outputFile.Sync() //nolint:errcheck //No error recovery path // If the queue has an ACK listener, notify it the frames were written. if wl.settings.WriteToDiskListener != nil { From f2e2393cda5d150987400b9ab1bb681b4a8f3456 Mon Sep 17 00:00:00 2001 From: "Lee E. Hinman" Date: Fri, 3 Jun 2022 14:05:26 -0500 Subject: [PATCH 2/7] Add docs for on-disk structures --- .../publisher/queue/diskqueue/docs/Makefile | 9 +++ .../queue/diskqueue/docs/frameV0.pic | 9 +++ .../queue/diskqueue/docs/frameV0.svg | 11 +++ .../queue/diskqueue/docs/frameV1.pic | 9 +++ .../queue/diskqueue/docs/frameV1.svg | 11 +++ .../queue/diskqueue/docs/frameV2.pic | 5 ++ .../queue/diskqueue/docs/frameV2.svg | 11 +++ .../diskqueue/docs/on-disk-structures.md | 69 +++++++++++++++++++ .../queue/diskqueue/docs/schemaV0.pic | 9 +++ .../queue/diskqueue/docs/schemaV0.svg | 11 +++ .../queue/diskqueue/docs/schemaV1.pic | 11 +++ .../queue/diskqueue/docs/schemaV1.svg | 13 ++++ .../queue/diskqueue/docs/schemaV2.pic | 4 ++ .../queue/diskqueue/docs/schemaV2.svg | 9 +++ 14 files changed, 191 insertions(+) create mode 100644 libbeat/publisher/queue/diskqueue/docs/Makefile create mode 100644 libbeat/publisher/queue/diskqueue/docs/frameV0.pic create mode 100644 libbeat/publisher/queue/diskqueue/docs/frameV0.svg create mode 100644 libbeat/publisher/queue/diskqueue/docs/frameV1.pic create mode 100644 libbeat/publisher/queue/diskqueue/docs/frameV1.svg create mode 100644 libbeat/publisher/queue/diskqueue/docs/frameV2.pic create mode 100644 libbeat/publisher/queue/diskqueue/docs/frameV2.svg create mode 100644 libbeat/publisher/queue/diskqueue/docs/on-disk-structures.md create mode 100644 libbeat/publisher/queue/diskqueue/docs/schemaV0.pic create mode 100644 libbeat/publisher/queue/diskqueue/docs/schemaV0.svg create mode 100644 libbeat/publisher/queue/diskqueue/docs/schemaV1.pic create mode 100644 libbeat/publisher/queue/diskqueue/docs/schemaV1.svg create mode 100644 libbeat/publisher/queue/diskqueue/docs/schemaV2.pic create mode 100644 libbeat/publisher/queue/diskqueue/docs/schemaV2.svg diff --git a/libbeat/publisher/queue/diskqueue/docs/Makefile b/libbeat/publisher/queue/diskqueue/docs/Makefile new file mode 100644 index 00000000000..d9f55ffb377 --- /dev/null +++ b/libbeat/publisher/queue/diskqueue/docs/Makefile @@ -0,0 +1,9 @@ +all : schemaV0.svg frameV0.svg schemaV1.svg frameV1.svg schemaV2.svg frameV2.svg + +.PHONY : clean + +%.svg : %.pic + pikchr --svg-only $< > $@ + +clean : + -rm *.svg diff --git a/libbeat/publisher/queue/diskqueue/docs/frameV0.pic b/libbeat/publisher/queue/diskqueue/docs/frameV0.pic new file mode 100644 index 00000000000..0b2ddb7249f --- /dev/null +++ b/libbeat/publisher/queue/diskqueue/docs/frameV0.pic @@ -0,0 +1,9 @@ +boxht = 0.25 +down +box "size (uint32)" wid 4; +down +box "JSON serialized data" dashed wid 4 ht 2; +down +box "checksum (uint32)" wid 4; +down +box "size (uint32)" wid 4; \ No newline at end of file diff --git a/libbeat/publisher/queue/diskqueue/docs/frameV0.svg b/libbeat/publisher/queue/diskqueue/docs/frameV0.svg new file mode 100644 index 00000000000..1696bb91a4a --- /dev/null +++ b/libbeat/publisher/queue/diskqueue/docs/frameV0.svg @@ -0,0 +1,11 @@ + + +size (uint32) + +JSON serialized data + +checksum (uint32) + +size (uint32) + + diff --git a/libbeat/publisher/queue/diskqueue/docs/frameV1.pic b/libbeat/publisher/queue/diskqueue/docs/frameV1.pic new file mode 100644 index 00000000000..58e956b7ddd --- /dev/null +++ b/libbeat/publisher/queue/diskqueue/docs/frameV1.pic @@ -0,0 +1,9 @@ +boxht = 0.25 +down +box "size (uint32)" wid 4; +down +box "CBOR serialized data" dashed wid 4 ht 2; +down +box "checksum (uint32)" wid 4; +down +box "size (uint32)" wid 4; \ No newline at end of file diff --git a/libbeat/publisher/queue/diskqueue/docs/frameV1.svg b/libbeat/publisher/queue/diskqueue/docs/frameV1.svg new file mode 100644 index 00000000000..8317e4b93e8 --- /dev/null +++ b/libbeat/publisher/queue/diskqueue/docs/frameV1.svg @@ -0,0 +1,11 @@ + + +size (uint32) + +CBOR serialized data + +checksum (uint32) + +size (uint32) + + diff --git a/libbeat/publisher/queue/diskqueue/docs/frameV2.pic b/libbeat/publisher/queue/diskqueue/docs/frameV2.pic new file mode 100644 index 00000000000..3e340d09071 --- /dev/null +++ b/libbeat/publisher/queue/diskqueue/docs/frameV2.pic @@ -0,0 +1,5 @@ +boxht = 0.25 +SIZE1: box "size (uint32)" wid 4; +DATA: box "LZ4 compressed CBOR serialized data" dashed wid 4 ht 2 with .nw at SIZE1.sw; +CHECKSUM: box "checksum (uint32)" wid 4 with .nw at DATA.sw; +SIZE2: box "size (uint32)" wid 4 with nw at CHECKSUM.sw; \ No newline at end of file diff --git a/libbeat/publisher/queue/diskqueue/docs/frameV2.svg b/libbeat/publisher/queue/diskqueue/docs/frameV2.svg new file mode 100644 index 00000000000..8cf04362b72 --- /dev/null +++ b/libbeat/publisher/queue/diskqueue/docs/frameV2.svg @@ -0,0 +1,11 @@ + + +size (uint32) + +LZ4 compressed CBOR serialized data + +checksum (uint32) + +size (uint32) + + diff --git a/libbeat/publisher/queue/diskqueue/docs/on-disk-structures.md b/libbeat/publisher/queue/diskqueue/docs/on-disk-structures.md new file mode 100644 index 00000000000..67831d13ff6 --- /dev/null +++ b/libbeat/publisher/queue/diskqueue/docs/on-disk-structures.md @@ -0,0 +1,69 @@ +# Disk Queue On Disk Structures + +The disk queue is a directory on disk that contains files. Each +file is called a segment. The name of the file is the segment id in +base 10 with the ".seg" suffix. For example: "42.seg". Each segment +contains multiple frames. Each frame contains one event. + +There are currently 3 versions of the disk queue, and the current code +base is able to write versions 1 & 2, while it is able to read version +0, 1, and 2. + +## Version 0 + +In version 0, the segments are made up of a header, followed by +frames. The header contains one field which is an unsigned 32-bit +integer in little-endian byte order, which signifies the version number. + +![Segment Schema Version 0](./schemaV0.svg) + +The frames for version 0, consist of a header, followed by the +serialized event and a footer. The header contains one field which is +the size of the frame, which is an unsigned 32-bit integer in +little-endian byte order. The serialization format is JSON. The +footer contains 2 fields, the first of which is a checksum which is an +unsigned 32-bit integer in little-endian format, followed by a repeat +of the size from the header. + +![Frame Version 0](./frameV0.svg) + +## Version 1 + +In version 1, the segments are made up of a header, followed by +frames. The header contains two fields. The first field in the +version number, which is an unsigned 32-bit integer in little-endian +format. The second field is a count of the number of frames in the +segment, which is an unsigned 32-bit integer in little-endian format. + +![Segment Schema Version 1](./schemaV1.svg) + +The frames for version 1, consist of a header, followed by the +serialized event and a footer. The header contains one field which is +the size of the frame, which is an unsigned 32-bit integer in +little-endian format. The serialization format is CBOR. The footer +contains 2 fields, the first of which is a checksum which is an +unsigned 32-bit integer in little-endian format, followed by a repeat +of the size from the header. + +![Frame Version 1](./frameV1.svg) + +## Version 2 + +In version 2, encryption & compression is added to version 1. The +segments are made of a header followed by an initialization vector, +and then encrypted frames. The header consists of one field, the +version number which is an unsigned 32-bit integer in little-endian +format. The initialization vector is 128-bits in length. + +![Segment Schema Version 2](./schemaV2.svg) + +The frames for version 2, consist of a header, followed by the +compressed serialized event and a footer. The header contains one +field which is the size of the frame, which is an unsigned 32-bit +integer in little-endian format. The compression is LZ4 with fast +compression. The serialization format is CBOR. The footer contains 2 +fields, the first of which is a checksum which is an unsigned 32-bit +integer in little-endian format, followed by a repeat of the size from +the header. + +![Frame Version 2](./frameV2.svg) diff --git a/libbeat/publisher/queue/diskqueue/docs/schemaV0.pic b/libbeat/publisher/queue/diskqueue/docs/schemaV0.pic new file mode 100644 index 00000000000..99cd63b0cc9 --- /dev/null +++ b/libbeat/publisher/queue/diskqueue/docs/schemaV0.pic @@ -0,0 +1,9 @@ +boxht = 0.25 +down; +box "version (uint32)" wid 4; +down; +box "frame 0" wid 4 ht 2; +down; +box "..." dashed wid 4; +down; +box "frame n" wid 4 ht 2; \ No newline at end of file diff --git a/libbeat/publisher/queue/diskqueue/docs/schemaV0.svg b/libbeat/publisher/queue/diskqueue/docs/schemaV0.svg new file mode 100644 index 00000000000..de9885d9d9a --- /dev/null +++ b/libbeat/publisher/queue/diskqueue/docs/schemaV0.svg @@ -0,0 +1,11 @@ + + +version (uint32) + +frame 0 + +... + +frame n + + diff --git a/libbeat/publisher/queue/diskqueue/docs/schemaV1.pic b/libbeat/publisher/queue/diskqueue/docs/schemaV1.pic new file mode 100644 index 00000000000..34dd086d37f --- /dev/null +++ b/libbeat/publisher/queue/diskqueue/docs/schemaV1.pic @@ -0,0 +1,11 @@ +boxht = 0.25 +down; +box "version (uint32)" wid 4; +down; +box "count (uint32)" wid 4; +down; +box "frame 0" wid 4 ht 2; +down; +box "..." dashed wid 4; +down; +box "frame (count - 1)" wid 4 ht 2; \ No newline at end of file diff --git a/libbeat/publisher/queue/diskqueue/docs/schemaV1.svg b/libbeat/publisher/queue/diskqueue/docs/schemaV1.svg new file mode 100644 index 00000000000..0070fe6e5c9 --- /dev/null +++ b/libbeat/publisher/queue/diskqueue/docs/schemaV1.svg @@ -0,0 +1,13 @@ + + +version (uint32) + +count (uint32) + +frame 0 + +... + +frame (count - 1) + + diff --git a/libbeat/publisher/queue/diskqueue/docs/schemaV2.pic b/libbeat/publisher/queue/diskqueue/docs/schemaV2.pic new file mode 100644 index 00000000000..006ae4bbf7b --- /dev/null +++ b/libbeat/publisher/queue/diskqueue/docs/schemaV2.pic @@ -0,0 +1,4 @@ +boxht = 0.25 +VERSION: box "version (uint32)" wid 4; +IV: box "initialization vector (128 bits)" wid 4 ht 1 with .nw at VERSION.sw +FRAME: box "Encrypted Frames" dashed wid 4 ht 2 with .nw at IV.sw; \ No newline at end of file diff --git a/libbeat/publisher/queue/diskqueue/docs/schemaV2.svg b/libbeat/publisher/queue/diskqueue/docs/schemaV2.svg new file mode 100644 index 00000000000..33ec757bed3 --- /dev/null +++ b/libbeat/publisher/queue/diskqueue/docs/schemaV2.svg @@ -0,0 +1,9 @@ + + +version (uint32) + +initialization vector (128 bits) + +Encrypted Frames + + From b2b50ba4559eeaa422dddc0f39ed75e757e92899 Mon Sep 17 00:00:00 2001 From: "Lee E. Hinman" Date: Mon, 6 Jun 2022 15:50:20 -0500 Subject: [PATCH 3/7] make encryption/compression it's own schema version --- .../queue/diskqueue/benchmark_test.go | 37 ++++--- libbeat/publisher/queue/diskqueue/config.go | 7 +- .../publisher/queue/diskqueue/core_loop.go | 2 + libbeat/publisher/queue/diskqueue/queue.go | 4 +- .../publisher/queue/diskqueue/reader_loop.go | 4 +- libbeat/publisher/queue/diskqueue/segments.go | 99 ++++++++++++------- .../queue/diskqueue/segments_test.go | 6 ++ .../publisher/queue/diskqueue/serialize.go | 8 -- .../queue/diskqueue/serialize_test.go | 4 +- 9 files changed, 99 insertions(+), 72 deletions(-) diff --git a/libbeat/publisher/queue/diskqueue/benchmark_test.go b/libbeat/publisher/queue/diskqueue/benchmark_test.go index 93c099b0c6d..659f37b7a49 100644 --- a/libbeat/publisher/queue/diskqueue/benchmark_test.go +++ b/libbeat/publisher/queue/diskqueue/benchmark_test.go @@ -64,18 +64,15 @@ func makeEvent() publisher.Event { // hold the queue. Location of the temporary directory is stored in // the queue settings. Call `cleanup` when done with the queue to // close the queue and remove the temp dir. -func setup(encryption bool, compression bool) (*diskQueue, queue.Producer) { +func setup(schemaVersion int) (*diskQueue, queue.Producer) { dir, err := os.MkdirTemp("", "benchmark") if err != nil { panic(err) } s := DefaultSettings() s.Path = dir - if encryption { - s.SchemaVersion = 2 - s.EncryptionKey = []byte("testtesttesttest") - s.UseCompression = compression - } + s.SchemaVersion = uint32(schemaVersion) + s.EncryptionKey = []byte("testtesttesttest") q, err := NewQueue(logp.NewLogger("benchmark"), s) if err != nil { os.RemoveAll(dir) @@ -136,14 +133,14 @@ func produceThenConsume(p queue.Producer, q *diskQueue, num_events int, batch_si //benchmarkQueue is a wrapper for produceAndConsume, it tries to limit // timers to just produceAndConsume -func benchmarkQueue(num_events int, batch_size int, encryption bool, compression bool, async bool, b *testing.B) { +func benchmarkQueue(num_events int, batch_size int, schemaVersion int, async bool, b *testing.B) { b.ResetTimer() var err error for n := 0; n < b.N; n++ { b.StopTimer() rand.Seed(1) - q, p := setup(encryption, compression) + q, p := setup(schemaVersion) b.StartTimer() if async { if err = produceAndConsume(p, q, num_events, batch_size); err != nil { @@ -164,15 +161,15 @@ func benchmarkQueue(num_events int, batch_size int, encryption bool, compression } // Actual benchmark calls follow -func BenchmarkAsync1k(b *testing.B) { benchmarkQueue(1000, 10, false, false, true, b) } -func BenchmarkAsync1M(b *testing.B) { benchmarkQueue(1000000, 1000, false, false, true, b) } -func BenchmarkAsyncEnc1k(b *testing.B) { benchmarkQueue(1000, 10, true, false, true, b) } -func BenchmarkAsyncEnc1M(b *testing.B) { benchmarkQueue(1000000, 1000, true, false, true, b) } -func BenchmarkAsyncEncComp1k(b *testing.B) { benchmarkQueue(1000, 10, true, true, true, b) } -func BenchmarkAsyncEncComp1M(b *testing.B) { benchmarkQueue(1000000, 1000, true, true, true, b) } -func BenchmarkSync1k(b *testing.B) { benchmarkQueue(1000, 10, false, false, false, b) } -func BenchmarkSync1M(b *testing.B) { benchmarkQueue(1000000, 1000, false, false, false, b) } -func BenchmarkSyncEnc1k(b *testing.B) { benchmarkQueue(1000, 10, true, false, false, b) } -func BenchmarkSyncEnc1M(b *testing.B) { benchmarkQueue(1000000, 1000, true, false, false, b) } -func BenchmarkSyncEncComp1k(b *testing.B) { benchmarkQueue(1000, 10, true, true, false, b) } -func BenchmarkSyncEncComp1M(b *testing.B) { benchmarkQueue(1000000, 1000, true, true, false, b) } +func BenchmarkV1Async1k(b *testing.B) { benchmarkQueue(1000, 10, 1, true, b) } +func BenchmarkV1Async1M(b *testing.B) { benchmarkQueue(1000000, 1000, 1, true, b) } +func BenchmarkV2Async1k(b *testing.B) { benchmarkQueue(1000, 10, 2, true, b) } +func BenchmarkV2Async1M(b *testing.B) { benchmarkQueue(1000000, 1000, 2, true, b) } +func BenchmarkV3Async1k(b *testing.B) { benchmarkQueue(1000, 10, 3, true, b) } +func BenchmarkV3Async1M(b *testing.B) { benchmarkQueue(1000000, 1000, 3, true, b) } +func BenchmarkV1Sync1k(b *testing.B) { benchmarkQueue(1000, 10, 1, false, b) } +func BenchmarkV1Sync1M(b *testing.B) { benchmarkQueue(1000000, 1000, 1, false, b) } +func BenchmarkV2Sync1k(b *testing.B) { benchmarkQueue(1000, 10, 2, false, b) } +func BenchmarkV2Sync1M(b *testing.B) { benchmarkQueue(1000000, 1000, 2, false, b) } +func BenchmarkV3Sync1k(b *testing.B) { benchmarkQueue(1000, 10, 3, false, b) } +func BenchmarkV3Sync1M(b *testing.B) { benchmarkQueue(1000000, 1000, 3, false, b) } diff --git a/libbeat/publisher/queue/diskqueue/config.go b/libbeat/publisher/queue/diskqueue/config.go index 804b937d552..c8fc42ba692 100644 --- a/libbeat/publisher/queue/diskqueue/config.go +++ b/libbeat/publisher/queue/diskqueue/config.go @@ -73,14 +73,11 @@ type Settings struct { MaxRetryInterval time.Duration // Schema Version specifies which on-disk format, serialization, and encryption to use. - // 0, 1, or 2 are valid options + // 0, 1, 2, or 3 are valid options SchemaVersion uint32 // EncryptionKey is used to encrypt data if SchemaVersion 2 is used. EncryptionKey []byte - - // UseCompression controls compression if SchemaVersion 2 is used. - UseCompression bool } // userConfig holds the parameters for a disk queue that are configurable @@ -214,6 +211,8 @@ func (settings Settings) maxValidFrameSize() uint64 { return settings.MaxSegmentSize - segmentHeaderSizeV1 case 2: return settings.MaxSegmentSize - segmentHeaderSizeV2 + case 3: + return settings.MaxSegmentSize - segmentHeaderSizeV3 default: return uint64(0) } diff --git a/libbeat/publisher/queue/diskqueue/core_loop.go b/libbeat/publisher/queue/diskqueue/core_loop.go index 2746bd07874..7499d0247a7 100644 --- a/libbeat/publisher/queue/diskqueue/core_loop.go +++ b/libbeat/publisher/queue/diskqueue/core_loop.go @@ -443,6 +443,8 @@ func (dq *diskQueue) enqueueWriteFrame(frame *writeFrame) { newSegmentSize = segmentHeaderSizeV1 case 2: newSegmentSize = segmentHeaderSizeV2 + case 3: + newSegmentSize = segmentHeaderSizeV3 } } diff --git a/libbeat/publisher/queue/diskqueue/queue.go b/libbeat/publisher/queue/diskqueue/queue.go index 64acae3ad96..559ee9ecbe0 100644 --- a/libbeat/publisher/queue/diskqueue/queue.go +++ b/libbeat/publisher/queue/diskqueue/queue.go @@ -268,7 +268,9 @@ func (dq *diskQueue) BufferConfig() queue.BufferConfig { func (dq *diskQueue) Producer(cfg queue.ProducerConfig) queue.Producer { encoder := newEventEncoder() - encoder.SetCompression(dq.settings.UseCompression) + if dq.settings.SchemaVersion == 3 { + encoder.useCompression = true + } return &diskQueueProducer{ queue: dq, config: cfg, diff --git a/libbeat/publisher/queue/diskqueue/reader_loop.go b/libbeat/publisher/queue/diskqueue/reader_loop.go index 53ae7a13a49..b14aaa47692 100644 --- a/libbeat/publisher/queue/diskqueue/reader_loop.go +++ b/libbeat/publisher/queue/diskqueue/reader_loop.go @@ -71,7 +71,6 @@ type readerLoop struct { func newReaderLoop(settings Settings) *readerLoop { decoder := newEventDecoder() - decoder.SetCompression(settings.UseCompression) return &readerLoop{ settings: settings, @@ -107,6 +106,9 @@ func (rl *readerLoop) processRequest(request readerLoopRequest) readerLoopRespon return readerLoopResponse{err: err} } defer handle.Close() + if request.segment.schemaVersion == 3 { + rl.decoder.useCompression = true + } _, err = handle.Seek(int64(request.startPosition), io.SeekStart) if err != nil { return readerLoopResponse{err: err} diff --git a/libbeat/publisher/queue/diskqueue/segments.go b/libbeat/publisher/queue/diskqueue/segments.go index 94835759000..3458153d80f 100644 --- a/libbeat/publisher/queue/diskqueue/segments.go +++ b/libbeat/publisher/queue/diskqueue/segments.go @@ -141,6 +141,12 @@ const segmentHeaderSizeV1 = 8 // segmentHeaderSizeV2 schemaVersion 2 header size uint32 for version const segmentHeaderSizeV2 = 4 +// segmentHeaderSizeV3 schemaVersion 3 header size uint32 for version +const segmentHeaderSizeV3 = 4 + +// maxSegmentVersion is the highest supported version +const maxSegmentVersion = 3 + // Sort order: we store loaded segments in ascending order by their id. type bySegmentID []*queueSegment @@ -201,6 +207,8 @@ func (segment *queueSegment) headerSize() uint64 { return segmentHeaderSizeV1 case 2: return segmentHeaderSizeV2 + case 3: + return segmentHeaderSizeV3 default: return uint64(0) } @@ -229,7 +237,7 @@ func (segment *queueSegment) getReader(queueSettings Settings) (*segmentReader, return nil, fmt.Errorf("couldn't read segment version: %w", err) } - if segment.schemaVersion > 2 { + if segment.schemaVersion > maxSegmentVersion { file.Close() return nil, fmt.Errorf("unknown segment version %d: %w", segment.schemaVersion, err) } @@ -246,7 +254,7 @@ func (segment *queueSegment) getReader(queueSettings Settings) (*segmentReader, sr.src = file sr.version = segment.schemaVersion - if sr.version != 2 { + if sr.version == 0 || sr.version == 1 { return sr, nil } @@ -276,7 +284,7 @@ func (segment *queueSegment) getWriter(queueSettings Settings) (*segmentWriter, return nil, err } - if sw.version != 2 { + if sw.version == 0 || sw.version == 1 { return sw, nil } @@ -399,7 +407,7 @@ func readSegmentHeader(in io.Reader) (*segmentHeader, error) { if err != nil { return nil, err } - case 2: + case 2, 3: default: return nil, fmt.Errorf("unrecognized schema version %d", header.version) } @@ -432,35 +440,47 @@ type segmentReader struct { } func (r *segmentReader) Read(p []byte) (int, error) { - if r.version != 2 { + switch r.version { + case 0, 1: return r.src.Read(p) + case 2, 3: + return r.er.Read(p) + default: + return 0, fmt.Errorf("unsupported schema version: %d", r.version) } - return r.er.Read(p) } func (r *segmentReader) Close() error { - if r.version != 2 { + switch r.version { + case 0, 1: return r.src.Close() + case 2, 3: + return r.er.Close() + default: + return fmt.Errorf("unsupported schema version: %d", r.version) } - return r.er.Close() } func (r *segmentReader) Seek(offset int64, whence int) (int64, error) { - if r.version != 2 { + switch r.version { + case 0, 1: return r.src.Seek(offset, whence) + case 2, 3: + //can't seek before segment header + if (offset + int64(whence)) < segmentHeaderSizeV2 { + return 0, fmt.Errorf("illegal seek offset %d, whence %d", offset, whence) + } + if _, err := r.src.Seek(segmentHeaderSizeV2, io.SeekStart); err != nil { + return 0, err + } + if err := r.er.Reset(); err != nil { + return 0, err + } + written, err := io.CopyN(io.Discard, r.er, (offset+int64(whence))-segmentHeaderSizeV2) + return written + segmentHeaderSizeV2, err + default: + return 0, fmt.Errorf("unsupported schema version: %d", r.version) } - //can't seek before segment header - if (offset + int64(whence)) < segmentHeaderSizeV2 { - return 0, fmt.Errorf("illegal seek offset %d, whence %d", offset, whence) - } - if _, err := r.src.Seek(segmentHeaderSizeV2, io.SeekStart); err != nil { - return 0, err - } - if err := r.er.Reset(); err != nil { - return 0, err - } - written, err := io.CopyN(io.Discard, r.er, (offset+int64(whence))-segmentHeaderSizeV2) - return written + segmentHeaderSizeV2, err } type segmentWriter struct { @@ -470,25 +490,34 @@ type segmentWriter struct { } func (w *segmentWriter) Write(p []byte) (int, error) { - if w.version != 2 { + switch w.version { + case 0, 1: return w.dst.Write(p) + case 2, 3: + return w.ew.Write(p) + default: + return 0, fmt.Errorf("unsupported schema version: %d", w.version) } - return w.ew.Write(p) } func (w *segmentWriter) Close() error { - if w.version != 2 { + switch w.version { + case 0, 1: return w.dst.Close() + case 2, 3: + return w.ew.Close() + default: + return fmt.Errorf("unsupported schema version: %d", w.version) } - return w.ew.Close() } func (w *segmentWriter) Seek(offset int64, whence int) (int64, error) { - if w.version != 2 { + switch w.version { + case 0, 1: return w.dst.Seek(offset, whence) + default: + return 0, fmt.Errorf("unsupported schema version: %d", w.version) } - // Not something we can do with a stream, we can't re-write. - return 0, nil } func (w *segmentWriter) Sync() error { @@ -504,20 +533,18 @@ func (w *segmentWriter) WriteHeader() error { return err } - // Version 0 & 2 don't have a count - if w.version == 0 || w.version == 2 { - return nil - } - - if err := binary.Write(w.dst, binary.LittleEndian, uint32(0)); err != nil { - return err + // Version 1 has count + if w.version == 1 { + if err := binary.Write(w.dst, binary.LittleEndian, uint32(0)); err != nil { + return err + } } return nil } func (w *segmentWriter) UpdateCount(count uint32) error { - // Version 0 & 2 don't record count + // Only Version 1 stores count if w.version != 1 { return nil } diff --git a/libbeat/publisher/queue/diskqueue/segments_test.go b/libbeat/publisher/queue/diskqueue/segments_test.go index 13307520687..1e10594009a 100644 --- a/libbeat/publisher/queue/diskqueue/segments_test.go +++ b/libbeat/publisher/queue/diskqueue/segments_test.go @@ -110,6 +110,12 @@ func TestSeek(t *testing.T) { headerSize: segmentHeaderSizeV2, plaintexts: [][]byte{[]byte("abc"), []byte("defg")}, }, + "version 3": { + id: 3, + schemaVersion: uint32(2), + headerSize: segmentHeaderSizeV3, + plaintexts: [][]byte{[]byte("abc"), []byte("defg")}, + }, } dir, err := os.MkdirTemp("", t.Name()) assert.Nil(t, err) diff --git a/libbeat/publisher/queue/diskqueue/serialize.go b/libbeat/publisher/queue/diskqueue/serialize.go index 05c55d32dd0..53a0580d948 100644 --- a/libbeat/publisher/queue/diskqueue/serialize.go +++ b/libbeat/publisher/queue/diskqueue/serialize.go @@ -128,10 +128,6 @@ func (e *eventEncoder) encode(evt interface{}) ([]byte, error) { return result, nil } -func (e *eventEncoder) SetCompression(val bool) { - e.useCompression = val -} - func newEventDecoder() *eventDecoder { d := &eventDecoder{} d.reset() @@ -196,7 +192,3 @@ func (d *eventDecoder) Decode() (publisher.Event, error) { }, }, nil } - -func (d *eventDecoder) SetCompression(val bool) { - d.useCompression = val -} diff --git a/libbeat/publisher/queue/diskqueue/serialize_test.go b/libbeat/publisher/queue/diskqueue/serialize_test.go index 170e1b16cff..3cc80c6f963 100644 --- a/libbeat/publisher/queue/diskqueue/serialize_test.go +++ b/libbeat/publisher/queue/diskqueue/serialize_test.go @@ -43,7 +43,7 @@ func TestSerialize(t *testing.T) { for _, test := range testCases { encoder := newEventEncoder() - encoder.SetCompression(test.useCompression) + encoder.useCompression = test.useCompression event := publisher.Event{ Content: beat.Event{ Fields: mapstr.M{ @@ -58,7 +58,7 @@ func TestSerialize(t *testing.T) { // Use decoder to decode the serialized bytes. decoder := newEventDecoder() - decoder.SetCompression(test.useCompression) + decoder.useCompression = (test.useCompression) buf := decoder.Buffer(len(serialized)) copy(buf, serialized) decoded, err := decoder.Decode() From 979ad390fd152c1d4456e85be789600c140119b2 Mon Sep 17 00:00:00 2001 From: "Lee E. Hinman" Date: Mon, 6 Jun 2022 18:51:35 -0500 Subject: [PATCH 4/7] writer_loop use bytes.Buffer to reduce Write syscalls --- .../publisher/queue/diskqueue/writer_loop.go | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/libbeat/publisher/queue/diskqueue/writer_loop.go b/libbeat/publisher/queue/diskqueue/writer_loop.go index c0bfbcd83cb..4dd8117aeb6 100644 --- a/libbeat/publisher/queue/diskqueue/writer_loop.go +++ b/libbeat/publisher/queue/diskqueue/writer_loop.go @@ -18,6 +18,7 @@ package diskqueue import ( + "bytes" "encoding/binary" "time" @@ -90,9 +91,13 @@ type writerLoop struct { outputFile *segmentWriter currentRetryInterval time.Duration + + // buffer Used to gather write information so there is only one write syscall + buffer *bytes.Buffer } func newWriterLoop(logger *logp.Logger, settings Settings) *writerLoop { + buffer := &bytes.Buffer{} return &writerLoop{ logger: logger, settings: settings, @@ -101,6 +106,7 @@ func newWriterLoop(logger *logp.Logger, settings Settings) *writerLoop { responseChan: make(chan writerLoopResponse), currentRetryInterval: settings.RetryInterval, + buffer: buffer, } } @@ -186,22 +192,27 @@ outerLoop: // The Write calls below all pass through retryWriter, so they can // only return an error if the write should be aborted. Thus, all we // need to do when we see an error is break out of the request loop. - err := binary.Write(retryWriter, binary.LittleEndian, frameSize) + err := binary.Write(wl.buffer, binary.LittleEndian, frameSize) if err != nil { break } - _, err = retryWriter.Write(frameRequest.frame.serialized) + _, err = wl.buffer.Write(frameRequest.frame.serialized) if err != nil { break } // Compute / write the frame's checksum checksum := computeChecksum(frameRequest.frame.serialized) - err = binary.Write(wl.outputFile, binary.LittleEndian, checksum) + err = binary.Write(wl.buffer, binary.LittleEndian, checksum) if err != nil { break } // Write the frame footer's (duplicate) length - err = binary.Write(wl.outputFile, binary.LittleEndian, frameSize) + err = binary.Write(wl.buffer, binary.LittleEndian, frameSize) + if err != nil { + break + } + _, err = retryWriter.Write(wl.buffer.Bytes()) + wl.buffer.Reset() if err != nil { break } From 3a69909e1bc940094aed294005f5f16d3354d296 Mon Sep 17 00:00:00 2001 From: "Lee E. Hinman" Date: Tue, 7 Jun 2022 09:54:36 -0500 Subject: [PATCH 5/7] add additional checks to encryption test --- libbeat/publisher/queue/diskqueue/encryption_test.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/libbeat/publisher/queue/diskqueue/encryption_test.go b/libbeat/publisher/queue/diskqueue/encryption_test.go index 5b08ad9344f..36998414780 100644 --- a/libbeat/publisher/queue/diskqueue/encryption_test.go +++ b/libbeat/publisher/queue/diskqueue/encryption_test.go @@ -19,6 +19,7 @@ package diskqueue import ( "bytes" + "crypto/aes" "io" "testing" @@ -37,6 +38,7 @@ func TestEncryptionRoundTrip(t *testing.T) { pr, pw := io.Pipe() src := bytes.NewReader(tc.plaintext) var dst bytes.Buffer + var teeBuf bytes.Buffer key := []byte("kkkkkkkkkkkkkkkk") go func() { @@ -48,10 +50,16 @@ func TestEncryptionRoundTrip(t *testing.T) { ew.Close() }() - er, err := NewEncryptionReader(pr, key) + tr := io.TeeReader(pr, &teeBuf) + er, err := NewEncryptionReader(io.NopCloser(tr), key) assert.Nil(t, err, name) _, err = io.Copy(&dst, er) assert.Nil(t, err, name) + // Check round trip worked assert.Equal(t, tc.plaintext, dst.Bytes(), name) + // Check that iv & cipher text were written + assert.Equal(t, len(tc.plaintext)+aes.BlockSize, teeBuf.Len(), name) + // Check that cipher text and plaintext don't match + assert.NotEqual(t, tc.plaintext, teeBuf.Bytes()[aes.BlockSize:], name) } } From 5b6dc958cd66d2928c13b976532e52e18ed7a662 Mon Sep 17 00:00:00 2001 From: "Lee E. Hinman" Date: Tue, 7 Jun 2022 10:16:48 -0500 Subject: [PATCH 6/7] update docs for v3 --- .../publisher/queue/diskqueue/docs/Makefile | 2 +- .../queue/diskqueue/docs/frameV2.pic | 2 +- .../queue/diskqueue/docs/frameV2.svg | 2 +- .../queue/diskqueue/docs/frameV3.pic | 5 +++ .../queue/diskqueue/docs/frameV3.svg | 11 ++++++ .../diskqueue/docs/on-disk-structures.md | 34 +++++++++++++++++-- .../queue/diskqueue/docs/schemaV3.pic | 4 +++ .../queue/diskqueue/docs/schemaV3.svg | 9 +++++ 8 files changed, 63 insertions(+), 6 deletions(-) create mode 100644 libbeat/publisher/queue/diskqueue/docs/frameV3.pic create mode 100644 libbeat/publisher/queue/diskqueue/docs/frameV3.svg create mode 100644 libbeat/publisher/queue/diskqueue/docs/schemaV3.pic create mode 100644 libbeat/publisher/queue/diskqueue/docs/schemaV3.svg diff --git a/libbeat/publisher/queue/diskqueue/docs/Makefile b/libbeat/publisher/queue/diskqueue/docs/Makefile index d9f55ffb377..bf7a34dd926 100644 --- a/libbeat/publisher/queue/diskqueue/docs/Makefile +++ b/libbeat/publisher/queue/diskqueue/docs/Makefile @@ -1,4 +1,4 @@ -all : schemaV0.svg frameV0.svg schemaV1.svg frameV1.svg schemaV2.svg frameV2.svg +all : schemaV0.svg frameV0.svg schemaV1.svg frameV1.svg schemaV2.svg frameV2.svg schemaV3.svg frameV3.svg .PHONY : clean diff --git a/libbeat/publisher/queue/diskqueue/docs/frameV2.pic b/libbeat/publisher/queue/diskqueue/docs/frameV2.pic index 3e340d09071..aac18259952 100644 --- a/libbeat/publisher/queue/diskqueue/docs/frameV2.pic +++ b/libbeat/publisher/queue/diskqueue/docs/frameV2.pic @@ -1,5 +1,5 @@ boxht = 0.25 SIZE1: box "size (uint32)" wid 4; -DATA: box "LZ4 compressed CBOR serialized data" dashed wid 4 ht 2 with .nw at SIZE1.sw; +DATA: box "CBOR serialized data" dashed wid 4 ht 2 with .nw at SIZE1.sw; CHECKSUM: box "checksum (uint32)" wid 4 with .nw at DATA.sw; SIZE2: box "size (uint32)" wid 4 with nw at CHECKSUM.sw; \ No newline at end of file diff --git a/libbeat/publisher/queue/diskqueue/docs/frameV2.svg b/libbeat/publisher/queue/diskqueue/docs/frameV2.svg index 8cf04362b72..8317e4b93e8 100644 --- a/libbeat/publisher/queue/diskqueue/docs/frameV2.svg +++ b/libbeat/publisher/queue/diskqueue/docs/frameV2.svg @@ -2,7 +2,7 @@ size (uint32) -LZ4 compressed CBOR serialized data +CBOR serialized data checksum (uint32) diff --git a/libbeat/publisher/queue/diskqueue/docs/frameV3.pic b/libbeat/publisher/queue/diskqueue/docs/frameV3.pic new file mode 100644 index 00000000000..3e340d09071 --- /dev/null +++ b/libbeat/publisher/queue/diskqueue/docs/frameV3.pic @@ -0,0 +1,5 @@ +boxht = 0.25 +SIZE1: box "size (uint32)" wid 4; +DATA: box "LZ4 compressed CBOR serialized data" dashed wid 4 ht 2 with .nw at SIZE1.sw; +CHECKSUM: box "checksum (uint32)" wid 4 with .nw at DATA.sw; +SIZE2: box "size (uint32)" wid 4 with nw at CHECKSUM.sw; \ No newline at end of file diff --git a/libbeat/publisher/queue/diskqueue/docs/frameV3.svg b/libbeat/publisher/queue/diskqueue/docs/frameV3.svg new file mode 100644 index 00000000000..8cf04362b72 --- /dev/null +++ b/libbeat/publisher/queue/diskqueue/docs/frameV3.svg @@ -0,0 +1,11 @@ + + +size (uint32) + +LZ4 compressed CBOR serialized data + +checksum (uint32) + +size (uint32) + + diff --git a/libbeat/publisher/queue/diskqueue/docs/on-disk-structures.md b/libbeat/publisher/queue/diskqueue/docs/on-disk-structures.md index 67831d13ff6..55878906bbe 100644 --- a/libbeat/publisher/queue/diskqueue/docs/on-disk-structures.md +++ b/libbeat/publisher/queue/diskqueue/docs/on-disk-structures.md @@ -49,14 +49,42 @@ of the size from the header. ## Version 2 -In version 2, encryption & compression is added to version 1. The +In version 2, encryption is added to version 1. The segments are made of a header followed by an initialization vector, and then encrypted frames. The header consists of one field, the version number which is an unsigned 32-bit integer in little-endian -format. The initialization vector is 128-bits in length. +format. The initialization vector is 128-bits in length. The count +was dropped from version 1 for 2 reasons. The first, if it was +outside the encrypted portion of the segment then it would be easy for +an attacker to modify. The second, is that adding it to the encrypted +segment in a meaningful way was problematic. The count is not known +until the last frame is written. With encryption you cannot seek to +the beginning of the segment and update the value. Adding the count +to the end is less useful because you have to decrypt the entire +segment before it can be read. ![Segment Schema Version 2](./schemaV2.svg) +The frames for version 2, consist of a header, followed by the +serialized event and a footer. The header contains one field which is +the size of the frame, which is an unsigned 32-bit integer in +little-endian format. The serialization format is CBOR. The footer +contains 2 fields, the first of which is a checksum which is an +unsigned 32-bit integer in little-endian format, followed by a repeat +of the size from the header. This is the same as version 1. + +![Frame Version 2](./frameV2.svg) + +## Version 3 + +In version 2, compression is added to version 2. The +segments are made of a header followed by an initialization vector, +and then encrypted frames. The header consists of one field, the +version number which is an unsigned 32-bit integer in little-endian +format. The initialization vector is 128-bits in length. + +![Segment Schema Version 3](./schemaV3.svg) + The frames for version 2, consist of a header, followed by the compressed serialized event and a footer. The header contains one field which is the size of the frame, which is an unsigned 32-bit @@ -66,4 +94,4 @@ fields, the first of which is a checksum which is an unsigned 32-bit integer in little-endian format, followed by a repeat of the size from the header. -![Frame Version 2](./frameV2.svg) +![Frame Version 3](./frameV3.svg) diff --git a/libbeat/publisher/queue/diskqueue/docs/schemaV3.pic b/libbeat/publisher/queue/diskqueue/docs/schemaV3.pic new file mode 100644 index 00000000000..006ae4bbf7b --- /dev/null +++ b/libbeat/publisher/queue/diskqueue/docs/schemaV3.pic @@ -0,0 +1,4 @@ +boxht = 0.25 +VERSION: box "version (uint32)" wid 4; +IV: box "initialization vector (128 bits)" wid 4 ht 1 with .nw at VERSION.sw +FRAME: box "Encrypted Frames" dashed wid 4 ht 2 with .nw at IV.sw; \ No newline at end of file diff --git a/libbeat/publisher/queue/diskqueue/docs/schemaV3.svg b/libbeat/publisher/queue/diskqueue/docs/schemaV3.svg new file mode 100644 index 00000000000..33ec757bed3 --- /dev/null +++ b/libbeat/publisher/queue/diskqueue/docs/schemaV3.svg @@ -0,0 +1,9 @@ + + +version (uint32) + +initialization vector (128 bits) + +Encrypted Frames + + From 49df4cf2555f3639fb6180b8e8f42ad304edf77d Mon Sep 17 00:00:00 2001 From: "Lee E. Hinman" Date: Tue, 14 Jun 2022 13:15:23 -0500 Subject: [PATCH 7/7] switch to stream compression with flush --- go.mod | 2 + go.sum | 2 + .../queue/diskqueue/benchmark_test.go | 90 +++++++--- .../publisher/queue/diskqueue/compression.go | 86 +++++++++ .../queue/diskqueue/compression_test.go | 166 ++++++++++++++++++ .../queue/diskqueue/enc_compress_test.go | 76 ++++++++ .../publisher/queue/diskqueue/encryption.go | 8 +- .../queue/diskqueue/encryption_test.go | 2 +- libbeat/publisher/queue/diskqueue/queue.go | 3 - .../publisher/queue/diskqueue/reader_loop.go | 4 +- libbeat/publisher/queue/diskqueue/segments.go | 81 +++++++-- .../queue/diskqueue/segments_test.go | 7 +- .../publisher/queue/diskqueue/serialize.go | 25 --- .../queue/diskqueue/serialize_test.go | 10 +- 14 files changed, 477 insertions(+), 85 deletions(-) create mode 100644 libbeat/publisher/queue/diskqueue/compression.go create mode 100644 libbeat/publisher/queue/diskqueue/compression_test.go create mode 100644 libbeat/publisher/queue/diskqueue/enc_compress_test.go diff --git a/go.mod b/go.mod index 15e96f44687..5741d32703c 100644 --- a/go.mod +++ b/go.mod @@ -263,6 +263,7 @@ require ( github.com/morikuni/aec v1.0.0 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.0.2 // indirect + github.com/pierrec/lz4/v4 v4.1.15-0.20220523073431-885e900f46f2 // indirect github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect github.com/prometheus/client_golang v1.11.0 // indirect github.com/samuel/go-parser v0.0.0-20130731160455-ca8abbf65d0e // indirect @@ -316,6 +317,7 @@ replace ( github.com/google/gopacket => github.com/elastic/gopacket v1.1.20-0.20211202005954-d412fca7f83a github.com/insomniacslk/dhcp => github.com/elastic/dhcp v0.0.0-20200227161230-57ec251c7eb3 // indirect github.com/tonistiigi/fifo => github.com/containerd/fifo v0.0.0-20190816180239-bda0ff6ed73c + github.com/pierrec/lz4/v4 v4.1.15-0.20220523073431-885e900f46f2 => /Users/leehinman/src/pierrec_lz4 ) // Exclude this version because the version has an invalid checksum. diff --git a/go.sum b/go.sum index e8ac417dccc..7b96e7c228d 100644 --- a/go.sum +++ b/go.sum @@ -1401,6 +1401,8 @@ github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0 github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pierrec/lz4 v2.6.0+incompatible h1:Ix9yFKn1nSPBLFl/yZknTp8TU5G4Ps0JDmguYK6iH1A= github.com/pierrec/lz4 v2.6.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= +github.com/pierrec/lz4/v4 v4.1.15-0.20220523073431-885e900f46f2 h1:rfcsVKBrjqnG3+q599T+JFw+wG94uL/ehfyQtUdav6U= +github.com/pierrec/lz4/v4 v4.1.15-0.20220523073431-885e900f46f2/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pierrre/gotestcover v0.0.0-20160517101806-924dca7d15f0 h1:i5VIxp6QB8oWZ8IkK8zrDgeT6ORGIUeiN+61iETwJbI= github.com/pierrre/gotestcover v0.0.0-20160517101806-924dca7d15f0/go.mod h1:4xpMLz7RBWyB+ElzHu8Llua96TRCB3YwX+l5EP1wmHk= github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4/go.mod h1:4OwLy04Bl9Ef3GJJCoec+30X3LQs/0/m4HFRt/2LUSA= diff --git a/libbeat/publisher/queue/diskqueue/benchmark_test.go b/libbeat/publisher/queue/diskqueue/benchmark_test.go index 659f37b7a49..86c07a97db5 100644 --- a/libbeat/publisher/queue/diskqueue/benchmark_test.go +++ b/libbeat/publisher/queue/diskqueue/benchmark_test.go @@ -49,14 +49,47 @@ var ( ) //makeEvent creates a sample event, using a random message from msg above -func makeEvent() publisher.Event { - return publisher.Event{ - Content: beat.Event{ - Timestamp: eventTime, - Fields: mapstr.M{ - "message": msgs[rand.Intn(len(msgs))], +func makeEvent(kind string) publisher.Event { + switch kind { + // 81 bytes + case "small": + return publisher.Event{ + Content: beat.Event{ + Timestamp: eventTime, + Fields: mapstr.M{ + "message": msgs[3], + }, }, - }, + } + // 865 bytes + case "medium": + return publisher.Event{ + Content: beat.Event{ + Timestamp: eventTime, + Fields: mapstr.M{ + "message": msgs[5], + }, + }, + } + // 2324 bytes + case "large": + return publisher.Event{ + Content: beat.Event{ + Timestamp: eventTime, + Fields: mapstr.M{ + "message": msgs[2], + }, + }, + } + default: + return publisher.Event{ + Content: beat.Event{ + Timestamp: eventTime, + Fields: mapstr.M{ + "message": msgs[rand.Intn(len(msgs))], + }, + }, + } } } @@ -92,9 +125,9 @@ func cleanup(q *diskQueue) { } } -func publishEvents(p queue.Producer, num int) { +func publishEvents(p queue.Producer, num int, kind string) { for i := 0; i < num; i++ { - ok := p.Publish(makeEvent()) + ok := p.Publish(makeEvent(kind)) if !ok { panic("didn't publish") } @@ -119,21 +152,21 @@ func getAndAckEvents(q *diskQueue, num_events int, batch_size int) error { //produceAndConsume generates and publishes events in a go routine, in // the main go routine it consumes and acks them. This interleaves // publish and consume. -func produceAndConsume(p queue.Producer, q *diskQueue, num_events int, batch_size int) error { - go publishEvents(p, num_events) +func produceAndConsume(p queue.Producer, q *diskQueue, num_events int, batch_size int, kind string) error { + go publishEvents(p, num_events, kind) return getAndAckEvents(q, num_events, batch_size) } //produceThenConsume generates and publishes events, when all events // are published it consumes and acks them. -func produceThenConsume(p queue.Producer, q *diskQueue, num_events int, batch_size int) error { - publishEvents(p, num_events) +func produceThenConsume(p queue.Producer, q *diskQueue, num_events int, batch_size int, kind string) error { + publishEvents(p, num_events, kind) return getAndAckEvents(q, num_events, batch_size) } //benchmarkQueue is a wrapper for produceAndConsume, it tries to limit // timers to just produceAndConsume -func benchmarkQueue(num_events int, batch_size int, schemaVersion int, async bool, b *testing.B) { +func benchmarkQueue(num_events int, batch_size int, schemaVersion int, async bool, kind string, b *testing.B) { b.ResetTimer() var err error @@ -143,12 +176,12 @@ func benchmarkQueue(num_events int, batch_size int, schemaVersion int, async boo q, p := setup(schemaVersion) b.StartTimer() if async { - if err = produceAndConsume(p, q, num_events, batch_size); err != nil { + if err = produceAndConsume(p, q, num_events, batch_size, kind); err != nil { cleanup(q) break } } else { - if err = produceThenConsume(p, q, num_events, batch_size); err != nil { + if err = produceThenConsume(p, q, num_events, batch_size, kind); err != nil { cleanup(q) break } @@ -161,15 +194,16 @@ func benchmarkQueue(num_events int, batch_size int, schemaVersion int, async boo } // Actual benchmark calls follow -func BenchmarkV1Async1k(b *testing.B) { benchmarkQueue(1000, 10, 1, true, b) } -func BenchmarkV1Async1M(b *testing.B) { benchmarkQueue(1000000, 1000, 1, true, b) } -func BenchmarkV2Async1k(b *testing.B) { benchmarkQueue(1000, 10, 2, true, b) } -func BenchmarkV2Async1M(b *testing.B) { benchmarkQueue(1000000, 1000, 2, true, b) } -func BenchmarkV3Async1k(b *testing.B) { benchmarkQueue(1000, 10, 3, true, b) } -func BenchmarkV3Async1M(b *testing.B) { benchmarkQueue(1000000, 1000, 3, true, b) } -func BenchmarkV1Sync1k(b *testing.B) { benchmarkQueue(1000, 10, 1, false, b) } -func BenchmarkV1Sync1M(b *testing.B) { benchmarkQueue(1000000, 1000, 1, false, b) } -func BenchmarkV2Sync1k(b *testing.B) { benchmarkQueue(1000, 10, 2, false, b) } -func BenchmarkV2Sync1M(b *testing.B) { benchmarkQueue(1000000, 1000, 2, false, b) } -func BenchmarkV3Sync1k(b *testing.B) { benchmarkQueue(1000, 10, 3, false, b) } -func BenchmarkV3Sync1M(b *testing.B) { benchmarkQueue(1000000, 1000, 3, false, b) } +func BenchmarkV1Async1k(b *testing.B) { benchmarkQueue(1000, 10, 1, true, "random", b) } +func BenchmarkV1Async1M(b *testing.B) { benchmarkQueue(1000000, 1000, 1, true, "random", b) } +func BenchmarkV2Async1k(b *testing.B) { benchmarkQueue(1000, 10, 2, true, "random", b) } +func BenchmarkV2Async1M(b *testing.B) { benchmarkQueue(1000000, 1000, 2, true, "random", b) } +func BenchmarkV3Async1k(b *testing.B) { benchmarkQueue(1000, 10, 3, true, "random", b) } +func BenchmarkV3Async1M(b *testing.B) { benchmarkQueue(1000000, 1000, 3, true, "random", b) } +func BenchmarkV1Sync1k(b *testing.B) { benchmarkQueue(1000, 10, 1, false, "random", b) } +func BenchmarkV1Sync1M(b *testing.B) { benchmarkQueue(1000000, 1000, 1, false, "random", b) } +func BenchmarkV2Sync1k(b *testing.B) { benchmarkQueue(1000, 10, 2, false, "random", b) } +func BenchmarkV2Sync1M(b *testing.B) { benchmarkQueue(1000000, 1000, 2, false, "random", b) } +func BenchmarkV3Sync1k(b *testing.B) { benchmarkQueue(1000, 10, 3, false, "random", b) } +func BenchmarkV3Sync1M(b *testing.B) { benchmarkQueue(1000000, 1000, 3, false, "random", b) } +func BenchmarkV1SyncSize(b *testing.B) { benchmarkQueue(100000, 1000, 1, false, "medium", b) } diff --git a/libbeat/publisher/queue/diskqueue/compression.go b/libbeat/publisher/queue/diskqueue/compression.go new file mode 100644 index 00000000000..9dedc8e9b63 --- /dev/null +++ b/libbeat/publisher/queue/diskqueue/compression.go @@ -0,0 +1,86 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package diskqueue + +import ( + "io" + + lz4V4 "github.com/pierrec/lz4/v4" +) + +//CompressionReader allows reading a stream compressed with LZ4 +type CompressionReader struct { + src io.ReadCloser + pLZ4Reader *lz4V4.Reader +} + +//NewCompressionReader returns a new LZ4 frame decoder +func NewCompressionReader(r io.ReadCloser) *CompressionReader { + zr := lz4V4.NewReader(r) + return &CompressionReader{ + src: r, + pLZ4Reader: zr, + } +} + +func (r *CompressionReader) Read(buf []byte) (int, error) { + return r.pLZ4Reader.Read(buf) +} + +func (r *CompressionReader) Close() error { + return r.src.Close() +} + +//Reset Sets up compression again, assumes that caller has already set +// the src to the correct position +func (r *CompressionReader) Reset() error { + r.pLZ4Reader.Reset(r.src) + return nil +} + +//CompressionWriter allows writing an LZ4 stream +type CompressionWriter struct { + dst WriteCloseSyncer + pLZ4Writer *lz4V4.Writer +} + +//NewCompressionWriter returns a new LZ4 frame encoder +func NewCompressionWriter(w WriteCloseSyncer) *CompressionWriter { + zw := lz4V4.NewWriter(w) + return &CompressionWriter{ + dst: w, + pLZ4Writer: zw, + } +} + +func (w *CompressionWriter) Write(p []byte) (int, error) { + return w.pLZ4Writer.Write(p) +} + +func (w *CompressionWriter) Close() error { + err := w.pLZ4Writer.Close() + if err != nil { + return err + } + return w.dst.Close() +} + +func (w *CompressionWriter) Sync() error { + w.pLZ4Writer.Flush() + return w.dst.Sync() +} diff --git a/libbeat/publisher/queue/diskqueue/compression_test.go b/libbeat/publisher/queue/diskqueue/compression_test.go new file mode 100644 index 00000000000..5419317d71b --- /dev/null +++ b/libbeat/publisher/queue/diskqueue/compression_test.go @@ -0,0 +1,166 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package diskqueue + +import ( + "bytes" + "io" + "testing" + + "github.com/stretchr/testify/assert" +) + +func NopWriteCloseSyncer(w io.WriteCloser) WriteCloseSyncer { + return nopWriteCloseSyncer{w} +} + +type nopWriteCloseSyncer struct { + io.WriteCloser +} + +func (nopWriteCloseSyncer) Sync() error { return nil } + +type nopWriteCloser struct { + io.Writer +} + +func NopWriteCloser(w io.Writer) io.WriteCloser { + return nopWriteCloser{w} +} +func (nopWriteCloser) Close() error { return nil } + +func TestCompressionReader(t *testing.T) { + tests := map[string]struct { + plaintext []byte + compressed []byte + }{ + "abc 1.9.3 lz4": { + plaintext: []byte("abc"), + compressed: []byte{ + 0x04, 0x22, 0x4d, 0x18, + 0x64, 0x40, 0xa7, 0x04, + 0x00, 0x00, 0x80, 0x61, + 0x62, 0x63, 0x0a, 0x00, + 0x00, 0x00, 0x00, 0x6c, + 0x3e, 0x7b, 0x08, 0x00}, + }, + "abc pierrec lz4": { + plaintext: []byte("abc"), + compressed: []byte{ + 0x04, 0x22, 0x4d, 0x18, + 0x64, 0x70, 0xb9, 0x03, + 0x00, 0x00, 0x80, 0x61, + 0x62, 0x63, 0x00, 0x00, + 0x00, 0x00, 0xff, 0x53, + 0xd1, 0x32}, + }, + } + + for name, tc := range tests { + dst := make([]byte, len(tc.plaintext)) + src := bytes.NewReader(tc.compressed) + cr := NewCompressionReader(io.NopCloser(src)) + n, err := cr.Read(dst) + assert.Nil(t, err, name) + assert.Equal(t, len(tc.plaintext), n, name) + assert.Equal(t, tc.plaintext, dst, name) + } +} + +func TestCompressionWriter(t *testing.T) { + tests := map[string]struct { + plaintext []byte + compressed []byte + }{ + "abc pierrec lz4": { + plaintext: []byte("abc"), + compressed: []byte{ + 0x04, 0x22, 0x4d, 0x18, + 0x64, 0x70, 0xb9, 0x03, + 0x00, 0x00, 0x80, 0x61, + 0x62, 0x63, 0x00, 0x00, + 0x00, 0x00, 0xff, 0x53, + 0xd1, 0x32}, + }, + } + + for name, tc := range tests { + var dst bytes.Buffer + cw := NewCompressionWriter(NopWriteCloseSyncer(NopWriteCloser(&dst))) + n, err := cw.Write(tc.plaintext) + cw.Close() + assert.Nil(t, err, name) + assert.Equal(t, len(tc.plaintext), n, name) + assert.Equal(t, tc.compressed, dst.Bytes(), name) + } +} + +func TestCompressionRoundTrip(t *testing.T) { + tests := map[string]struct { + plaintext []byte + }{ + "no repeat": {plaintext: []byte("abcdefghijklmnopqrstuvwxzy01234567890ABCDEFGHIJKLMNOPQRSTUVWXYZ")}, + "256 repeat": {plaintext: []byte("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")}, + } + for name, tc := range tests { + pr, pw := io.Pipe() + src := bytes.NewReader(tc.plaintext) + var dst bytes.Buffer + + go func() { + cw := NewCompressionWriter(NopWriteCloseSyncer(pw)) + _, err := io.Copy(cw, src) + assert.Nil(t, err, name) + cw.Close() + }() + + cr := NewCompressionReader(pr) + _, err := io.Copy(&dst, cr) + assert.Nil(t, err, name) + assert.Equal(t, tc.plaintext, dst.Bytes(), name) + } +} + +func TestCompressionSync(t *testing.T) { + tests := map[string]struct { + plaintext []byte + }{ + "no repeat": {plaintext: []byte("abcdefghijklmnopqrstuvwxzy01234567890ABCDEFGHIJKLMNOPQRSTUVWXYZ")}, + "256 repeat": {plaintext: []byte("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")}, + } + for name, tc := range tests { + pr, pw := io.Pipe() + var dst bytes.Buffer + go func() { + cw := NewCompressionWriter(NopWriteCloseSyncer(pw)) + src1 := bytes.NewReader(tc.plaintext) + _, err := io.Copy(cw, src1) + assert.Nil(t, err, name) + err = cw.Sync() + src2 := bytes.NewReader(tc.plaintext) + _, err = io.Copy(cw, src2) + assert.Nil(t, err, name) + cw.Close() + }() + cr := NewCompressionReader(pr) + _, err := io.Copy(&dst, cr) + assert.Nil(t, err, name) + assert.Equal(t, tc.plaintext, dst.Bytes()[:len(tc.plaintext)], name) + assert.Equal(t, tc.plaintext, dst.Bytes()[len(tc.plaintext):], name) + } +} diff --git a/libbeat/publisher/queue/diskqueue/enc_compress_test.go b/libbeat/publisher/queue/diskqueue/enc_compress_test.go new file mode 100644 index 00000000000..637765c24cb --- /dev/null +++ b/libbeat/publisher/queue/diskqueue/enc_compress_test.go @@ -0,0 +1,76 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package diskqueue + +import ( + "bytes" + "io" + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestEncryptionCompressionRoundTrip(t *testing.T) { + tests := map[string]struct { + plaintext []byte + }{ + "1 rune": {plaintext: []byte("a")}, + "16 runes": {plaintext: []byte("bbbbbbbbbbbbbbbb")}, + "17 runes": {plaintext: []byte("ccccccccccccccccc")}, + "small json": {plaintext: []byte("{\"message\":\"2 123456789010 eni-1235b8ca123456789 - - - - - - - 1431280876 1431280934 - NODATA\"}")}, + "large json": {plaintext: []byte("{\"message\":\"{\\\"CacheCacheStatus\\\":\\\"hit\\\",\\\"CacheResponseBytes\\\":26888,\\\"CacheResponseStatus\\\":200,\\\"CacheTieredFill\\\":true,\\\"ClientASN\\\":1136,\\\"ClientCountry\\\":\\\"nl\\\",\\\"ClientDeviceType\\\":\\\"desktop\\\",\\\"ClientIP\\\":\\\"89.160.20.156\\\",\\\"ClientIPClass\\\":\\\"noRecord\\\",\\\"ClientRequestBytes\\\":5324,\\\"ClientRequestHost\\\":\\\"eqlplayground.io\\\",\\\"ClientRequestMethod\\\":\\\"GET\\\",\\\"ClientRequestPath\\\":\\\"/40865/bundles/plugin/securitySolution/8.0.0/securitySolution.chunk.9.js\\\",\\\"ClientRequestProtocol\\\":\\\"HTTP/1.1\\\",\\\"ClientRequestReferer\\\":\\\"https://eqlplayground.io/s/eqldemo/app/security/timelines/default?sourcerer=(default:!(.siem-signals-eqldemo))&timerange=(global:(linkTo:!(),timerange:(from:%272021-03-03T19:55:15.519Z%27,fromStr:now-24h,kind:relative,to:%272021-03-04T19:55:15.519Z%27,toStr:now)),timeline:(linkTo:!(),timerange:(from:%272020-03-04T19:55:28.684Z%27,fromStr:now-1y,kind:relative,to:%272021-03-04T19:55:28.692Z%27,toStr:now)))&timeline=(activeTab:eql,graphEventId:%27%27,id:%2769f93840-7d23-11eb-866c-79a0609409ba%27,isOpen:!t)\\\",\\\"ClientRequestURI\\\":\\\"/40865/bundles/plugin/securitySolution/8.0.0/securitySolution.chunk.9.js\\\",\\\"ClientRequestUserAgent\\\":\\\"Mozilla/5.0(WindowsNT10.0;Win64;x64)AppleWebKit/537.36(KHTML,likeGecko)Chrome/91.0.4472.124Safari/537.36\\\",\\\"ClientSSLCipher\\\":\\\"NONE\\\",\\\"ClientSSLProtocol\\\":\\\"none\\\",\\\"ClientSrcPort\\\":0,\\\"ClientXRequestedWith\\\":\\\"\\\",\\\"EdgeColoCode\\\":\\\"33.147.138.217\\\",\\\"EdgeColoID\\\":20,\\\"EdgeEndTimestamp\\\":1625752958875000000,\\\"EdgePathingOp\\\":\\\"wl\\\",\\\"EdgePathingSrc\\\":\\\"macro\\\",\\\"EdgePathingStatus\\\":\\\"nr\\\",\\\"EdgeRateLimitAction\\\":\\\"\\\",\\\"EdgeRateLimitID\\\":0,\\\"EdgeRequestHost\\\":\\\"eqlplayground.io\\\",\\\"EdgeResponseBytes\\\":24743,\\\"EdgeResponseCompressionRatio\\\":0,\\\"EdgeResponseContentType\\\":\\\"application/javascript\\\",\\\"EdgeResponseStatus\\\":200,\\\"EdgeServerIP\\\":\\\"89.160.20.156\\\",\\\"EdgeStartTimestamp\\\":1625752958812000000,\\\"FirewallMatchesActions\\\":[],\\\"FirewallMatchesRuleIDs\\\":[],\\\"FirewallMatchesSources\\\":[],\\\"OriginIP\\\":\\\"\\\",\\\"OriginResponseBytes\\\":0,\\\"OriginResponseHTTPExpires\\\":\\\"\\\",\\\"OriginResponseHTTPLastModified\\\":\\\"\\\",\\\"OriginResponseStatus\\\":0,\\\"OriginResponseTime\\\":0,\\\"OriginSSLProtocol\\\":\\\"unknown\\\",\\\"ParentRayID\\\":\\\"66b9d9f88b5b4c4f\\\",\\\"RayID\\\":\\\"66b9d9f890ae4c4f\\\",\\\"SecurityLevel\\\":\\\"off\\\",\\\"WAFAction\\\":\\\"unknown\\\",\\\"WAFFlags\\\":\\\"0\\\",\\\"WAFMatchedVar\\\":\\\"\\\",\\\"WAFProfile\\\":\\\"unknown\\\",\\\"WAFRuleID\\\":\\\"\\\",\\\"WAFRuleMessage\\\":\\\"\\\",\\\"WorkerCPUTime\\\":0,\\\"WorkerStatus\\\":\\\"unknown\\\",\\\"WorkerSubrequest\\\":true,\\\"WorkerSubrequestCount\\\":0,\\\"ZoneID\\\":393347122}\"}")}, + } + + for name, tc := range tests { + pr, pw := io.Pipe() + key := []byte("keykeykeykeykeyk") + src := bytes.NewReader(tc.plaintext) + var dst bytes.Buffer + var tEncBuf bytes.Buffer + var tCompBuf bytes.Buffer + + go func() { + ew, err := NewEncryptionWriter(NopWriteCloseSyncer(pw), key) + assert.Nil(t, err, name) + cw := NewCompressionWriter(ew) + _, err = io.Copy(cw, src) + assert.Nil(t, err, name) + err = cw.Close() + assert.Nil(t, err, name) + }() + + ter := io.TeeReader(pr, &tEncBuf) + er, err := NewEncryptionReader(io.NopCloser(ter), key) + assert.Nil(t, err, name) + + tcr := io.TeeReader(er, &tCompBuf) + + cr := NewCompressionReader(io.NopCloser(tcr)) + + _, err = io.Copy(&dst, cr) + assert.Nil(t, err, name) + // Check round trip worked + assert.Equal(t, tc.plaintext, dst.Bytes(), name) + // Check that cipher text and plaintext don't match + assert.NotEqual(t, tc.plaintext, tEncBuf.Bytes(), name) + // Check that compressed text and plaintext don't match + assert.NotEqual(t, tc.plaintext, tCompBuf.Bytes(), name) + // Check that compressed text and ciphertext don't match + assert.NotEqual(t, tEncBuf.Bytes(), tCompBuf.Bytes(), name) + } +} diff --git a/libbeat/publisher/queue/diskqueue/encryption.go b/libbeat/publisher/queue/diskqueue/encryption.go index c21c7a8fba2..ecabfc06f16 100644 --- a/libbeat/publisher/queue/diskqueue/encryption.go +++ b/libbeat/publisher/queue/diskqueue/encryption.go @@ -100,12 +100,12 @@ func (er *EncryptionReader) Reset() error { //EncryptionWriter allows writing to a AES-128-CTR stream type EncryptionWriter struct { - dst io.WriteCloser + dst WriteCloseSyncer stream cipher.Stream } //NewEncryptionWriter returns a new AES-128-CTR stream encryptor -func NewEncryptionWriter(w io.WriteCloser, key []byte) (*EncryptionWriter, error) { +func NewEncryptionWriter(w WriteCloseSyncer, key []byte) (*EncryptionWriter, error) { if len(key) != KeySize { return nil, fmt.Errorf("key must be %d bytes long", KeySize) } @@ -150,3 +150,7 @@ func (ew *EncryptionWriter) Write(buf []byte) (int, error) { func (ew *EncryptionWriter) Close() error { return ew.dst.Close() } + +func (ew *EncryptionWriter) Sync() error { + return ew.dst.Sync() +} diff --git a/libbeat/publisher/queue/diskqueue/encryption_test.go b/libbeat/publisher/queue/diskqueue/encryption_test.go index 36998414780..8bb70eb6ae9 100644 --- a/libbeat/publisher/queue/diskqueue/encryption_test.go +++ b/libbeat/publisher/queue/diskqueue/encryption_test.go @@ -43,7 +43,7 @@ func TestEncryptionRoundTrip(t *testing.T) { go func() { //NewEncryptionWriter writes iv, so needs to be in go routine - ew, err := NewEncryptionWriter(pw, key) + ew, err := NewEncryptionWriter(NopWriteCloseSyncer(pw), key) assert.Nil(t, err, name) _, err = io.Copy(ew, src) assert.Nil(t, err, name) diff --git a/libbeat/publisher/queue/diskqueue/queue.go b/libbeat/publisher/queue/diskqueue/queue.go index 559ee9ecbe0..fc4bafe59b5 100644 --- a/libbeat/publisher/queue/diskqueue/queue.go +++ b/libbeat/publisher/queue/diskqueue/queue.go @@ -268,9 +268,6 @@ func (dq *diskQueue) BufferConfig() queue.BufferConfig { func (dq *diskQueue) Producer(cfg queue.ProducerConfig) queue.Producer { encoder := newEventEncoder() - if dq.settings.SchemaVersion == 3 { - encoder.useCompression = true - } return &diskQueueProducer{ queue: dq, config: cfg, diff --git a/libbeat/publisher/queue/diskqueue/reader_loop.go b/libbeat/publisher/queue/diskqueue/reader_loop.go index b14aaa47692..193cd73694a 100644 --- a/libbeat/publisher/queue/diskqueue/reader_loop.go +++ b/libbeat/publisher/queue/diskqueue/reader_loop.go @@ -106,9 +106,7 @@ func (rl *readerLoop) processRequest(request readerLoopRequest) readerLoopRespon return readerLoopResponse{err: err} } defer handle.Close() - if request.segment.schemaVersion == 3 { - rl.decoder.useCompression = true - } + _, err = handle.Seek(int64(request.startPosition), io.SeekStart) if err != nil { return readerLoopResponse{err: err} diff --git a/libbeat/publisher/queue/diskqueue/segments.go b/libbeat/publisher/queue/diskqueue/segments.go index 3458153d80f..345f7e85379 100644 --- a/libbeat/publisher/queue/diskqueue/segments.go +++ b/libbeat/publisher/queue/diskqueue/segments.go @@ -132,6 +132,12 @@ type segmentHeader struct { frameCount uint32 } +type WriteCloseSyncer interface { + io.Writer + io.Closer + Sync() error +} + // segmentHeaderSizeV0 schemaVersion 0 header size, uint32 for version const segmentHeaderSizeV0 = 4 @@ -258,12 +264,18 @@ func (segment *queueSegment) getReader(queueSettings Settings) (*segmentReader, return sr, nil } - sr.er, err = NewEncryptionReader(sr.src, queueSettings.EncryptionKey) - if err != nil { - sr.src.Close() - return nil, fmt.Errorf("couldn't create encryption reader: %w", err) + if sr.version == 2 || sr.version == 3 { + sr.er, err = NewEncryptionReader(sr.src, queueSettings.EncryptionKey) + if err != nil { + sr.src.Close() + return nil, fmt.Errorf("couldn't create encryption reader: %w", err) + } + } + if sr.version == 3 { + sr.cr = NewCompressionReader(sr.er) } return sr, nil + } // Should only be called from the writer loop. @@ -288,10 +300,16 @@ func (segment *queueSegment) getWriter(queueSettings Settings) (*segmentWriter, return sw, nil } - sw.ew, err = NewEncryptionWriter(sw.dst, queueSettings.EncryptionKey) - if err != nil { - sw.dst.Close() - return nil, fmt.Errorf("couldn't create encryption writer: %w", err) + if sw.version == 2 || sw.version == 3 { + sw.ew, err = NewEncryptionWriter(sw.dst, queueSettings.EncryptionKey) + if err != nil { + sw.dst.Close() + return nil, fmt.Errorf("couldn't create encryption writer: %w", err) + } + } + + if sw.version == 3 { + sw.cw = NewCompressionWriter(sw.ew) } return sw, nil @@ -436,6 +454,7 @@ func (segments *diskQueueSegments) sizeOnDisk() uint64 { type segmentReader struct { src io.ReadSeekCloser er *EncryptionReader + cr *CompressionReader version uint32 } @@ -443,8 +462,10 @@ func (r *segmentReader) Read(p []byte) (int, error) { switch r.version { case 0, 1: return r.src.Read(p) - case 2, 3: + case 2: return r.er.Read(p) + case 3: + return r.cr.Read(p) default: return 0, fmt.Errorf("unsupported schema version: %d", r.version) } @@ -454,8 +475,10 @@ func (r *segmentReader) Close() error { switch r.version { case 0, 1: return r.src.Close() - case 2, 3: + case 2: return r.er.Close() + case 3: + return r.cr.Close() default: return fmt.Errorf("unsupported schema version: %d", r.version) } @@ -465,7 +488,7 @@ func (r *segmentReader) Seek(offset int64, whence int) (int64, error) { switch r.version { case 0, 1: return r.src.Seek(offset, whence) - case 2, 3: + case 2: //can't seek before segment header if (offset + int64(whence)) < segmentHeaderSizeV2 { return 0, fmt.Errorf("illegal seek offset %d, whence %d", offset, whence) @@ -478,6 +501,22 @@ func (r *segmentReader) Seek(offset int64, whence int) (int64, error) { } written, err := io.CopyN(io.Discard, r.er, (offset+int64(whence))-segmentHeaderSizeV2) return written + segmentHeaderSizeV2, err + case 3: + //can't seek before segment header + if (offset + int64(whence)) < segmentHeaderSizeV3 { + return 0, fmt.Errorf("illegal seek offset %d, whence %d", offset, whence) + } + if _, err := r.src.Seek(segmentHeaderSizeV3, io.SeekStart); err != nil { + return 0, err + } + if err := r.er.Reset(); err != nil { + return 0, err + } + if err := r.cr.Reset(); err != nil { + return 0, err + } + written, err := io.CopyN(io.Discard, r.cr, (offset+int64(whence))-segmentHeaderSizeV3) + return written + segmentHeaderSizeV3, err default: return 0, fmt.Errorf("unsupported schema version: %d", r.version) } @@ -486,6 +525,7 @@ func (r *segmentReader) Seek(offset int64, whence int) (int64, error) { type segmentWriter struct { dst *os.File ew *EncryptionWriter + cw *CompressionWriter version uint32 } @@ -493,8 +533,10 @@ func (w *segmentWriter) Write(p []byte) (int, error) { switch w.version { case 0, 1: return w.dst.Write(p) - case 2, 3: + case 2: return w.ew.Write(p) + case 3: + return w.cw.Write(p) default: return 0, fmt.Errorf("unsupported schema version: %d", w.version) } @@ -504,8 +546,10 @@ func (w *segmentWriter) Close() error { switch w.version { case 0, 1: return w.dst.Close() - case 2, 3: + case 2: return w.ew.Close() + case 3: + return w.cw.Close() default: return fmt.Errorf("unsupported schema version: %d", w.version) } @@ -521,7 +565,16 @@ func (w *segmentWriter) Seek(offset int64, whence int) (int64, error) { } func (w *segmentWriter) Sync() error { - return w.dst.Sync() + switch w.version { + case 0, 1: + return w.dst.Sync() + case 2: + return w.ew.Sync() + case 3: + return w.cw.Sync() + default: + return fmt.Errorf("unsupported schema version: %d", w.version) + } } func (w *segmentWriter) WriteHeader() error { diff --git a/libbeat/publisher/queue/diskqueue/segments_test.go b/libbeat/publisher/queue/diskqueue/segments_test.go index 1e10594009a..8261c0d78d9 100644 --- a/libbeat/publisher/queue/diskqueue/segments_test.go +++ b/libbeat/publisher/queue/diskqueue/segments_test.go @@ -46,6 +46,11 @@ func TestSchemasRoundTrip(t *testing.T) { schemaVersion: uint32(2), plaintext: []byte("abc"), }, + "version 3": { + id: 2, + schemaVersion: uint32(2), + plaintext: []byte("abc"), + }, } dir, err := os.MkdirTemp("", t.Name()) assert.Nil(t, err) @@ -85,7 +90,7 @@ func TestSchemasRoundTrip(t *testing.T) { } } -func TestSeek(t *testing.T) { +func TestSegmentReaderSeek(t *testing.T) { tests := map[string]struct { id segmentID schemaVersion uint32 diff --git a/libbeat/publisher/queue/diskqueue/serialize.go b/libbeat/publisher/queue/diskqueue/serialize.go index 53a0580d948..5af619b5429 100644 --- a/libbeat/publisher/queue/diskqueue/serialize.go +++ b/libbeat/publisher/queue/diskqueue/serialize.go @@ -25,8 +25,6 @@ import ( "fmt" "time" - "github.com/pierrec/lz4" - "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/outputs/codec" "github.com/elastic/beats/v7/libbeat/publisher" @@ -39,8 +37,6 @@ import ( type eventEncoder struct { buf bytes.Buffer folder *gotype.Iterator - // Use Compression - useCompression bool } type eventDecoder struct { @@ -54,9 +50,6 @@ type eventDecoder struct { useJSON bool unfolder *gotype.Unfolder - - // Use Compression - useCompression bool } type entry struct { @@ -114,15 +107,6 @@ func (e *eventEncoder) encode(evt interface{}) ([]byte, error) { return nil, err } - // Copy the encoded bytes to a new array owned by the caller. - if e.useCompression { - result := make([]byte, lz4.CompressBlockBound(e.buf.Len())) - n, err := lz4.CompressBlock(e.buf.Bytes(), result, nil) - if err != nil { - return nil, err - } - return result[:n], nil - } result := make([]byte, e.buf.Len()) copy(result, e.buf.Bytes()) return result, nil @@ -163,15 +147,6 @@ func (d *eventDecoder) Decode() (publisher.Event, error) { } defer d.unfolder.Reset() - if d.useCompression { - out := make([]byte, 10*len(d.buf)) - n, err := lz4.UncompressBlock(d.buf, out) - if err != nil { - return publisher.Event{}, err - } - d.buf = out[:n] - } - if d.useJSON { err = d.jsonParser.Parse(d.buf) } else { diff --git a/libbeat/publisher/queue/diskqueue/serialize_test.go b/libbeat/publisher/queue/diskqueue/serialize_test.go index 3cc80c6f963..64cb3e3342b 100644 --- a/libbeat/publisher/queue/diskqueue/serialize_test.go +++ b/libbeat/publisher/queue/diskqueue/serialize_test.go @@ -30,20 +30,15 @@ import ( // A test to make sure serialization works correctly on multi-byte characters. func TestSerialize(t *testing.T) { testCases := []struct { - name string - value string - useCompression bool + name string + value string }{ {name: "Ascii only", value: "{\"name\": \"Momotaro\"}"}, {name: "Multi-byte", value: "{\"name\": \"桃太郎\"}"}, - {name: "Compressed Ascii only", value: "{\"name\": \"Momotaro\"}", useCompression: true}, - {name: "Compressed Multi-byte", value: "{\"name\": \"桃太郎\"}", useCompression: true}, - {name: "Compressed high repeat", value: "{\"name\": \"aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa\"}", useCompression: true}, } for _, test := range testCases { encoder := newEventEncoder() - encoder.useCompression = test.useCompression event := publisher.Event{ Content: beat.Event{ Fields: mapstr.M{ @@ -58,7 +53,6 @@ func TestSerialize(t *testing.T) { // Use decoder to decode the serialized bytes. decoder := newEventDecoder() - decoder.useCompression = (test.useCompression) buf := decoder.Buffer(len(serialized)) copy(buf, serialized) decoded, err := decoder.Decode()