Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Zstd compression support, Make block size configurable via IndexSettings #1374

Merged
merged 8 commits into from May 25, 2022

Conversation

kryesh
Copy link
Contributor

@kryesh kryesh commented May 18, 2022

Made block_size a configurable option in IndexSettings - making this configurable rather than a constant is important because using larger block sizes allows for compression to benefit from using a dictionary for larger chunks of data, resulting in better compression ratios.

Also added zstd support via the zstd crate - implementation is based on the lz4 one, just adapted to the interface provided by the zstd crate.

Compression ratios for a sample dataset ingesting syslog for an app of mine (highly compressible, ~22GB raw):
loggen -i -n 1000000 --size 2000 --active-connections 12 127.0.0.1 5514

Lz4, 16k blocks (current lz4 config): 744M
Lz4, 512k blocks: 526M
Zstd, 16k blocks: 573M
Zstd, 512k blocks: 379M

.try_into()
.unwrap();

let uncompressed_size = usize::from_le_bytes(*uncompressed_size_bytes);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let uncompressed_size = usize::from_le_bytes(*uncompressed_size_bytes);
let uncompressed_size = u64::from_le_bytes(*uncompressed_size_bytes);

use zstd::bulk::{compress_to_buffer, decompress_to_buffer};
use zstd::DEFAULT_COMPRESSION_LEVEL;

const USIZE_SIZE: usize = std::mem::size_of::<usize>();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that const is not helpful

)
.map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))?;

compressed[0..USIZE_SIZE].copy_from_slice(&uncompressed.len().to_le_bytes());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

usize is platform dependent.
Let's use u64 instead of usize for stuff that is serialized.

pub fn compress(uncompressed: &[u8], compressed: &mut Vec<u8>) -> io::Result<()> {
compressed.clear();

let max_size: usize = uncompressed.len() + USIZE_SIZE;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is correct.
It is not possible to create a non-destructive compression algorithm that creates only payload <= to their original size.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

src/store/compression_zstd_block.rs Outdated Show resolved Hide resolved
Copy link
Collaborator

@fulmicoton fulmicoton left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comments inline

@fulmicoton fulmicoton requested a review from PSeitz May 18, 2022 09:07
pub fn compress(uncompressed: &[u8], compressed: &mut Vec<u8>) -> io::Result<()> {
compressed.clear();

let count_size = std::mem::size_of::<u64>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

4GB (u32) should be enough, same limitation as in lz4


decompressed.resize(uncompressed_size as usize, 0);
let decompressed_size = decompress_to_buffer(&compressed[count_size..], decompressed)
.map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err.to_string()))?;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is already an io Result. No need to convert it.

@PSeitz
Copy link
Contributor

PSeitz commented May 19, 2022

Can you add the feature to the tests in .github/workflows/test.yml?

@@ -139,11 +145,11 @@ pub mod tests {
Ok(())
}

fn test_store(compressor: Compressor) -> crate::Result<()> {
fn test_store(compressor: Compressor, blocksize: usize) -> crate::Result<()> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a test in our test_store suite that test for random (high entropy) payloads?

We spotted a bug in the original form of this PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can look into it later today - but just wanted to confirm, would the test be for the case where the input is incompressible to the point where the compressed output is larger than the input?
Just making sure the test is for the right thing

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

@@ -139,11 +145,11 @@ pub mod tests {
Ok(())
}

fn test_store(compressor: Compressor) -> crate::Result<()> {
fn test_store(compressor: Compressor, blocksize: usize) -> crate::Result<()> {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a test in our test_store suite that test for random payloads?

@@ -65,7 +65,7 @@ impl StoreWriter {
VInt(doc_num_bytes as u64).serialize(&mut self.current_block)?;
self.current_block.write_all(serialized_document)?;
self.doc += 1;
if self.current_block.len() > BLOCK_SIZE {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's true for the other stores, and it does not matter much, but shouldn't we make that >=?.

This is almost philosophy at this point, but my mental model is
"We close a block once it is full". "Being full means being greater or equal to BLOCK_SIZE"

@@ -65,7 +65,7 @@ impl StoreWriter {
VInt(doc_num_bytes as u64).serialize(&mut self.current_block)?;
self.current_block.write_all(serialized_document)?;
self.doc += 1;
if self.current_block.len() > BLOCK_SIZE {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's true for the other stores, and it does not matter much, but shouldn't we make that >=?.

This is almost philosophy at this point, but my mental model is
"We close a block once it is full". "Being full means being greater or equal to BLOCK_SIZE"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just went with what was already there, personally I don't think it makes much difference since blocks are allowed to overflow anyway;
Happy to go either way on this one

@fulmicoton
Copy link
Collaborator

@kryesh this is a great contribution thank you!

1 similar comment
@fulmicoton
Copy link
Collaborator

@kryesh this is a great contribution thank you!

@kryesh
Copy link
Contributor Author

kryesh commented May 20, 2022

@kryesh this is a great contribution thank you!

Thanks!
I stumbled across the block size thing while trying to figure out why an index that was compressed was getting compressed further when stored on a zfs filesystem with compression enabled, as for zstd I figured that while I was looking at it I might as well add it.
Hopefully it proves useful!

@fulmicoton
Copy link
Collaborator

Should we use a common dictionary for all of the blocks?

@PSeitz PSeitz merged commit 89e19f1 into quickwit-oss:main May 25, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants