diff --git a/parquet/src/bloom_filter/mod.rs b/parquet/src/bloom_filter/mod.rs index d0bee8a5fec..0122a3a7679 100644 --- a/parquet/src/bloom_filter/mod.rs +++ b/parquet/src/bloom_filter/mod.rs @@ -24,9 +24,11 @@ use crate::file::metadata::ColumnChunkMetaData; use crate::file::reader::ChunkReader; use crate::format::{ BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash, BloomFilterHeader, + SplitBlockAlgorithm, Uncompressed, XxHash, }; use bytes::{Buf, Bytes}; use std::hash::Hasher; +use std::io::Write; use std::sync::Arc; use thrift::protocol::{TCompactInputProtocol, TSerializable}; use twox_hash::XxHash64; @@ -129,6 +131,30 @@ impl Sbbf { Self(data) } + pub fn write_bitset(&self, mut writer: W) -> Result<(), ParquetError> { + for block in &self.0 { + for word in block { + writer.write_all(&word.to_le_bytes()).map_err(|e| { + ParquetError::General(format!( + "Could not write bloom filter bit set: {}", + e + )) + })?; + } + } + Ok(()) + } + + pub fn header(&self) -> BloomFilterHeader { + BloomFilterHeader { + // 8 i32 per block, 4 bytes per i32 + num_bytes: self.0.len() as i32 * 4 * 8, + algorithm: BloomFilterAlgorithm::BLOCK(SplitBlockAlgorithm {}), + hash: BloomFilterHash::XXHASH(XxHash {}), + compression: BloomFilterCompression::UNCOMPRESSED(Uncompressed {}), + } + } + pub fn read_from_column_chunk( column_metadata: &ColumnChunkMetaData, reader: Arc, diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs index 90c9b6bfc67..bf6ec93fa36 100644 --- a/parquet/src/file/writer.rs +++ b/parquet/src/file/writer.rs @@ -230,11 +230,16 @@ impl SerializedFileWriter { Some(bloom_filter) => { let start_offset = self.buf.bytes_written(); let mut protocol = TCompactOutputProtocol::new(&mut self.buf); - bloom_filter.write_to_out_protocol(&mut protocol)?; + let header = bloom_filter.header(); + header.write_to_out_protocol(&mut protocol)?; protocol.flush()?; - let end_offset = self.buf.bytes_written(); + bloom_filter.write_bitset(&mut self.buf)?; // set offset and index for bloom filter - column_metadata.bloom_filter_offset = Some(start_offset as i64); + column_metadata + .meta_data + .as_mut() + .expect("can't have bloom filter without column metadata") + .bloom_filter_offset = Some(start_offset as i64); } None => {} } @@ -424,10 +429,10 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> { // Update row group writer metrics *total_bytes_written += r.bytes_written; column_chunks.push(r.metadata); - column_indexes.push(r.column_index); - offset_indexes.push(r.offset_index); #[cfg(feature = "bloom")] bloom_filters.push(r.bloom_filter); + column_indexes.push(r.column_index); + offset_indexes.push(r.offset_index); if let Some(rows) = *total_rows_written { if rows != r.rows_written { @@ -663,6 +668,7 @@ impl<'a, W: Write> PageWriter for SerializedPageWriter<'a, W> { Ok(spec) } + fn write_metadata(&mut self, metadata: &ColumnChunkMetaData) -> Result<()> { let mut protocol = TCompactOutputProtocol::new(&mut self.sink); metadata