Skip to content

Commit

Permalink
encode writing
Browse files Browse the repository at this point in the history
  • Loading branch information
jimexist committed Nov 15, 2022
1 parent 9b8a0f5 commit 5d76248
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 5 deletions.
26 changes: 26 additions & 0 deletions parquet/src/bloom_filter/mod.rs
Expand Up @@ -24,9 +24,11 @@ use crate::file::metadata::ColumnChunkMetaData;
use crate::file::reader::ChunkReader;
use crate::format::{
BloomFilterAlgorithm, BloomFilterCompression, BloomFilterHash, BloomFilterHeader,
SplitBlockAlgorithm, Uncompressed, XxHash,
};
use bytes::{Buf, Bytes};
use std::hash::Hasher;
use std::io::Write;
use std::sync::Arc;
use thrift::protocol::{TCompactInputProtocol, TSerializable};
use twox_hash::XxHash64;
Expand Down Expand Up @@ -129,6 +131,30 @@ impl Sbbf {
Self(data)
}

pub fn write_bitset<W: Write>(&self, mut writer: W) -> Result<(), ParquetError> {
for block in &self.0 {
for word in block {
writer.write_all(&word.to_le_bytes()).map_err(|e| {
ParquetError::General(format!(
"Could not write bloom filter bit set: {}",
e
))
})?;
}
}
Ok(())
}

pub fn header(&self) -> BloomFilterHeader {
BloomFilterHeader {
// 8 i32 per block, 4 bytes per i32
num_bytes: self.0.len() as i32 * 4 * 8,
algorithm: BloomFilterAlgorithm::BLOCK(SplitBlockAlgorithm {}),
hash: BloomFilterHash::XXHASH(XxHash {}),
compression: BloomFilterCompression::UNCOMPRESSED(Uncompressed {}),
}
}

pub fn read_from_column_chunk<R: ChunkReader>(
column_metadata: &ColumnChunkMetaData,
reader: Arc<R>,
Expand Down
16 changes: 11 additions & 5 deletions parquet/src/file/writer.rs
Expand Up @@ -230,11 +230,16 @@ impl<W: Write> SerializedFileWriter<W> {
Some(bloom_filter) => {
let start_offset = self.buf.bytes_written();
let mut protocol = TCompactOutputProtocol::new(&mut self.buf);
bloom_filter.write_to_out_protocol(&mut protocol)?;
let header = bloom_filter.header();
header.write_to_out_protocol(&mut protocol)?;
protocol.flush()?;
let end_offset = self.buf.bytes_written();
bloom_filter.write_bitset(&mut self.buf)?;
// set offset and index for bloom filter
column_metadata.bloom_filter_offset = Some(start_offset as i64);
column_metadata
.meta_data
.as_mut()
.expect("can't have bloom filter without column metadata")
.bloom_filter_offset = Some(start_offset as i64);
}
None => {}
}
Expand Down Expand Up @@ -424,10 +429,10 @@ impl<'a, W: Write> SerializedRowGroupWriter<'a, W> {
// Update row group writer metrics
*total_bytes_written += r.bytes_written;
column_chunks.push(r.metadata);
column_indexes.push(r.column_index);
offset_indexes.push(r.offset_index);
#[cfg(feature = "bloom")]
bloom_filters.push(r.bloom_filter);
column_indexes.push(r.column_index);
offset_indexes.push(r.offset_index);

if let Some(rows) = *total_rows_written {
if rows != r.rows_written {
Expand Down Expand Up @@ -663,6 +668,7 @@ impl<'a, W: Write> PageWriter for SerializedPageWriter<'a, W> {

Ok(spec)
}

fn write_metadata(&mut self, metadata: &ColumnChunkMetaData) -> Result<()> {
let mut protocol = TCompactOutputProtocol::new(&mut self.sink);
metadata
Expand Down

0 comments on commit 5d76248

Please sign in to comment.