New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add encrypted diskqueue #31808
Add encrypted diskqueue #31808
Conversation
💚 Flaky test reportTests succeeded. 🤖 GitHub commentsTo re-run your PR in the CI, just comment with:
|
a25481e
to
710f0ce
Compare
Couldn't we just disable it by default and let the our users decide? |
These results look suspicious to me. Should not the amount of events decrease the same way as in sync mode? According to the results, encrypted events are more performant than non-encrypted. |
I might be misinterpreting the results completely but does not it mean that compressed events are larger in byte size than uncompressed events? |
It is little surprise we are not getting much value from compression. As far as I can tell we are compressing one event at at time. Compression shows its value when we compress across many events as a stream. The tokens for the field names are highly repetitive as are the values. It is difficult to tell from the above what compression rate we are getting given the current implementation. 20%? 40%? |
IIRC from uni, it could be the compression algorithm not being able to indeed compress, but adding its "metadata" anyway |
|
||
// Schema Version specifies which on-disk format, serialization, and encryption to use. | ||
// 0, 1, or 2 are valid options | ||
SchemaVersion uint32 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Schema version shouldn't be a configurable option, as we don't actually want to support multiple versions, we just need to be able to read old data after a user update. So we should be able to read old versions of the schema (though we might at some point remove 0, since it only appeared in beta), but new segments should always be written in the most recent version. (This also means we can assume schema-2 fields are present on any segments we create, and just need to pick reasonable defaults for them when loading an older version.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Schema version shouldn't be a configurable option, as we don't actually want to support multiple versions, we just need to be able to read old data after a user update. So we should be able to read old versions of the schema (though we might at some point remove 0, since it only appeared in beta), but new segments should always be written in the most recent version. (This also means we can assume schema-2 fields are present on any segments we create, and just need to pick reasonable defaults for them when loading an older version.)
So this gets back to the feedback from the design doc. The feedback was to have version 3 be encrypted, and not have fields in the segment header specifying things like encrypted, ciphers, keys, etc. So if version 3 is always encrypted, the option exists so you can choose the non-encrypted version 2.
That being said, the original design had more fields in the segment header to control things like encryption, I'd be very happy to go back towards that.
@@ -71,13 +70,15 @@ type readerLoop struct { | |||
} | |||
|
|||
func newReaderLoop(settings Settings) *readerLoop { | |||
decoder := newEventDecoder() | |||
decoder.SetCompression(settings.UseCompression) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The compression field can't be set based on the global queue settings -- the reader might be processing a segment created by a previous run of the program, or even a previous version, so the compression flag needs to be set for each segment from the metadata in the segment header.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The compression field can't be set based on the global queue settings -- the reader might be processing a segment created by a previous run of the program, or even a previous version, so the compression flag needs to be set for each segment from the metadata in the segment header.
Absolutely. Good catch. For right now, it lets me test compression/no compression. But yeah if it ends up being something configurable we need to go with either a new schema version or header option
const segmentHeaderSizeV1 = 8 | ||
|
||
// segmentHeaderSizeV2 schemaVersion 2 header size uint32 for version | ||
const segmentHeaderSizeV2 = 4 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this header size remove the frameCount field? That is part of the queue's bookkeeping and is used in metrics reporting.
It seems to me segmentHeader
instead needs to grow to also include flags for encryption and compression (since these can in principle vary for every segment if the process is restarted with a different configuration).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
back to the design doc. The concern that was brought up was that it was outside the encryption, and as such an attacker could modify it to a known value, making it's use in the book keeping suspect. Moving it inside encryption has it's own issues, you can't put it at the front because you can't seek back in the encryption stream and re-write the first bits, that throws everything off. You could add it to the end, but then you have to have decrypted the whole thing anyway, in which case it behaves a lot like schema version 0.
We could but I think if we are going to do that we need to add if compression is used to the segmentHeader or have encryption + compression be version 3. So version 2 can be encryption or encryption+compression. I have it as an option here so I could test it easily. Back in the design doc, the feedback was to have the schema version be the deciding factor for what features were enabled vs having them specified in the header. |
:-) This is why there is an async and sync benchmark. But it isn't all that unusual to see changes like this with concurrent code. |
The "Mean Bytes/event" is coming from the -benchmem statistics, and what it really means is this is how much memory was allocated for processing the event. Not how much memory the event actually takes up when it is written to disk. So both encryption is increasing it because at some point you have to have a []byte that is the same size as the plaintext to encrypt into. Compression is increasing this because you have a similar situation where you need a []byte to write the compressed data into. |
The original plan in the design doc was to compress all the frames, not each individual event inside the frame. Unfortunately, the stream implementation for lz4 still requires a "Close" call that flushes the last block. That is a problem because the queue expects to read and write to the segment file. So it is possible (and happened in testing) that you can't read what you think has been written because a "Close" hasn't occurred yet. And once the Close happens you can't write to the compression writer. I can do some quick tests to see how "compressed" each of the sample events is in the benchmark. |
Yep, we had a similar issue when I implemented this in endpoint. You basically have to flush the last block through on a read call. Since the streaming API in the golang implementation does directly support flush, you'd end up having to do the block/frame mgmt yourself. In essence, aggregate events in buffer until threshold or read signal, write block/frame and flush through crypto layer to disk. I am concerned that if we do not do some form of stream level compression, that we will see disk i/o degradation with very high speed event generation. In the Endgame product, we would see 10-20x compression with very little CPU overhead, which took some pressure off the crypto and a lot of pressure off the i/o spool to disk. Of course, the impact of compression will vary considerably depending on workload. |
@@ -0,0 +1,69 @@ | |||
# Disk Queue On Disk Structures |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for adding this! I am a big fan of documentation/design living next to the source code.
assert.Nil(t, err, name) | ||
_, err = io.Copy(ew, src) | ||
assert.Nil(t, err, name) | ||
ew.Close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should there be a check at the end here that the encrypted result is not equal to the plaintext? I think this test might still pass if these functions did not encrypt anything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that's a problem with round trip tests.
I added a TeeReader, and check that the lenght of the iv + length of the plaintext gets written and that the plaintext & ciphertext don't match.
Changes so V2 is encryption only, and V3 is encryption + compression. Also, small performance change with a large impact. ~65-75% improvement depending on test. Async Comparison
Sync Comparison
|
- encryption reader/writer - compression on frames - change to segmentReader/segmentWriter instead of *os.File - updated tests - update benchmark
I think a potential downside to this is that the last bytes on disk don't necessarily represent a full frame. So you could have one or more events lost in the event of a crash or power failure.
For the benchmark I have 7 sample events. Here is how they were compressed with the current implementation:
For right now I think compression gets put on pause until a minimum shipper exists, and then we can start measuring performance bottlenecks and start addressing them. |
Looking at the compression rate for small individual events I'm not surprised it isn't great. It minimizes repeated information, and for small payloads that can't compress well will just increase their size by adding in the block/frame headers. I would agree that compression is not valuable when applied to individual events. I am not convinced that it wouldn't be valuable when applied to all the frames or entire frames.
We are close to this, but I am not entirely sure how much it would affect the queue benchmarks. I'd expect the disk queue performance in isolation to be close to what we see with it running the shipper. How difficult is it to change the queue implementation to optimize the effect of compression? Are there any risks besides the small possibility of data loss already mentioned?
You would also potentially lose any in flight events that were not yet written to disk, for example events sitting in memory waiting to be flushed to disk or on the wire between the input and shipper. I don't think we can completely eliminate data loss as a possibility, we can just minimize the window where it can occur. The best we can do is only acknowledge events that have been safely persisted so the input side is able to retry them where possible. Perhaps we just need to avoid acknowledging events until a full frame has been built up, but that is adding more complexity. How does the endpoint queue deal with this situation? |
These results are for compressing individual events, correct? Do we have stats on compressing a realistic stream of events? Disk IOPS do matter quite a bit to the security use case. If we decide to dump compression, the 8x-10x increase of IOPS may be a considerable regression for many of our security customers; particularly on servers that are already I/O bound. |
There is always a risk of data loss. There are events in RAM that will not flush if the app crashes. There are blocks in the disk cache that may not get committed of the machine faults. It is really a matter of how high is the risk, and what the tolerance is for the occasional failure. If the process is stable 99.99% of the time, then we would rather error on the side of performance. On endpoint, we made heavy use of compression as the data we see is highly repetitive and compresses extremely well. As mention before, we generally would see 10-20x compression on the security data. Less data to push through means less crypto and fewer IOPS, so in our use case the tradeoff of performance over the occasional data loss was well worth it. FWIW, I've yet to receive a data loss complaint in 5 years this code has been in production. I would strongly suggest pushing real world data streams through the implementation for a benchmark, particularly with the full stream compression enabled. Finding the sweet spot of CPU overhead, memory usage, and minimal IOPS is tricky. |
This pull request is now in conflicts. Could you fix it? 🙏
|
ok, I got stream compression with flush working :-)
And the size of the segment files is reduced:
The compression should be really good in this case, it is the same 7 messages repeated. I'm going to split this into 3 PRs. One for the performance tweak, one for encryption, and one for compression. To get the streaming compressing to work we need the development version of the golang lz4 library, and it needs a small patch too. |
@@ -316,6 +317,7 @@ replace ( | |||
github.com/google/gopacket => github.com/elastic/gopacket v1.1.20-0.20211202005954-d412fca7f83a | |||
github.com/insomniacslk/dhcp => github.com/elastic/dhcp v0.0.0-20200227161230-57ec251c7eb3 // indirect | |||
github.com/tonistiigi/fifo => github.com/containerd/fifo v0.0.0-20190816180239-bda0ff6ed73c | |||
github.com/pierrec/lz4/v4 v4.1.15-0.20220523073431-885e900f46f2 => /Users/leehinman/src/pierrec_lz4 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The rename is necessary because the Flush function needs a small patch, it currently prevents further writing to the stream. The patch allows you to write to the stream after a flush. So before merging it would be good to see if upstream will take the fix or if we need to maintain a fork.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The upstream project looks active enough, hopefully they'll just take the PR. I'm curious what the change here looks like as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Already merged! Nice, that was fast.
Just to make sure I remember what these benchmarks mean, V1 is the original implementation, V2 is encryption + per event compression, and V3 is encryption + streaming segment compression? The results look much better assuming I am understanding this all correctly. Nice work! |
What does this PR do?
Adds a support for encrypted and encrypted+compression to the persistent disk queue.
Why is it important?
Necessary for endpoint running under elastic-agent
Checklist
- [ ] I have made corresponding changes to the documentation- [ ] I have made corresponding change to the default configuration filesCHANGELOG.next.asciidoc
orCHANGELOG-developer.next.asciidoc
.How to test this PR locally
Test
go test
Benchmark
go test -bench=. -benchtime 3x -timeout 60m
Related issues
Benchmark Results
Async Benchmarks
Sync Benchmarks
Raw Data
Click to expand!
Open Questions
count
back to schemaV2? It is easy, and would speed up reading a new queue. It is outside encryption.