apache / datafusion-ballista

Apache DataFusion Ballista Distributed Query Engine
https://datafusion.apache.org/ballista
Apache License 2.0
1.39k stars 181 forks source link

Ballista: Executor must return statistics in CompletedTask / CompletedJob #18

Open andygrove opened 2 years ago

andygrove commented 2 years ago

Is your feature request related to a problem or challenge? Please describe what you are trying to do. We cannot fix the shuffle mechanism until we have partition stats, or ShuffleReaderExec will attempt to read empty partitions, causing an error.

Describe the solution you'd like Scheduler should receive partition stats and only try and read from non-empty shuffle partitions.

Describe alternatives you've considered As a workaround we could write empty shuffle files for empty partitions.

Additional context None

mingmwang commented 1 year ago

I can work on this improvement.

mingmwang commented 1 year ago

Other improvement I can think of is if the shuffle data was colocated on the same host with the shuffle reader, we should allow the reader to read from disk directly(LocalShuffle Reader) instead of reading from remote Rpc.

metesynnada commented 1 year ago

@mingmwang Do you need help? I may help with the implementation.

mingmwang commented 1 year ago

@metesynnada @andygrove

Sorry, I do not get a chance to look into this. Regarding the issue, I'm not sure we would like to fix it in this way. In my opinion, even read empty partitions, ShuffleReaderExec should not return Error or cause any data quality issue. We will do some test and see what the specific error it is.

@yahoNanJing

mingmwang commented 1 year ago

Let the CompletedTask return the partition stats is quite heavy. Imaging we have 1000 map tasks and 1000 reduce tasks(partition = 1000), the stats will become 1M.

mingmwang commented 1 year ago

@thinkharderdev

Ted-Jiang commented 1 year ago

I update the tpch-q3 to test, intend to use o_shippriority = 'none' to produce this

select
    l_orderkey,
    sum(l_extendedprice * (1 - l_discount)) as revenue,
    o_orderdate,
    o_shippriority
from
    customer,
    orders,
    lineitem
where
        c_mktsegment = 'BUILDING'
  and c_custkey = o_custkey
  and l_orderkey = o_orderkey
  and o_orderdate < date '1995-03-15'
  and o_shippriority = 'none'
group by
    l_orderkey,
    o_orderdate,
    o_shippriority
order by
    revenue desc,
    o_orderdate;
0 rows in set. Query took 11.347 seconds.

@andygrove how could you produce this error?