diff --git a/NOTICE.txt b/NOTICE.txt
index c3cdb78e7d0..23789b3c138 100644
--- a/NOTICE.txt
+++ b/NOTICE.txt
@@ -13745,6 +13745,44 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
+--------------------------------------------------------------------------------
+Dependency : github.com/pierrec/lz4
+Version: v2.6.0+incompatible
+Licence type (autodetected): BSD-3-Clause
+--------------------------------------------------------------------------------
+
+Contents of probable licence file $GOMODCACHE/github.com/pierrec/lz4@v2.6.0+incompatible/LICENSE:
+
+Copyright (c) 2015, Pierre Curto
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+* Redistributions of source code must retain the above copyright notice, this
+ list of conditions and the following disclaimer.
+
+* Redistributions in binary form must reproduce the above copyright notice,
+ this list of conditions and the following disclaimer in the documentation
+ and/or other materials provided with the distribution.
+
+* Neither the name of xxHash nor the names of its
+ contributors may be used to endorse or promote products derived from
+ this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+
+
+
--------------------------------------------------------------------------------
Dependency : github.com/pierrre/gotestcover
Version: v0.0.0-20160517101806-924dca7d15f0
@@ -34505,44 +34543,6 @@ Contents of probable licence file $GOMODCACHE/github.com/oxtoacart/bpool@v0.0.0-
limitations under the License.
---------------------------------------------------------------------------------
-Dependency : github.com/pierrec/lz4
-Version: v2.6.0+incompatible
-Licence type (autodetected): BSD-3-Clause
---------------------------------------------------------------------------------
-
-Contents of probable licence file $GOMODCACHE/github.com/pierrec/lz4@v2.6.0+incompatible/LICENSE:
-
-Copyright (c) 2015, Pierre Curto
-All rights reserved.
-
-Redistribution and use in source and binary forms, with or without
-modification, are permitted provided that the following conditions are met:
-
-* Redistributions of source code must retain the above copyright notice, this
- list of conditions and the following disclaimer.
-
-* Redistributions in binary form must reproduce the above copyright notice,
- this list of conditions and the following disclaimer in the documentation
- and/or other materials provided with the distribution.
-
-* Neither the name of xxHash nor the names of its
- contributors may be used to endorse or promote products derived from
- this software without specific prior written permission.
-
-THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
-AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
-DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
-FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
-DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
-SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
-CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
-OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
-OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
-
-
-
--------------------------------------------------------------------------------
Dependency : github.com/power-devops/perfstat
Version: v0.0.0-20210106213030-5aafc221ea8c
diff --git a/go.mod b/go.mod
index 38c79f2ad87..5741d32703c 100644
--- a/go.mod
+++ b/go.mod
@@ -163,6 +163,7 @@ require (
github.com/elastic/elastic-agent-autodiscover v0.1.1
github.com/elastic/elastic-agent-libs v0.2.5
github.com/elastic/elastic-agent-system-metrics v0.3.1
+ github.com/pierrec/lz4 v2.6.0+incompatible
github.com/shirou/gopsutil/v3 v3.21.12
go.elastic.co/apm/module/apmelasticsearch/v2 v2.0.0
go.elastic.co/apm/module/apmhttp/v2 v2.0.0
@@ -262,7 +263,7 @@ require (
github.com/morikuni/aec v1.0.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.0.2 // indirect
- github.com/pierrec/lz4 v2.6.0+incompatible // indirect
+ github.com/pierrec/lz4/v4 v4.1.15-0.20220523073431-885e900f46f2 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/prometheus/client_golang v1.11.0 // indirect
github.com/samuel/go-parser v0.0.0-20130731160455-ca8abbf65d0e // indirect
@@ -316,6 +317,7 @@ replace (
github.com/google/gopacket => github.com/elastic/gopacket v1.1.20-0.20211202005954-d412fca7f83a
github.com/insomniacslk/dhcp => github.com/elastic/dhcp v0.0.0-20200227161230-57ec251c7eb3 // indirect
github.com/tonistiigi/fifo => github.com/containerd/fifo v0.0.0-20190816180239-bda0ff6ed73c
+ github.com/pierrec/lz4/v4 v4.1.15-0.20220523073431-885e900f46f2 => /Users/leehinman/src/pierrec_lz4
)
// Exclude this version because the version has an invalid checksum.
diff --git a/go.sum b/go.sum
index e8ac417dccc..7b96e7c228d 100644
--- a/go.sum
+++ b/go.sum
@@ -1401,6 +1401,8 @@ github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pierrec/lz4 v2.6.0+incompatible h1:Ix9yFKn1nSPBLFl/yZknTp8TU5G4Ps0JDmguYK6iH1A=
github.com/pierrec/lz4 v2.6.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
+github.com/pierrec/lz4/v4 v4.1.15-0.20220523073431-885e900f46f2 h1:rfcsVKBrjqnG3+q599T+JFw+wG94uL/ehfyQtUdav6U=
+github.com/pierrec/lz4/v4 v4.1.15-0.20220523073431-885e900f46f2/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pierrre/gotestcover v0.0.0-20160517101806-924dca7d15f0 h1:i5VIxp6QB8oWZ8IkK8zrDgeT6ORGIUeiN+61iETwJbI=
github.com/pierrre/gotestcover v0.0.0-20160517101806-924dca7d15f0/go.mod h1:4xpMLz7RBWyB+ElzHu8Llua96TRCB3YwX+l5EP1wmHk=
github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4/go.mod h1:4OwLy04Bl9Ef3GJJCoec+30X3LQs/0/m4HFRt/2LUSA=
diff --git a/libbeat/publisher/queue/diskqueue/acks_test.go b/libbeat/publisher/queue/diskqueue/acks_test.go
index bacdf6689d5..7a7f9e20ef5 100644
--- a/libbeat/publisher/queue/diskqueue/acks_test.go
+++ b/libbeat/publisher/queue/diskqueue/acks_test.go
@@ -66,7 +66,7 @@ func TestAddFrames(t *testing.T) {
rf(0, 0, true, 100),
},
frameID(1),
- &queuePosition{0, segmentHeaderSize + 100, 1},
+ &queuePosition{0, segmentHeaderSizeV1 + 100, 1},
nil,
},
{
@@ -85,7 +85,7 @@ func TestAddFrames(t *testing.T) {
rf(0, 1, false, 75),
},
frameID(2),
- &queuePosition{0, segmentHeaderSize + 175, 2},
+ &queuePosition{0, segmentHeaderSizeV1 + 175, 2},
nil,
},
{
@@ -94,7 +94,7 @@ func TestAddFrames(t *testing.T) {
rf(0, 2, false, 100),
},
frameID(5),
- &queuePosition{1, segmentHeaderSize + 150, 2},
+ &queuePosition{1, segmentHeaderSizeV1 + 150, 2},
// This time we crossed a boundary so we should get an ACK for segment
// 0 on the notification channel.
segmentIDRef(0),
@@ -122,7 +122,7 @@ func TestAddFrames(t *testing.T) {
rf(0, 0, true, 100),
},
frameID(2),
- &queuePosition{0, segmentHeaderSize + 150, 2},
+ &queuePosition{0, segmentHeaderSizeV1 + 150, 2},
nil,
},
{
@@ -131,7 +131,7 @@ func TestAddFrames(t *testing.T) {
rf(0, 2, false, 75),
},
frameID(3),
- &queuePosition{0, segmentHeaderSize + 225, 3},
+ &queuePosition{0, segmentHeaderSizeV1 + 225, 3},
nil,
},
{
@@ -140,7 +140,7 @@ func TestAddFrames(t *testing.T) {
rf(1, 3, false, 100),
},
frameID(7),
- &queuePosition{2, segmentHeaderSize + 100, 1},
+ &queuePosition{2, segmentHeaderSizeV1 + 100, 1},
segmentIDRef(1),
},
{
@@ -149,7 +149,7 @@ func TestAddFrames(t *testing.T) {
rf(2, 7, false, 100),
},
frameID(9),
- &queuePosition{2, segmentHeaderSize + 300, 3},
+ &queuePosition{2, segmentHeaderSizeV1 + 300, 3},
nil,
},
},
@@ -165,7 +165,7 @@ func TestAddFrames(t *testing.T) {
rf(0, 0, true, 100),
},
frameID(4),
- &queuePosition{3, segmentHeaderSize + 100, 1},
+ &queuePosition{3, segmentHeaderSizeV1 + 100, 1},
// We advanced from segment 0 to segment 3, so we expect
// segmentID 2 on the ACK channel.
segmentIDRef(2),
@@ -182,7 +182,7 @@ func TestAddFrames(t *testing.T) {
rf(10, 35, true, 100),
},
frameID(36),
- &queuePosition{10, segmentHeaderSize + 100, 1},
+ &queuePosition{10, segmentHeaderSizeV1 + 100, 1},
// We advanced to segment 10, so we expect segmentID 9 on
// the ACK channel.
segmentIDRef(9),
@@ -218,7 +218,7 @@ func TestAddFrames(t *testing.T) {
[]*readFrame{
{
segment: &queueSegment{
- schemaVersion: uint32Ref(0),
+ schemaVersion: 0,
},
bytesOnDisk: 100,
},
@@ -299,15 +299,11 @@ func (dqa *diskQueueACKs) assertACKedSegment(
}
}
-func uint32Ref(v uint32) *uint32 {
- return &v
-}
-
// rf assembles a readFrame with the given parameters and a spoofed
// queue segment, whose firstFrameID field is set to match the given frame
// if "first" is true.
func rf(seg segmentID, frame frameID, first bool, size uint64) *readFrame {
- s := &queueSegment{id: seg}
+ s := &queueSegment{id: seg, schemaVersion: 1}
if first {
s.firstFrameID = frame
}
diff --git a/libbeat/publisher/queue/diskqueue/benchmark_test.go b/libbeat/publisher/queue/diskqueue/benchmark_test.go
index 62d791f53b0..86c07a97db5 100644
--- a/libbeat/publisher/queue/diskqueue/benchmark_test.go
+++ b/libbeat/publisher/queue/diskqueue/benchmark_test.go
@@ -49,14 +49,47 @@ var (
)
//makeEvent creates a sample event, using a random message from msg above
-func makeEvent() publisher.Event {
- return publisher.Event{
- Content: beat.Event{
- Timestamp: eventTime,
- Fields: mapstr.M{
- "message": msgs[rand.Intn(len(msgs))],
+func makeEvent(kind string) publisher.Event {
+ switch kind {
+ // 81 bytes
+ case "small":
+ return publisher.Event{
+ Content: beat.Event{
+ Timestamp: eventTime,
+ Fields: mapstr.M{
+ "message": msgs[3],
+ },
},
- },
+ }
+ // 865 bytes
+ case "medium":
+ return publisher.Event{
+ Content: beat.Event{
+ Timestamp: eventTime,
+ Fields: mapstr.M{
+ "message": msgs[5],
+ },
+ },
+ }
+ // 2324 bytes
+ case "large":
+ return publisher.Event{
+ Content: beat.Event{
+ Timestamp: eventTime,
+ Fields: mapstr.M{
+ "message": msgs[2],
+ },
+ },
+ }
+ default:
+ return publisher.Event{
+ Content: beat.Event{
+ Timestamp: eventTime,
+ Fields: mapstr.M{
+ "message": msgs[rand.Intn(len(msgs))],
+ },
+ },
+ }
}
}
@@ -64,13 +97,15 @@ func makeEvent() publisher.Event {
// hold the queue. Location of the temporary directory is stored in
// the queue settings. Call `cleanup` when done with the queue to
// close the queue and remove the temp dir.
-func setup() (*diskQueue, queue.Producer) {
+func setup(schemaVersion int) (*diskQueue, queue.Producer) {
dir, err := os.MkdirTemp("", "benchmark")
if err != nil {
panic(err)
}
s := DefaultSettings()
s.Path = dir
+ s.SchemaVersion = uint32(schemaVersion)
+ s.EncryptionKey = []byte("testtesttesttest")
q, err := NewQueue(logp.NewLogger("benchmark"), s)
if err != nil {
os.RemoveAll(dir)
@@ -83,18 +118,23 @@ func setup() (*diskQueue, queue.Producer) {
//clean closes the queue and deletes the temporory directory that
// holds the queue.
func cleanup(q *diskQueue) {
- q.Close()
+ err := q.Close()
os.RemoveAll(q.settings.directoryPath())
+ if err != nil {
+ panic(err)
+ }
}
-//produceAndConsume does the interesting work. It generates events,
-// publishes them, consumes them, and ACKS them.
-func produceAndConsume(p queue.Producer, q *diskQueue, num_events int, batch_size int) error {
- go func() {
- for i := 0; i < num_events; i++ {
- p.Publish(makeEvent())
+func publishEvents(p queue.Producer, num int, kind string) {
+ for i := 0; i < num; i++ {
+ ok := p.Publish(makeEvent(kind))
+ if !ok {
+ panic("didn't publish")
}
- }()
+ }
+}
+
+func getAndAckEvents(q *diskQueue, num_events int, batch_size int) error {
var received int
for {
batch, err := q.Get(batch_size)
@@ -104,33 +144,66 @@ func produceAndConsume(p queue.Producer, q *diskQueue, num_events int, batch_siz
batch.ACK()
received = received + batch.Count()
if received == num_events {
- break
+ return nil
}
}
- return nil
+}
+
+//produceAndConsume generates and publishes events in a go routine, in
+// the main go routine it consumes and acks them. This interleaves
+// publish and consume.
+func produceAndConsume(p queue.Producer, q *diskQueue, num_events int, batch_size int, kind string) error {
+ go publishEvents(p, num_events, kind)
+ return getAndAckEvents(q, num_events, batch_size)
+}
+
+//produceThenConsume generates and publishes events, when all events
+// are published it consumes and acks them.
+func produceThenConsume(p queue.Producer, q *diskQueue, num_events int, batch_size int, kind string) error {
+ publishEvents(p, num_events, kind)
+ return getAndAckEvents(q, num_events, batch_size)
}
//benchmarkQueue is a wrapper for produceAndConsume, it tries to limit
// timers to just produceAndConsume
-func benchmarkQueue(num_events int, batch_size int, b *testing.B) {
- var err error
- rand.Seed(1)
- q, p := setup()
+func benchmarkQueue(num_events int, batch_size int, schemaVersion int, async bool, kind string, b *testing.B) {
b.ResetTimer()
+ var err error
+
for n := 0; n < b.N; n++ {
- if err = produceAndConsume(p, q, num_events, batch_size); err != nil {
- break
+ b.StopTimer()
+ rand.Seed(1)
+ q, p := setup(schemaVersion)
+ b.StartTimer()
+ if async {
+ if err = produceAndConsume(p, q, num_events, batch_size, kind); err != nil {
+ cleanup(q)
+ break
+ }
+ } else {
+ if err = produceThenConsume(p, q, num_events, batch_size, kind); err != nil {
+ cleanup(q)
+ break
+ }
}
+ cleanup(q)
}
- b.StopTimer()
- cleanup(q)
if err != nil {
b.Errorf("Error producing/consuming events: %v", err)
}
}
// Actual benchmark calls follow
-func Benchmark1M_10(b *testing.B) { benchmarkQueue(1000000, 10, b) }
-func Benchmark1M_100(b *testing.B) { benchmarkQueue(1000000, 100, b) }
-func Benchmark1M_1k(b *testing.B) { benchmarkQueue(1000000, 1000, b) }
-func Benchmark1M_10k(b *testing.B) { benchmarkQueue(1000000, 10000, b) }
+func BenchmarkV1Async1k(b *testing.B) { benchmarkQueue(1000, 10, 1, true, "random", b) }
+func BenchmarkV1Async1M(b *testing.B) { benchmarkQueue(1000000, 1000, 1, true, "random", b) }
+func BenchmarkV2Async1k(b *testing.B) { benchmarkQueue(1000, 10, 2, true, "random", b) }
+func BenchmarkV2Async1M(b *testing.B) { benchmarkQueue(1000000, 1000, 2, true, "random", b) }
+func BenchmarkV3Async1k(b *testing.B) { benchmarkQueue(1000, 10, 3, true, "random", b) }
+func BenchmarkV3Async1M(b *testing.B) { benchmarkQueue(1000000, 1000, 3, true, "random", b) }
+func BenchmarkV1Sync1k(b *testing.B) { benchmarkQueue(1000, 10, 1, false, "random", b) }
+func BenchmarkV1Sync1M(b *testing.B) { benchmarkQueue(1000000, 1000, 1, false, "random", b) }
+func BenchmarkV2Sync1k(b *testing.B) { benchmarkQueue(1000, 10, 2, false, "random", b) }
+func BenchmarkV2Sync1M(b *testing.B) { benchmarkQueue(1000000, 1000, 2, false, "random", b) }
+func BenchmarkV3Sync1k(b *testing.B) { benchmarkQueue(1000, 10, 3, false, "random", b) }
+func BenchmarkV3Sync1M(b *testing.B) { benchmarkQueue(1000000, 1000, 3, false, "random", b) }
+func BenchmarkV1SyncSize(b *testing.B) { benchmarkQueue(100000, 1000, 1, false, "medium", b) }
diff --git a/libbeat/publisher/queue/diskqueue/compression.go b/libbeat/publisher/queue/diskqueue/compression.go
new file mode 100644
index 00000000000..9dedc8e9b63
--- /dev/null
+++ b/libbeat/publisher/queue/diskqueue/compression.go
@@ -0,0 +1,86 @@
+// Licensed to Elasticsearch B.V. under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Elasticsearch B.V. licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package diskqueue
+
+import (
+ "io"
+
+ lz4V4 "github.com/pierrec/lz4/v4"
+)
+
+//CompressionReader allows reading a stream compressed with LZ4
+type CompressionReader struct {
+ src io.ReadCloser
+ pLZ4Reader *lz4V4.Reader
+}
+
+//NewCompressionReader returns a new LZ4 frame decoder
+func NewCompressionReader(r io.ReadCloser) *CompressionReader {
+ zr := lz4V4.NewReader(r)
+ return &CompressionReader{
+ src: r,
+ pLZ4Reader: zr,
+ }
+}
+
+func (r *CompressionReader) Read(buf []byte) (int, error) {
+ return r.pLZ4Reader.Read(buf)
+}
+
+func (r *CompressionReader) Close() error {
+ return r.src.Close()
+}
+
+//Reset Sets up compression again, assumes that caller has already set
+// the src to the correct position
+func (r *CompressionReader) Reset() error {
+ r.pLZ4Reader.Reset(r.src)
+ return nil
+}
+
+//CompressionWriter allows writing an LZ4 stream
+type CompressionWriter struct {
+ dst WriteCloseSyncer
+ pLZ4Writer *lz4V4.Writer
+}
+
+//NewCompressionWriter returns a new LZ4 frame encoder
+func NewCompressionWriter(w WriteCloseSyncer) *CompressionWriter {
+ zw := lz4V4.NewWriter(w)
+ return &CompressionWriter{
+ dst: w,
+ pLZ4Writer: zw,
+ }
+}
+
+func (w *CompressionWriter) Write(p []byte) (int, error) {
+ return w.pLZ4Writer.Write(p)
+}
+
+func (w *CompressionWriter) Close() error {
+ err := w.pLZ4Writer.Close()
+ if err != nil {
+ return err
+ }
+ return w.dst.Close()
+}
+
+func (w *CompressionWriter) Sync() error {
+ w.pLZ4Writer.Flush()
+ return w.dst.Sync()
+}
diff --git a/libbeat/publisher/queue/diskqueue/compression_test.go b/libbeat/publisher/queue/diskqueue/compression_test.go
new file mode 100644
index 00000000000..5419317d71b
--- /dev/null
+++ b/libbeat/publisher/queue/diskqueue/compression_test.go
@@ -0,0 +1,166 @@
+// Licensed to Elasticsearch B.V. under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Elasticsearch B.V. licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package diskqueue
+
+import (
+ "bytes"
+ "io"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func NopWriteCloseSyncer(w io.WriteCloser) WriteCloseSyncer {
+ return nopWriteCloseSyncer{w}
+}
+
+type nopWriteCloseSyncer struct {
+ io.WriteCloser
+}
+
+func (nopWriteCloseSyncer) Sync() error { return nil }
+
+type nopWriteCloser struct {
+ io.Writer
+}
+
+func NopWriteCloser(w io.Writer) io.WriteCloser {
+ return nopWriteCloser{w}
+}
+func (nopWriteCloser) Close() error { return nil }
+
+func TestCompressionReader(t *testing.T) {
+ tests := map[string]struct {
+ plaintext []byte
+ compressed []byte
+ }{
+ "abc 1.9.3 lz4": {
+ plaintext: []byte("abc"),
+ compressed: []byte{
+ 0x04, 0x22, 0x4d, 0x18,
+ 0x64, 0x40, 0xa7, 0x04,
+ 0x00, 0x00, 0x80, 0x61,
+ 0x62, 0x63, 0x0a, 0x00,
+ 0x00, 0x00, 0x00, 0x6c,
+ 0x3e, 0x7b, 0x08, 0x00},
+ },
+ "abc pierrec lz4": {
+ plaintext: []byte("abc"),
+ compressed: []byte{
+ 0x04, 0x22, 0x4d, 0x18,
+ 0x64, 0x70, 0xb9, 0x03,
+ 0x00, 0x00, 0x80, 0x61,
+ 0x62, 0x63, 0x00, 0x00,
+ 0x00, 0x00, 0xff, 0x53,
+ 0xd1, 0x32},
+ },
+ }
+
+ for name, tc := range tests {
+ dst := make([]byte, len(tc.plaintext))
+ src := bytes.NewReader(tc.compressed)
+ cr := NewCompressionReader(io.NopCloser(src))
+ n, err := cr.Read(dst)
+ assert.Nil(t, err, name)
+ assert.Equal(t, len(tc.plaintext), n, name)
+ assert.Equal(t, tc.plaintext, dst, name)
+ }
+}
+
+func TestCompressionWriter(t *testing.T) {
+ tests := map[string]struct {
+ plaintext []byte
+ compressed []byte
+ }{
+ "abc pierrec lz4": {
+ plaintext: []byte("abc"),
+ compressed: []byte{
+ 0x04, 0x22, 0x4d, 0x18,
+ 0x64, 0x70, 0xb9, 0x03,
+ 0x00, 0x00, 0x80, 0x61,
+ 0x62, 0x63, 0x00, 0x00,
+ 0x00, 0x00, 0xff, 0x53,
+ 0xd1, 0x32},
+ },
+ }
+
+ for name, tc := range tests {
+ var dst bytes.Buffer
+ cw := NewCompressionWriter(NopWriteCloseSyncer(NopWriteCloser(&dst)))
+ n, err := cw.Write(tc.plaintext)
+ cw.Close()
+ assert.Nil(t, err, name)
+ assert.Equal(t, len(tc.plaintext), n, name)
+ assert.Equal(t, tc.compressed, dst.Bytes(), name)
+ }
+}
+
+func TestCompressionRoundTrip(t *testing.T) {
+ tests := map[string]struct {
+ plaintext []byte
+ }{
+ "no repeat": {plaintext: []byte("abcdefghijklmnopqrstuvwxzy01234567890ABCDEFGHIJKLMNOPQRSTUVWXYZ")},
+ "256 repeat": {plaintext: []byte("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")},
+ }
+ for name, tc := range tests {
+ pr, pw := io.Pipe()
+ src := bytes.NewReader(tc.plaintext)
+ var dst bytes.Buffer
+
+ go func() {
+ cw := NewCompressionWriter(NopWriteCloseSyncer(pw))
+ _, err := io.Copy(cw, src)
+ assert.Nil(t, err, name)
+ cw.Close()
+ }()
+
+ cr := NewCompressionReader(pr)
+ _, err := io.Copy(&dst, cr)
+ assert.Nil(t, err, name)
+ assert.Equal(t, tc.plaintext, dst.Bytes(), name)
+ }
+}
+
+func TestCompressionSync(t *testing.T) {
+ tests := map[string]struct {
+ plaintext []byte
+ }{
+ "no repeat": {plaintext: []byte("abcdefghijklmnopqrstuvwxzy01234567890ABCDEFGHIJKLMNOPQRSTUVWXYZ")},
+ "256 repeat": {plaintext: []byte("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")},
+ }
+ for name, tc := range tests {
+ pr, pw := io.Pipe()
+ var dst bytes.Buffer
+ go func() {
+ cw := NewCompressionWriter(NopWriteCloseSyncer(pw))
+ src1 := bytes.NewReader(tc.plaintext)
+ _, err := io.Copy(cw, src1)
+ assert.Nil(t, err, name)
+ err = cw.Sync()
+ src2 := bytes.NewReader(tc.plaintext)
+ _, err = io.Copy(cw, src2)
+ assert.Nil(t, err, name)
+ cw.Close()
+ }()
+ cr := NewCompressionReader(pr)
+ _, err := io.Copy(&dst, cr)
+ assert.Nil(t, err, name)
+ assert.Equal(t, tc.plaintext, dst.Bytes()[:len(tc.plaintext)], name)
+ assert.Equal(t, tc.plaintext, dst.Bytes()[len(tc.plaintext):], name)
+ }
+}
diff --git a/libbeat/publisher/queue/diskqueue/config.go b/libbeat/publisher/queue/diskqueue/config.go
index 009af69712b..c8fc42ba692 100644
--- a/libbeat/publisher/queue/diskqueue/config.go
+++ b/libbeat/publisher/queue/diskqueue/config.go
@@ -29,6 +29,9 @@ import (
"github.com/elastic/elastic-agent-libs/paths"
)
+//defaultSchemaVersion specifies which on disk schema (0,1,2) is the default.
+const defaultSchemaVersion = 1
+
// Settings contains the configuration fields to create a new disk queue
// or open an existing one.
type Settings struct {
@@ -68,6 +71,13 @@ type Settings struct {
// use exponential backoff up to the specified limit.
RetryInterval time.Duration
MaxRetryInterval time.Duration
+
+ // Schema Version specifies which on-disk format, serialization, and encryption to use.
+ // 0, 1, 2, or 3 are valid options
+ SchemaVersion uint32
+
+ // EncryptionKey is used to encrypt data if SchemaVersion 2 is used.
+ EncryptionKey []byte
}
// userConfig holds the parameters for a disk queue that are configurable
@@ -129,6 +139,8 @@ func DefaultSettings() Settings {
RetryInterval: 1 * time.Second,
MaxRetryInterval: 30 * time.Second,
+
+ SchemaVersion: defaultSchemaVersion,
}
}
@@ -192,7 +204,18 @@ func (settings Settings) segmentPath(segmentID segmentID) string {
// maxValidFrameSize returns the size of the largest possible frame that
// can be stored with the current queue settings.
func (settings Settings) maxValidFrameSize() uint64 {
- return settings.MaxSegmentSize - segmentHeaderSize
+ switch settings.SchemaVersion {
+ case 0:
+ return settings.MaxSegmentSize - segmentHeaderSizeV0
+ case 1:
+ return settings.MaxSegmentSize - segmentHeaderSizeV1
+ case 2:
+ return settings.MaxSegmentSize - segmentHeaderSizeV2
+ case 3:
+ return settings.MaxSegmentSize - segmentHeaderSizeV3
+ default:
+ return uint64(0)
+ }
}
// Given a retry interval, nextRetryInterval returns the next higher level
diff --git a/libbeat/publisher/queue/diskqueue/core_loop.go b/libbeat/publisher/queue/diskqueue/core_loop.go
index fd7e710372a..7499d0247a7 100644
--- a/libbeat/publisher/queue/diskqueue/core_loop.go
+++ b/libbeat/publisher/queue/diskqueue/core_loop.go
@@ -277,7 +277,7 @@ func (dq *diskQueue) handleShutdown() {
dq.acks.lock.Lock()
finalPosition := dq.acks.nextPosition
// We won't be updating the position anymore, so we can close the file.
- dq.acks.positionFile.Sync()
+ dq.acks.positionFile.Sync() //nolint:errcheck //No error recovery path
dq.acks.positionFile.Close()
dq.acks.lock.Unlock()
@@ -436,7 +436,16 @@ func (dq *diskQueue) enqueueWriteFrame(frame *writeFrame) {
dq.segments.nextID++
// Reset the on-disk size to its initial value, the file's header size
// with no frame data.
- newSegmentSize = segmentHeaderSize
+ switch dq.settings.SchemaVersion {
+ case 0:
+ newSegmentSize = segmentHeaderSizeV0
+ case 1:
+ newSegmentSize = segmentHeaderSizeV1
+ case 2:
+ newSegmentSize = segmentHeaderSizeV2
+ case 3:
+ newSegmentSize = segmentHeaderSizeV3
+ }
}
dq.segments.writingSegmentSize = newSegmentSize
diff --git a/libbeat/publisher/queue/diskqueue/core_loop_test.go b/libbeat/publisher/queue/diskqueue/core_loop_test.go
index 27f535b67c3..ab7ff5b33a5 100644
--- a/libbeat/publisher/queue/diskqueue/core_loop_test.go
+++ b/libbeat/publisher/queue/diskqueue/core_loop_test.go
@@ -122,7 +122,7 @@ func TestHandleProducerWriteRequest(t *testing.T) {
}
settings := DefaultSettings()
- settings.MaxSegmentSize = 1000 + segmentHeaderSize
+ settings.MaxSegmentSize = 1000 + segmentHeaderSizeV1
settings.MaxBufferSize = 10000
for description, test := range testCases {
dq := &diskQueue{
@@ -480,7 +480,7 @@ func TestMaybeReadPending(t *testing.T) {
"read one full segment": {
segments: diskQueueSegments{
reading: []*queueSegment{
- {id: 1, byteCount: 1000},
+ {id: 1, byteCount: 1000, schemaVersion: 1},
},
// The next read request should start with frame 5
nextReadFrameID: 5,
@@ -525,10 +525,10 @@ func TestMaybeReadPending(t *testing.T) {
"ignore writing segments if reading is available": {
segments: diskQueueSegments{
reading: []*queueSegment{
- {id: 1, byteCount: 1000},
+ {id: 1, byteCount: 1000, schemaVersion: 1},
},
writing: []*queueSegment{
- {id: 2, byteCount: 1000},
+ {id: 2, byteCount: 1000, schemaVersion: 1},
},
},
expectedRequest: &readerLoopRequest{
@@ -566,8 +566,8 @@ func TestMaybeReadPending(t *testing.T) {
"skip the first reading segment if it's already been fully read": {
segments: diskQueueSegments{
reading: []*queueSegment{
- {id: 1, byteCount: 1000},
- {id: 2, byteCount: 500},
+ {id: 1, byteCount: 1000, schemaVersion: 1},
+ {id: 2, byteCount: 500, schemaVersion: 1},
},
nextReadPosition: 1000,
},
@@ -594,7 +594,7 @@ func TestMaybeReadPending(t *testing.T) {
{
id: 1,
byteCount: 1000,
- schemaVersion: makeUint32Ptr(0)},
+ schemaVersion: 0},
},
// The next read request should start with frame 5
nextReadFrameID: 5,
@@ -969,12 +969,8 @@ func makeWriteFrameWithSize(size int) *writeFrame {
return &writeFrame{serialized: make([]byte, size-frameMetadataSize)}
}
-func makeUint32Ptr(value uint32) *uint32 {
- return &value
-}
-
func segmentWithSize(size int) *queueSegment {
- if size < segmentHeaderSize {
+ if size < segmentHeaderSizeV1 {
// Can't have a segment smaller than the segment header
return nil
}
diff --git a/libbeat/publisher/queue/diskqueue/docs/Makefile b/libbeat/publisher/queue/diskqueue/docs/Makefile
new file mode 100644
index 00000000000..bf7a34dd926
--- /dev/null
+++ b/libbeat/publisher/queue/diskqueue/docs/Makefile
@@ -0,0 +1,9 @@
+all : schemaV0.svg frameV0.svg schemaV1.svg frameV1.svg schemaV2.svg frameV2.svg schemaV3.svg frameV3.svg
+
+.PHONY : clean
+
+%.svg : %.pic
+ pikchr --svg-only $< > $@
+
+clean :
+ -rm *.svg
diff --git a/libbeat/publisher/queue/diskqueue/docs/frameV0.pic b/libbeat/publisher/queue/diskqueue/docs/frameV0.pic
new file mode 100644
index 00000000000..0b2ddb7249f
--- /dev/null
+++ b/libbeat/publisher/queue/diskqueue/docs/frameV0.pic
@@ -0,0 +1,9 @@
+boxht = 0.25
+down
+box "size (uint32)" wid 4;
+down
+box "JSON serialized data" dashed wid 4 ht 2;
+down
+box "checksum (uint32)" wid 4;
+down
+box "size (uint32)" wid 4;
\ No newline at end of file
diff --git a/libbeat/publisher/queue/diskqueue/docs/frameV0.svg b/libbeat/publisher/queue/diskqueue/docs/frameV0.svg
new file mode 100644
index 00000000000..1696bb91a4a
--- /dev/null
+++ b/libbeat/publisher/queue/diskqueue/docs/frameV0.svg
@@ -0,0 +1,11 @@
+
+
diff --git a/libbeat/publisher/queue/diskqueue/docs/frameV1.pic b/libbeat/publisher/queue/diskqueue/docs/frameV1.pic
new file mode 100644
index 00000000000..58e956b7ddd
--- /dev/null
+++ b/libbeat/publisher/queue/diskqueue/docs/frameV1.pic
@@ -0,0 +1,9 @@
+boxht = 0.25
+down
+box "size (uint32)" wid 4;
+down
+box "CBOR serialized data" dashed wid 4 ht 2;
+down
+box "checksum (uint32)" wid 4;
+down
+box "size (uint32)" wid 4;
\ No newline at end of file
diff --git a/libbeat/publisher/queue/diskqueue/docs/frameV1.svg b/libbeat/publisher/queue/diskqueue/docs/frameV1.svg
new file mode 100644
index 00000000000..8317e4b93e8
--- /dev/null
+++ b/libbeat/publisher/queue/diskqueue/docs/frameV1.svg
@@ -0,0 +1,11 @@
+
+
diff --git a/libbeat/publisher/queue/diskqueue/docs/frameV2.pic b/libbeat/publisher/queue/diskqueue/docs/frameV2.pic
new file mode 100644
index 00000000000..aac18259952
--- /dev/null
+++ b/libbeat/publisher/queue/diskqueue/docs/frameV2.pic
@@ -0,0 +1,5 @@
+boxht = 0.25
+SIZE1: box "size (uint32)" wid 4;
+DATA: box "CBOR serialized data" dashed wid 4 ht 2 with .nw at SIZE1.sw;
+CHECKSUM: box "checksum (uint32)" wid 4 with .nw at DATA.sw;
+SIZE2: box "size (uint32)" wid 4 with nw at CHECKSUM.sw;
\ No newline at end of file
diff --git a/libbeat/publisher/queue/diskqueue/docs/frameV2.svg b/libbeat/publisher/queue/diskqueue/docs/frameV2.svg
new file mode 100644
index 00000000000..8317e4b93e8
--- /dev/null
+++ b/libbeat/publisher/queue/diskqueue/docs/frameV2.svg
@@ -0,0 +1,11 @@
+
+
diff --git a/libbeat/publisher/queue/diskqueue/docs/frameV3.pic b/libbeat/publisher/queue/diskqueue/docs/frameV3.pic
new file mode 100644
index 00000000000..3e340d09071
--- /dev/null
+++ b/libbeat/publisher/queue/diskqueue/docs/frameV3.pic
@@ -0,0 +1,5 @@
+boxht = 0.25
+SIZE1: box "size (uint32)" wid 4;
+DATA: box "LZ4 compressed CBOR serialized data" dashed wid 4 ht 2 with .nw at SIZE1.sw;
+CHECKSUM: box "checksum (uint32)" wid 4 with .nw at DATA.sw;
+SIZE2: box "size (uint32)" wid 4 with nw at CHECKSUM.sw;
\ No newline at end of file
diff --git a/libbeat/publisher/queue/diskqueue/docs/frameV3.svg b/libbeat/publisher/queue/diskqueue/docs/frameV3.svg
new file mode 100644
index 00000000000..8cf04362b72
--- /dev/null
+++ b/libbeat/publisher/queue/diskqueue/docs/frameV3.svg
@@ -0,0 +1,11 @@
+
+
diff --git a/libbeat/publisher/queue/diskqueue/docs/on-disk-structures.md b/libbeat/publisher/queue/diskqueue/docs/on-disk-structures.md
new file mode 100644
index 00000000000..55878906bbe
--- /dev/null
+++ b/libbeat/publisher/queue/diskqueue/docs/on-disk-structures.md
@@ -0,0 +1,97 @@
+# Disk Queue On Disk Structures
+
+The disk queue is a directory on disk that contains files. Each
+file is called a segment. The name of the file is the segment id in
+base 10 with the ".seg" suffix. For example: "42.seg". Each segment
+contains multiple frames. Each frame contains one event.
+
+There are currently 3 versions of the disk queue, and the current code
+base is able to write versions 1 & 2, while it is able to read version
+0, 1, and 2.
+
+## Version 0
+
+In version 0, the segments are made up of a header, followed by
+frames. The header contains one field which is an unsigned 32-bit
+integer in little-endian byte order, which signifies the version number.
+
+![Segment Schema Version 0](./schemaV0.svg)
+
+The frames for version 0, consist of a header, followed by the
+serialized event and a footer. The header contains one field which is
+the size of the frame, which is an unsigned 32-bit integer in
+little-endian byte order. The serialization format is JSON. The
+footer contains 2 fields, the first of which is a checksum which is an
+unsigned 32-bit integer in little-endian format, followed by a repeat
+of the size from the header.
+
+![Frame Version 0](./frameV0.svg)
+
+## Version 1
+
+In version 1, the segments are made up of a header, followed by
+frames. The header contains two fields. The first field in the
+version number, which is an unsigned 32-bit integer in little-endian
+format. The second field is a count of the number of frames in the
+segment, which is an unsigned 32-bit integer in little-endian format.
+
+![Segment Schema Version 1](./schemaV1.svg)
+
+The frames for version 1, consist of a header, followed by the
+serialized event and a footer. The header contains one field which is
+the size of the frame, which is an unsigned 32-bit integer in
+little-endian format. The serialization format is CBOR. The footer
+contains 2 fields, the first of which is a checksum which is an
+unsigned 32-bit integer in little-endian format, followed by a repeat
+of the size from the header.
+
+![Frame Version 1](./frameV1.svg)
+
+## Version 2
+
+In version 2, encryption is added to version 1. The
+segments are made of a header followed by an initialization vector,
+and then encrypted frames. The header consists of one field, the
+version number which is an unsigned 32-bit integer in little-endian
+format. The initialization vector is 128-bits in length. The count
+was dropped from version 1 for 2 reasons. The first, if it was
+outside the encrypted portion of the segment then it would be easy for
+an attacker to modify. The second, is that adding it to the encrypted
+segment in a meaningful way was problematic. The count is not known
+until the last frame is written. With encryption you cannot seek to
+the beginning of the segment and update the value. Adding the count
+to the end is less useful because you have to decrypt the entire
+segment before it can be read.
+
+![Segment Schema Version 2](./schemaV2.svg)
+
+The frames for version 2, consist of a header, followed by the
+serialized event and a footer. The header contains one field which is
+the size of the frame, which is an unsigned 32-bit integer in
+little-endian format. The serialization format is CBOR. The footer
+contains 2 fields, the first of which is a checksum which is an
+unsigned 32-bit integer in little-endian format, followed by a repeat
+of the size from the header. This is the same as version 1.
+
+![Frame Version 2](./frameV2.svg)
+
+## Version 3
+
+In version 2, compression is added to version 2. The
+segments are made of a header followed by an initialization vector,
+and then encrypted frames. The header consists of one field, the
+version number which is an unsigned 32-bit integer in little-endian
+format. The initialization vector is 128-bits in length.
+
+![Segment Schema Version 3](./schemaV3.svg)
+
+The frames for version 2, consist of a header, followed by the
+compressed serialized event and a footer. The header contains one
+field which is the size of the frame, which is an unsigned 32-bit
+integer in little-endian format. The compression is LZ4 with fast
+compression. The serialization format is CBOR. The footer contains 2
+fields, the first of which is a checksum which is an unsigned 32-bit
+integer in little-endian format, followed by a repeat of the size from
+the header.
+
+![Frame Version 3](./frameV3.svg)
diff --git a/libbeat/publisher/queue/diskqueue/docs/schemaV0.pic b/libbeat/publisher/queue/diskqueue/docs/schemaV0.pic
new file mode 100644
index 00000000000..99cd63b0cc9
--- /dev/null
+++ b/libbeat/publisher/queue/diskqueue/docs/schemaV0.pic
@@ -0,0 +1,9 @@
+boxht = 0.25
+down;
+box "version (uint32)" wid 4;
+down;
+box "frame 0" wid 4 ht 2;
+down;
+box "..." dashed wid 4;
+down;
+box "frame n" wid 4 ht 2;
\ No newline at end of file
diff --git a/libbeat/publisher/queue/diskqueue/docs/schemaV0.svg b/libbeat/publisher/queue/diskqueue/docs/schemaV0.svg
new file mode 100644
index 00000000000..de9885d9d9a
--- /dev/null
+++ b/libbeat/publisher/queue/diskqueue/docs/schemaV0.svg
@@ -0,0 +1,11 @@
+
+
diff --git a/libbeat/publisher/queue/diskqueue/docs/schemaV1.pic b/libbeat/publisher/queue/diskqueue/docs/schemaV1.pic
new file mode 100644
index 00000000000..34dd086d37f
--- /dev/null
+++ b/libbeat/publisher/queue/diskqueue/docs/schemaV1.pic
@@ -0,0 +1,11 @@
+boxht = 0.25
+down;
+box "version (uint32)" wid 4;
+down;
+box "count (uint32)" wid 4;
+down;
+box "frame 0" wid 4 ht 2;
+down;
+box "..." dashed wid 4;
+down;
+box "frame (count - 1)" wid 4 ht 2;
\ No newline at end of file
diff --git a/libbeat/publisher/queue/diskqueue/docs/schemaV1.svg b/libbeat/publisher/queue/diskqueue/docs/schemaV1.svg
new file mode 100644
index 00000000000..0070fe6e5c9
--- /dev/null
+++ b/libbeat/publisher/queue/diskqueue/docs/schemaV1.svg
@@ -0,0 +1,13 @@
+
+
diff --git a/libbeat/publisher/queue/diskqueue/docs/schemaV2.pic b/libbeat/publisher/queue/diskqueue/docs/schemaV2.pic
new file mode 100644
index 00000000000..006ae4bbf7b
--- /dev/null
+++ b/libbeat/publisher/queue/diskqueue/docs/schemaV2.pic
@@ -0,0 +1,4 @@
+boxht = 0.25
+VERSION: box "version (uint32)" wid 4;
+IV: box "initialization vector (128 bits)" wid 4 ht 1 with .nw at VERSION.sw
+FRAME: box "Encrypted Frames" dashed wid 4 ht 2 with .nw at IV.sw;
\ No newline at end of file
diff --git a/libbeat/publisher/queue/diskqueue/docs/schemaV2.svg b/libbeat/publisher/queue/diskqueue/docs/schemaV2.svg
new file mode 100644
index 00000000000..33ec757bed3
--- /dev/null
+++ b/libbeat/publisher/queue/diskqueue/docs/schemaV2.svg
@@ -0,0 +1,9 @@
+
+
diff --git a/libbeat/publisher/queue/diskqueue/docs/schemaV3.pic b/libbeat/publisher/queue/diskqueue/docs/schemaV3.pic
new file mode 100644
index 00000000000..006ae4bbf7b
--- /dev/null
+++ b/libbeat/publisher/queue/diskqueue/docs/schemaV3.pic
@@ -0,0 +1,4 @@
+boxht = 0.25
+VERSION: box "version (uint32)" wid 4;
+IV: box "initialization vector (128 bits)" wid 4 ht 1 with .nw at VERSION.sw
+FRAME: box "Encrypted Frames" dashed wid 4 ht 2 with .nw at IV.sw;
\ No newline at end of file
diff --git a/libbeat/publisher/queue/diskqueue/docs/schemaV3.svg b/libbeat/publisher/queue/diskqueue/docs/schemaV3.svg
new file mode 100644
index 00000000000..33ec757bed3
--- /dev/null
+++ b/libbeat/publisher/queue/diskqueue/docs/schemaV3.svg
@@ -0,0 +1,9 @@
+
+
diff --git a/libbeat/publisher/queue/diskqueue/enc_compress_test.go b/libbeat/publisher/queue/diskqueue/enc_compress_test.go
new file mode 100644
index 00000000000..637765c24cb
--- /dev/null
+++ b/libbeat/publisher/queue/diskqueue/enc_compress_test.go
@@ -0,0 +1,76 @@
+// Licensed to Elasticsearch B.V. under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Elasticsearch B.V. licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package diskqueue
+
+import (
+ "bytes"
+ "io"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestEncryptionCompressionRoundTrip(t *testing.T) {
+ tests := map[string]struct {
+ plaintext []byte
+ }{
+ "1 rune": {plaintext: []byte("a")},
+ "16 runes": {plaintext: []byte("bbbbbbbbbbbbbbbb")},
+ "17 runes": {plaintext: []byte("ccccccccccccccccc")},
+ "small json": {plaintext: []byte("{\"message\":\"2 123456789010 eni-1235b8ca123456789 - - - - - - - 1431280876 1431280934 - NODATA\"}")},
+ "large json": {plaintext: []byte("{\"message\":\"{\\\"CacheCacheStatus\\\":\\\"hit\\\",\\\"CacheResponseBytes\\\":26888,\\\"CacheResponseStatus\\\":200,\\\"CacheTieredFill\\\":true,\\\"ClientASN\\\":1136,\\\"ClientCountry\\\":\\\"nl\\\",\\\"ClientDeviceType\\\":\\\"desktop\\\",\\\"ClientIP\\\":\\\"89.160.20.156\\\",\\\"ClientIPClass\\\":\\\"noRecord\\\",\\\"ClientRequestBytes\\\":5324,\\\"ClientRequestHost\\\":\\\"eqlplayground.io\\\",\\\"ClientRequestMethod\\\":\\\"GET\\\",\\\"ClientRequestPath\\\":\\\"/40865/bundles/plugin/securitySolution/8.0.0/securitySolution.chunk.9.js\\\",\\\"ClientRequestProtocol\\\":\\\"HTTP/1.1\\\",\\\"ClientRequestReferer\\\":\\\"https://eqlplayground.io/s/eqldemo/app/security/timelines/default?sourcerer=(default:!(.siem-signals-eqldemo))&timerange=(global:(linkTo:!(),timerange:(from:%272021-03-03T19:55:15.519Z%27,fromStr:now-24h,kind:relative,to:%272021-03-04T19:55:15.519Z%27,toStr:now)),timeline:(linkTo:!(),timerange:(from:%272020-03-04T19:55:28.684Z%27,fromStr:now-1y,kind:relative,to:%272021-03-04T19:55:28.692Z%27,toStr:now)))&timeline=(activeTab:eql,graphEventId:%27%27,id:%2769f93840-7d23-11eb-866c-79a0609409ba%27,isOpen:!t)\\\",\\\"ClientRequestURI\\\":\\\"/40865/bundles/plugin/securitySolution/8.0.0/securitySolution.chunk.9.js\\\",\\\"ClientRequestUserAgent\\\":\\\"Mozilla/5.0(WindowsNT10.0;Win64;x64)AppleWebKit/537.36(KHTML,likeGecko)Chrome/91.0.4472.124Safari/537.36\\\",\\\"ClientSSLCipher\\\":\\\"NONE\\\",\\\"ClientSSLProtocol\\\":\\\"none\\\",\\\"ClientSrcPort\\\":0,\\\"ClientXRequestedWith\\\":\\\"\\\",\\\"EdgeColoCode\\\":\\\"33.147.138.217\\\",\\\"EdgeColoID\\\":20,\\\"EdgeEndTimestamp\\\":1625752958875000000,\\\"EdgePathingOp\\\":\\\"wl\\\",\\\"EdgePathingSrc\\\":\\\"macro\\\",\\\"EdgePathingStatus\\\":\\\"nr\\\",\\\"EdgeRateLimitAction\\\":\\\"\\\",\\\"EdgeRateLimitID\\\":0,\\\"EdgeRequestHost\\\":\\\"eqlplayground.io\\\",\\\"EdgeResponseBytes\\\":24743,\\\"EdgeResponseCompressionRatio\\\":0,\\\"EdgeResponseContentType\\\":\\\"application/javascript\\\",\\\"EdgeResponseStatus\\\":200,\\\"EdgeServerIP\\\":\\\"89.160.20.156\\\",\\\"EdgeStartTimestamp\\\":1625752958812000000,\\\"FirewallMatchesActions\\\":[],\\\"FirewallMatchesRuleIDs\\\":[],\\\"FirewallMatchesSources\\\":[],\\\"OriginIP\\\":\\\"\\\",\\\"OriginResponseBytes\\\":0,\\\"OriginResponseHTTPExpires\\\":\\\"\\\",\\\"OriginResponseHTTPLastModified\\\":\\\"\\\",\\\"OriginResponseStatus\\\":0,\\\"OriginResponseTime\\\":0,\\\"OriginSSLProtocol\\\":\\\"unknown\\\",\\\"ParentRayID\\\":\\\"66b9d9f88b5b4c4f\\\",\\\"RayID\\\":\\\"66b9d9f890ae4c4f\\\",\\\"SecurityLevel\\\":\\\"off\\\",\\\"WAFAction\\\":\\\"unknown\\\",\\\"WAFFlags\\\":\\\"0\\\",\\\"WAFMatchedVar\\\":\\\"\\\",\\\"WAFProfile\\\":\\\"unknown\\\",\\\"WAFRuleID\\\":\\\"\\\",\\\"WAFRuleMessage\\\":\\\"\\\",\\\"WorkerCPUTime\\\":0,\\\"WorkerStatus\\\":\\\"unknown\\\",\\\"WorkerSubrequest\\\":true,\\\"WorkerSubrequestCount\\\":0,\\\"ZoneID\\\":393347122}\"}")},
+ }
+
+ for name, tc := range tests {
+ pr, pw := io.Pipe()
+ key := []byte("keykeykeykeykeyk")
+ src := bytes.NewReader(tc.plaintext)
+ var dst bytes.Buffer
+ var tEncBuf bytes.Buffer
+ var tCompBuf bytes.Buffer
+
+ go func() {
+ ew, err := NewEncryptionWriter(NopWriteCloseSyncer(pw), key)
+ assert.Nil(t, err, name)
+ cw := NewCompressionWriter(ew)
+ _, err = io.Copy(cw, src)
+ assert.Nil(t, err, name)
+ err = cw.Close()
+ assert.Nil(t, err, name)
+ }()
+
+ ter := io.TeeReader(pr, &tEncBuf)
+ er, err := NewEncryptionReader(io.NopCloser(ter), key)
+ assert.Nil(t, err, name)
+
+ tcr := io.TeeReader(er, &tCompBuf)
+
+ cr := NewCompressionReader(io.NopCloser(tcr))
+
+ _, err = io.Copy(&dst, cr)
+ assert.Nil(t, err, name)
+ // Check round trip worked
+ assert.Equal(t, tc.plaintext, dst.Bytes(), name)
+ // Check that cipher text and plaintext don't match
+ assert.NotEqual(t, tc.plaintext, tEncBuf.Bytes(), name)
+ // Check that compressed text and plaintext don't match
+ assert.NotEqual(t, tc.plaintext, tCompBuf.Bytes(), name)
+ // Check that compressed text and ciphertext don't match
+ assert.NotEqual(t, tEncBuf.Bytes(), tCompBuf.Bytes(), name)
+ }
+}
diff --git a/libbeat/publisher/queue/diskqueue/encryption.go b/libbeat/publisher/queue/diskqueue/encryption.go
new file mode 100644
index 00000000000..ecabfc06f16
--- /dev/null
+++ b/libbeat/publisher/queue/diskqueue/encryption.go
@@ -0,0 +1,156 @@
+// Licensed to Elasticsearch B.V. under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Elasticsearch B.V. licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package diskqueue
+
+import (
+ "bytes"
+ "crypto/aes"
+ "crypto/cipher"
+ "crypto/rand"
+ "fmt"
+ "io"
+)
+
+const (
+ // KeySize is 128-bit
+ KeySize = 16
+)
+
+//EncryptionReader allows reading from a AES-128-CTR stream
+type EncryptionReader struct {
+ src io.ReadCloser
+ stream cipher.Stream
+ block cipher.Block
+ iv []byte
+}
+
+//NewEncryptionReader returns a new AES-128-CTR decrypter
+func NewEncryptionReader(r io.ReadCloser, key []byte) (*EncryptionReader, error) {
+ if len(key) != KeySize {
+ return nil, fmt.Errorf("key must be %d bytes long", KeySize)
+ }
+
+ er := &EncryptionReader{}
+ er.src = r
+
+ // turn key into block & save
+ block, err := aes.NewCipher(key)
+ if err != nil {
+ return nil, err
+ }
+ er.block = block
+
+ // read IV from the io.ReadCloser
+ iv := make([]byte, aes.BlockSize)
+ if _, err := io.ReadFull(er.src, iv); err != nil {
+ return nil, err
+ }
+ er.iv = iv
+
+ // create Stream
+ er.stream = cipher.NewCTR(block, iv)
+
+ return er, nil
+}
+
+func (er *EncryptionReader) Read(buf []byte) (int, error) {
+ ciphertext := make([]byte, len(buf))
+ n, err := er.src.Read(ciphertext)
+ if err != nil {
+ return n, err
+ }
+ er.stream.XORKeyStream(buf, ciphertext)
+ return n, nil
+}
+
+func (er *EncryptionReader) Close() error {
+ return er.src.Close()
+}
+
+//Reset Sets up stream again, assumes that caller has already set the
+// src to the iv
+func (er *EncryptionReader) Reset() error {
+ iv := make([]byte, aes.BlockSize)
+ if _, err := io.ReadFull(er.src, iv); err != nil {
+ return err
+ }
+ if !bytes.Equal(iv, er.iv) {
+ return fmt.Errorf("different iv, something is wrong")
+ }
+
+ // create Stream
+ er.stream = cipher.NewCTR(er.block, iv)
+ return nil
+}
+
+//EncryptionWriter allows writing to a AES-128-CTR stream
+type EncryptionWriter struct {
+ dst WriteCloseSyncer
+ stream cipher.Stream
+}
+
+//NewEncryptionWriter returns a new AES-128-CTR stream encryptor
+func NewEncryptionWriter(w WriteCloseSyncer, key []byte) (*EncryptionWriter, error) {
+ if len(key) != KeySize {
+ return nil, fmt.Errorf("key must be %d bytes long", KeySize)
+ }
+
+ ew := &EncryptionWriter{}
+
+ // turn key into block
+ block, err := aes.NewCipher(key)
+ if err != nil {
+ return nil, err
+ }
+
+ // create random IV
+ iv := make([]byte, aes.BlockSize)
+ if _, err := io.ReadFull(rand.Reader, iv); err != nil {
+ return nil, err
+ }
+
+ // create stream
+ stream := cipher.NewCTR(block, iv)
+
+ //write IV
+ n, err := w.Write(iv)
+ if err != nil {
+ return nil, err
+ }
+ if n != len(iv) {
+ return nil, io.ErrShortWrite
+ }
+
+ ew.dst = w
+ ew.stream = stream
+ return ew, nil
+}
+
+func (ew *EncryptionWriter) Write(buf []byte) (int, error) {
+ ciphertext := make([]byte, len(buf))
+ ew.stream.XORKeyStream(ciphertext, buf)
+ return ew.dst.Write(ciphertext)
+}
+
+func (ew *EncryptionWriter) Close() error {
+ return ew.dst.Close()
+}
+
+func (ew *EncryptionWriter) Sync() error {
+ return ew.dst.Sync()
+}
diff --git a/libbeat/publisher/queue/diskqueue/encryption_test.go b/libbeat/publisher/queue/diskqueue/encryption_test.go
new file mode 100644
index 00000000000..8bb70eb6ae9
--- /dev/null
+++ b/libbeat/publisher/queue/diskqueue/encryption_test.go
@@ -0,0 +1,65 @@
+// Licensed to Elasticsearch B.V. under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Elasticsearch B.V. licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package diskqueue
+
+import (
+ "bytes"
+ "crypto/aes"
+ "io"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestEncryptionRoundTrip(t *testing.T) {
+ tests := map[string]struct {
+ plaintext []byte
+ }{
+ "8 bits": {plaintext: []byte("a")},
+ "128 bits": {plaintext: []byte("bbbbbbbbbbbbbbbb")},
+ "136 bits": {plaintext: []byte("ccccccccccccccccc")},
+ }
+ for name, tc := range tests {
+ pr, pw := io.Pipe()
+ src := bytes.NewReader(tc.plaintext)
+ var dst bytes.Buffer
+ var teeBuf bytes.Buffer
+ key := []byte("kkkkkkkkkkkkkkkk")
+
+ go func() {
+ //NewEncryptionWriter writes iv, so needs to be in go routine
+ ew, err := NewEncryptionWriter(NopWriteCloseSyncer(pw), key)
+ assert.Nil(t, err, name)
+ _, err = io.Copy(ew, src)
+ assert.Nil(t, err, name)
+ ew.Close()
+ }()
+
+ tr := io.TeeReader(pr, &teeBuf)
+ er, err := NewEncryptionReader(io.NopCloser(tr), key)
+ assert.Nil(t, err, name)
+ _, err = io.Copy(&dst, er)
+ assert.Nil(t, err, name)
+ // Check round trip worked
+ assert.Equal(t, tc.plaintext, dst.Bytes(), name)
+ // Check that iv & cipher text were written
+ assert.Equal(t, len(tc.plaintext)+aes.BlockSize, teeBuf.Len(), name)
+ // Check that cipher text and plaintext don't match
+ assert.NotEqual(t, tc.plaintext, teeBuf.Bytes()[aes.BlockSize:], name)
+ }
+}
diff --git a/libbeat/publisher/queue/diskqueue/queue.go b/libbeat/publisher/queue/diskqueue/queue.go
index df6519edd01..fc4bafe59b5 100644
--- a/libbeat/publisher/queue/diskqueue/queue.go
+++ b/libbeat/publisher/queue/diskqueue/queue.go
@@ -267,10 +267,11 @@ func (dq *diskQueue) BufferConfig() queue.BufferConfig {
}
func (dq *diskQueue) Producer(cfg queue.ProducerConfig) queue.Producer {
+ encoder := newEventEncoder()
return &diskQueueProducer{
queue: dq,
config: cfg,
- encoder: newEventEncoder(),
+ encoder: encoder,
done: make(chan struct{}),
}
}
diff --git a/libbeat/publisher/queue/diskqueue/reader_loop.go b/libbeat/publisher/queue/diskqueue/reader_loop.go
index d6bb49494e0..193cd73694a 100644
--- a/libbeat/publisher/queue/diskqueue/reader_loop.go
+++ b/libbeat/publisher/queue/diskqueue/reader_loop.go
@@ -21,7 +21,6 @@ import (
"encoding/binary"
"fmt"
"io"
- "os"
)
// startPosition and endPosition are absolute byte offsets into the segment
@@ -71,13 +70,14 @@ type readerLoop struct {
}
func newReaderLoop(settings Settings) *readerLoop {
+ decoder := newEventDecoder()
return &readerLoop{
settings: settings,
requestChan: make(chan readerLoopRequest, 1),
responseChan: make(chan readerLoopResponse),
output: make(chan *readFrame, settings.ReadAheadLimit),
- decoder: newEventDecoder(),
+ decoder: decoder,
}
}
@@ -106,12 +106,13 @@ func (rl *readerLoop) processRequest(request readerLoopRequest) readerLoopRespon
return readerLoopResponse{err: err}
}
defer handle.Close()
+
_, err = handle.Seek(int64(request.startPosition), io.SeekStart)
if err != nil {
return readerLoopResponse{err: err}
}
- targetLength := uint64(request.endPosition - request.startPosition)
+ targetLength := request.endPosition - request.startPosition
for {
remainingLength := targetLength - byteCount
@@ -173,9 +174,7 @@ func (rl *readerLoop) processRequest(request readerLoopRequest) readerLoopRespon
// it does not exceed the given length bound. The returned frame leaves the
// segment and frame IDs unset.
// The returned error will be set if and only if the returned frame is nil.
-func (rl *readerLoop) nextFrame(
- handle *os.File, maxLength uint64,
-) (*readFrame, error) {
+func (rl *readerLoop) nextFrame(handle *segmentReader, maxLength uint64) (*readFrame, error) {
// Ensure we are allowed to read the frame header.
if maxLength < frameHeaderSize {
return nil, fmt.Errorf(
diff --git a/libbeat/publisher/queue/diskqueue/segments.go b/libbeat/publisher/queue/diskqueue/segments.go
index a58381aa914..345f7e85379 100644
--- a/libbeat/publisher/queue/diskqueue/segments.go
+++ b/libbeat/publisher/queue/diskqueue/segments.go
@@ -19,6 +19,7 @@ package diskqueue
import (
"encoding/binary"
+ "errors"
"fmt"
"io"
"io/ioutil"
@@ -91,14 +92,9 @@ type queueSegment struct {
// A segment id is globally unique within its originating queue.
id segmentID
- // If this segment was loaded from a previous session, schemaVersion
- // points to the file schema version that was read from its header.
- // This is only used by queueSegment.headerSize(), which is used in
- // maybeReadPending to calculate the position of the first data frame,
- // and by queueSegment.shouldUseJSON(), which is used in the reader
- // loop to detect old segments that used JSON encoding instead of
- // the current CBOR.
- schemaVersion *uint32
+ // schemaVersion is used to determine on disk format, data serialization,
+ // and encryption
+ schemaVersion uint32
// The number of bytes occupied by this segment on-disk, as of the most
// recent completed writerLoop request.
@@ -125,7 +121,7 @@ type queueSegment struct {
}
type segmentHeader struct {
- // The schema version for this segment file. Current schema version is 1.
+ // The schema version for this segment file
version uint32
// If the segment file has been completely written, this field contains
@@ -136,13 +132,26 @@ type segmentHeader struct {
frameCount uint32
}
-const currentSegmentVersion = 1
+type WriteCloseSyncer interface {
+ io.Writer
+ io.Closer
+ Sync() error
+}
+
+// segmentHeaderSizeV0 schemaVersion 0 header size, uint32 for version
+const segmentHeaderSizeV0 = 4
+
+// segmentHeaderSizeV1 schemaVersion 1 header size, uint32 for version, uint32 for count
+const segmentHeaderSizeV1 = 8
+
+// segmentHeaderSizeV2 schemaVersion 2 header size uint32 for version
+const segmentHeaderSizeV2 = 4
-// Segment headers are currently a 4-byte version plus a 4-byte frame count.
-// In contexts where the segment may have been created by an earlier version,
-// instead use (queueSegment).headerSize() which accounts for the schema
-// version of the target segment.
-const segmentHeaderSize = 8
+// segmentHeaderSizeV3 schemaVersion 3 header size uint32 for version
+const segmentHeaderSizeV3 = 4
+
+// maxSegmentVersion is the highest supported version
+const maxSegmentVersion = 3
// Sort order: we store loaded segments in ascending order by their id.
type bySegmentID []*queueSegment
@@ -182,7 +191,7 @@ func scanExistingSegments(logger *logp.Logger, pathStr string) ([]*queueSegment,
}
segments = append(segments, &queueSegment{
id: segmentID(id),
- schemaVersion: &header.version,
+ schemaVersion: header.version,
frameCount: header.frameCount,
byteCount: uint64(file.Size()),
})
@@ -197,11 +206,18 @@ func scanExistingSegments(logger *logp.Logger, pathStr string) ([]*queueSegment,
// been written to disk yet) of this segment file's header region. The
// segment's first data frame begins immediately after the header.
func (segment *queueSegment) headerSize() uint64 {
- if segment.schemaVersion != nil && *segment.schemaVersion < 1 {
- // Schema 0 had nothing except the 4-byte version.
- return 4
+ switch segment.schemaVersion {
+ case 0:
+ return segmentHeaderSizeV0
+ case 1:
+ return segmentHeaderSizeV1
+ case 2:
+ return segmentHeaderSizeV2
+ case 3:
+ return segmentHeaderSizeV3
+ default:
+ return uint64(0)
}
- return segmentHeaderSize
}
// The initial release of the disk queue used JSON to encode events
@@ -209,46 +225,94 @@ func (segment *queueSegment) headerSize() uint64 {
// with encoding multi-byte characters, and for lower encoding
// overhead.
func (segment *queueSegment) shouldUseJSON() bool {
- return segment.schemaVersion != nil && *segment.schemaVersion == 0
+ return segment.schemaVersion == 0
}
// Should only be called from the reader loop. If successful, returns an open
// file handle positioned at the beginning of the segment's data region.
-func (segment *queueSegment) getReader(
- queueSettings Settings,
-) (*os.File, error) {
+func (segment *queueSegment) getReader(queueSettings Settings) (*segmentReader, error) {
path := queueSettings.segmentPath(segment.id)
file, err := os.Open(path)
if err != nil {
return nil, fmt.Errorf(
"couldn't open segment %d: %w", segment.id, err)
}
- // We don't need the header contents here, we just want to advance past the
- // header region, so discard the return value.
- _, err = readSegmentHeader(file)
+ err = binary.Read(file, binary.LittleEndian, &segment.schemaVersion)
if err != nil {
file.Close()
- return nil, fmt.Errorf("couldn't read segment header: %w", err)
+ return nil, fmt.Errorf("couldn't read segment version: %w", err)
}
- return file, nil
+ if segment.schemaVersion > maxSegmentVersion {
+ file.Close()
+ return nil, fmt.Errorf("unknown segment version %d: %w", segment.schemaVersion, err)
+ }
+
+ if segment.schemaVersion == 1 {
+ err = binary.Read(file, binary.LittleEndian, &segment.frameCount)
+ if err != nil {
+ file.Close()
+ return nil, fmt.Errorf("couldn't read segment frame count: %w", err)
+ }
+ }
+
+ sr := &segmentReader{}
+ sr.src = file
+ sr.version = segment.schemaVersion
+
+ if sr.version == 0 || sr.version == 1 {
+ return sr, nil
+ }
+
+ if sr.version == 2 || sr.version == 3 {
+ sr.er, err = NewEncryptionReader(sr.src, queueSettings.EncryptionKey)
+ if err != nil {
+ sr.src.Close()
+ return nil, fmt.Errorf("couldn't create encryption reader: %w", err)
+ }
+ }
+ if sr.version == 3 {
+ sr.cr = NewCompressionReader(sr.er)
+ }
+ return sr, nil
+
}
// Should only be called from the writer loop.
-func (segment *queueSegment) getWriter(
- queueSettings Settings,
-) (*os.File, error) {
+func (segment *queueSegment) getWriter(queueSettings Settings) (*segmentWriter, error) {
path := queueSettings.segmentPath(segment.id)
file, err := os.OpenFile(path, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0600)
if err != nil {
return nil, err
}
- err = writeSegmentHeader(file, 0)
- if err != nil {
- return nil, fmt.Errorf("couldn't write segment header: %w", err)
+
+ sw := &segmentWriter{}
+ sw.dst = file
+
+ segment.schemaVersion = queueSettings.SchemaVersion
+ sw.version = queueSettings.SchemaVersion
+
+ if err := sw.WriteHeader(); err != nil {
+ return nil, err
}
- return file, nil
+ if sw.version == 0 || sw.version == 1 {
+ return sw, nil
+ }
+
+ if sw.version == 2 || sw.version == 3 {
+ sw.ew, err = NewEncryptionWriter(sw.dst, queueSettings.EncryptionKey)
+ if err != nil {
+ sw.dst.Close()
+ return nil, fmt.Errorf("couldn't create encryption writer: %w", err)
+ }
+ }
+
+ if sw.version == 3 {
+ sw.cw = NewCompressionWriter(sw.ew)
+ }
+
+ return sw, nil
}
// getWriterWithRetry tries to create a file handle for writing via
@@ -257,7 +321,7 @@ func (segment *queueSegment) getWriter(
// creating a queue segment from the writer loop.
func (segment *queueSegment) getWriterWithRetry(
queueSettings Settings, retry func(err error, firstTime bool) bool,
-) (*os.File, error) {
+) (*segmentWriter, error) {
firstTime := true
file, err := segment.getWriter(queueSettings)
for err != nil && retry(err, firstTime) {
@@ -309,7 +373,7 @@ func readSegmentHeaderWithFrameCount(path string) (*segmentHeader, error) {
err = binary.Read(reader, binary.LittleEndian, &frameLength)
if err != nil {
// EOF at a frame boundary means we successfully scanned all frames.
- if err == io.EOF && header.frameCount > 0 {
+ if errors.Is(err, io.EOF) && header.frameCount > 0 {
return header, nil
}
// All other errors mean we are done scanning, exit the loop.
@@ -354,36 +418,20 @@ func readSegmentHeader(in io.Reader) (*segmentHeader, error) {
if err != nil {
return nil, err
}
- if header.version > currentSegmentVersion {
- return nil, fmt.Errorf("unrecognized schema version %d", header.version)
- }
- if header.version >= 1 {
+ switch header.version {
+ case 0:
+ case 1:
err = binary.Read(in, binary.LittleEndian, &header.frameCount)
if err != nil {
return nil, err
}
+ case 2, 3:
+ default:
+ return nil, fmt.Errorf("unrecognized schema version %d", header.version)
}
return header, nil
}
-// writeSegmentHeader seeks to the beginning of the given file handle and
-// writes a segment header with the current schema version, containing the
-// given frameCount.
-func writeSegmentHeader(out *os.File, frameCount uint32) error {
- _, err := out.Seek(0, io.SeekStart)
- if err != nil {
- return err
- }
-
- version := uint32(currentSegmentVersion)
- err = binary.Write(out, binary.LittleEndian, version)
- if err != nil {
- return err
- }
- err = binary.Write(out, binary.LittleEndian, frameCount)
- return err
-}
-
// The number of bytes occupied by all the queue's segment files. This
// should only be called from the core loop.
func (segments *diskQueueSegments) sizeOnDisk() uint64 {
@@ -402,3 +450,162 @@ func (segments *diskQueueSegments) sizeOnDisk() uint64 {
}
return total
}
+
+type segmentReader struct {
+ src io.ReadSeekCloser
+ er *EncryptionReader
+ cr *CompressionReader
+ version uint32
+}
+
+func (r *segmentReader) Read(p []byte) (int, error) {
+ switch r.version {
+ case 0, 1:
+ return r.src.Read(p)
+ case 2:
+ return r.er.Read(p)
+ case 3:
+ return r.cr.Read(p)
+ default:
+ return 0, fmt.Errorf("unsupported schema version: %d", r.version)
+ }
+}
+
+func (r *segmentReader) Close() error {
+ switch r.version {
+ case 0, 1:
+ return r.src.Close()
+ case 2:
+ return r.er.Close()
+ case 3:
+ return r.cr.Close()
+ default:
+ return fmt.Errorf("unsupported schema version: %d", r.version)
+ }
+}
+
+func (r *segmentReader) Seek(offset int64, whence int) (int64, error) {
+ switch r.version {
+ case 0, 1:
+ return r.src.Seek(offset, whence)
+ case 2:
+ //can't seek before segment header
+ if (offset + int64(whence)) < segmentHeaderSizeV2 {
+ return 0, fmt.Errorf("illegal seek offset %d, whence %d", offset, whence)
+ }
+ if _, err := r.src.Seek(segmentHeaderSizeV2, io.SeekStart); err != nil {
+ return 0, err
+ }
+ if err := r.er.Reset(); err != nil {
+ return 0, err
+ }
+ written, err := io.CopyN(io.Discard, r.er, (offset+int64(whence))-segmentHeaderSizeV2)
+ return written + segmentHeaderSizeV2, err
+ case 3:
+ //can't seek before segment header
+ if (offset + int64(whence)) < segmentHeaderSizeV3 {
+ return 0, fmt.Errorf("illegal seek offset %d, whence %d", offset, whence)
+ }
+ if _, err := r.src.Seek(segmentHeaderSizeV3, io.SeekStart); err != nil {
+ return 0, err
+ }
+ if err := r.er.Reset(); err != nil {
+ return 0, err
+ }
+ if err := r.cr.Reset(); err != nil {
+ return 0, err
+ }
+ written, err := io.CopyN(io.Discard, r.cr, (offset+int64(whence))-segmentHeaderSizeV3)
+ return written + segmentHeaderSizeV3, err
+ default:
+ return 0, fmt.Errorf("unsupported schema version: %d", r.version)
+ }
+}
+
+type segmentWriter struct {
+ dst *os.File
+ ew *EncryptionWriter
+ cw *CompressionWriter
+ version uint32
+}
+
+func (w *segmentWriter) Write(p []byte) (int, error) {
+ switch w.version {
+ case 0, 1:
+ return w.dst.Write(p)
+ case 2:
+ return w.ew.Write(p)
+ case 3:
+ return w.cw.Write(p)
+ default:
+ return 0, fmt.Errorf("unsupported schema version: %d", w.version)
+ }
+}
+
+func (w *segmentWriter) Close() error {
+ switch w.version {
+ case 0, 1:
+ return w.dst.Close()
+ case 2:
+ return w.ew.Close()
+ case 3:
+ return w.cw.Close()
+ default:
+ return fmt.Errorf("unsupported schema version: %d", w.version)
+ }
+}
+
+func (w *segmentWriter) Seek(offset int64, whence int) (int64, error) {
+ switch w.version {
+ case 0, 1:
+ return w.dst.Seek(offset, whence)
+ default:
+ return 0, fmt.Errorf("unsupported schema version: %d", w.version)
+ }
+}
+
+func (w *segmentWriter) Sync() error {
+ switch w.version {
+ case 0, 1:
+ return w.dst.Sync()
+ case 2:
+ return w.ew.Sync()
+ case 3:
+ return w.cw.Sync()
+ default:
+ return fmt.Errorf("unsupported schema version: %d", w.version)
+ }
+}
+
+func (w *segmentWriter) WriteHeader() error {
+ if _, err := w.dst.Seek(0, io.SeekStart); err != nil {
+ return err
+ }
+
+ if err := binary.Write(w.dst, binary.LittleEndian, w.version); err != nil {
+ return err
+ }
+
+ // Version 1 has count
+ if w.version == 1 {
+ if err := binary.Write(w.dst, binary.LittleEndian, uint32(0)); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func (w *segmentWriter) UpdateCount(count uint32) error {
+ // Only Version 1 stores count
+ if w.version != 1 {
+ return nil
+ }
+
+ // Seek to count on disk
+ if _, err := w.dst.Seek(4, io.SeekStart); err != nil {
+ return err
+ }
+
+ return binary.Write(w.dst, binary.LittleEndian, count)
+}
diff --git a/libbeat/publisher/queue/diskqueue/segments_test.go b/libbeat/publisher/queue/diskqueue/segments_test.go
new file mode 100644
index 00000000000..8261c0d78d9
--- /dev/null
+++ b/libbeat/publisher/queue/diskqueue/segments_test.go
@@ -0,0 +1,160 @@
+// Licensed to Elasticsearch B.V. under one or more contributor
+// license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright
+// ownership. Elasticsearch B.V. licenses this file to you under
+// the Apache License, Version 2.0 (the "License"); you may
+// not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package diskqueue
+
+import (
+ "io"
+ "os"
+ "testing"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestSchemasRoundTrip(t *testing.T) {
+ tests := map[string]struct {
+ id segmentID
+ schemaVersion uint32
+ plaintext []byte
+ }{
+ "version 0": {
+ id: 0,
+ schemaVersion: uint32(0),
+ plaintext: []byte("abc"),
+ },
+ "version 1": {
+ id: 1,
+ schemaVersion: uint32(1),
+ plaintext: []byte("abc"),
+ },
+ "version 2": {
+ id: 2,
+ schemaVersion: uint32(2),
+ plaintext: []byte("abc"),
+ },
+ "version 3": {
+ id: 2,
+ schemaVersion: uint32(2),
+ plaintext: []byte("abc"),
+ },
+ }
+ dir, err := os.MkdirTemp("", t.Name())
+ assert.Nil(t, err)
+ defer os.RemoveAll(dir)
+ for name, tc := range tests {
+ dst := make([]byte, len(tc.plaintext))
+ settings := DefaultSettings()
+ settings.Path = dir
+ settings.SchemaVersion = tc.schemaVersion
+ settings.EncryptionKey = []byte("keykeykeykeykeyk")
+ qs := &queueSegment{
+ id: tc.id,
+ }
+ sw, err := qs.getWriter(settings)
+ assert.Nil(t, err, name)
+
+ n, err := sw.Write(tc.plaintext)
+ assert.Nil(t, err, name)
+ assert.Equal(t, len(tc.plaintext), n, name)
+
+ err = sw.Close()
+ assert.Nil(t, err, name)
+
+ sr, err := qs.getReader(settings)
+ assert.Nil(t, err, name)
+
+ n, err = sr.Read(dst)
+ assert.Nil(t, err, name)
+
+ err = sr.Close()
+ assert.Nil(t, err, name)
+ assert.Equal(t, len(dst), n, name)
+
+ //make sure we read back what we wrote
+ assert.Equal(t, tc.plaintext, dst, name)
+
+ }
+}
+
+func TestSegmentReaderSeek(t *testing.T) {
+ tests := map[string]struct {
+ id segmentID
+ schemaVersion uint32
+ headerSize int64
+ plaintexts [][]byte
+ }{
+ "version 0": {
+ id: 0,
+ schemaVersion: uint32(0),
+ headerSize: segmentHeaderSizeV0,
+ plaintexts: [][]byte{[]byte("abc"), []byte("defg")},
+ },
+ "version 1": {
+ id: 1,
+ schemaVersion: uint32(1),
+ headerSize: segmentHeaderSizeV1,
+ plaintexts: [][]byte{[]byte("abc"), []byte("defg")},
+ },
+ "version 2": {
+ id: 2,
+ schemaVersion: uint32(2),
+ headerSize: segmentHeaderSizeV2,
+ plaintexts: [][]byte{[]byte("abc"), []byte("defg")},
+ },
+ "version 3": {
+ id: 3,
+ schemaVersion: uint32(2),
+ headerSize: segmentHeaderSizeV3,
+ plaintexts: [][]byte{[]byte("abc"), []byte("defg")},
+ },
+ }
+ dir, err := os.MkdirTemp("", t.Name())
+ assert.Nil(t, err)
+ // defer os.RemoveAll(dir)
+ for name, tc := range tests {
+ settings := DefaultSettings()
+ settings.Path = dir
+ settings.SchemaVersion = tc.schemaVersion
+ settings.EncryptionKey = []byte("keykeykeykeykeyk")
+ qs := &queueSegment{
+ id: tc.id,
+ }
+ sw, err := qs.getWriter(settings)
+ assert.Nil(t, err, name)
+ for _, plaintext := range tc.plaintexts {
+ n, err := sw.Write(plaintext)
+ assert.Nil(t, err, name)
+ assert.Equal(t, len(plaintext), n, name)
+ err = sw.Sync()
+ assert.Nil(t, err, name)
+ }
+ sw.Close()
+ sr, err := qs.getReader(settings)
+ assert.Nil(t, err, name)
+ //seek to second data piece
+ n, err := sr.Seek(tc.headerSize+int64(len(tc.plaintexts[0])), io.SeekStart)
+ assert.Nil(t, err, name)
+ assert.Equal(t, tc.headerSize+int64(len(tc.plaintexts[0])), n, name)
+ dst := make([]byte, len(tc.plaintexts[1]))
+
+ _, err = sr.Read(dst)
+ assert.Nil(t, err, name)
+ assert.Equal(t, tc.plaintexts[1], dst, name)
+
+ sw.Close()
+ }
+}
diff --git a/libbeat/publisher/queue/diskqueue/serialize.go b/libbeat/publisher/queue/diskqueue/serialize.go
index 3c75534cdce..5af619b5429 100644
--- a/libbeat/publisher/queue/diskqueue/serialize.go
+++ b/libbeat/publisher/queue/diskqueue/serialize.go
@@ -107,11 +107,8 @@ func (e *eventEncoder) encode(evt interface{}) ([]byte, error) {
return nil, err
}
- // Copy the encoded bytes to a new array owned by the caller.
- bytes := e.buf.Bytes()
- result := make([]byte, len(bytes))
- copy(result, bytes)
-
+ result := make([]byte, e.buf.Len())
+ copy(result, e.buf.Bytes())
return result, nil
}
diff --git a/libbeat/publisher/queue/diskqueue/writer_loop.go b/libbeat/publisher/queue/diskqueue/writer_loop.go
index b47f3a2baef..4dd8117aeb6 100644
--- a/libbeat/publisher/queue/diskqueue/writer_loop.go
+++ b/libbeat/publisher/queue/diskqueue/writer_loop.go
@@ -18,8 +18,8 @@
package diskqueue
import (
+ "bytes"
"encoding/binary"
- "os"
"time"
"github.com/elastic/elastic-agent-libs/logp"
@@ -88,12 +88,16 @@ type writerLoop struct {
// The file handle corresponding to currentSegment. When currentSegment
// changes, this handle is closed and a new one is created.
- outputFile *os.File
+ outputFile *segmentWriter
currentRetryInterval time.Duration
+
+ // buffer Used to gather write information so there is only one write syscall
+ buffer *bytes.Buffer
}
func newWriterLoop(logger *logp.Logger, settings Settings) *writerLoop {
+ buffer := &bytes.Buffer{}
return &writerLoop{
logger: logger,
settings: settings,
@@ -102,6 +106,7 @@ func newWriterLoop(logger *logp.Logger, settings Settings) *writerLoop {
responseChan: make(chan writerLoopResponse),
currentRetryInterval: settings.RetryInterval,
+ buffer: buffer,
}
}
@@ -112,8 +117,8 @@ func (wl *writerLoop) run() {
// The request channel is closed, we are done. If there is an active
// segment file, finalize its frame count and close it.
if wl.outputFile != nil {
- writeSegmentHeader(wl.outputFile, wl.currentSegment.frameCount)
- wl.outputFile.Sync()
+ wl.outputFile.UpdateCount(wl.currentSegment.frameCount) //nolint:errcheck //No error recovery path
+ wl.outputFile.Sync() //nolint:errcheck //No error recovery path
wl.outputFile.Close()
wl.outputFile = nil
}
@@ -156,9 +161,8 @@ outerLoop:
if wl.outputFile != nil {
// Update the header with the frame count (including the ones we
// just wrote), try to sync to disk, then close the file.
- writeSegmentHeader(wl.outputFile,
- wl.currentSegment.frameCount+curSegmentResponse.framesWritten)
- wl.outputFile.Sync()
+ wl.outputFile.UpdateCount(wl.currentSegment.frameCount + curSegmentResponse.framesWritten) //nolint:errcheck //No error recovery path
+ wl.outputFile.Sync() //nolint:errcheck //No error recovery path
wl.outputFile.Close()
wl.outputFile = nil
// We are done with this segment, add the totals to the response and
@@ -188,22 +192,27 @@ outerLoop:
// The Write calls below all pass through retryWriter, so they can
// only return an error if the write should be aborted. Thus, all we
// need to do when we see an error is break out of the request loop.
- err := binary.Write(retryWriter, binary.LittleEndian, frameSize)
+ err := binary.Write(wl.buffer, binary.LittleEndian, frameSize)
if err != nil {
break
}
- _, err = retryWriter.Write(frameRequest.frame.serialized)
+ _, err = wl.buffer.Write(frameRequest.frame.serialized)
if err != nil {
break
}
// Compute / write the frame's checksum
checksum := computeChecksum(frameRequest.frame.serialized)
- err = binary.Write(wl.outputFile, binary.LittleEndian, checksum)
+ err = binary.Write(wl.buffer, binary.LittleEndian, checksum)
if err != nil {
break
}
// Write the frame footer's (duplicate) length
- err = binary.Write(wl.outputFile, binary.LittleEndian, frameSize)
+ err = binary.Write(wl.buffer, binary.LittleEndian, frameSize)
+ if err != nil {
+ break
+ }
+ _, err = retryWriter.Write(wl.buffer.Bytes())
+ wl.buffer.Reset()
if err != nil {
break
}
@@ -228,7 +237,7 @@ outerLoop:
}
}
// Try to sync the written data to disk.
- wl.outputFile.Sync()
+ wl.outputFile.Sync() //nolint:errcheck //No error recovery path
// If the queue has an ACK listener, notify it the frames were written.
if wl.settings.WriteToDiskListener != nil {