Open eyalfa opened 6 years ago
Hi @eyalfa, thank you for raising the issue.
I'm not sure I fully understood the use case. I appreciate if you can share more info or code example.
Regarding the memory reclamation, I don't think that finalize()
is a good idea. Would AutoCloseable
Java interface help somehow? Does Spark (broadcast?) call close
for classes implementing it?
Agree that Spark, managed vs unmanaged memory is not an easy thing, especially with large memory pieces.
my thought is that implementing UnsafeBitArray in terms of a plain Java long array (Long[]) should not harm capabilities or performance
Right, it shouldn't. In fact it's even faster, but just one CPU cycle faster 😀 . From other side, managed memory could cause a pressure for GC. It's a trade-off.
Probably the best idea would be to expose a BitArray
interface in BloomFilter
class and have few implementations: UnsafeBitArray
, LongBasedBitArray
, CompositeBitArray
, etc WDYT?
With only one implementation used, JVM should optimize it to one more CMP
instruction.
hi @alexandrnikitin and thanks for your swift reply! I'd start by saying that your answer is the one I wanted,Ii was just afraid you'd be reluctant to make the BitsArray class polymorphic - for potential performance reasons, hence my suggestion for a benchmark.
regarding my use case with spark: in this specific use case I'm using the RDD api (I found the DataFrame API to be far superior and better performing for most other use cases) and I specifically leverage two methods:
RDD.mapPartitions
which also allows me to maintain partitioning, so I'm doing something like this:
rdd
.keys
.mapPartitions { iter =>
//todo: parametrize bloom filter's n and fpp parameters via cfg
val bloomFilter = new BloomFilterWrapper[K](10 * 1000 * 1000, 0.03) //roughly 8.7MB
iter foreach bloomFilter.add
Iterator(bloomFilter)
}
to create an RDD with a single BloomFilter object per partition.
RDD.zipPartitions
which also allows me to maintain partitioning:
otherRdd
.zipPartitions(bloomFiltersRdd, preservesPartitioning = true) { (iter1, iter2) =>
val bloomFilter = iter2.next()
iter1.filter { p =>
bloomFilter.mightContain(p._1)
}
}
I'm actually using this pattern twice, in one place I also persist (cache) the bloom filters RDD in memory and reuse it multiple times, in the second place the bloom filters RDD is used as a 'one-of'.
There are few issues here related to off-heap memory allocations:
long
fields actually standing for pointers... so spark can't determine the actual mem-usage for these cached-partitions and therefore cannot take the actions needed to free up space for these (i.e. by evacuating unused blocks or spilling them to disk).finalize
. it seems GC is also unaware of the off-heap memory and I've experienced some test crushes I could only remedy by introducing explicit System.gc
calls in test code.AutoClosable
may apply to my second usage example (as this RDD is not cached) but in general it doesn't suite spark code or any other code that chains iterators to form pipelines.sorry for the long write-up :smile: , how would you like to proceed? I still think we have to benchmark on every step:
I definitely think the library should support multiple implementations, but we must first be able to determine the performance penalty (if any) for this approach.
Thank you for the thorough answer. Now it's clear.
spark attempts to estimate it's memory usage, this is done with a visitor that traverses the object graph using reflection,
Is there a way to help Spark find out the memory usage? a special interface?
re 2 and 3: Can you iterate over RDD with BFs and close them explicitly after you finished working with them?
off-heap vs Array based implementations.
There's a memory access benchmark already in UnsafeBitArrayBenchmark.scala. It gives the following numbers on my laptop:
[info] Benchmark Mode Cnt Score Error Units
[info] UnsafeBitArrayBenchmark.getBitSet thrpt 40 268406318.673 â–’ 4304838.896 ops/s
[info] UnsafeBitArrayBenchmark.getChronicle thrpt 40 154137794.237 â–’ 1175038.454 ops/s
[info] UnsafeBitArrayBenchmark.getUnsafe thrpt 40 161828817.042 â–’ 5424124.171 ops/s
The difference is just few CPU cycles. I don't have asm listings to show yet.
hi @alexandrnikitin , not completely related to this issue, but looking at the benchmark you sent:
unsafeBits.dispose
is not called in the benchmark.sandbox.bloomfilter.mutable.ChronicleBitArray
seems to allocate less memory than it actually needs as it determines the number of indices/long
s needed but allocate this amount of bytes. not sure if it leaks or not as I'm unfamiliar with the underlying library.
I'm using the library with Apache-Spark, specifically I'm optimizing a sparse join by creating a bloom filter per partition, this required wrapping the bloom filter instances with a class that implements
finalaize
. If I hadn't done so I'd get a massive memory leak :cry: , the wrapper solved part of my problems but not all as Spark is unaware of the memory usage of the bloom filters which may lead to situations where it 'misses' a memory spill (spark's way of freeing up memory).my thought is that implementing UnsafeBitArray in terms of a plain Java long array (
Long[]
) should not harm capabilities or performance of this class while still allowing for proper GC of these objects. I think an even further possibility is using multiple arrays to avoid huge allocations, JVM heap (like many other memory allocators) 'suffers' both from tiny and huge allocations, each having its own merits.@alexandrnikitin , what do you think? does it worth a benchmark? (perhaps in a separate branch to avoid complexities of side by side support for both code paths)?