apache / datafusion-ballista

Apache DataFusion Ballista Distributed Query Engine
https://datafusion.apache.org/ballista
Apache License 2.0
1.56k stars 197 forks source link

[Ballista] Support to access remote object store, like HDFS, S3, etc #6

Open yahoNanJing opened 2 years ago

yahoNanJing commented 2 years ago

Is your feature request related to a problem or challenge? Please describe what you are trying to do.

After introducing the object store API, to support to access remote object store for Ballista executors, there are still some gap. For example, as apache/arrow-ballista#22 and apache/arrow-ballista#10 mentioned, ballista is not able to support remote object store.

Describe the solution you'd like

Our workaround is to make the file path self described. For example, a local file path should be file://tmp/..., a hdfs file path should hdfs://localhost:xxx:/tmp/...

matthewmturner commented 2 years ago

@yahoNanJing fyi @seddonm1 and I have been working on https://github.com/datafusion-contrib/datafusion-objectstore-s3. Still early stages but would be great to have more use cases that we could steer development with.

thinkharderdev commented 2 years ago

This PR would support this use case: https://github.com/apache/arrow-datafusion/pull/1677

yahoNanJing commented 2 years ago

Hi @matthewmturner, thanks for your info. It's really helpful example for creating another independent crate, like datafusion-objectstore-hdfs. However, it would better to avoid new object store registration for cases like providing sql service without any places for users to do manually registration. And it's not a good way to do the registration for each sql or each session, since it's quite stable for the data sources the service provides for.

yahoNanJing commented 2 years ago

This PR would support this use case: https://github.com/apache/arrow-datafusion/pull/1677

Thanks @thinkharderdev. I think it's better for the path to be self-described, like s3://hostname:port/xxx/... or hdfs://hostname:port/xxx/.... Then the object store management service can detect which remote object store will be used.

If possible, it's better to provide a way to avoid remote object store registration.

matthewmturner commented 2 years ago

@yahoNanJing there actually already is a hdfs extension here https://github.com/datafusion-contrib/datafusion-hdfs-native.

Understood on your point - in that case what would be your ideal API for accessing S3? I would have thought the service could register the ObjectStore to share with clients and clients could access after registration at service level had been completed.

thinkharderdev commented 2 years ago

This PR would support this use case: apache/arrow-datafusion#1677

Thanks @thinkharderdev. I think it's better for the path to be self-described, like s3://hostname:port/xxx/... or hdfs://hostname:port/xxx/.... Then the object store management service can detect which remote object store will be used.

If possible, it's better to provide a way to avoid remote object store registration.

Not sure I follow. I agree that the object store should be resolved from the file URI, but we would still have to register an ObjectStore in the ExecutionContext in order to use it right?

yahoNanJing commented 2 years ago

Hi @matthewmturner and @thinkharderdev, what I'm thinking is to introduce some remote object store extensions as Datafusion features. These extensions will be inclusive crates and are managed independently so that this solution will not introduce additional maintenance effort for Datafusion.

However, this solution is blocked by apache/arrow-datafusion#1772. After the datasource module is splitted into an independent crate. Then the remote object store extensions will be able to just depend on that crate. In other places, we can setup whether to enable the extension features or not. By this way, we can avoid cyclic dependency.

Further to say, for the default object stores in the ObjectStoreRegistry, maybe better to make it configurable so that we can avoid remote object store registration manually.

milenkovicm commented 2 years ago

Had a quick look into this issue, and from what I can see, there is not nothing missing on datafusion side to have this functionality (apart from some hard work :)).

Team did a great job to add support for object store in datafusion:

use std::sync::Arc;
use datafusion::{
    datasource::listing::{ListingTable, ListingTableConfig, ListingTableUrl},
    prelude::SessionContext,
};
use log::info;
use object_store::aws::AmazonS3Builder;

    let ctx = SessionContext::new();

    let s3 = AmazonS3Builder::new()
        .with_region("us-east-1")
        .with_bucket_name("testbucket")
        .with_access_key_id("MINIO")
        .with_secret_access_key("MINIO/MINIO")
        .with_endpoint("http://localhost:9000")
        .with_allow_http(true)
        .build()
        .unwrap();

    let s3 = Arc::new(s3);

    ctx.runtime_env()
        .register_object_store("s3", "localhost:9000", s3);

    let url = ListingTableUrl::parse("s3://localhost:9000/testpath/").unwrap();

    let config = ListingTableConfig::new(url)
        .infer(&ctx.state())
        .await
        .unwrap();

    let table = ListingTable::try_new(config).unwrap();
    ctx.register_table("test", Arc::new(table)).unwrap();

    ctx.sql("SELECT * FROM test")
        .await
        .unwrap()
        .show()
        .await
        .unwrap();

I give quick try with ballista standalone, changing code a bit to expose RuntimeEnv on client, scheduler, and executor and registering store on each of them manually. At the end, it did produce correct result. Currently getting to a RuntimeEnv is not "walk in a park", few hacks here and there were needed, but it is not hard to make it easier. It would then be possible load object store providers from configuration files.

Alternatively register_object_store can be provided directly on the BallistaContext and then somehow object store configuration may be magically serialized and handled on other actors in the system. AmazonS3Builder should probably be modified so it can be serialized.

avantgardnerio commented 2 years ago

@yahoNanJing this issue seems related to https://github.com/apache/arrow-datafusion/pull/3311 where we are working towards allowing users to register delta-rs tables dynamically through SQL at runtime in Ballista.

milenkovicm commented 2 years ago

@avantgardnerio SQL would be perfect fit

avantgardnerio commented 2 years ago

ObjectStore in the ExecutionContext in order to use it right?

I think the problem is that this must happen dynamically in the case of a DataFusion executor in Ballista. The solution I am proposing is a TableProviderFactory in https://github.com/apache/arrow-datafusion/pull/3311

Edit: by dynamically, I mean the name of the table and the path to it will not be known at compile time.

saikrishna1-bidgely commented 1 year ago

Is loading the data from S3 possible with a distributed setup right now? If so, can you provide a small example? I tried with a path on S3 and it failed with "no object store available for s3" error. This happened after I added the s3 feature to my project. Looks like s3 feature hasn't been added to scheduler and executor.

ahmedriza commented 1 year ago

@saikrishna1-bidgely, here's an example of what I tried and this worked. I'm not 100% sure that this is how it is supposed to be :-)

I tested with the following sample code:

use ballista::prelude::BallistaContext;
use ballista_core::config::BallistaConfig;
use datafusion::prelude::ParquetReadOptions;

#[tokio::main]
pub async fn main()  {
    let config = BallistaConfig::builder().build().unwrap();
    let ctx = BallistaContext::remote("localhost", 50050, &config).await.unwrap();
    let filename = "s3://foo/test.parquet";
    let df = ctx
        .read_parquet(filename, ParquetReadOptions::default())
        .await?;
    let rows = df.count().await?;
    println!("rows: {}", rows);
}

The code correctly returns the number of rows in the Parquet file:

rows: 15309

Last bit of logging from the scheduler process:

2023-02-10T23:16:11.345868Z  INFO tokio-runtime-worker ThreadId(05) ballista_scheduler::display: === [pRdAqhp/2] Stage finished, physical plan with metrics ===
ShuffleWriterExec: None, metrics=[output_rows=1, input_rows=1, repart_time=1ns, write_time=721.748µs]
  AggregateExec: mode=Final, gby=[], aggr=[COUNT(NULL)], metrics=[output_rows=1, elapsed_compute=28.791µs, spill_count=0, spilled_bytes=0, mem_used=0]
    CoalescePartitionsExec, metrics=[]
      ShuffleReaderExec: partitions=1, metrics=[]

2023-02-10T23:16:11.346163Z  INFO tokio-runtime-worker ThreadId(05) ballista_scheduler::state::execution_graph: Job pRdAqhp is success, finalizing output partitions
2023-02-10T23:16:11.346354Z  INFO tokio-runtime-worker ThreadId(05) ballista_scheduler::scheduler_server::query_stage_scheduler: Job pRdAqhp success

From the executor process:

2023-02-10T23:16:11.241689Z  INFO          task_runner ThreadId(22) ballista_executor::metrics: === [pRdAqhp/2/0] Physical plan with metrics ===
ShuffleWriterExec: None, metrics=[output_rows=1, input_rows=1, write_time=721.748µs, repart_time=1ns]
  AggregateExec: mode=Final, gby=[], aggr=[COUNT(NULL)], metrics=[output_rows=1, elapsed_compute=28.791µs, spill_count=0, spilled_bytes=0, mem_used=0]
    CoalescePartitionsExec, metrics=[]
      ShuffleReaderExec: partitions=1, metrics=[]

Hope this helps.

saikrishna1-bidgely commented 1 year ago

@ahmedriza can you make this into a PR pls. That would be a lot helpful.

saikrishna1-bidgely commented 1 year ago

@ahmedriza I tried what you suggested. I built the scheduler and executor with the s3 feature added to ballista-core dependency in cargo.toml. But I'm getting the same error: Error: DataFusionError(Execution("No object store available for s3://ballista-test-bucket/temp.csv")). I'm running this on Windows 10. Also, I'm running the code in another repo with this as cargo.toml:

[package]
name = "ballista-test"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
ballista = "0.11.0"
datafusion = "18.0.0"
tokio = "1.0"
parquet = "29.0.0"
YuriyGavrilov commented 1 year ago

There is also could be nice to have an uplink support of storj network https://github.com/storj/uplink (Rust bindings for libuplink https://github.com/storj-thirdparty/uplink-rust). It provide a direct access to the storj network avoiding s3 gateway bottleneck. (https://github.com/storj/storj/wiki/Libuplink-Walkthrough)

benedetto73 commented 4 months ago

@ahmedriza is this going to be fixed? I still experience the same issue as @saikrishna1-bidgely reported.