apache / datafusion-ballista

Apache DataFusion Ballista Distributed Query Engine
https://datafusion.apache.org/ballista
Apache License 2.0
1.56k stars 197 forks source link

Upgrade to Datafusion 42 #1059

Closed palaska closed 3 weeks ago

palaska commented 2 months ago

Upgrades datafusion and certain libraries to the equivalent versions of what datafusion uses (mainly arrow and it's crates, which depend on prost, tonic, hyper etc). warp was still on hyper v0.14 and v1 breaks quite a lot of stuff so I replaced it with axum.

Closes #.

Rationale for this change

What changes are included in this PR?

Are there any user-facing changes?

andygrove commented 2 months ago

Thanks @palaska. This looks great. I wasn't able to run any queries though with these changes.

I built with cargo build --release.

I started the scheduler:

$ ./target/release/ballista-scheduler 
2024-09-25T12:57:29.760309Z  INFO main ThreadId(01) ballista_scheduler::cluster: Initializing Sled database in temp directory    
2024-09-25T12:57:29.769023Z  INFO main ThreadId(01) ballista_scheduler::scheduler_process: Ballista v0.12.0 Scheduler listening on 0.0.0.0:50050    
2024-09-25T12:57:29.769038Z  INFO main ThreadId(01) ballista_scheduler::scheduler_process: Starting Scheduler grpc server with task scheduling policy of PullStaged    
2024-09-25T12:57:29.769137Z  INFO main ThreadId(01) ballista_scheduler::cluster::kv: Initializing heartbeat listener    
2024-09-25T12:57:29.769155Z  INFO main ThreadId(01) ballista_scheduler::scheduler_server::query_stage_scheduler: Starting QueryStageScheduler    
2024-09-25T12:57:29.769175Z  INFO tokio-runtime-worker ThreadId(31) ballista_core::event_loop: Starting the event loop query_stage  

I started the executor:

$ ./target/release/ballista-executor -c 8
2024-09-25T12:57:34.575905Z  INFO main ThreadId(01) ballista_executor::executor_process: Running with config:    
2024-09-25T12:57:34.575919Z  INFO main ThreadId(01) ballista_executor::executor_process: work_dir: /tmp/.tmpnbFKWN    
2024-09-25T12:57:34.575921Z  INFO main ThreadId(01) ballista_executor::executor_process: concurrent_tasks: 8    
2024-09-25T12:57:34.576719Z  INFO tokio-runtime-worker ThreadId(32) ballista_executor::executor_process: Ballista v0.12.0 Rust Executor Flight Server listening on 0.0.0.0:50051    
2024-09-25T12:57:34.576759Z  INFO tokio-runtime-worker ThreadId(30) ballista_executor::execution_loop: Starting poll work loop with scheduler    

The executor immediately started producing errors:

2024-09-25T12:57:34.577270Z  WARN tokio-runtime-worker ThreadId(30) ballista_executor::execution_loop: Executor poll work loop failed. If this continues to happen the Scheduler might be marked as dead. Error: status: Cancelled, message: "h2 protocol error: http2 error", details: [], metadata: MetadataMap { headers: {} }    
2024-09-25T12:57:34.678711Z  WARN tokio-runtime-worker ThreadId(02) ballista_executor::execution_loop: Executor poll work loop failed. If this continues to happen the Scheduler might be marked as dead. Error: status: Cancelled, message: "h2 protocol error: http2 error", details: [], metadata: MetadataMap { headers: {} }    

I then went back to the scheduler process and saw errors there as well:

thread 'tokio-runtime-worker' panicked at /home/andy/git/apache/datafusion-ballista/ballista/scheduler/src/scheduler_server/grpc.rs:95:56:
called `Option::unwrap()` on a `None` value
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
thread 'tokio-runtime-worker' panicked at /home/andy/git/apache/datafusion-ballista/ballista/scheduler/src/scheduler_server/grpc.rs:95:56:
called `Option::unwrap()` on a `None` value

Here is the backtrace:

thread 'tokio-runtime-worker' panicked at /home/andy/git/apache/datafusion-ballista/ballista/scheduler/src/scheduler_server/grpc.rs:95:56:
called `Option::unwrap()` on a `None` value
stack backtrace:
   0: rust_begin_unwind
   1: core::panicking::panic_fmt
   2: core::panicking::panic
   3: core::option::unwrap_failed
   4: ballista_scheduler::scheduler_server::grpc::<impl ballista_core::serde::generated::ballista::scheduler_grpc_server::SchedulerGrpc for ballista_scheduler::scheduler_server::SchedulerServer<T,U>>::poll_work::{{closure}}
   5: <<ballista_core::serde::generated::ballista::scheduler_grpc_server::SchedulerGrpcServer<T> as tower_service::Service<http::request::Request<B>>>::call::PollWorkSvc<T> as tonic::server::service::UnaryService<ballista_core::serde::generated::ballista::PollWorkParams>>::call::{{closure}}
   6: <ballista_core::serde::generated::ballista::scheduler_grpc_server::SchedulerGrpcServer<T> as tower_service::Service<http::request::Request<B>>>::call::{{closure}}
andygrove commented 2 months ago

The scheduler receives this request from the executor:

Received poll_work request for ExecutorRegistration { id: "b81acaa8-2fd8-400d-aa4c-3faea28b60ed", port: 50051, grpc_port: 50052, specification: Some(ExecutorSpecification { resources: [ExecutorResource { resource: Some(TaskSlots(8)) }] }), optional_host: None }

This code panics when trying to get the IP address (which is missing):

                let metadata = ExecutorMetadata {
                    id: metadata.id,
                    host: metadata
                        .optional_host
                        .map(|h| match h {
                            OptionalHost::Host(host) => host,
                        })
                        .unwrap_or_else(|| remote_addr.unwrap().ip().to_string()),
andygrove commented 2 months ago

I managed to get it working. In the scheduler poll_work method, you need to change ..

        let remote_addr = request.remote_addr();

to

        let remote_addr = request
            .extensions()
            .get::<ConnectInfo<SocketAddr>>()
            .cloned();

I saw a similar change in register_executor

andygrove commented 2 months ago

I see that you pushed that fix while I was typing that!

palaska commented 2 months ago

Hey @andygrove , thanks for catching that. I only tested with -push_staged flag, my bad. It should be fixed now, we have a new way of extracting the connect info from the request.

andygrove commented 2 months ago

I have been testing with TPC-H, and many queries work. However, some queries, such as queries 2, 7, and 8, never complete, and I do not see any errors logged.

I won't have time to debug this issue until the weekend, and perhaps I should instead focus on adding some CI tests for TPC-H so that we can catch regressions like this in CI.

palaska commented 2 months ago

I have been testing with TPC-H, and many queries work. However, some queries, such as queries 2, 7, and 8, never complete, and I do not see any errors logged.

I won't have time to debug this issue until the weekend, and perhaps I should instead focus on adding some CI tests for TPC-H so that we can catch regressions like this in CI.

I've just looked at this, I think the never completing queries just try to return too many rows and they take too long. That is because the logical optimizer pushes down Limit to Sort but on logical plan deserialization, we seem to be ignoring it. This might be a quick fix: https://github.com/apache/datafusion/pull/12626.

andygrove commented 2 months ago

I've just looked at this, I think the never completing queries just try to return too many rows and they take too long. That is because the logical optimizer pushes down Limit to Sort but on logical plan deserialization, we seem to be ignoring it. This might be a quick fix: apache/datafusion#12626.

It works fine with the main branch though. Was this bug not present in earlier DataFusion versions?

palaska commented 2 months ago

I've just looked at this, I think the never completing queries just try to return too many rows and they take too long. That is because the logical optimizer pushes down Limit to Sort but on logical plan deserialization, we seem to be ignoring it. This might be a quick fix: apache/datafusion#12626.

It works fine with the main branch though. Was this bug not present in earlier DataFusion versions?

I think this bug was always there but Datafusion v42's logical optimizer removes Limit logical node after pushing down it's fetch while v41 doesn't remove it. Even though we haven't been deserializing Sort properly, having the wrapping Limit was saving us. I didn't pinpoint which commit changes the behavior in logical optimizer though.

andygrove commented 2 months ago

Would it make sense just to upgrade to DF 41 in this PR?

palaska commented 2 months ago

Would it make sense just to upgrade to DF 41 in this PR?

Raised a separate PR that upgrades Datafusion to 41: https://github.com/apache/datafusion-ballista/pull/1062, please see the last commit on that PR that reverts from 42 to 41 (it's not very small, maybe we can merge the hotfix and depend on the git revision until we get a new release). I had to disable two tests failing due to other bugs that are fixed on v42. All tpch queries seem to be working.

andygrove commented 2 months ago

I'm fine with pinning to a revision of DataFusion once your PR is merged over there.

palaska commented 2 months ago

I'm fine with pinning to a revision of DataFusion once your PR is merged over there.

Just explored this, pinning datafusion to a rev makes the python crate break since we cannot pin datafusion-python's datafusion dependency at the same time. Maybe we can just merge the upgrade to v41 and wait for v43 for now? 😅

andygrove commented 3 weeks ago

This upgrade has now happened in another PR. Thanks for starting this @palaska