Skip to content

Commit d460b77

Browse files
feat: Predicate Filtering via ORC Bloom Filters (#72)
* feat: Add Bloom filter support for row group filtering * add case * feat: Enhance Bloom filter generation with additional columns and data types * taplo format * feat: Refactor bloom_value_bytes to use Cow for improved memory efficiency * fix
1 parent 0761dc6 commit d460b77

File tree

9 files changed

+617
-4
lines changed

9 files changed

+617
-4
lines changed

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,10 @@ chrono = { version = "0.4.41", default-features = false, features = ["std"] }
3939
chrono-tz = "0.10"
4040
fallible-streaming-iterator = "0.1"
4141
flate2 = "1"
42+
log = "0.4"
4243
lz4_flex = "0.11"
4344
lzokay-native = "0.1"
45+
murmur3 = "0.5"
4446
num = "0.4.1"
4547
prost = "0.13"
4648
snafu = "0.8"
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
#
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
#
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
"""
19+
Generate a small ORC file that contains Bloom filters for regression tests.
20+
21+
The generated file is written to:
22+
tests/integration/data/bloom_filter.orc
23+
24+
Dependencies:
25+
pip install pyorc
26+
27+
Usage:
28+
python scripts/generate_orc_with_bloom_filter.py
29+
"""
30+
31+
from pathlib import Path
32+
33+
import pyorc
34+
from datetime import date
35+
from decimal import Decimal
36+
37+
OUT_PATH = Path(__file__).parent.parent / "tests" / "integration" / "data" / "bloom_filter.orc"
38+
39+
40+
EXTRA_ROWS = 200
41+
42+
43+
def main() -> None:
44+
OUT_PATH.parent.mkdir(parents=True, exist_ok=True)
45+
46+
schema = (
47+
"struct<id:int,name:string,score:double,event_date:date,flag:boolean,data:binary,dec:decimal(10,2)>"
48+
)
49+
# Non-contiguous values make some predicates fall within min/max but absent
50+
# from the data (to exercise Bloom pruning). Include multiple rows to cover
51+
# richer value ranges.
52+
base_rows = [
53+
(1, "alpha", 1.0, (2023, 1, 1), True, b"\x01", Decimal("1.11")),
54+
(3, "gamma", 3.0, (2023, 1, 3), False, b"\x03", Decimal("3.33")),
55+
(5, "delta", 5.0, (2023, 1, 5), True, b"\x05", Decimal("5.55")),
56+
(10, "epsilon", 10.0, (2023, 1, 10), False, b"\x0a", Decimal("10.10")),
57+
]
58+
59+
# Add many more rows to create multiple row groups and stripes.
60+
# We deliberately skip certain even values (id=2, date=2023-01-02, binary 0x02, decimal 2.22)
61+
# so predicates for those values must rely on Bloom filters to prune.
62+
extra_rows = []
63+
day_choices = [1, 3, 4, 5, 6, 7] # exclude day=2
64+
for i in range(EXTRA_ROWS):
65+
id_v = 101 + i * 2 # odd ids; still keeps id=2 absent
66+
name_v = f"name_{i}"
67+
score_v = float(id_v)
68+
day = day_choices[i % len(day_choices)]
69+
event_date = (2023, 1, day)
70+
flag = i % 2 == 0
71+
data = bytes([((i * 2) + 1) % 256]) # avoid byte 0x02
72+
dec = Decimal(f"{id_v}.01")
73+
extra_rows.append((id_v, name_v, score_v, event_date, flag, data, dec))
74+
75+
rows = base_rows + extra_rows
76+
77+
# Enable Bloom filters for all columns with a small false positive probability.
78+
with OUT_PATH.open("wb") as f:
79+
writer = pyorc.Writer(
80+
f,
81+
schema,
82+
bloom_filter_columns=[
83+
"id",
84+
"name",
85+
"score",
86+
"event_date",
87+
"flag",
88+
"data",
89+
"dec",
90+
],
91+
bloom_filter_fpp=0.01,
92+
stripe_size=1024,
93+
)
94+
for id_v, name_v, score_v, (y, m, d), flag, data, dec in rows:
95+
writer.write((id_v, name_v, score_v, date(y, m, d), flag, data, dec))
96+
writer.close()
97+
98+
print(f"Wrote ORC file with bloom filters to {OUT_PATH}")
99+
100+
101+
if __name__ == "__main__":
102+
main()

src/bloom_filter.rs

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
//! ORC Bloom filter decoding and evaluation.
19+
//!
20+
//! This follows the ORC v1 spec (https://orc.apache.org/specification/ORCv1/):
21+
//! - Stream kinds `BLOOM_FILTER` / `BLOOM_FILTER_UTF8` provide per-row-group filters.
22+
//! - Bits are set using Murmur3 x64_128 with seed 0, deriving h1/h2 and the
23+
//! double-hash sequence `h1 + i*h2 (mod m)` for `numHashFunctions`.
24+
//! - A cleared bit means the value is **definitely absent**; set bits mean
25+
//! **possible presence** (false positives allowed).
26+
//!
27+
//! Bloom filters are attached to row groups and can quickly rule out equality
28+
//! predicates (e.g. `col = 'abc'`) before any data decoding.
29+
30+
use murmur3::murmur3_x64_128;
31+
32+
use crate::proto;
33+
34+
/// A Bloom filter parsed from the ORC index stream.
35+
#[derive(Debug, Clone)]
36+
pub struct BloomFilter {
37+
num_hash_functions: u32,
38+
bitset: Vec<u64>,
39+
}
40+
41+
impl BloomFilter {
42+
/// Create a Bloom filter from a decoded protobuf value.
43+
pub fn try_from_proto(proto: &proto::BloomFilter) -> Option<Self> {
44+
// Ensure only one of bitset / utf8bitset is populated
45+
assert!(
46+
proto.bitset.is_empty() || proto.utf8bitset.is_none(),
47+
"Bloom filter proto has both bitset and utf8bitset populated"
48+
);
49+
50+
let num_hash_functions = proto.num_hash_functions();
51+
if proto.bitset.is_empty() && proto.utf8bitset.is_none() {
52+
return None;
53+
}
54+
55+
let bitset = if !proto.bitset.is_empty() {
56+
proto.bitset.clone()
57+
} else {
58+
// utf8bitset is encoded as bytes; convert to u64 words (little-endian)
59+
proto
60+
.utf8bitset
61+
.as_ref()
62+
.map(|bytes| {
63+
bytes
64+
.chunks(8)
65+
.map(|chunk| {
66+
let mut padded = [0u8; 8];
67+
for (idx, value) in chunk.iter().enumerate() {
68+
padded[idx] = *value;
69+
}
70+
u64::from_le_bytes(padded)
71+
})
72+
.collect::<Vec<_>>()
73+
})
74+
.unwrap_or_default()
75+
};
76+
77+
Some(Self {
78+
num_hash_functions: if num_hash_functions == 0 {
79+
// Writers are expected to set this, but default to a safe value
80+
3
81+
} else {
82+
num_hash_functions
83+
},
84+
bitset,
85+
})
86+
}
87+
88+
#[cfg(test)]
89+
/// Create a Bloom filter from raw parts (mainly for tests)
90+
pub fn from_parts(num_hash_functions: u32, bitset: Vec<u64>) -> Self {
91+
Self {
92+
num_hash_functions: num_hash_functions.max(1),
93+
bitset,
94+
}
95+
}
96+
97+
/// Returns true if the value *might* be contained. False means *definitely not*.
98+
pub fn might_contain(&self, value: &[u8]) -> bool {
99+
let bit_count = self.bitset.len() * 64;
100+
if bit_count == 0 {
101+
// Defensive: no bits means we cannot use the filter
102+
return true;
103+
}
104+
105+
let hash = self.hash128(value);
106+
let h1 = hash as u64;
107+
let h2 = (hash >> 64) as u64;
108+
109+
for i in 0..self.num_hash_functions {
110+
// ORC uses the standard double-hash scheme: h1 + i*h2 (mod m)
111+
let combined = h1.wrapping_add((i as u64).wrapping_mul(h2));
112+
let bit_idx = (combined % (bit_count as u64)) as usize;
113+
if !self.test_bit(bit_idx) {
114+
return false;
115+
}
116+
}
117+
118+
true
119+
}
120+
121+
fn hash128(&self, value: &[u8]) -> u128 {
122+
// The ORC specification uses Murmur3 (64-bit) for bloom filters.
123+
// murmur3_x64_128 matches the Java reference implementation, where
124+
// the lower 64 bits are treated as h1 and the upper 64 bits as h2.
125+
let mut cursor = std::io::Cursor::new(value);
126+
murmur3_x64_128(&mut cursor, 0).unwrap_or(0)
127+
}
128+
129+
fn test_bit(&self, bit_idx: usize) -> bool {
130+
let word = bit_idx / 64;
131+
let bit = bit_idx % 64;
132+
if let Some(bits) = self.bitset.get(word) {
133+
(bits & (1u64 << bit)) != 0
134+
} else {
135+
false
136+
}
137+
}
138+
}
139+
140+
#[cfg(test)]
141+
mod tests {
142+
use super::*;
143+
144+
fn build_filter(values: &[&[u8]], bitset_words: usize, hash_funcs: u32) -> BloomFilter {
145+
let mut bitset = vec![0u64; bitset_words];
146+
let bit_count = bitset_words * 64;
147+
148+
for value in values {
149+
let mut cursor = std::io::Cursor::new(*value);
150+
let hash = murmur3_x64_128(&mut cursor, 0).unwrap();
151+
let h1 = hash as u64;
152+
let h2 = (hash >> 64) as u64;
153+
for i in 0..hash_funcs {
154+
let combined = h1.wrapping_add((i as u64).wrapping_mul(h2));
155+
let bit_idx = (combined % (bit_count as u64)) as usize;
156+
bitset[bit_idx / 64] |= 1u64 << (bit_idx % 64);
157+
}
158+
}
159+
160+
BloomFilter::from_parts(hash_funcs, bitset)
161+
}
162+
163+
#[test]
164+
fn test_bloom_filter_hit_and_miss() {
165+
let filter = build_filter(&[b"abc", b"def"], 2, 3);
166+
167+
assert!(filter.might_contain(b"abc"));
168+
assert!(!filter.might_contain(b"xyz"));
169+
}
170+
171+
#[test]
172+
fn test_try_from_proto_utf8_bitset() {
173+
let filter = build_filter(&[b"foo"], 1, 2);
174+
175+
let proto = proto::BloomFilter {
176+
num_hash_functions: Some(filter.num_hash_functions),
177+
bitset: vec![],
178+
utf8bitset: Some(filter.bitset.iter().flat_map(|w| w.to_le_bytes()).collect()),
179+
};
180+
181+
let decoded = BloomFilter::try_from_proto(&proto).unwrap();
182+
assert!(decoded.might_contain(b"foo"));
183+
assert!(!decoded.might_contain(b"bar"));
184+
}
185+
}

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ pub mod arrow_reader;
5151
pub mod arrow_writer;
5252
#[cfg(feature = "async")]
5353
pub mod async_arrow_reader;
54+
mod bloom_filter;
5455
mod column;
5556
pub mod compression;
5657
#[allow(dead_code)]

0 commit comments

Comments
 (0)