tresata / spark-sorted

Secondary sort and streaming reduce for Apache Spark
Apache License 2.0
78 stars 17 forks source link

Need help find out why spilling is so slow #7

Closed nhsieh168 closed 9 years ago

nhsieh168 commented 9 years ago

Hi, I used a large data set with GroupSorted, 25 million raw records, after filtering are 2 million rows, and the GroupSort is running forever to no ends. A jstack dump of the process seem to indicate it is in spilling. The allocated memory in theory should be enough or close to enough. There are a lot of keys but most key has less than a hundred rows.

java.io.ObjectInputStream.defaultReadFields(java.lang.Object, java.io.ObjectStreamClass) @bci=105, line=1986 (Compiled frame)

koertkuipers commented 9 years ago

2 mm rows, and hundreds of values per row should not be an issue at all.

if i understand the stack dump correctly it is reading from the external sorter (which means sort-based shuffle), and de-serializing using java serialization framework.

i do not know if it matters, but i have never used java serialization. we always use kryo instead. can you try setting spark.serializer=org.apache.spark.serializer.KryoSerializer?

the spark application web ui should provide more information about what is going on. it includes info on the shuffle as well.

nhsieh168 commented 9 years ago

I figured it out. The original root cause was that executor.memory setting had a typo so I wasn't allocating the memory I thought I had. So the GC collection was taking up all the CPU. Before identifying the root cause, I went a big round of data structures refactoring in the comparator for sorting and in Windowed Function I wrote using mapStreamByKey, which will be good anyway.

My discovery: 1) Because the filtering was in the ordering of 90% of original data, I had many small partition. Repartitioning to a lesser number of partitions helped shuffle/sort performance, but this kind of manual tuning in my view is ugly and a more automatic solution in the future would be nice. 2) In my naiive attempt to save cpu, I created data structures in comparator and windowed function which triggered unnecessary GC. Computation on the fly end up being much better.