Closed YesOrNo828 closed 1 month ago
This is a good idea, we are designing such a catalog management system, but we have to consider the low version of Flink (<1.18);
@dpengpeng Thanks for your feedback. What tool do you use to submit the Flink SQL task, Flink SQL-Client, Flink SQL-Gateway, or internal submission tool?
@YesOrNo828 We consider using flink sql client, which also supports the use of TABLE/SQL API to write Java job running tasks.
@YesOrNo828 Do you consider external data sources such as MysqlCatalog, KafkaCatalog, etc.
@YesOrNo828 Do you consider external data sources such as MysqlCatalog, KafkaCatalog, etc.
We have discussed the possibility of using AMS as the stream meta-store to register Kafka stream table , which is valuable in the context of Flink SQL, but it is not currently included in the roadmap. Also, we haven't figured out a good way to support this feature in the low version (<1.18) Flink SQL Client/Gateway.
@baiyangtx On the Alibaba Cloud Flink VVP platform, a variety of catalogs have been supported, such as KafkaCatlaog and MysqlCatalog. They can directly use Flink SQL to query table data and metadata information. We guess whether it is also possible to use a custom Catalog to manipulate external data sources.
@baiyangtx On the Alibaba Cloud Flink VVP platform, a variety of catalogs have been supported, such as KafkaCatlaog and MysqlCatalog. They can directly use Flink SQL to query table data and metadata information. We guess whether it is also possible to use a custom Catalog to manipulate external data sources.
Expanding support for more data source types and enabling smoother usage of multiple catalogs in Flink SQL is a fairly broad topic. Here, I will break it down and discuss it further.
Expanding support for more formats within a single Catalog.
Currently, for computing engines, Catalogs are essentially equivalent to data source types, but from the perspective of Catalog Services, a Catalog is an index of tables. A Catalog is not limited to a single type of table. Traditional Metastores like Hive can register tables of various types. Users are concerned with the business attributes of tables rather than storage formats.
Currently, the community is working on a feature called UnifiedCatalog
. This feature allows for multiple formats to be supported within one Catalog created on AMS. Rather than creating multiple Catalogs for each table type, the engine can use various data lake formats under a single database based on business needs. This is a planned feature in this year's roadmap, and you can track its progress through this issue.
Eliminating the need for the computing engine to repeatedly create Catalogs.
Even with UnifiedCatalog
, if multiple Catalogs are created due to business needs or federation is required between multiple Hives, the computing engine still needs to use CREATE CATALOG
multiple times to point to the Catalog on AMS. We hope to provide a way for the computing engine to only configure the URI of AMS rather than the specific URI of each Catalog, so that when using Flink SQL, the user can write SQL like SELECT * FROM catalog.db.table
without having to write CREATE CATALOG
. This feature is called CatalogManager
or CatalogStore
. This issue is currently discussing this matter.
For Flink 1.18, CatalogStore
is a natively supported feature, and we can complete this process by implementing the standard Flink CatalogStore API. However, for lower versions of Flink, it is not possible to provide such a non-invasive solution. A feasible solution is to provide an Amoro version of TableEnvironment
, which can be easily implemented for SQL tasks submitted by packaging into jars through Maven dependencies. However, if Flink SQL Client or Flink SQL Gateway is used directly, the corresponding Flink source code must be modified to use this feature.
Extend UnifiedCatalog and CatalogStore to non-data lake tables, such as Kafka Topic Table.
Since Amoro was originally positioned as a lake table management platform on the data lake, when designing related features at the beginning, only how to connect to lake tables was considered. However, it now appears that serving as a metadata center for stream processing is also valuable, so we are also discussing how to expand the supported table format types.
One proposed solution is to allow InternalCatalog
to register Kafka Topic Tables. As Kafka Topic Tables naturally do not have metadata services, Kafka itself can be considered a storage cluster, so kafka topics can be registered in InternalCatalog
and accessed through UnifiedCatalog
and CatalogStore
. For other tables such as MySQL Tables, they can be registered as ExternalCatalog
. This ultimately achieves the requirement of querying all types of tables required in stream processing tasks through Flink SQL.
However, this process has not been included in the roadmap yet.
If you are interested in those features, you can participate in the development together. Step 3 of expanding data source types can begin after supporting UnifiedCatalog
in Step 1, without waiting for Step 2. @dpengpeng
@YesOrNo828 Do you consider external data sources such as MysqlCatalog, KafkaCatalog, etc.
@dpengpeng @baiyangtx Glad to see the active discussion for enhancing the seamless integration of AMS and compute engines.
I'd like to share some thoughts on this:
We consider expanding the data sources, such as message queues, and relational databases. That is mentioned in here, but when would expend data source? It depends on the community.
Expanding support for more formats within a single Catalog. How can this conflict be avoided and resolved if different catalogs have the same database or table name? Some methods to consider are as follows: Give the user an option to choose, keep the former db/tables, keep the latter db/tables, or delete the conflicting db/tables @baiyangtx @dpengpeng WDYT?
It makes sense to store the table metadata of KafkaCatalog(PlusarCatalog) in AMS.
@YesOrNo828 Do you consider external data sources such as MysqlCatalog, KafkaCatalog, etc.
@dpengpeng @baiyangtx Glad to see the active discussion for enhancing the seamless integration of AMS and compute engines.
I'd like to share some thoughts on this:
1. We consider expanding the data sources, such as message queues, and relational databases. That is mentioned in[ here](https://amoro.netease.com/docs/latest/catalogs/#future-work), but when would expend data source? It depends on the community. 2. Expanding support for more formats within a single Catalog. How can this conflict be avoided and resolved if different catalogs have the same database or table name? Some methods to consider are as follows: Give the user an option to choose, keep the former db/tables, keep the latter db/tables, or delete the conflicting db/tables @baiyangtx @dpengpeng WDYT? 3. It makes sense to store the table metadata of KafkaCatalog(PlusarCatalog) in AMS.
How can this conflict be avoided and resolved if different catalogs have the same database or table name?
There won't be such conflict issue. The Catalog itself will solve this problem, which is also the core capability of the Catalog.
This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.
This issue has been closed because it has not received any activity in the last 14 days since being marked as 'stale'
Description
AMS offers a Catalog service that can handle various formats such as iceberg, mixed hive, and paimon (as per #1269). The aim is to provide a speedy method of interfacing with the Flink engine without needing to create a catalog through Flink SQL DDL or Java. Ideally, I would like to find a way to seamlessly integrate with the Flink engine, thus avoiding the need to create AMS catalogs through flink SQL or Java.
Use case/motivation
One potential use case for AMS Catalog Service's support for multiple formats is for companies looking to improve the speed and efficiency of their data processing. By seamlessly integrating with the Flink engine, AMS Catalog Service allows for faster and more efficient data processing without the need for creating catalogs through flink SQL or Java. This can save companies time and resources while improving their overall data processing capabilities.
Now creating catalogs in AMS: To access the AMS metadata using the Flink engine, we must create Flink Catalogs and register them into Flink's CatalogManager individually.
Or through the Java language:
Expected: I want to introduce a simple way that allows the Flink engine to access all AMS Catalogs directly.
Avoid users creating and registering AMS catalogs.
Describe the solution
1. Based on Flip-295 provide CatalogStoreFactory store AMS catalogs.
Using Configuration: AMSCatalogStore will fetch and save AMS catalogs through the specific AMS thrift address.
Using Table API
Limitation: Flip-295 implements in version 1.18.
2. Provides a custom TableEnvironment with built-in AMS catalogs
The above two approaches are just a simplified description of the outline, the detailed design will be initiated again later. Anyone who is interested can take part in the discussion.
Subtasks
No response
Related issues
No response
Are you willing to submit a PR?
Code of Conduct