apache / druid

Apache Druid: a high performance real-time analytics database.
https://druid.apache.org/
Apache License 2.0
13.43k stars 3.69k forks source link

If SQL Input Source can support Hive, Iceberg, and Presto JDBC, that would be super awesome and I will tell you why. #12746

Closed didip closed 7 months ago

didip commented 2 years ago

Description

The way index_parallel downloads *.parquet files is a bit too low level. It has no idea about Iceberg's versioning scheme. So if you just point index_parallel to Iceberg's S3 folder, you will get duplicated data.

Now, what if you use SqlInputStream to SELECT * from iceberg_table where date=2022-07-01? Then you will get the latest version of Iceberg data and the ingestion will be correct.

Motivation

Now let's expand this "new feature" to also supports Hive and Presto JDBC... Druid's index_parallel will become so much more flexible to the point where it's no longer necessary to have a Spark connector plugin. The entire Druid ecosystem will be so much more awesome. (Provided that the ingestion is fast enough and parallelizable enough).

gianm commented 2 years ago

I agree totally that it would be a cool feature. my 2¢ on implementation:

The SQL input source is built to be super generic and pull from any database over JDBC. It's flexible, but I think performance and scalability would not be sufficient for pulling large amounts of data from a data lake. IMHO the best place to do Iceberg (or Delta Lake) integration would be in the InputSources. There's a couple of ways to do it:

  1. Add a "file chooser" option to existing input sources like s3, google, hdfs, etc. Implement Iceberg and Delta Lake file choosers. The file lists would be filtered through these choosers, and then our regular code would take over.
  2. iceberg and deltaLake input sources that accept data specs in whatever form is most natural for the system. Internally those input sources would need to have some code for talking to S3, GCS, HDFS, or wherever else data is stored. They'd likely do it through libraries provided by those other projects. This wouldn't compose with our existing s3, google, hdfs, etc, input sources.

To me, the first option is preferable, unless there is some downside that requires the second option. Experience with Hadoop suggests it's not a great idea to let the integration fully control communication with remote storage, and it's better to do composable approaches.

didip commented 2 years ago

About file choosers, I am not sure that it is possible to pick and choose the parquet files that contains the latest version in iceberg. Is it possible?

I have been having a hard time trying to get an answer to that question.

gianm commented 2 years ago

I am not sure that it is possible to pick and choose the parquet files that contains the latest version in iceberg. Is it possible?

I don't know either. I'm not too familiar with either Iceberg or Delta Lake.

FrankChen021 commented 2 years ago

If we want to build this feature upon SQL input source, I think we need to improve it first. Current SQL input source requires either postgres-metadata-storage or mysql-metadata-storage to be loaded. But If we want to ingest data from both postgres and mysql, it's not able to do that(See #11733). Also making SQL input source depend on metadata storage also is very wired.

I agree that current SQL input source may not be suffient and scalable enough for large amount data, but if we want to make Druid integrate with more SQL based external data sources and enrich its ecology, it's better to improve it as first citizen in Druid so that we don't need to make any changes to support a new SQL-based data source.

abhishekagarwal87 commented 2 years ago

IMO it seems a bit weird to model iceberg and delta as input sources. They do look like file formats to me - though more of a logical file format than a physical one. If I understand correctly, one reason that we want to use SQLInputSource is the flexibility and ease of use of SQL as an interface. The underlying protocol itself to read data from these formats can still be anything that is splittable and performant. We will get the former once we support batch ingestion via SQL. For the latter, we should implement a custom input format for these formats.

There will be some work here that goes beyond implementing a format e.g. if I write "select * from iceberg_table where date=XYZ" - how does that translate into a set of iceberg folders/filers to be read? I think that this is where option 1) suggested by Gian is handy. Though it's limiting in the sense that it cannot do anything fancier. Say I want to read just one column C of a parquet file on S3. In such a case, I would want to issue a range get request to S3 with start and end pointing to column section C within that parquet file. This is possible if the parquet format itself is constructing S3 requests.

There is also potential synergy with Druid catalog proposal. If this catalog understands iceberg or delta tables, then the catalog can be used to filter the files that need to be accessed for that table.

FrankChen021 commented 2 years ago

Batch ingestion via SQL is a good and the utimate way. I don't know at the first stage if the ingestion SQL will be planned and converted into a SQL input source.

paul-rogers commented 2 years ago

For a bit of color, Apache Drill and Presto/Trino both support extensible input sources, as, of course, does Spark. To support them well, the tool has to have optimizer support to push as much work to the input source as possible. That means Parquet row group pruning, Iceberg versioning, WHERE and join clauses into the JDBC data source, etc. These tools have, over time, built a bundle of tricks that can be used to work out what can be pushed, and how to do that for each source. This work is never done, there are always more tweaks.

For Druid, we'd want a connector that can not only do the mechanics of sending requests and reading data, but also provide metadata that says whether the work can be distributed, or single-threaded, what can be pushed down, etc.

This can get fancy: we could partition SQL queries and push them to multiple Presto servers. For example, rather that doing SELECT * FROM sales WHERE saleDate > ?, spilt it into ten queries, each reading a subset of data. This is called "sharding" in the DB world: Vitess does this for queries against sharded MySQL databases at YouTube.

One then gets into failure handling: what happens if one of those queries fails? Do we fail the entire insert job in Druid? Can we retry that one item? The connector would have to give us that information.

Would be great to understand the use case here. For exploratory use, SQL might be pretty cool: just grab some data from somewhere, load it in Druid, and try out ideas for a new app. Once the app goes into production, however, it would seem more stable to use some tool to pump data into Kafka, then have Druid read from there. It decouples the two operations, which would allow for a simpler system.

github-actions[bot] commented 8 months ago

This issue has been marked as stale due to 280 days of inactivity. It will be closed in 4 weeks if no further activity occurs. If this issue is still relevant, please simply write any comment. Even if closed, you can still revive the issue at any time or discuss it on the dev@druid.apache.org list. Thank you for your contributions.

github-actions[bot] commented 7 months ago

This issue has been closed due to lack of activity. If you think that is incorrect, or the issue requires additional review, you can revive the issue at any time.