Closed metasim closed 4 years ago
Error is thrown at the end of this function in SparkPlan.scala
(org.apache.spark.sql.execution.SparkPlan
):
/**
* Packing the UnsafeRows into byte array for faster serialization.
* The byte arrays are in the following format:
* [size] [bytes of UnsafeRow] [size] [bytes of UnsafeRow] ... [-1]
*
* UnsafeRow is highly compressible (at least 8 bytes for any column), the byte array is also
* compressed.
*/
private def getByteArrayRdd(n: Int = -1): RDD[(Long, Array[Byte])] = {
execute().mapPartitionsInternal { iter =>
var count = 0
val buffer = new Array[Byte](4 << 10) // 4K
val codec = CompressionCodec.createCodec(SparkEnv.get.conf)
val bos = new ByteArrayOutputStream()
val out = new DataOutputStream(codec.compressedOutputStream(bos))
while (iter.hasNext && (n < 0 || count < n)) {
val row = iter.next().asInstanceOf[UnsafeRow]
out.writeInt(row.getSizeInBytes)
row.writeToStream(out, buffer)
count += 1
}
out.writeInt(-1)
out.flush()
out.close()
Iterator((count, bos.toByteArray))
}
}
spark.checkpoint.compress
is an option.
also spark.shuffle.compress
.
Turns out the notebook was setting the default driver memory to 512MB.
Haven't yet been able to boil down to a publishable test case.
Stacktrace