datafusion-contrib / datafusion-objectstore-s3

S3 as an ObjectStore for DataFusion
Apache License 2.0
59 stars 13 forks source link

Experiment with rust-s3 and caching #53

Open mateuszkj opened 2 years ago

mateuszkj commented 2 years ago

This merge requests provide:

Caching

Motivation why caching is needed is because current datafusion implementation is not optimal for network file operations:

Results of very unscientific tests on local machine with my example SQL query:

Why not aws-sdk-rust?

I don't know how AWS works, units test on my local machine took several minutes to complete (with success). I think it was because of IMDS timeouts. I couldn't find how to disable it, so I have chosen much simpler implementation of S3 client.

I had also problems with rust_s3 and other S3 self-hosted server. I created MRs for fix it: https://github.com/durch/rust-s3/pull/267 and https://github.com/datenlord/s3-server/pull/114

What can be done next?

matthewmturner commented 2 years ago

thank you very much for the contribution @mateuszkj πŸ‘

CC @seddonm1 (as the original core author) and @houqp (has helped with reviewing changes / design here and has experience with delta-rs which i believe connects to s3 with rusoto).

To confirm the primary reason for switching to rust-s3 is performance?

Indeed rust-s3 does look much lighter, has significant usage, appears to be actively maintained (release a month ago), and it seems it might enable GCS as well. however, im a little concerned about moving from official aws implementation. i think we know that the official implementation will continue to be maintained and enhanced moving forward while its unclear what what the status of 3rd party libraries will be (i.e. rusoto going into maintenance mode https://github.com/rusoto/rusoto). my specific concern being we will have to rewrite again if the same thing happens to this library. can you provide any insight on this?

regarding the cache - can you please include a more implementation details and a description of how specifically it is used? Could we perhaps start with just using the cache functionality with aws-sdk-rust?

regarding the IMDS timeout - do you have an example in other languages of how to disable it that we could use as reference? and rust-s3 allowed disabling IMDS timeout?

regarding testing other s3 servers - if it isnt too much work would you be able to run CI against them using docker files (it looks like seaweed makes that easy)? I think this would be a great value add to the project regardless and would make comparing results easier.

Given the size of the change i will need some time to review the code - so please bare with me as i will likely need at least a few days to review and test.

thank you again for the contribution.

mateuszkj commented 2 years ago

thanks for your fast response @matthewmturner

After second thought I think this MR should not be merged. It can be done better. Can you mark this MR as draft?.

Moving cache mechanism to different crate

Now I consider that caching mechanism could be moved to its own ObjectStore implementation in another crate.

for example

let s3_file_system = S3FileSystem::new(...).await?;
let s3_cached_file_system = Arc::new(ObjectStoreCache::new(s3_file_system, ObjectStoreCacheOptions::from_envs()?));
runtime_env.register_object_store("s3", s3_cached_file_system);

with cache in different crate, we could have two S3 client implementations or other remote storage systems.

Cache description

First let look how datafusion read files without cache.

To disable cache set environment variables:

export RUST_LOG=info,datafusion_objectstore_s3=debug
export DATAFUSION_S3_MIN_REQUEST_SIZE=0
export DATAFUSION_S3_USE_METADATA_PREFETCH=false

Below is example of querying data using SQL simillar to:

SELECT
  s.col3_text,
  s.col4_int
FROM stops s
WHERE s.col1_time >= CAST('2022-03-02 00:00:00') AS Timestamp)
  AND s.col1_time < CAST('2022-03-03 00:00:00') AS Timestamp)
  AND s.col2_text = 'xxx'
ORDER BY s.col1_time;

Query uses 4 columns and filters by time and some text field. I have multiple *.parquet files in my table (89 files). I greped only one for clarity.

# Regier table. datafusion read metadata of every *.parquet file to collect schemas.
2022-04-10T18:28:13.751524Z direct id: 25 file: s3://data-2022-02.parquet start: 982029 len: 65536, rlen: 65536
2022-04-10T18:28:13.760407Z direct id: 25 file: s3://data-2022-02.parquet start: 951802 len: 30227, rlen: 30227

# datafusion read metadata of every *.parquet file to create phisical plan
2022-04-10T18:28:17.134288Z direct id: 203 file: s3://data-2022-02.parquet start: 982029 len: 65536, rlen: 65536
2022-04-10T18:28:17.150239Z direct id: 203 file: s3://data-2022-02.parquet start: 951802 len: 30227, rlen: 30227

# datafusion read metadata of every *.parquet file to execute phisical plan.
2022-04-10T18:28:19.187678Z direct id: 290 file: s3://data-2022-02.parquet start: 982029 len: 65536, rlen: 65536
2022-04-10T18:28:19.203489Z direct id: 290 file: s3://data-2022-02.parquet start: 951802 len: 30227, rlen: 30227

# datafusion read data from first column (iterdate over row groups)
2022-04-10T18:28:19.223254Z direct id: 290 file: s3://data-2022-02.parquet start: 220862 len: 14, rlen: 14
2022-04-10T18:28:19.235615Z direct id: 290 file: s3://data-2022-02.parquet start: 225370 len: 14, rlen: 14
2022-04-10T18:28:19.248903Z direct id: 290 file: s3://data-2022-02.parquet start: 229878 len: 14, rlen: 14
2022-04-10T18:28:19.257919Z direct id: 290 file: s3://data-2022-02.parquet start: 234386 len: 14, rlen: 14
2022-04-10T18:28:19.271822Z direct id: 290 file: s3://data-2022-02.parquet start: 238894 len: 14, rlen: 14
2022-04-10T18:28:19.284739Z direct id: 290 file: s3://data-2022-02.parquet start: 462074 len: 14, rlen: 14
2022-04-10T18:28:19.298708Z direct id: 290 file: s3://data-2022-02.parquet start: 466582 len: 14, rlen: 14
2022-04-10T18:28:19.313049Z direct id: 290 file: s3://data-2022-02.parquet start: 471090 len: 14, rlen: 14
2022-04-10T18:28:19.325878Z direct id: 290 file: s3://data-2022-02.parquet start: 475598 len: 14, rlen: 14
2022-04-10T18:28:19.338194Z direct id: 290 file: s3://data-2022-02.parquet start: 480106 len: 14, rlen: 14
2022-04-10T18:28:19.350122Z direct id: 290 file: s3://data-2022-02.parquet start: 704027 len: 14, rlen: 14
2022-04-10T18:28:19.362426Z direct id: 290 file: s3://data-2022-02.parquet start: 708535 len: 14, rlen: 14
2022-04-10T18:28:19.374202Z direct id: 290 file: s3://data-2022-02.parquet start: 713043 len: 14, rlen: 14
2022-04-10T18:28:19.386540Z direct id: 290 file: s3://data-2022-02.parquet start: 717551 len: 14, rlen: 14
2022-04-10T18:28:19.400017Z direct id: 290 file: s3://data-2022-02.parquet start: 722059 len: 14, rlen: 14
2022-04-10T18:28:19.412182Z direct id: 290 file: s3://data-2022-02.parquet start: 947346 len: 14, rlen: 14

# datafusion read data from second column (iterdate over row groups)
2022-04-10T18:28:19.424140Z direct id: 290 file: s3://data-2022-02.parquet start: 220981 len: 14, rlen: 14
2022-04-10T18:28:19.436095Z direct id: 290 file: s3://data-2022-02.parquet start: 225489 len: 14, rlen: 14
2022-04-10T18:28:19.448831Z direct id: 290 file: s3://data-2022-02.parquet start: 229997 len: 14, rlen: 14
2022-04-10T18:28:19.461895Z direct id: 290 file: s3://data-2022-02.parquet start: 234505 len: 14, rlen: 14
2022-04-10T18:28:19.474107Z direct id: 290 file: s3://data-2022-02.parquet start: 239013 len: 14, rlen: 14
2022-04-10T18:28:19.488363Z direct id: 290 file: s3://data-2022-02.parquet start: 462193 len: 14, rlen: 14
2022-04-10T18:28:19.503114Z direct id: 290 file: s3://data-2022-02.parquet start: 466701 len: 14, rlen: 14
2022-04-10T18:28:19.515595Z direct id: 290 file: s3://data-2022-02.parquet start: 471209 len: 14, rlen: 14
2022-04-10T18:28:19.527923Z direct id: 290 file: s3://data-2022-02.parquet start: 475717 len: 14, rlen: 14
2022-04-10T18:28:19.539998Z direct id: 290 file: s3://data-2022-02.parquet start: 480225 len: 14, rlen: 14
2022-04-10T18:28:19.553284Z direct id: 290 file: s3://data-2022-02.parquet start: 704146 len: 14, rlen: 14
2022-04-10T18:28:19.566201Z direct id: 290 file: s3://data-2022-02.parquet start: 708654 len: 14, rlen: 14
2022-04-10T18:28:19.578198Z direct id: 290 file: s3://data-2022-02.parquet start: 713162 len: 14, rlen: 14
2022-04-10T18:28:19.589092Z direct id: 290 file: s3://data-2022-02.parquet start: 717670 len: 14, rlen: 14
2022-04-10T18:28:19.602429Z direct id: 290 file: s3://data-2022-02.parquet start: 722178 len: 14, rlen: 14
2022-04-10T18:28:19.615445Z direct id: 290 file: s3://data-2022-02.parquet start: 947465 len: 14, rlen: 14

# datafusion read data from third column (iterdate over row groups)
2022-04-10T18:28:19.628900Z direct id: 290 file: s3://data-2022-02.parquet start: 221621 len: 14, rlen: 14
2022-04-10T18:28:19.642048Z direct id: 290 file: s3://data-2022-02.parquet start: 226129 len: 14, rlen: 14
2022-04-10T18:28:19.656429Z direct id: 290 file: s3://data-2022-02.parquet start: 230637 len: 14, rlen: 14
2022-04-10T18:28:19.669087Z direct id: 290 file: s3://data-2022-02.parquet start: 235145 len: 14, rlen: 14
2022-04-10T18:28:19.681621Z direct id: 290 file: s3://data-2022-02.parquet start: 239653 len: 14, rlen: 14
2022-04-10T18:28:19.696082Z direct id: 290 file: s3://data-2022-02.parquet start: 462833 len: 14, rlen: 14
2022-04-10T18:28:19.708401Z direct id: 290 file: s3://data-2022-02.parquet start: 467341 len: 14, rlen: 14
2022-04-10T18:28:19.721417Z direct id: 290 file: s3://data-2022-02.parquet start: 471849 len: 14, rlen: 14
2022-04-10T18:28:19.733059Z direct id: 290 file: s3://data-2022-02.parquet start: 476357 len: 14, rlen: 14
2022-04-10T18:28:19.745427Z direct id: 290 file: s3://data-2022-02.parquet start: 480865 len: 14, rlen: 14
2022-04-10T18:28:19.758242Z direct id: 290 file: s3://data-2022-02.parquet start: 704786 len: 14, rlen: 14
2022-04-10T18:28:19.770439Z direct id: 290 file: s3://data-2022-02.parquet start: 709294 len: 14, rlen: 14
2022-04-10T18:28:19.783198Z direct id: 290 file: s3://data-2022-02.parquet start: 713802 len: 14, rlen: 14
2022-04-10T18:28:19.795203Z direct id: 290 file: s3://data-2022-02.parquet start: 718310 len: 14, rlen: 14
2022-04-10T18:28:19.808062Z direct id: 290 file: s3://data-2022-02.parquet start: 722818 len: 14, rlen: 14
2022-04-10T18:28:19.827283Z direct id: 290 file: s3://data-2022-02.parquet start: 948105 len: 14, rlen: 14

# datafusion read data from four column (iterdate over row groups)
2022-04-10T18:28:19.840199Z direct id: 290 file: s3://data-2022-02.parquet start: 223858 len: 14, rlen: 14
2022-04-10T18:28:19.853255Z direct id: 290 file: s3://data-2022-02.parquet start: 228366 len: 14, rlen: 14
2022-04-10T18:28:19.866894Z direct id: 290 file: s3://data-2022-02.parquet start: 232874 len: 14, rlen: 14
2022-04-10T18:28:19.878996Z direct id: 290 file: s3://data-2022-02.parquet start: 237382 len: 14, rlen: 14
2022-04-10T18:28:19.892441Z direct id: 290 file: s3://data-2022-02.parquet start: 241890 len: 14, rlen: 14
2022-04-10T18:28:19.906870Z direct id: 290 file: s3://data-2022-02.parquet start: 465070 len: 14, rlen: 14
2022-04-10T18:28:19.925541Z direct id: 290 file: s3://data-2022-02.parquet start: 469578 len: 14, rlen: 14
2022-04-10T18:28:19.939221Z direct id: 290 file: s3://data-2022-02.parquet start: 474086 len: 14, rlen: 14
2022-04-10T18:28:19.951420Z direct id: 290 file: s3://data-2022-02.parquet start: 478594 len: 14, rlen: 14
2022-04-10T18:28:19.964162Z direct id: 290 file: s3://data-2022-02.parquet start: 483102 len: 14, rlen: 14
2022-04-10T18:28:19.977930Z direct id: 290 file: s3://data-2022-02.parquet start: 707023 len: 14, rlen: 14
2022-04-10T18:28:19.992568Z direct id: 290 file: s3://data-2022-02.parquet start: 711531 len: 14, rlen: 14
2022-04-10T18:28:20.005620Z direct id: 290 file: s3://data-2022-02.parquet start: 716039 len: 14, rlen: 14
2022-04-10T18:28:20.016705Z direct id: 290 file: s3://data-2022-02.parquet start: 720547 len: 14, rlen: 14
2022-04-10T18:28:20.028415Z direct id: 290 file: s3://data-2022-02.parquet start: 725055 len: 14, rlen: 14
2022-04-10T18:28:20.040173Z direct id: 290 file: s3://data-2022-02.parquet start: 950342 len: 14, rlen: 14

What we see in logs:

My understanding to implement cache which was:

CI with other self-hosted S3 servers

About running CI with other self-hosted S3 servers is great idea. I could do it when i will moving cache to another crate. But for now I want to focus on sth else.

aws-sdk-rust and rust-s3

About why I dropped aws-sdk-rust. I don't have experience with AWS. I don't have a clue what IMDS is and why is required (?). When I run unit tests on my linux machine I got logs with IMDS timeout for every S3 request (somehow unit tests passed successfully but took long time to complete). Maybe I have to install something or disable (?). After spending haft of day on this problem i just rewritten it to rust-s3 which worked at first try and it acceptable fast.

One of drawbacks of rust-s3 is that only one person can publish to crates.io.

Maybe we need two S3 implementations, one for AWS services with official aws-sdk-rust crate and another for self-hosted S3 servers without AWS complexity. Or maybe IMDS complexity can be disabled with aws-sdk-rust.

matthewmturner commented 2 years ago

Thanks for the detailed write up @mateuszkj - it makes sense.

I think that bringing up the cache on the main datafusion repo would be a good starting point to see what the community thinks about this. For example there is an active HDFS community using datafusion as well (https://github.com/datafusion-contrib/datafusion-objectstore-hdfs) and perhaps this could benefit them. In general there is a lot of focus on improving performance - and parallelism within datafusion (for example https://github.com/apache/arrow-datafusion/issues/2199) so i think its a good time to bring it up. I can help with creating that if you would like.

Regarding CI and testing with other S3 providers - I can work on this. I think its a valuable addition to the project regardless and should help with whatever changes come from this.

I will look into reproducing the IMDS issue. I think once we have a better understanding of it then we can consider whether it makes sense to use alternative s3 implementations.

Thanks working with me on this!

tustvold commented 2 years ago

Hi, thanks for working on this. I'm currently actively working on making it so that IOx can make use of more of the DataFusion functionality for object storage, where it currently does its own thing. I am therefore very interested in helping drive improvements in this space :+1:

70 requests for one file. 2744 request for total of 89 files.

This might be a behaviour we want to fix upstream in DataFusion/arrow-rs, as opposed to working around with caching. For reference https://github.com/apache/arrow-rs/issues/1473 contains some investigative work refining the interface exposed by the parquet crate, and in particular https://github.com/apache/arrow-rs/pull/1509 might be of interest. It is all still proposals and experiments at this stage, but I would be more than happy to help draft up some issues if you (or anyone else) wanted to lend a hand getting this over the line.

Store metadata for all parquet files in RAM. So, we don't have to read it from S3 server every single SQL query.

Not needing to scan or list object storage in order to plan the query seems very sensible. One thought might be to integrate with Object Storage at the TableProvider level, as opposed to the ObjectStore level to allow using a dedicated catalog. In IOx we use a custom catalog, but I could see adding support for a Hive compatible MetaStore as being a valuable feature for the DataFusion ecosystem.

Using caching to accelerate queries on datasets that lack a dedicated catalog is definitely still useful, just mentioning that typically one would have some sort of catalog. I've personally had a lot of success using lambdas to populate an AWS Glue catalog, but there are loads of alternatives in this space.

https://github.com/apache/arrow-datafusion/issues/2079 might also be relevant and concerns how the TableProvider translates the catalog information into a physical query plan.

IMDS timeout for every S3 request

I'm not familiar with aws-sdk-rust, as I've only ever used rusoto, but IMDS is the mechanism by which instances running in AWS obtain IAM credentials with which to authorize with object storage. It does this by talking to an HTTP service running at a special IP. This won't exist when running on your local machine and so you might need to provide dummy AWS access credentials as environment variables, e.g. AWS_ACCESS_KEY_ID=local, or otherwise manipulate the credential chain being used by the SDK to make this work.

matthewmturner commented 2 years ago

@tustvold thank you for the insight - very helpful. personally, i am interested in learning about and implementing these types of performance improvements. so if you could draft some issues outlining what you are looking to accomplish i would be happy to try and help.

tustvold commented 2 years ago

Ok I'll see what I can do, I have quite a few plates in the air at the moment so might be a day or two before I can produce anything coherent, but I'll be sure to tag you on any issues once I file them πŸ˜€

matthewmturner commented 2 years ago

@tustvold No rush - I have a couple things I'll need to close before being able to start on that anyway. Thanks again.

seddonm1 commented 2 years ago

Hi everyone. I wrote the base implementation so any blame can be directed at me πŸ˜‰.

I think moving away from the official AWS SDK would be a mistake as they are developing a strong Rust practice so I would expect it to be better maintained than the others going forward (maintaining a library for someone else's API is a bit of a thankless task).

This was always a bit of a hacky implementation until Async datasources are properly implemented. It is good the IOx team is onboard as I hope we can arrive on one canonical implementation - that deals with all of the issues above. That means any caching ideas would be good to incorporate. πŸ‘

matthewmturner commented 2 years ago

@seddonm1 No blame being directed - we wouldn't be having this conversation without your efforts.

houqp commented 2 years ago

As @tustvold mentioned, IMDS won't work for custom S3 implementations. If this is the problem that's causing the slow down, then it's likely that the official S3 client is not as bad as we observed here. It is usually used when a default credential chain is configured for the client, then it will use IMDS to discover what AWS environment the client is in and what methods to use to fetch the IAM credential for S3 access. The client might use IMDS to discover which region it's in as well. As a quick test, you should be able to turn it off by using a static client credential by hard coding the access keys and region. If that doesn't work, I recommend filing an issue in the upstream official AWS repo, it should be a fairly straight forward fix.

houqp commented 2 years ago

Another place that I think caching could help is at the object store level to perform look ahead scan, i.e. fetch the next chunk of data from S3 while the system is performing compute on the previous chunk.