Closed nartal1 closed 2 years ago
I'd like to try
How could we update our code in rapids ? I see the following comments in GpuFileFormatWriter.scala
:
val bucketIdExpression = bucketSpec.map { _ =>
// Use `HashPartitioning.partitionIdExpression` as our bucket id expression, so that we can
// guarantee the data distribution is same between shuffle and bucketed data source, which
// enables us to only shuffle one side when join a bucketed table and a normal one.
//HashPartitioning(bucketColumns, spec.numBuckets).partitionIdExpression
//
// TODO: Cannot support this until we either:
// Guarantee join and bucketing are both on the GPU and disable GPU-writing if join not on GPU
// OR
// Guarantee GPU hash partitioning is 100% compatible with CPU hashing
throw new UnsupportedOperationException("GPU hash partitioning for bucketed data is not "
+ "compatible with the CPU version")
}
But in Spark, the HashPartitioning
is used:
val writerBucketSpec = bucketSpec.map { spec =>
val bucketColumns = spec.bucketColumnNames.map(c => dataColumns.find(_.name == c).get)
if (options.getOrElse(BucketingUtils.optionForHiveCompatibleBucketWrite, "false") ==
"true") {
// Hive bucketed table: use `HiveHash` and bitwise-and as bucket id expression.
// Without the extra bitwise-and operation, we can get wrong bucket id when hash value of
// columns is negative. See Hive implementation in
// `org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils#getBucketNumber()`.
val hashId = BitwiseAnd(HiveHash(bucketColumns), Literal(Int.MaxValue))
val bucketIdExpression = Pmod(hashId, Literal(spec.numBuckets))
// The bucket file name prefix is following Hive, Presto and Trino conversion, so this
// makes sure Hive bucketed table written by Spark, can be read by other SQL engines.
//
// Hive: `org.apache.hadoop.hive.ql.exec.Utilities#getBucketIdFromFile()`.
// Trino: `io.trino.plugin.hive.BackgroundHiveSplitLoader#BUCKET_PATTERNS`.
val fileNamePrefix = (bucketId: Int) => f"$bucketId%05d_0_"
WriterBucketSpec(bucketIdExpression, fileNamePrefix)
} else {
// Spark bucketed table: use `HashPartitioning.partitionIdExpression` as bucket id
// expression, so that we can guarantee the data distribution is same between shuffle and
// bucketed data source, which enables us to only shuffle one side when join a bucketed
// table and a normal one.
val bucketIdExpression = HashPartitioning(bucketColumns, spec.numBuckets)
.partitionIdExpression
WriterBucketSpec(bucketIdExpression, (_: Int) => "")
}
}
For a long time our hash partitioning didn't match Sparks. We now match 100% of the time so those comments are out of date. But we never put in support for doing bucketed writes. We do not support Hive hash partitioning on the GPU, so for now I would keep doing the same thing. If you see code that deals with bucketed writes you can throw an exception saying we don't support it. Then we need to add tests to make sure that we are falling back to the CPU when someone tries to do a hive bucketed write.
Make sense!
Reopening because we still do not support writing Hive bucketed tables.
@sameerz this originally came from an audit. We fixed all of the issues so we will not crash if someone asks us to write hive bucketed data. But to actually support it would turn this into a feature request and not an audit task. Should we remove the audit labels and add on Needs Triage
so we can take a look at it again from that standpoint?
Closing this since it was an audit task. If we want to support Hive Bucketed tables in the future we will open a new separate issue.
Is your feature request related to a problem? Please describe. We have copied some code(def wrtie) from FileFormatWriter.scala. We need to keep GpuFileFormatWriter.scala in sync with Apache Spark's file. PR: https://github.com/apache/spark/commit/4a34db9a17