Is your feature request related to a problem or challenge? Please describe what you are trying to do.
On datafusion, described as one of ballista components, I can load a hive-style partitioned dataset using the ListingOptions struct:
let partition_columns: Vec<(String, DataType)> = vec![("uf".to_owned(), DataType::Utf8), ("municipio".to_owned(), DataType::Utf8)];
let listing_options =
ListingOptions::new(Arc::new(file_format))
.with_table_partition_cols(partition_columns);
let config = ListingTableConfig::new(listing_table_url)
.with_listing_options(listing_options)
.with_schema(resolved_schema);
let table_provider = Arc::new(listing_table);
session_context.register_table(table_name, table_provider)?;
I can use the same table_provider object in register_table for a ballista context, it recognizes file fields and return success, but ignores hive-style partition columns.
Describe the solution you'd like
Given a hive-style partitioned dataset, local or in an object storage, like in the followin example:
$> tree resources/age_gender_draw/
resources/age_gender_draw/
├── uf=CE
│ └── municipio=Fortaleza
│ └── fortaleza.parquet
├── uf=DF
│ └── municipio=Brasilia
│ └── brasilia.parquet
├── uf=RJ
│ └── municipio=Rio De Janeiro
│ └── rio_de_janeiro.parquet
└── uf=SP
└── municipio=Sao Paulo
└── sao_paulo.parquet
Ballista should be able to recognize the listing_options configuration when loading a dataset, registering as table or load as dataframe. Basically, the following code should work.
Given the registering function:
pub async fn register_parquet(table_name: &str, source_path: &str, session_context: &SessionContext, ballista_context: &BallistaContext) -> anyhow::Result<()> {
let session_state = session_context.state();
let partition_columns: Vec<(String, DataType)> = vec![("uf".to_owned(), DataType::Utf8), ("municipio".to_owned(), DataType::Utf8)];
let listing_table_url = ListingTableUrl::parse(source_path)?;
let file_format =
ParquetFormat::new()
.with_enable_pruning(Some(true))
.with_skip_metadata(Some(true));
let listing_options =
ListingOptions::new(Arc::new(file_format))
.with_table_partition_cols(partition_columns);
let resolved_schema = listing_options
.infer_schema(&session_state, &listing_table_url)
.await?;
let config = ListingTableConfig::new(listing_table_url)
.with_listing_options(listing_options)
.with_schema(resolved_schema);
let listing_table =
ListingTable::try_new(config)?;
let table_provider = Arc::new(listing_table);
ballista_context.register_table(table_name, table_provider.clone())?;
session_context.register_table(table_name, table_provider)?;
Ok(())
}
and the use as:
let query = "SELECT distinct uf, age_group from age_gender_draw LIMIT 10";
register_parquet(table_name, source_path, &session_context, &ballista_context).await?;
let ballista_processed_data_frame = run_query_in_ballista(&query, &ballista_context).await?;
ballista_processed_data_frame.show().await?;
The following error, that happens when when generating ballista_processed_data_frame should not happen:
Error: Schema error: No field named 'uf'. Valid fields are 'age_gender_draw'.'maid', 'age_gender_draw'.'per_capita_income', 'age_gender_draw'.'gender', 'age_gender_draw'.'age_group'.
Caused by:
No field named 'uf'. Valid fields are 'age_gender_draw'.'maid', 'age_gender_draw'.'per_capita_income', 'age_gender_draw'.'gender', 'age_gender_draw'.'age_group'.
$>
The result, should be the same as generating fusion_processed_data_frame:
let fusion_processed_data_frame = run_query_in_fusion(&query, &session_context).await?;
fusion_processed_data_frame.show().await?;
Describe alternatives you've considered
Using data fusion is a possible alternative for small datasets, but in my case, where huge datasets are common, Ballistra should be used to scale processing over distributed nodes.
Additional context
There is nothing more I can share now, sorry.
With some help I am able to execute the task and open a pull request, but would need some help to know where to start.
Is your feature request related to a problem or challenge? Please describe what you are trying to do. On datafusion, described as one of ballista components, I can load a hive-style partitioned dataset using the
ListingOptions
struct:I can use the same
table_provider
object inregister_table
for a ballista context, it recognizes file fields and return success, but ignores hive-style partition columns.Describe the solution you'd like Given a hive-style partitioned dataset, local or in an object storage, like in the followin example:
Ballista should be able to recognize the
listing_options
configuration when loading a dataset, registering as table or load as dataframe. Basically, the following code should work.Given the registering function:
and the use as:
The following error, that happens when when generating
ballista_processed_data_frame
should not happen:The result, should be the same as generating
fusion_processed_data_frame
:Results in
Describe alternatives you've considered Using data fusion is a possible alternative for small datasets, but in my case, where huge datasets are common, Ballistra should be used to scale processing over distributed nodes.
Additional context There is nothing more I can share now, sorry.
With some help I am able to execute the task and open a pull request, but would need some help to know where to start.