Closed dai-chen closed 8 months ago
Basic idea is to reuse Spark's BloomFilterAggregate
. For some reason, Spark doesn't register it to built-in function repository and it only accepts expected item number and size (cannot specify FPP instead).
HTTP logs data set for test. There are 1045 files and clientip
column cardinality is ~50K - 200K:
SELECT input_file_name(), approx_count_distinct(clientip) FROM ds_tables.http_logs GROUP BY input_file_name();
s3://httplogs/year=1998/month=6/day=10/part-00451-76cb51e0-1b8f-41ea-8bfd-e77261483002.c000.json.bz2 160818
s3://httplogs/year=1998/month=6/day=11/part-00631-76cb51e0-1b8f-41ea-8bfd-e77261483002.c000.json.bz2 163687
s3://httplogs/year=1998/month=6/day=10/part-00386-76cb51e0-1b8f-41ea-8bfd-e77261483002.c000.json.bz2 143661
s3://httplogs/year=1998/month=6/day=11/part-00544-76cb51e0-1b8f-41ea-8bfd-e77261483002.c000.json.bz2 158091
s3://httplogs/year=1998/month=6/day=11/part-00616-76cb51e0-1b8f-41ea-8bfd-e77261483002.c000.json.bz2 173761
s3://httplogs/year=1998/month=6/day=10/part-00434-76cb51e0-1b8f-41ea-8bfd-e77261483002.c000.json.bz2 138823
s3://httplogs/year=1998/month=6/day=11/part-00646-76cb51e0-1b8f-41ea-8bfd-e77261483002.c000.json.bz2 176312
s3://httplogs/year=1998/month=5/day=27/part-00198-76cb51e0-1b8f-41ea-8bfd-e77261483002.c000.json.bz2 17872
s3://httplogs/year=1998/month=6/day=12/part-00725-76cb51e0-1b8f-41ea-8bfd-e77261483002.c000.json.bz2 136647
s3://httplogs/year=1998/month=6/day=11/part-00602-76cb51e0-1b8f-41ea-8bfd-e77261483002.c000.json.bz2 147839
s3://httplogs/year=1998/month=6/day=12/part-00852-76cb51e0-1b8f-41ea-8bfd-e77261483002.c000.json.bz2 162665
s3://httplogs/year=1998/month=5/day=25/part-00175-76cb51e0-1b8f-41ea-8bfd-e77261483002.c000.json.bz2 17109
s3://httplogs/year=1998/month=6/day=12/part-00770-76cb51e0-1b8f-41ea-8bfd-e77261483002.c000.json.bz2 127598
s3://httplogs/year=1998/month=6/day=11/part-00612-76cb51e0-1b8f-41ea-8bfd-e77261483002.c000.json.bz2 177900
s3://httplogs/year=1998/month=6/day=12/part-00828-76cb51e0-1b8f-41ea-8bfd-e77261483002.c000.json.bz2 157072
...
Use UDF to convert IP to Long. Test BloomFilter with 100K expected items and default 0.03 FPP:
PUT bf_ip_test
{
"mappings": {
"properties": {
"file_path": {
"type": "keyword"
},
"clientip_bloom_filter": {
"type": "binary"
}
}
}
}
import org.apache.spark.sql.catalyst.FunctionIdentifier
import org.apache.spark.sql.catalyst.expressions._
import org.opensearch.flint.spark.skipping.bloomfilter.BloomFilterAggregate
val funcId_bloom_filter_agg = new FunctionIdentifier("bloom_filter_agg")
val funcId_might_contain = new FunctionIdentifier("might_contain")
// Register 'bloom_filter_agg' to builtin.
spark.sessionState.functionRegistry.registerFunction(funcId_bloom_filter_agg,
new ExpressionInfo(classOf[BloomFilterAggregate].getName, "bloom_filter_agg"),
(children: Seq[Expression]) => children.size match {
case 1 => new BloomFilterAggregate(children.head)
case 2 => new BloomFilterAggregate(children.head, children(1))
case 3 => new BloomFilterAggregate(children.head, children(1), children(2))
})
import org.opensearch.flint.spark.skipping.bloomfilter.BloomFilterMightContain
// Register 'might_contain' to builtin.
spark.sessionState.functionRegistry.registerFunction(funcId_might_contain,
new ExpressionInfo(classOf[BloomFilterMightContain].getName, "might_contain"),
(children: Seq[Expression]) => BloomFilterMightContain(children.head, children(1)))
val ip_to_num = udf((ip: String) => {
val arr = ip.split('.').map(_.toLong)
require(arr.length == 4, s"Invalid IPv4: ${ip}")
arr(0) << 24 | arr(1) << 16 | arr(2) << 8 | arr(3)
})
spark.udf.register("ip_to_num", ip_to_num)
spark.sql("SELECT input_file_name() AS file_path, bloom_filter_agg(ip_to_num(clientip), 100000L) AS clientip_bloom_filter FROM ds_tables.http_logs GROUP BY input_file_name()").write.format("flint").mode("overwrite").save("bf_ip_test_3")
spark.conf.set("spark.sql.codegen.wholeStage", "false")
spark.sql("SELECT clientip, COUNT(*) AS file_count FROM (SELECT DISTINCT clientip, input_file_name() FROM ds_tables.http_logs) GROUP BY clientip ORDER BY file_count ASC LIMIT 5").show
+-----------+----------+
| clientip|file_count|
+-----------+----------+
| 6.100.18.0| 1|
|139.127.0.0| 1|
| 77.204.4.0| 1|
|205.132.4.0| 1|
|79.176.13.0| 1|
+-----------+----------+
spark.read.format("flint").load("bf_ip_test_3").filter(expr(s"might_contain(clientip_bloom_filter, ip_to_num('6.100.18.0'))")).select("file_path").count
res18: Long = 19
Bloom filter size ~0.1MB per file (BF with 1000K takes 0.6MB but Flint data source has problem with long binary):
health status index uuid pri rep docs.count docs.deleted store.size pri.store.size
green open bf_ip_test_3 O1DESZ9TQASrmJMlA0L2bA 5 2 1045 0 349.8mb 116.6mb
Tested BF with 1000K expected item:
// Reduce scroll page size to avoid overflow 100MB response limit
spark.conf.set("spark.datasource.flint.read.scroll_size", 10)
spark.read.format("flint").load("bf_ip_test_2").filter(expr(s"might_contain(clientip_bloom_filter, ip_to_num('6.100.18.0'))")).select("file_path").count
res9: Long = 1
By default, docValue is disabled for OpenSearch binary field. In this PoC, we create a new index with docValue enabled:
PUT bf_ip_test_4
{
"mappings": {
"properties": {
"file_path": {
"type": "keyword"
},
"clientip_bloom_filter": {
"type": "binary",
"doc_values": true
}
}
}
}
scala> spark.sql("SELECT input_file_name() AS file_path, bloom_filter_agg(ip_to_num(clientip), 1000000L) AS clientip_bloom_filter FROM ds_tables.http_logs GROUP BY input_file_name()").write.format("flint").mode("overwrite").save("bf_ip_test_4")
The index size for BF 1M expected item increase to 1.6GB (1045 docs in total => ~1.6MB BF per file)
green open bf_ip_test_4 NQwe5hqGQqiilHR_pttA7w 5 2 1045 0 4.8gb 1.6gb
Push down Spark's BloomFilterImpl.mightContain()
to Painless script. Because Painless only supports basic Java API, we need to inline any class unsupported, such as ByteArrayInputStream
:
GET bf_ip_test_4/_search
{
"query": {
"bool": {
"filter": {
"script": {
"script": {
"source": """
int hashLong(long input, int seed) {
int low = (int) input;
int high = (int) (input >>> 32);
int k1 = mixK1(low);
int h1 = mixH1(seed, k1);
k1 = mixK1(high);
h1 = mixH1(h1, k1);
return fmix(h1, 8);
}
int mixK1(int k1) {
k1 *= 0xcc9e2d51L;
k1 = Integer.rotateLeft(k1, 15);
k1 *= 0x1b873593L;
return k1;
}
int mixH1(int h1, int k1) {
h1 ^= k1;
h1 = Integer.rotateLeft(h1, 13);
h1 = h1 * 5 + (int) 0xe6546b64L;
return h1;
}
int fmix(int h1, int length) {
h1 ^= length;
h1 ^= h1 >>> 16;
h1 *= 0x85ebca6bL;
h1 ^= h1 >>> 13;
h1 *= 0xc2b2ae35L;
h1 ^= h1 >>> 16;
return h1;
}
BytesRef bfBytes = doc['clientip_bloom_filter'].value;
byte[] buf = bfBytes.bytes;
int pos = 0;
int count = buf.length;
// int version = dis.readInt();
int ch1 = (pos < count) ? (buf[pos++] & (int) 0xffL) : -1;
int ch2 = (pos < count) ? (buf[pos++] & (int) 0xffL) : -1;
int ch3 = (pos < count) ? (buf[pos++] & (int) 0xffL) : -1;
int ch4 = (pos < count) ? (buf[pos++] & (int) 0xffL) : -1;
int version = ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
// int numHashFunctions = dis.readInt();
ch1 = (pos < count) ? (buf[pos++] & (int) 0xffL) : -1;
ch2 = (pos < count) ? (buf[pos++] & (int) 0xffL) : -1;
ch3 = (pos < count) ? (buf[pos++] & (int) 0xffL) : -1;
ch4 = (pos < count) ? (buf[pos++] & (int) 0xffL) : -1;
int numHashFunctions = ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
// int numWords = dis.readInt();
ch1 = (pos < count) ? (buf[pos++] & (int) 0xffL) : -1;
ch2 = (pos < count) ? (buf[pos++] & (int) 0xffL) : -1;
ch3 = (pos < count) ? (buf[pos++] & (int) 0xffL) : -1;
ch4 = (pos < count) ? (buf[pos++] & (int) 0xffL) : -1;
int numWords = ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
// Create BitArray internally
long[] data = new long[numWords];
byte[] readBuffer = new byte[8];
for (int i = 0; i < numWords; i++) {
// data[i] = dis.readLong()
int n = 0;
while (n < 8) {
int count2;
// int count2 = in.read(readBuffer, off + n, len - n);
int off = n;
int len = 8 - n;
if (pos >= count) {
count2 = -1;
} else {
int avail = count - pos;
if (len > avail) {
len = avail;
}
if (len <= 0) {
count2 = 0;
} else {
System.arraycopy(buf, pos, readBuffer, off, len);
pos += len;
count2 = len;
}
}
n += count2;
}
data[i] = (((long) readBuffer[0] << 56) +
((long) (readBuffer[1] & 255) << 48) +
((long) (readBuffer[2] & 255) << 40) +
((long) (readBuffer[3] & 255) << 32) +
((long) (readBuffer[4] & 255) << 24) +
((readBuffer[5] & 255) << 16) +
((readBuffer[6] & 255) << 8) +
((readBuffer[7] & 255) << 0));
}
long bitCount = 0;
for (long word : data) {
bitCount += Long.bitCount(word);
}
// BloomFilterImpl.mightContainLong(item)
long item = params.ip;
int h1 = hashLong(item, 0);
int h2 = hashLong(item, h1);
long bitSize = (long) data.length * Long.SIZE;
for (int i = 1; i <= numHashFunctions; i++) {
int combinedHash = h1 + (i * h2);
// Flip all the bits if it's negative (guaranteed positive number)
if (combinedHash < 0) {
combinedHash = ~combinedHash;
}
if ((data[(int) (combinedHash % bitSize >>> 6)] & (1L << combinedHash % bitSize)) == 0) {
return false;
}
}
return true
""",
"params": {
"ip": 107221504
}
}
}
}
}
}
}
Result:
"hits": [
{
"_index": "bf_ip_test_4",
"_id": "ojr4GI0B9arJAA1FN11b",
"_score": 0,
"_source": {
"file_path": "s3://httplogs/http_logs_partitioned_json_bz2/year=1998/month=6/day=12/part-00820-76cb51e0-1b8f-41ea-8bfd-e77261483002.c000.json.bz2",
"clientip_bloom_filter": "AAAAAQAAAAYAAehIAAAAAAAg..."
Verify:
scala> spark.sql("SELECT clientip, input_file_name() FROM ds_tables.http_logs WHERE clientip = '6.100.18.0'").show(false)
24/01/17 21:03:43 WARN CredentialsLegacyConfigLocationProvider: Found the legacy config profiles file at [/home/hadoop/.aws/config]. Please move it to the latest default location [~/.aws/credentials].
24/01/17 21:03:43 WARN CredentialsLegacyConfigLocationProvider: Found the legacy config profiles file at [/home/hadoop/.aws/config]. Please move it to the latest default location [~/.aws/credentials].
24/01/17 21:03:43 WARN CredentialsLegacyConfigLocationProvider: Found the legacy config profiles file at [/home/hadoop/.aws/config]. Please move it to the latest default location [~/.aws/credentials].
+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------+
|clientip |input_file_name() |
+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------+
|6.100.18.0|s3://httplogs/http_logs_partitioned_json_bz2/year=1998/month=6/day=12/part-00820-76cb51e0-1b8f-41ea-8bfd-e77261483002.c000.json.bz2|
+----------+-----------------------------------------------------------------------------------------------------------------------------------------------------+
More test:
scala> spark.sql("SELECT clientip, COUNT(*) AS file_count FROM (SELECT DISTINCT clientip, input_file_name() FROM ds_tables.http_logs) GROUP BY clientip HAVING COUNT(*) = 3 ORDER BY file_count ASC LIMIT 5").show
+-----------+----------+
| clientip|file_count|
+-----------+----------+
|186.207.8.0| 3|
|243.254.9.0| 3|
| 160.69.1.0| 3|
| 165.71.0.0| 3|
| 41.96.1.0| 3|
+-----------+----------+
scala> spark.sql("select ip_to_num('243.254.9.0')").show
+----------------------+
|ip_to_num(243.254.9.0)|
+----------------------+
| 4093511936|
+----------------------+
# Run the query with IP 243.254.9.0
"hits": [
{
"_index": "bf_ip_test_4",
"_id": "sDX3GI0BWdGHpYUCxqxK",
"_score": 0,
"fields": {
"file_path": [
"s3://httplogs/http_logs_partitioned_json_bz2/year=1998/month=5/day=30/part-00217-76cb51e0-1b8f-41ea-8bfd-e77261483002.c000.json.bz2"
]
}
},
{
"_index": "bf_ip_test_4",
"_id": "UzX3GI0BWdGHpYUCpKyP",
"_score": 0,
"fields": {
"file_path": [
"s3://httplogs/http_logs_partitioned_json_bz2/year=1998/month=5/day=30/part-00215-76cb51e0-1b8f-41ea-8bfd-e77261483002.c000.json.bz2"
]
}
},
{
"_index": "bf_ip_test_4",
"_id": "LDr3GI0B9arJAA1Fx1wB",
"_score": 0,
"fields": {
"file_path": [
"s3://httplogs/http_logs_partitioned_json_bz2/year=1998/month=5/day=30/part-00216-76cb51e0-1b8f-41ea-8bfd-e77261483002.c000.json.bz2"
]
}
}
]
Quick tested the following index but found not much difference:
POST binary_test_X/_doc
{
"file_path": "s3://httplogs/http_logs_partitioned_json_bz2/year=1998/month=6/day=10/part-00409-76cb51e0-1b8f-41ea-8bfd-e77261483002.c000.json.bz2",
"clientip_bloom_filter": "VGhpcyBpcyBhIG5lZWRzY3JpcHQgMTAwMEIgc3RyaW5nOgoKR2V0IHlvdSBhIGJhc2U2NCBzdHJpbmc6CkkgbmVlZCBpdCBmb3IgdGVzdAoKRmFpbGVkIHRvIGJlIGluIFhNTDoKLS0tLS1CRUdJTiBDRVJUSUZJQ0FURS0tLS0tCk1JSUMwakNDQXhTZ0F3SUJBZ0lKQU5oOGJ5dGdXNFlBYXpMdXZINHRzclNuTThOS0VhWllVSzlYdDgxZjRjc1MKVnpKN2RReVNKT2hBSWh2N3ZaNkYydjJ0T2xONmtHcG14QXZZVmdBNXpLWGhVUExQR2xEa2w2OHlwaGg3YkRFUQoxYytqM0FLcXRkRFZrR1VSS1NWMjRXbDBuVWZwZllTSERrWVZWT0dWcmQzOHc2dCtMaExSRG0vaFdCclVsSnQwY0N4CkI0RmtVUjB0SVU3Rk5aUG1iZlJxRzRZRmlaeTB0ZEdaRXRkcXdYcXd6Q1F3R0Z1dTRFbUhwZ3hRVVNENjU1SHNkM1gKMjY1Q2tIaGg5eEw5V2pxK2JkcXg4OXkxdTFIVHJyS1FUaFBxMmxLTFZMZUxJczJYN2pmUTBqZ0t0d2M1ajFLQgp5RWpFbnE3TVhRSnp4Z0k1eGQ0cHdJWjNJSWxURlRKWUhUc09rWEt6eVNvb0htM0lGMFQwRkdSTUhhWjdpUEhrCkhwSmZJTE5wVE5zYkpUdG1KVjJ5dUN2S3JjRFBsOVZSLlpwMjlXN3I1YWJvVUxZS1hxdDRaNVU1RnVtZVJnS0UwMApFRS9lQklHbz0KLS0tLS1FTkQgQ0VSVElGSUNBVEUtLS0tLQo="
}
# DocValue is disabled by default
PUT binary_test_1
{
"mappings": {
"properties": {
"file_path": {
"type": "keyword"
},
"clientip_bloom_filter": {
"type": "binary"
}
}
}
}
# Enable docValue required by pushdown
PUT binary_test_2
{
"mappings": {
"properties": {
"file_path": {
"type": "keyword"
},
"clientip_bloom_filter": {
"type": "binary",
"doc_values": true
}
}
}
}
# Disable _source
PUT binary_test_3
{
"mappings": {
"_source": {
"enabled": false
},
"properties": {
"file_path": {
"type": "keyword"
},
"clientip_bloom_filter": {
"type": "binary",
"doc_values": true
}
}
}
}
# Exclude instead of disabling _source
# because a _restore_source field found in Lucene index
PUT binary_test_4
{
"mappings": {
"_source": {
"excludes": ["file_path", "clientip_bloom_filter"]
},
"properties": {
"file_path": {
"type": "keyword"
},
"clientip_bloom_filter": {
"type": "binary",
"doc_values": true
}
}
}
}
# Store BF only in stored field
PUT binary_test_5
{
"mappings": {
"properties": {
"file_path": {
"type": "keyword"
},
"clientip_bloom_filter": {
"type": "binary",
"store": true
}
}
}
}
# Use best compression
PUT binary_test_6
{
"settings": {
"index.codec": "best_compression"
},
"mappings": {
"properties": {
"file_path": {
"type": "keyword"
},
"clientip_bloom_filter": {
"type": "binary"
}
}
}
}
Here are different strategies for determining Bloom filter algorithm parameters:
Bloom Filter with Global Parameters:
Adaptive Bloom Filter per File:
Possible solutions:
Approximate Parameter Sampling
Sample each file to calculate approximate column cardinality
Adaptive Bloom Filter
Build multiple bloom filter and choose best candidate (PARQUET-2254)
Scalable Bloom Filter
Involves dynamically adjusting the filter size to accommodate varying amounts of data.
Discussed in the context of Redis (Scalable Bloom Filter in Redis).
Pseudocode:
class AdaptiveBloomFilter:
// Initialize 10 candidate BF which NDV exponentially increase
val ranges = (1K -> 512K)
val candidateBFs = ranges.map(ndv => BloomFilter.create(ndv))
// How many unique values seen so far (approximated by BF.put() result)
var total = 0
def put(item: Long):
// bitChanged=true means this is the first time for this item inserted to BF
// bitChanged=false mean maybe first time or not
// Use last (biggest) BF for more accurate result
val bitChanged = candidates.map(_.putLong(item)).last
if (bitChanged) {
total = total + 1
}
def bestCandidate():
// Use candidate with NDV slightly greater than total counter
val index = ranges.indexWhere(range => total < range * K)
candidates(index)
Is your feature request related to a problem?
What solution would you like?
User Experience
Here is the example. See more details in comment below.
Skipping index data in OpenSearch:
Proposed Solution
Design decision from problem space to solution space:
BloomFilterAggregate
andBloomFilterMightContain
function and map to OpenSearch binary.Proof of Concept
PoC branch: https://github.com/dai-chen/opensearch-spark/tree/bloom-filter-skipping-poc
BloomFilterAggregate
andBloomFilterMightContain
) in Flint Spark layerFlintClient
to support binary type and might contain function pushdown (cannot delegate to SQL plugin because it's not Flint dependency at the moment)Benchmark
Preparation
Setup a. OpenSearch cluster b. Index setting/mapping: binary field with docvalue enabled c. Query cache disabled
Test Data a. http_logs log files in compressed JSON format b. Total file number = 1045 in different partition on S3 bucket c. File size: 100K -> 25MB (compressed) d. clientip column cardinality per file: 1 -> 180K
Test Case: (FPP is always 1%) a. Static NDV 512K b. Static 256K NDV: assume user has prior knowledge of max clientip cardinality c. Adaptive NDV with 10 BloomFilters from 1K to 512K
Test Result
Q2: 4.3 - 5.2
(Prior knowledge of column cardinality)
Q2: 3.8 - 4.3
(10 BFs from 1K to 512K)
Q2: 6.7 - 7.6
Test Result Analysis
More Test on FPP
FPP impact on size: