StarRocks / starrocks

StarRocks, a Linux Foundation project, is a next-generation sub-second MPP OLAP database for full analytics scenarios, including multi-dimensional analytics, real-time analytics, and ad-hoc queries.
https://starrocks.io
Apache License 2.0
8.66k stars 1.75k forks source link

Stream load Parquet file #13319

Open alanpaulkwan opened 1 year ago

alanpaulkwan commented 1 year ago

I imagine it's a lot more performant, given the bandwidth would be lower, the types are pre-defined, and the columnar storage is similar.

chaoyli commented 1 year ago

@alanpaulkwan It's a good feature. Previously, we do not support the parquet in stream load. Because we are refactoring the ingestion framework. We are trying to unify the ingestion interface. We may need time to discuss whether the old interface should support it. Could you elaborate on which case you have to use parquet in stream load?

alanpaulkwan commented 1 year ago

The use case is simple: I am working on a local file with a scripting language like Python/R and want to insert into StarRocks. Parquet is faster to write, takes less space and thus faster to transfer, and programs like R/Python write parquet just like they do .csv files. It avoids having to maintain HIVE/Hudi/Deltalake etc. to load Parquet files

Most of my files I store as Parquet, right now I must convert to .csv or Clickhouse to get them into StarRocks.

alanpaulkwan commented 1 year ago

A cool add-on might be to leverage Parquet's self-describing feature to infer a schema for the Parquet file without the user having to define one.

chaoyli commented 1 year ago

@alanpaulkwan Do not define a schema previously?

alanpaulkwan commented 1 year ago

yes, I define a schema first.

I am going to re-iterate this suggestion. I have a very simple file where it has company names. If I stream load a CSV the rows get filtered because 'HEGDE FUND NAME, LLC' is confused for a delimiter. It's simply not tenable I think. Parquet is designed for this purpose. Right now to use Parquet I have to bounce it off a minIO bucket, guess the schema, etc to get it into StarRocks. it's really painful to use.

alberttwong commented 1 year ago

https://github.com/StarRocks/starrocks/issues/26060

alberttwong commented 1 year ago

broker isn't available in the allin1 image yet.

alter system add broker local_load "172.26.199.40:8000";
load label xxxx1 (data infile("file:///home/disk1/zhaoheng/ssb-poc-0.10.0/ssb-poc/output/data_dir/user_behavior_sample_data.parquet") into table user_behavior format as "parquet"(UserID,ItemID,CategoryID,BehaviorType,Timestamp) ) with broker local_load properties("timeout"="3600");
DanRoscigno commented 1 year ago

broker isn't available in the allin1 image yet.

alter system add broker local_load "172.26.199.40:8000";
load label xxxx1 (data infile("file:///home/disk1/zhaoheng/ssb-poc-0.10.0/ssb-poc/output/data_dir/user_behavior_sample_data.parquet") into table user_behavior format as "parquet"(UserID,ItemID,CategoryID,BehaviorType,Timestamp) ) with broker local_load properties("timeout"="3600");

@alberttwong Are the commands above sufficient to add the broker to the running allin1 image, or are those the commands that would work once the broker is added? I tried the above and I am seeing type:ETL_RUN_FAIL; msg:failed to find alive broker: local_load when I try to load the Parquet file.

kevincai commented 1 year ago

broker isn't available in the allin1 image yet.

alter system add broker local_load "172.26.199.40:8000";
load label xxxx1 (data infile("file:///home/disk1/zhaoheng/ssb-poc-0.10.0/ssb-poc/output/data_dir/user_behavior_sample_data.parquet") into table user_behavior format as "parquet"(UserID,ItemID,CategoryID,BehaviorType,Timestamp) ) with broker local_load properties("timeout"="3600");

@alberttwong Are the commands above sufficient to add the broker to the running allin1 image, or are those the commands that would work once the broker is added? I tried the above and I am seeing type:ETL_RUN_FAIL; msg:failed to find alive broker: local_load when I try to load the Parquet file.

the command will work when broker is added into allin1 image which is not done yet. stay tuned.

alberttwong commented 1 year ago

@DanRoscigno broker in the allin1 image won't be ready until the 3.1 GA release. If you are using the linux binary install, this will work.

kevincai commented 1 year ago

Have built a private allin1 image at lvlouisaslia/allin1-ubi:test based on PR #28240 , a broker service is added into the image.

Tried following scenario:

Example parquet file downloaded from userdata1.parquet.

MySQL [(none)]> use test; Database changed

* create test table
```sql
CREATE TABLE IF NOT EXISTS sr_user (
id          int,    
first_name      string, 
last_name       string, 
email           string, 
gender          string, 
ip_address      string, 
cc          string, 
country         string, 
birthdate       string, 
salary          double, 
title           string, 
comments        string
) DISTRIBUTED BY HASH(id);

MySQL [test]> select count(*) from sr_user;
+----------+
| count(*) |
+----------+
|        0 |
+----------+
1 row in set (0.11 sec)

MySQL [test]> show load \G 1. row JobId: 11082 Label: BROKER_LOAD_LOCAL_FILE State: FINISHED Progress: ETL:100%; LOAD:100% Type: BROKER Priority: NORMAL ScanRows: 1000 FilteredRows: 0 UnselectedRows: 0 SinkRows: 1000 EtlInfo: NULL TaskInfo: resource:N/A; timeout(s):3600; max_filter_ratio:0.0 ErrorMsg: NULL CreateTime: 2023-07-30 10:14:02 EtlStartTime: 2023-07-30 10:14:08 EtlFinishTime: 2023-07-30 10:14:08 LoadStartTime: 2023-07-30 10:14:08 LoadFinishTime: 2023-07-30 10:14:08 TrackingSQL: JobDetails: {"All backends":{"079c44be-518f-4a10-8fe4-aa6aeb8dcdc6":[11003]},"FileNumber":1,"FileSize":113629,"InternalTableLoadBytes":162150,"InternalTableLoadRows":1000,"ScanBytes":113629,"ScanRows":1000,"TaskNumber":1,"Unfinished backends":{"079c44be-518f-4a10-8fe4-aa6aeb8dcdc6":[]}} 1 row in set (0.00 sec)


* check the loaded data
```sql
MySQL [test]> select count(*) from sr_user;
+----------+
| count(*) |
+----------+
|     1000 |
+----------+
1 row in set (0.04 sec)
alberttwong commented 1 year ago

https://github.com/StarRocks/starrocks/issues/23625

alanpaulkwan commented 1 year ago

If I understand correctly, this requires the local broker to be running on the same machine where SR is running?

Would this use case be able to accommodate the scenario where I have my laptop in one place and SR running remotely?

alberttwong commented 7 months ago

As of right now, we still do not support stream load for parquet files.

jaogoy commented 4 months ago

@alanpaulkwan Will the parquet files loaded from local be larger than 4GB? Even we can implement it in Stream Load, it needs to buffer a whole parquet file in memory, or to save the overall loaded file data into BE's disk and read from BE's local disk to continue the data loading.

johnpyp commented 1 month ago

I'd like to reiterate this request as well, as stream load is definitely the most flexible ingestion option. Broker load has had performance issues, sometimes takes a long time to start up, etc., and I'd prefer relying on my own parallel ingestion code to handle it. Particularly when handling millions of parquet files, the other ingestion options end up far slower presumably due to scanning/buffering than what I'd like.

Apache Doris has implemented Parquet support for stream load, and it's ended up being the most performant ingestion option for me so far.

Regarding file limit, it makes sense to me that the size of individual parquet files will need to be within a configurable memory limit and should be expected to be read entirely into memory. As long as parallelism doesn't make this OOM trivially, it should be fine to have a limit.

kevincai commented 3 weeks ago

the broker part for allin1 image is done.

Assign to @jaogoy for the continuous discussion of stream load supporting parquet file

jaogoy commented 3 weeks ago

It's good to support loading Parquet files through Stream Load. However, currently, there are few users to load Parquet files through Stream Load, b/c most of the Parquet files are on Cloud Storage.