diff --git a/Cargo.lock b/Cargo.lock index bcccfa1134..49e1e76e3e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -94,6 +94,7 @@ dependencies = [ "common_util", "datafusion 12.0.0", "env_logger", + "ethbloom", "futures 0.3.21", "lazy_static", "log", @@ -1908,6 +1909,19 @@ dependencies = [ "termcolor", ] +[[package]] +name = "ethbloom" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c22d4b5885b6aa2fe5e8b9329fb8d232bf739e434e6b87347c63bdd00c120f60" +dependencies = [ + "crunchy", + "fixed-hash", + "impl-rlp", + "impl-serde", + "tiny-keccak", +] + [[package]] name = "fail" version = "0.4.0" @@ -1985,6 +1999,18 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "fixed-hash" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "835c052cb0c08c1acf6ffd71c022172e18723949c8282f2b9f27efbc51e64534" +dependencies = [ + "byteorder", + "rand 0.8.5", + "rustc-hex", + "static_assertions 1.1.0", +] + [[package]] name = "fixedbitset" version = "0.4.2" @@ -2700,6 +2726,24 @@ dependencies = [ "unicode-normalization", ] +[[package]] +name = "impl-rlp" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f28220f89297a075ddc7245cd538076ee98b01f2a9c23a53a4f1105d5a322808" +dependencies = [ + "rlp", +] + +[[package]] +name = "impl-serde" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebc88fc67028ae3db0c853baa36269d398d5f45b6982f95549ff5def78c935cd" +dependencies = [ + "serde", +] + [[package]] name = "indexmap" version = "1.9.1" @@ -4967,6 +5011,16 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3582f63211428f83597b51b2ddb88e2a91a9d52d12831f9d08f5e624e8977422" +[[package]] +name = "rlp" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb919243f34364b6bd2fc10ef797edbfa75f33c252e7998527479c6d6b47e1ec" +dependencies = [ + "bytes 1.2.1", + "rustc-hex", +] + [[package]] name = "rocksdb" version = "0.3.0" @@ -5065,6 +5119,12 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" +[[package]] +name = "rustc-hex" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3e75f6a532d0fd9f7f13144f392b6ad56a32696bfcd9c78f797f16bbb6f072d6" + [[package]] name = "rustc-serialize" version = "0.3.24" diff --git a/Cargo.toml b/Cargo.toml index fb072ebf73..c9768be2d1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -69,6 +69,7 @@ common_types = { path = "common_types" } common_util = { path = "common_util" } df_operator = { path = "df_operator" } env_logger = "0.6" +ethbloom = "0.13.0" futures = "0.3" lazy_static = "1.4.0" log = "0.4" diff --git a/analytic_engine/Cargo.toml b/analytic_engine/Cargo.toml index 977acffc5f..de356d7b98 100644 --- a/analytic_engine/Cargo.toml +++ b/analytic_engine/Cargo.toml @@ -20,6 +20,7 @@ bytes = { workspace = true } common_types = { workspace = true } common_util = { workspace = true } datafusion = { workspace = true } +ethbloom = { workspace = true } futures = { workspace = true } lazy_static = { workspace = true } log = { workspace = true } diff --git a/analytic_engine/src/compaction/picker.rs b/analytic_engine/src/compaction/picker.rs index 309aa39b46..fddfb507f6 100644 --- a/analytic_engine/src/compaction/picker.rs +++ b/analytic_engine/src/compaction/picker.rs @@ -594,7 +594,6 @@ mod tests { file::SstMetaData, manager::{tests::LevelsControllerMockBuilder, LevelsController}, }, - table_options::StorageFormatOptions, }; fn build_sst_meta_data(time_range: TimeRange, size: u64) -> SstMetaData { @@ -606,7 +605,8 @@ mod tests { schema: build_schema(), size, row_num: 2, - storage_format_opts: StorageFormatOptions::default(), + storage_format_opts: Default::default(), + bloom_filter: Default::default(), } } diff --git a/analytic_engine/src/instance/flush_compaction.rs b/analytic_engine/src/instance/flush_compaction.rs index 828af1961c..dbd73c9487 100644 --- a/analytic_engine/src/instance/flush_compaction.rs +++ b/analytic_engine/src/instance/flush_compaction.rs @@ -620,6 +620,7 @@ impl Instance { storage_format_opts: StorageFormatOptions::new( table_data.table_options().storage_format, ), + bloom_filter: Default::default(), }; let store = self.space_store.clone(); @@ -725,6 +726,7 @@ impl Instance { size: 0, row_num: 0, storage_format_opts: StorageFormatOptions::new(table_data.storage_format()), + bloom_filter: Default::default(), }; // Alloc file id for next sst file diff --git a/analytic_engine/src/sst/file.rs b/analytic_engine/src/sst/file.rs index 961d0d8c1c..825913455d 100644 --- a/analytic_engine/src/sst/file.rs +++ b/analytic_engine/src/sst/file.rs @@ -27,6 +27,7 @@ use common_util::{ metric::Meter, runtime::{JoinHandle, Runtime}, }; +use ethbloom::Bloom; use log::{debug, error, info}; use object_store::ObjectStoreRef; use proto::{common as common_pb, sst as sst_pb}; @@ -56,6 +57,16 @@ pub enum Error { #[snafu(display("Storage format options are not found.\nBacktrace\n:{}", backtrace))] StorageFormatOptionsNotFound { backtrace: Backtrace }, + #[snafu(display("Bloom filter options are not found.\nBacktrace\n:{}", backtrace))] + BloomFilterNotFound { backtrace: Backtrace }, + + #[snafu(display( + "Bloom filter should be 256 byte, current:{}.\nBacktrace\n:{}", + size, + backtrace + ))] + InvalidBloomFilterSize { size: usize, backtrace: Backtrace }, + #[snafu(display("Failed to convert time range, err:{}", source))] ConvertTimeRange { source: common_types::time::Error }, @@ -425,6 +436,66 @@ impl FileMeta { } } +#[derive(Debug, Clone, PartialEq, Eq, Default)] +pub struct BloomFilter { + // Two level vector means + // 1. row group + // 2. column + filters: Vec>, +} + +impl BloomFilter { + pub fn new(filters: Vec>) -> Self { + Self { filters } + } +} + +impl From for sst_pb::SstBloomFilter { + fn from(bloom_filter: BloomFilter) -> Self { + let row_group_filters = bloom_filter + .filters + .iter() + .map(|row_group_filter| { + let column_filters = row_group_filter + .iter() + .map(|column_filter| column_filter.data().to_vec()) + .collect::>(); + sst_pb::sst_bloom_filter::RowGroupFilter { column_filters } + }) + .collect::>(); + + sst_pb::SstBloomFilter { row_group_filters } + } +} + +impl TryFrom for BloomFilter { + type Error = Error; + + fn try_from(src: sst_pb::SstBloomFilter) -> Result { + let filters = src + .row_group_filters + .into_iter() + .map(|row_group_filter| { + row_group_filter + .column_filters + .into_iter() + .map(|encoded_bytes| { + let size = encoded_bytes.len(); + let bs: [u8; 256] = encoded_bytes + .try_into() + .ok() + .context(InvalidBloomFilterSize { size })?; + + Ok(Bloom::from(bs)) + }) + .collect::>>() + }) + .collect::>>()?; + + Ok(BloomFilter { filters }) + } +} + /// Meta data of a sst file #[derive(Debug, Clone, PartialEq)] pub struct SstMetaData { @@ -440,6 +511,7 @@ pub struct SstMetaData { // total row number pub row_num: u64, pub storage_format_opts: StorageFormatOptions, + pub bloom_filter: BloomFilter, } impl SstMetaData { @@ -459,6 +531,7 @@ impl From for sst_pb::SstMetaData { size: src.size, row_num: src.row_num, storage_format_opts: Some(src.storage_format_opts.into()), + bloom_filter: Some(src.bloom_filter.into()), } } } @@ -479,6 +552,11 @@ impl TryFrom for SstMetaData { src.storage_format_opts .context(StorageFormatOptionsNotFound)?, ); + let bloom_filter = { + let pb_filter = src.bloom_filter.context(BloomFilterNotFound)?; + BloomFilter::try_from(pb_filter)? + }; + Ok(Self { min_key: src.min_key.into(), max_key: src.max_key.into(), @@ -488,6 +566,7 @@ impl TryFrom for SstMetaData { size: src.size, row_num: src.row_num, storage_format_opts, + bloom_filter, }) } } @@ -670,6 +749,8 @@ pub fn merge_sst_meta(files: &[FileHandle], schema: Schema) -> SstMetaData { size: 0, row_num: 0, storage_format_opts: StorageFormatOptions::new(storage_format), + // bloom filter is rebuilt when write sst, so use default here + bloom_filter: Default::default(), } } @@ -723,9 +804,10 @@ pub mod tests { time_range: self.time_range, max_sequence: self.max_sequence, schema: self.schema.clone(), - size: 0, row_num: 0, - storage_format_opts: StorageFormatOptions::default(), + size: 0, + storage_format_opts: Default::default(), + bloom_filter: Default::default(), } } } diff --git a/analytic_engine/src/sst/parquet/builder.rs b/analytic_engine/src/sst/parquet/builder.rs index edeeaacf8c..33831fb51b 100644 --- a/analytic_engine/src/sst/parquet/builder.rs +++ b/analytic_engine/src/sst/parquet/builder.rs @@ -2,14 +2,18 @@ //! Sst builder implementation based on parquet. -use std::sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, +use std::{ + collections::VecDeque, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, + }, }; use async_trait::async_trait; -use common_types::request_id::RequestId; +use common_types::{record_batch::RecordBatchWithKey, request_id::RequestId}; use datafusion::parquet::basic::Compression; +use ethbloom::{Bloom, Input}; use futures::StreamExt; use log::debug; use object_store::{ObjectStoreRef, Path}; @@ -18,7 +22,7 @@ use snafu::ResultExt; use crate::sst::{ builder::{RecordBatchStream, SstBuilder, *}, factory::SstBuilderOptions, - file::SstMetaData, + file::{BloomFilter, SstMetaData}, parquet::encoding::ParquetEncoder, }; @@ -54,23 +58,20 @@ struct RecordBytesReader { compression: Compression, meta_data: SstMetaData, total_row_num: Arc, - - fetched_row_num: usize, + // Record batch partitioned by exactly given `num_rows_per_row_group` + // There may be more than one `RecordBatchWithKey` inside each partition + partitioned_record_batch: Vec>, } impl RecordBytesReader { - async fn read_all(mut self) -> Result> { - let mut arrow_record_batch_vec = Vec::new(); + // Partition record batch stream into batch vector with exactly given + // `num_rows_per_row_group` + async fn partition_record_batch(&mut self) -> Result<()> { + let mut fetched_row_num = 0; + let mut pending_record_batch: VecDeque = Default::default(); + let mut current_batch = Vec::new(); + let mut remaining = self.num_rows_per_row_group; // how many records are left for current_batch - let mut parquet_encoder = ParquetEncoder::try_new( - self.num_rows_per_row_group, - self.compression, - self.meta_data, - ) - .map_err(|e| Box::new(e) as _) - .context(EncodeRecordBatch)?; - - // process record batch stream while let Some(record_batch) = self.record_stream.next().await { let record_batch = record_batch.context(PollRecordBatch)?; @@ -80,28 +81,120 @@ impl RecordBytesReader { self.request_id ); - self.fetched_row_num += record_batch.num_rows(); - arrow_record_batch_vec.push(record_batch.into_record_batch().into_arrow_record_batch()); - - if self.fetched_row_num >= self.num_rows_per_row_group { - let buf_len = arrow_record_batch_vec.len(); - self.fetched_row_num = 0; - let row_num = parquet_encoder - .encode_record_batch(arrow_record_batch_vec) - .map_err(|e| Box::new(e) as _) - .context(EncodeRecordBatch)?; - arrow_record_batch_vec = Vec::with_capacity(buf_len); - self.total_row_num.fetch_add(row_num, Ordering::Relaxed); + fetched_row_num += record_batch.num_rows(); + pending_record_batch.push_back(record_batch); + + // reach batch limit, append to self and reset counter and pending batch + // Note: pending_record_batch may contains multiple batches + while fetched_row_num >= self.num_rows_per_row_group { + match pending_record_batch.pop_front() { + // accumulated records is enough for one batch + Some(next) if next.num_rows() >= remaining => { + debug!( + "enough target:{}, remaining:{}, fetched:{}, current:{}", + self.num_rows_per_row_group, + remaining, + fetched_row_num, + next.num_rows(), + ); + current_batch.push(next.slice(0, remaining)); + let left = next.num_rows() - remaining; + if left > 0 { + pending_record_batch.push_front(next.slice(remaining, left)); + } + + self.partitioned_record_batch + .push(std::mem::take(&mut current_batch)); + fetched_row_num -= self.num_rows_per_row_group; + remaining = self.num_rows_per_row_group; + } + // not enough for one batch + Some(next) => { + debug!( + "not enough target:{}, remaining:{}, fetched:{}, current:{}", + self.num_rows_per_row_group, + remaining, + fetched_row_num, + next.num_rows(), + ); + remaining -= next.num_rows(); + current_batch.push(next); + } + // nothing left, put back to pending_record_batch + _ => { + for records in std::mem::take(&mut current_batch) { + pending_record_batch.push_front(records); + } + break; + } + } } } - // final check if there is any record batch left - if self.fetched_row_num != 0 { + // collect remaining records into one batch + let mut remaining = Vec::with_capacity(pending_record_batch.len()); + while let Some(batch) = pending_record_batch.pop_front() { + remaining.push(batch); + } + if !remaining.is_empty() { + self.partitioned_record_batch.push(remaining); + } + + Ok(()) + } + + fn build_bloom_filter(&self) -> BloomFilter { + let filters = self + .partitioned_record_batch + .iter() + .map(|row_group_batch| { + let mut row_group_filters = + vec![Bloom::default(); row_group_batch[0].num_columns()]; + + for partial_batch in row_group_batch { + for (col_idx, column) in partial_batch.columns().iter().enumerate() { + for row in 0..column.num_rows() { + let datum = column.datum(row); + let bytes = datum.to_bytes(); + row_group_filters[col_idx].accrue(Input::Raw(&bytes)); + } + } + } + + row_group_filters + }) + .collect::>(); + + BloomFilter::new(filters) + } + + async fn read_all(mut self) -> Result> { + self.partition_record_batch().await?; + let filters = self.build_bloom_filter(); + self.meta_data.bloom_filter = filters; + + let mut parquet_encoder = ParquetEncoder::try_new( + self.num_rows_per_row_group, + self.compression, + self.meta_data, + ) + .map_err(|e| Box::new(e) as _) + .context(EncodeRecordBatch)?; + + // process record batch stream + let mut arrow_record_batch_vec = Vec::new(); + for record_batches in self.partitioned_record_batch { + for batch in record_batches { + arrow_record_batch_vec.push(batch.into_record_batch().into_arrow_record_batch()); + } + + let buf_len = arrow_record_batch_vec.len(); let row_num = parquet_encoder .encode_record_batch(arrow_record_batch_vec) .map_err(|e| Box::new(e) as _) .context(EncodeRecordBatch)?; self.total_row_num.fetch_add(row_num, Ordering::Relaxed); + arrow_record_batch_vec = Vec::with_capacity(buf_len); } let bytes = parquet_encoder @@ -134,7 +227,7 @@ impl<'a> SstBuilder for ParquetSstBuilder<'a> { total_row_num: total_row_num.clone(), // TODO(xikai): should we avoid this clone? meta_data: meta.to_owned(), - fetched_row_num: 0, + partitioned_record_batch: Default::default(), }; let bytes = reader.read_all().await?; self.storage @@ -162,7 +255,10 @@ mod tests { tests::{build_row, build_schema}, time::{TimeRange, Timestamp}, }; - use common_util::runtime::{self, Runtime}; + use common_util::{ + runtime::{self, Runtime}, + tests::init_log_for_test, + }; use futures::stream; use object_store::LocalFileSystem; use table_engine::predicate::Predicate; @@ -176,13 +272,15 @@ mod tests { parquet::{reader::ParquetSstReader, AsyncParquetReader}, reader::{tests::check_stream, SstReader}, }, - table_options::{self, StorageFormatOptions}, + table_options, }; // TODO(xikai): add test for reverse reader #[test] fn test_parquet_build_and_read() { + init_log_for_test(); + let runtime = Arc::new(runtime::Builder::default().build().unwrap()); parquet_write_and_then_read_back(runtime.clone(), 3, vec![3, 3, 3, 3, 3]); parquet_write_and_then_read_back(runtime.clone(), 4, vec![4, 4, 4, 3]); @@ -237,7 +335,8 @@ mod tests { schema: schema.clone(), size: 10, row_num: 2, - storage_format_opts: StorageFormatOptions::default(), + storage_format_opts: Default::default(), + bloom_filter: Default::default(), }; let mut counter = 10; @@ -286,13 +385,16 @@ mod tests { let mut reader: Box = if async_reader { let mut reader = AsyncParquetReader::new(&sst_file_path, &store, &sst_reader_options); - let sst_meta_readback = { + let mut sst_meta_readback = { // FIXME: size of SstMetaData is not what this file's size, so overwrite it // https://github.com/CeresDB/ceresdb/issues/321 let mut meta = reader.meta_data().await.unwrap().clone(); meta.size = sst_meta.size; meta }; + // bloom filter is built insider sst writer, so overwrite to default for + // comparsion + sst_meta_readback.bloom_filter = Default::default(); assert_eq!(&sst_meta_readback, &sst_meta); assert_eq!( expected_num_rows, @@ -307,7 +409,15 @@ mod tests { Box::new(reader) } else { let mut reader = ParquetSstReader::new(&sst_file_path, &store, &sst_reader_options); - assert_eq!(reader.meta_data().await.unwrap(), &sst_meta); + let sst_meta_readback = { + let mut meta = reader.meta_data().await.unwrap().clone(); + // bloom filter is built insider sst writer, so overwrite to default for + // comparsion + meta.bloom_filter = Default::default(); + meta + }; + + assert_eq!(&sst_meta_readback, &sst_meta); assert_eq!( expected_num_rows, reader @@ -331,4 +441,78 @@ mod tests { check_stream(&mut stream, expect_rows).await; }); } + + #[tokio::test] + async fn test_partition_record_batch() { + // rows per group: 10 + let testcases = vec![ + // input, expected + (vec![], vec![]), + (vec![10, 10], vec![10, 10]), + (vec![10, 10, 1], vec![10, 10, 1]), + (vec![10, 10, 21], vec![10, 10, 10, 10, 1]), + (vec![5, 6, 10], vec![10, 10, 1]), + (vec![5, 4, 4, 30], vec![10, 10, 10, 10, 3]), + (vec![20, 7, 23, 20], vec![10, 10, 10, 10, 10, 10, 10]), + (vec![21], vec![10, 10, 1]), + ]; + + for (input, expected) in testcases { + test_partition_record_batch_inner(input, expected).await; + } + } + + async fn test_partition_record_batch_inner( + input_row_nums: Vec, + expected_row_nums: Vec, + ) { + init_log_for_test(); + let schema = build_schema(); + let mut poll_cnt = 0; + let schema_clone = schema.clone(); + let record_batch_stream = Box::new(stream::poll_fn(move |_ctx| -> Poll> { + if poll_cnt == input_row_nums.len() { + return Poll::Ready(None); + } + + let rows = (0..input_row_nums[poll_cnt]) + .map(|_| build_row(b"a", 100, 10.0, "v4")) + .collect::>(); + + let batch = build_record_batch_with_key(schema_clone.clone(), rows); + poll_cnt += 1; + + Poll::Ready(Some(Ok(batch))) + })); + + let mut reader = RecordBytesReader { + request_id: RequestId::next_id(), + record_stream: record_batch_stream, + num_rows_per_row_group: 10, + compression: Compression::UNCOMPRESSED, + meta_data: SstMetaData { + min_key: Default::default(), + max_key: Default::default(), + time_range: Default::default(), + max_sequence: 1, + schema, + size: 0, + row_num: 0, + storage_format_opts: Default::default(), + bloom_filter: Default::default(), + }, + total_row_num: Arc::new(AtomicUsize::new(0)), + partitioned_record_batch: Vec::new(), + }; + + reader.partition_record_batch().await.unwrap(); + + for (i, expected_row_num) in expected_row_nums.into_iter().enumerate() { + let actual: usize = reader.partitioned_record_batch[i] + .iter() + .map(|b| b.num_rows()) + .sum(); + assert_eq!(expected_row_num, actual); + } + } } diff --git a/analytic_engine/src/sst/parquet/encoding.rs b/analytic_engine/src/sst/parquet/encoding.rs index 43b6955212..43688ae3ce 100644 --- a/analytic_engine/src/sst/parquet/encoding.rs +++ b/analytic_engine/src/sst/parquet/encoding.rs @@ -903,6 +903,7 @@ mod tests { size: 10, row_num: 4, storage_format_opts, + bloom_filter: Default::default(), }; let mut encoder = HybridRecordEncoder::try_new(100, Compression::ZSTD, meta_data.clone()).unwrap(); diff --git a/analytic_engine/src/table/version_edit.rs b/analytic_engine/src/table/version_edit.rs index 6eece4799e..c0a10071aa 100644 --- a/analytic_engine/src/table/version_edit.rs +++ b/analytic_engine/src/table/version_edit.rs @@ -100,6 +100,7 @@ impl TryFrom for AddFile { size: src.size, row_num: src.row_num, storage_format_opts: StorageFormatOptions::new(storage_format.into()), + bloom_filter: Default::default(), }, }, }; diff --git a/common_types/src/datum.rs b/common_types/src/datum.rs index 50ffbfdaab..32b669616d 100644 --- a/common_types/src/datum.rs +++ b/common_types/src/datum.rs @@ -484,6 +484,32 @@ impl Datum { } } + pub fn to_bytes(&self) -> Vec { + match self { + Datum::Double(v) => v.to_le_bytes().to_vec(), + Datum::Float(v) => v.to_le_bytes().to_vec(), + Datum::UInt64(v) => v.to_le_bytes().to_vec(), + Datum::UInt32(v) => v.to_le_bytes().to_vec(), + Datum::UInt16(v) => v.to_le_bytes().to_vec(), + Datum::UInt8(v) => v.to_le_bytes().to_vec(), + Datum::Int64(v) => v.to_le_bytes().to_vec(), + Datum::Int32(v) => v.to_le_bytes().to_vec(), + Datum::Int16(v) => v.to_le_bytes().to_vec(), + Datum::Int8(v) => v.to_le_bytes().to_vec(), + Datum::Boolean(v) => { + if *v { + vec![1] + } else { + vec![0] + } + } + Datum::Null => vec![0], + Datum::Timestamp(ts) => ts.as_i64().to_le_bytes().to_vec(), + Datum::Varbinary(b) => b.to_vec(), + Datum::String(string) => string.as_bytes().to_vec(), + } + } + /// Generate a negative datum if possible. /// /// It will return `None` if: diff --git a/proto/protos/sst.proto b/proto/protos/sst.proto index d29cae4760..32b136e447 100644 --- a/proto/protos/sst.proto +++ b/proto/protos/sst.proto @@ -7,6 +7,14 @@ package sst; import "common.proto"; import "analytic_common.proto"; +message SstBloomFilter { + message RowGroupFilter { + repeated bytes column_filters = 1; + }; + + repeated RowGroupFilter row_group_filters = 1; +} + message SstMetaData { // Min key in the sst bytes min_key = 1; @@ -20,4 +28,5 @@ message SstMetaData { uint64 size = 6; uint64 row_num = 7; analytic_common.StorageFormatOptions storage_format_opts = 8; + SstBloomFilter bloom_filter = 9; }