apache / datafusion-ballista

Apache DataFusion Ballista Distributed Query Engine
https://datafusion.apache.org/ballista
Apache License 2.0
1.39k stars 181 forks source link

Add Accumulator API #650

Open thinkharderdev opened 1 year ago

thinkharderdev commented 1 year ago

Is your feature request related to a problem or challenge? Please describe what you are trying to do. A clear and concise description of what the problem is. Ex. I'm always frustrated when [...] (This section helps Arrow developers understand the context and why for this feature, in addition to the what)

Various optimizations and protection mechanisms can be implemented if we have a way of having a shared global counter. Examples:

  1. Apply a global limit for a stage across all executors. The scheduler can pre-empt tasks during execution if a global limit is satisfied by the sum of output rows across all tasks (similar to https://github.com/apache/arrow-ballista/issues/628 but could also pre-empty tasks more quickly since it would not require any individual task to complete).
  2. Add a global bytes scanned limit on a query which can pre-empt a large query if it exceeds a certain amount of data scanned across all tasks (similar to mechanisms available in Presto/AWS Athena). This can protect the level of service in cases of overly large user queries.

Describe the solution you'd like A clear and concise description of what you want to happen.

This would be similar in spirit but less general than Spark accumulators. It would mostly be an internal implementation detail rather than something exposed to users (nor now).

The general shape of the solution could look something like:

  1. Scheduler adds a new RPC which is a bidi stream between scheduler and executors
  2. Executors can send accumulator values over this stream (where each executor has a single stream to the scheduler and multi-plexes all updates on this channel)
  3. Scheduler is responsible for managing accumulators and merging updates.
  4. Scheduler can send response events in the response stream (for example, to pre-empt tasks when a global limit is satisfied)
  5. Accumulators are scoped to a particular job so they do not need to be shared between schedulers.

Describe alternatives you've considered A clear and concise description of any alternative solutions or features you've considered.

Do nothing

Additional context Add any other context or screenshots about the feature request here.

mingmwang commented 1 year ago

If we just want cancel tasks early, protect systems from heavy queries/heavy scans, I think we do not need to introduce the Accumulator. Spark's Accumulator and Metrics system is very heavy and cause lots of memory issues and management burden to the Spark Driver, the Accumulator(with unique ID) need to be registered in the Spark Driver and life cycle is managed by the Spark Driver.

We can just leverage the current DataFusion Metrics system and TaskStatus update rpc and add necessary throttling/checking/aborting logic when we handle the Task finish event in the Ballista Scheduler.

thinkharderdev commented 1 year ago

We can just leverage the current DataFusion Metrics system and TaskStatus update rpc and add necessary throttling/checking/aborting logic when we handle the Task finish event in the Ballista Scheduler.

This would be good first step, but I don't think it really solves the issue. We would need to wait for tasks to finish which means if we had really long running tasks running concurrently it could easily overload the system with no way to cancel since all tasks are taking a long time to complete.

Agree that Spark's accumulator causes issues but I think if we can design something more purpose-built (and used only internally by the scheduler, not something exposed through the public API) then it could be relatively lightweight.