Open thinkharderdev opened 2 years ago
To be transparent, my team is building a query engine which is sensitive to time-to-first-result latency so we are very interested in fully streaming execution (and hoping to upstream as much as we can) but want to make sure that this is in line for the desired direction of Ballista for the rest of the community.
cc @andygrove @yahoNanJing @realno @Ted-Jiang
@thinkharderdev Thanks for the nice write up. Among the three options I am leaning towards 3 is more realistic to achieve. That is, the community can be more focused on making it work really well with a specific set of use cases first, which will hopefully grow the community further.
For streaming v.s. batch I don't have a strong opinion at the point, I believe whoever can use it in a real use case should try to drive the project forward.
We are not ready yet to do anything serious, though we have two major use cases in mind:
Use case 2 seems similar to your use case. I am curious when you said fully streaming execution did you mean like Flink? I think there is value to support operators/algorithms that needs to see the entire dataset/partition multiple times (for example ML), so a hybrid model would be good. For example, if the compiler can analyze the query and turn part of it "fully streaming" when possible.
Some other requirements for us are:
@thinkharderdev Thanks for writing it up ❤️.
To be transparent, my team is building a query engine which is sensitive to time-to-first-result latency so we are very interested in fully streaming execution (and hoping to upstream as much as we can) but want to make sure that this is in line for the desired direction of Ballista for the rest of the community.
We have the same purpose: use ballista to build a latency sensitive query system. So we are also very interested in fully streaming execution (avoid shuffle to disk).
To bring this back around to some concrete options, I think there are a few different ways we can go:
For now, I have no strong opinion about these choices.
I am curious when you said fully streaming execution did you mean like Flink?
Exactly, the current execution model is basically Flink Batch execution, but what we ultimately want (for our use case) is streaming execution with RecordBatch
as the event type.
Somewhat of a tangent for this discussion, but my project (https://github.com/influxdata/influxdb_iox) is also interested in time to first result (aka streaming) execution, as well as the lower resource (memory) usage such execution provides; While we will likely not use ballista directly, we intend to ensure DataFusion can be used for this type of system.
One concrete example is the work @tustvold is working on recently to get parquet decoding / IO better integrated (so the parquet decoding work can be done in a streaming fashion as well)
Thanks for starting this discussion @thinkharderdev :heart:
With Ballista moving to this new repository I think it is an excellent time to "reboot" the project and assess what we are trying to build here.
I'd like to provide some historical context for how we ended up where we are today:
The original goal with Ballista was essentially "rewrite Apache Spark in Rust" but avoiding an architecture that heavily favors a particular programming language (Scala, in Spark's case). This is why we serialize plans to protobuf format rather than just using Rust's "serde" crate, which would have been much easier. I now hope that we can eventually adopt https://substrait.io/ as the serialization format to make it easier for Ballista to leverage query engines other than DataFusion.
Quite early in the development process, I discovered Apache Arrow and made that a core part of the design as well. This in my mind was another clear advantage over Apache Spark, which is largely row-based.
Obviously, the choice of Rust is another major differentiator with its unique approach to memory management and safety.
I designed Ballista based on my experience of using Apache Spark for SQL/ETL batch jobs.
I would fully support seeing Ballista support both batch and streaming and I think it would be fine for a user to pick one or the other when executing a query and use different APIs for each case. That said, I have not looked into this so there are likely complications that I am not even aware of here. I will start learning more about streaming in Spark and Flink so that I can better contribute to the discussion.
To be transparent, my team is building a query engine which is sensitive to time-to-first-result latency so we are very interested in fully streaming execution (and hoping to upstream as much as we can) but want to make sure that this is in line for the desired direction of Ballista for the rest of the community.
I also have major usecases for latency-sensitive, potentially-multisource queries. It boils down to being able to use it for end-user/interactive applications
One of the biggest bummers to me about Spark is that its architecture cripples it for latency-sensitive workloads I wanted to see what the latency was like to do a basic, two-DB join query between in-memory databases:
Something like:
SELECT ... FROM db1.foo JOIN db2.bar ON ... LIMIT 1
Using the latest Spark nightly snapshot, this takes 150-200ms on my personal machine.
A significant portion of this is spent on things relevant to multi-node computation but not required for doing in-memory on a single node (serialization, broadcasts, scheduling/coordination)
The codegen + execution time isn't that bad
Understandably Spark isn't tailored for this. But there's a lot of great technology in there (Catalyst, Tungsten) that are state-of-the-art for query optimization and performance, and it's a bummer that you can't configure Spark (to my knowledge) for a "local" mode or directly interact with just the pieces you need to manually evaluate expressions/do query optimization.
Would be great if the future of Ballista accommodated for this. Opens up interesting possibilities.
Great, thanks for the feedback everyone!
I would summarize the key points as:
Please chime in if anyone came to different conclusions :)
I have a PR open (https://github.com/apache/arrow-ballista/pull/33) to improve the top-level README to better describe the current state of the project and the future direction, linking to this discussion.
I have also created a PR against "This Week In Rust" (https://github.com/rust-lang/this-week-in-rust/pull/3276) to promote the new repo and hopefully direct more people to contribute to the discussion here.
Hi @thinkharderdev, for the point 2 of whether to use ballista for batch processing or latency-sensitive query processing, there should be a few things to be clarified:
- How the ballista cluster to be deployed
Since k8s is so common and popular today, I think we should support it natively. Like Spark for batch processing, we can provide a way to deploy the ballista by k8s on demand.
- Should the ballista cluster work in a long running way
I prefer the cluster as a standalone system and runs in a long running way. For a long running system, we have to pay much attention to several aspects, like avoiding memory leaks, managing historical states, etc. While for Spark, it previously focuses on batch processing rather than long running system. Then its cluster always be destroyed after the batch processing finishes and it doesn't need to pay much attention to the long running related aspects.
- Should the tasks work in a long running way
For streaming engines, like Flink, the tasks always work in a long running way. It will bring other challenges. The interests of my teams are not on this. We will focus on latency-sensitive interactive queries.
- Which way to do the data exchange, push-based or pull-based
The pull-based way may not be as efficient as the push-based way. However, the push-based way needs the whole task pipeline topology to be determined before task execution. While for the pull-based way, like the Spark employs, AQE can be introduced to make is possible to change the whole query plan adaptively during query execution. One coin has two sides. Therefore, I propose to implement both.
- Should the exchanged data be flushed to disk
It also depends. Flushing to disk will be good for error recovering and easy memory management. However, it's not efficient. Therefore, I also propose to implement both.
I agree with @yahoNanJing on his answers above. The only thing I would add is that with respect to deployment we should be able to support different auto-scaling behaviors for long running clusters. Push-based, latency sensitive query execution probably will have more strict demands for auto-scaling since we need to schedule a lot of tasks all at once. I know that this is something my team is very interested in.
Regarding k8s support mentioned by @yahoNanJing - this is already supported and documented in the user guide (which needs updating to remove the DataFusion parts and we also need to get this published).
Hi everyone, first thanks for the great work on datafusion and ballista i am currently on @andygrove book for processing engine and it's pretty interesting as it help us to learn more about how processing engine works and take an extra steps on learning engine such spark.
I am not an expert in this area, but i have few questions in mind as i am willing to explore how ballista answer these and help in production grade data processing engine.
One of the things that i find interesting, is spark/redis integration that bump up spark performance due to the in memory nature of redis. However, i am not sure if that's already true in the context of ballista/datafusion due to the nature of Arrow an espcially with the integration of Plasma project in Arrow, made by Ray team, so i wonder guys if you can shed some lights on this.
One more think, how does ballista compare to engine such as Ray and Dask in general, and does potentially ballista could be direct competitor of those frameworks.
Finally as Ballista compares directly to spark, how do we see the implmentation of BALLISTA ML, Graph.
Thanks everyone for the great works.
For spark integration, check out https://github.com/blaze-init/blaze by @yjshen
In terms of the current status of the Plasma project I think you may have to ask on the dev@arrow.apache.org list -- I remember some discussions about it previously but not sure
Hi @ziedbouf. I'm glad you are finding the book helpful! Let me try and answer some of your questions.
I have not used Plasma and there is no support for it in DataFusion/Ballista. I recall a discussion about it being unmaintained but I could be wrong. Ballista doesn't provide an in-memory cache like redis, although it would be possible to implement a custom datasource to connect to redis.
Ballista is similar to Spark SQL and Dask-SQL in terms of architecture, so yes, could be seen as a competitor although it is not as mature yet. Ballista does not have any stream, graph, or ML capabilities yet though. As mentioned in this discussion, some contributors are planning on working on streaming.
My personal view is that we need to get Ballista to the point of maturity where it can run industry-standard benchmarks at scale with performance and scalability at least as good as Spark. With that, and some better docs, the project hopefully starts to gain more traction and more contributors and that would eventually lead to people building ML features perhaps.
The blaze project is really interesting because it leverages the mature Spark scheduler and uses DataFusion for execution.
What are the downsides of apache spark, why somebody should use ballista?
imo as far as I read posts and watched a talk on yt, the memory consumption of spark is huge. Even a hello world in spark will consume a lot of memory. The reduced memory consumption can be a big advantage for ballista. I always have to increase the memory of our production spark jobs - sometimes to even 32GB per executor.
What can do apache spark good?
Its integration very good with the hadoop distributed file system. This is from my perspective the big advantage: the computation is taken place as close to the data as possible, moving only data when really required. Again, as far as I understand, this is currently not possible with ballista?
And, as I am a data engineer and doing a lot of analytics, I really like the spark-shell to play around with the data.
I just started using rust, so maybe I am not the biggest help for implementing features, but I know spark from a developer perspective quite well as I am using it daily.
What could be a direction: Have the data mostly/only in ram in the cluster, reducing the (slow) HDD/SSD reads and running the computation than on the arrow In-memory data frames. That maybe a niche to fit into.
What can do apache spark good?
One major advantage is that it is very mature (so has many feature, documentation, integrations, etc -- such as spark-shell
)
@andygrove what benefits are you expecting from migrating to Substrait?
@andygrove what benefits are you expecting from migrating to Substrait?
I think that Substrait will become the defacto standard for serializing relational algebra and has lots of smart people working on so I think we will benefit from this work and not have to build all new features ourselves and it will also potentially open up interesting integrations with the rest of the ecosystem.
I created a Google doc where we can collaborate more on this discussion and define the Ballista architecture that we are aiming for. This will provide documentation that we can put in the repo to help everyone understand how everything works and the direction we are moving towards. Contributors welcome as always.
https://docs.google.com/document/d/1Fd44vVmjSD6NuSGaFjHeI9pFSRxOIWmZ6Rv7aZ3S5Sw/edit?usp=sharing
open up interesting integrations with the rest of the ecosystem.
e.g.:
I also have some thoughts about a unified execution engine, welcome to take a look and comment:
cc @andygrove @mingmwang
I also have some thoughts about a unified execution engine, welcome to take a look and comment:
cc @andygrove @mingmwang
Thanks @liurenjie1024. I recently read the F1 paper (https://static.googleusercontent.com/media/research.google.com/en//pubs/archive/41344.pdf) which I found very interesting. It's quite a bit larger in scope than Ballista (it's intended to be more of a full DBMS than a query engine like Ballista), but the query processing section is very interesting. It has some very interesting properties:
Yes,deciding the execution mode(batched or pipelined) at runtime is an interesting topic, and I believe the first step is to make ballista flexible enough to support complex job graph.
Ideally we can implement the bubble execution model which mix the pipeline and batch execution model in one engine. Based on the complexity of the query DAG, some parts can be scheduled and executed in batch model and other parts can be scheduled and executed in pipeline model.
There are huge gaps in DataFusion's physical planning phase, I think we need to put more effort to address those gaps first. Only with efficient and sophisticated physical planner, we can go further.
I also have some thoughts about a unified execution engine, welcome to take a look and comment:
cc @andygrove @mingmwang
How can I get access to the doc ?
I also have some thoughts about a unified execution engine, welcome to take a look and comment: https://www.notion.so/liurenjie1024/A-Cloud-Native-Universal-Execution-Engine-7903dd9eeea143c48049631a2d1cb845 cc @andygrove @mingmwang
How can I get access to the doc ?
I was able to view it. I think you have to be signed into Notion though (to any account)
@thinkharderdev @liurenjie1024 I think the 'Bubble Execution' model is what exactly you would like to achieve. And as I know, Alibaba's MaxCompute engine had already implemented the 'Bubble Execution' model.
Some questions are still not clear to me as of now, particularly if we plan to bring the pipeline based scheduling( (Morsel-Driven Parallelism) into Ballista, then what's the relationship between Bubble, Stage and Pipeline, and how to divide the DAG into Bubbles, how to split the Stage into multiple Pipelines etc.
I think the 'Bubble Execution' model is what exactly you would like to achieve
Yes, I agree 100%.
then what's the relationship between Bubble, Stage and Pipeline,
I think generally a stage and pipeline would be the same thing. A pipeline breaker would be a map/reduce (aka repartition). A bubble would be a layer on top of that which breaks the graph into streaming subgraphs which are fully materialized at the bubble boundary.
cc @mingmwang @thinkharderdev Sorry for late reply from a long vacation.
Some questions are still not clear to me as of now, particularly if we plan to bring the pipeline based scheduling( (Morsel-Driven Parallelism) into Ballista, then what's the relationship between Bubble, Stage and Pipeline, and how to divide the DAG into Bubbles, how to split the Stage into multiple Pipelines etc.
My motivation of dividing dag into bubble comes from two cases:
From the bubble execution paper you mentioned, it takes into account the resource limitation of query exection, and add vertical cut when performing a bubble.
The relationship between stage and bubble is clearly described by this pic from alibaba's article:
In fact in my proposal I didn't mention bringing morsal driven into Ballista. The key problem with morsel driven is that it uses shared state so that each pipeline is paralleled, and it's not practical in distributed execution. Presto uses morsel driven approach to execute each stage, e.g. breaking stage plan fragments into pipelines and executes them in parallel. This way it can implement two level scheduling, e.g. each stage has a parallelism and each pipeline has another parallelism. They claim that this is more flexible, e.g. a query can have more resources when cluster is idle, and less resources when it's busy. I'm not a big fan of this approach and not sure whether it's state of art.
cc @mingmwang @thinkharderdev Sorry for late reply from a long vacation.
Some questions are still not clear to me as of now, particularly if we plan to bring the pipeline based scheduling( (Morsel-Driven Parallelism) into Ballista, then what's the relationship between Bubble, Stage and Pipeline, and how to divide the DAG into Bubbles, how to split the Stage into multiple Pipelines etc.
Bubble and stage
My motivation of dividing dag into bubble comes from two cases:
- Subplan reuse in complex olap queries, e.g. the spool operator in this paper which materialize results into disk.
- The dataflow api which can persist data explicitly.
From the bubble execution paper you mentioned, it takes into account the resource limitation of query exection, and add vertical cut when performing a bubble.
The relationship between stage and bubble is clearly described by this pic from alibaba's article:
Stage and pipeline
In fact in my proposal I didn't mention bringing morsal driven into Ballista. The key problem with morsel driven is that it uses shared state so that each pipeline is paralleled, and it's not practical in distributed execution. Presto uses morsel driven approach to execute each stage, e.g. breaking stage plan fragments into pipelines and executes them in parallel. This way it can implement two level scheduling, e.g. each stage has a parallelism and each pipeline has another parallelism. They claim that this is more flexible, e.g. a query can have more resources when cluster is idle, and less resources when it's busy. I'm not a big fan of this approach and not sure whether it's state of art.
I think pipeline execution is the trend. Presto, Clickhouse, Starrocks, DuckDb, Velox, all those are in this direction or trying on.
I think pipeline execution is the trend. Presto, Clickhouse, Starrocks, DuckDb, Velox, all those are in this direction or trying on.
I'm neutral to this. But if we want to support pipeline, current datafusion operators can't be reused.
In terms of pipeline execution (at least in terms of a push based, pipelined execution model), I wanted to point out that @tustvold investigated this approach in DataFusion (and figured out a way to reuse the current operators). See https://github.com/apache/arrow-datafusion/pull/2226 which added a scheduler under a feature flag
Our eventual goal is to support running a plan on 100s of parquet files without having to fetch them all before (or concurrently). However, we currently have other things blocking this goal so additional work to the scheduler is on hold for now
You can find more detail on https://github.com/apache/arrow-datafusion/issues/2504
Our eventual goal is to support running a plan on 100s of parquet files without having to fetch them all before (or concurrently). However, we currently have other things blocking this goal so additional work to the scheduler is on hold for now
I'm a little confused here. Avoiding fetching 100s of parquet files is more like an optimizer issue?
I'm a little confused here. Avoiding fetching 100s of parquet files is more like an optimizer issue?
What I mean is that now, if you give a datafusion plan 5000 parquet files (maybe for a SUM()
type query) , it will likely try to start reading / decoding all 5000 files concurrently, even if the downstream operators can only consume a small fraction at once . This means the resources (file handles and/or memory buffers) for reading all 5000 are held open during the plan.
It also means if the plan terminates early (e.g. ... LIMIT 10
) a large amount of IO will be done / wasted.
What we would like to happen is that a smaller subset are read and fully processed before new ones are opened. Of course, one challenge with doing this is we still need sufficient parallelism to hide IO latencies.
I'm a little confused here. Avoiding fetching 100s of parquet files is more like an optimizer issue?
What I mean is that now, if you give a datafusion plan 5000 parquet files (maybe for a
SUM()
type query) , it will likely try to start reading / decoding all 5000 files concurrently, even if the downstream operators can only consume a small fraction at once . This means the resources (file handles and/or memory buffers) for reading all 5000 are held open during the plan.It also means if the plan terminates early (e.g.
... LIMIT 10
) a large amount of IO will be done / wasted.What we would like to happen is that a smaller subset are read and fully processed before new ones are opened. Of course, one challenge with doing this is we still need sufficient parallelism to hide IO latencies.
Within a given partition FileStream
will still process them sequentially right?
Within a given partition FileStream will still process them sequentially right?
Yes that is my understanding -- maybe the parquet reader has gotten fancier since I last looked at it 🤔 Sorry if I am causing confusion
I have written a more detailed doc here, welcome to comment and discuss cc @alamb @mingmwang @thinkharderdev
No that Ballista is it’s own top-level project, I want to extend a previous discussion around what exactly Ballista is (previous discussion https://github.com/apache/arrow-datafusion/issues/1916)
I believe the consensus from that discussion was that Ballista should be a standalone system but in practice I think we have adopted somewhat of a “both and” approach in the sense that we have added several extension point to Ballista (while also providing default implementations that allow you to run Ballista out-of-the-box). I think this is a reasonable direction and agree that it is important that Ballista be something you can use “as is”.
That raises the question of what are the use cases for Ballista that we would like to optimize for?
To take a concrete example, I think the current architecture is optimized for batch processing using a straightforward map-reduce implementation. This has many advantages:
However, for non-batch oriented work it has some serious drawbacks
There has already been some excellent proof-of-concept work done on a different scheduling paradigm in https://github.com/apache/arrow-datafusion/pull/1842 (something my team hopes to help push forward in the near future) but this raises some questions of it’s own, Namely, when do we use streaming vs map-reduce execution? In the PoC it is a very simple heuristic but in real uses cases I’m not sure there is a one-size-fits-all solution. In some cases you may want to use only streaming execution and use auto-scaling or dynamic partitioning to make sure each query can be scheduled promptly. Or you may only care about resource utilization and want to disable streaming execution entirely.
This is one particular example but you can imagine many others.
To bring this back around to some concrete options, I think there are a few different ways we can go: