NVIDIA / spark-rapids

Spark RAPIDS plugin - accelerate Apache Spark with GPUs
https://nvidia.github.io/spark-rapids
Apache License 2.0
797 stars 232 forks source link

Look into semaphore usage in q78 and q95 #5058

Open abellina opened 2 years ago

abellina commented 2 years ago

This change: https://github.com/NVIDIA/spark-rapids/pull/4588, we removed some cases where we were holding onto the semaphore for IO during the SHJ, but not all cases. There are fallback scenarios, specifically when the host-side batch is larger than the batch limit (at which point we want to ensure things are spillable, etc), or when our incoming batches aren't just serialized batches (shuffle).

This scenario seems may be covered, but the task is to add some nvtx ranges and figure out what this is and document it here. Maybe there's something we can do and release the semaphore here also, but I am not sure yet.

Screenshot from 2022-03-25 13-00-21

Zoomed in:

Screenshot from 2022-03-25 12-44-49

abellina commented 2 years ago

I'll take the add nvtx range task and report back task for now. It seems the issue as it stands isn't helpful enough to yield a change.

jbrennan333 commented 2 years ago

See also #4078

abellina commented 2 years ago

For q78: Ok in this case it is a join that is pulling on either side from an aggregate. The aggregate is the one doing pulling on a shuffle coalesce iterator to get blocks from the shuffle.

Because of this the SHJ optimization doesn't kick in. By the time we get the blocks it is too late. So this is a different case that can be optimized separately. It seems we need an iterator that knows to release the semaphore in a generic way for different types of execs (aggs, joins, others?)

For q95: it is a similar pattern, but it is not the aggregate. The SHJ is pulling the build side from another exec node, and so it is already on the GPU (it has no way to release the semaphore).

Screenshot from 2022-03-28 10-50-41

revans2 commented 2 years ago

Like I said in #4970 I think if we try to generalize all of this to be at the task/worker level it is going to make things a lot simpler. I really like what @jlowe said about #4964 in using CPU cores that are stuck waiting on the semaphore. If we had both in place we could do something like start to fetch shuffle data for an entire task, not just the one stream we are on right now. We could also release the semaphore when we know that I/O is required to start processing data. We could even look at starting to process a subset of the data if we have it ready to go.