Today, Presto Kafka connector only supports a single Kafka cluster and the static list of tables/schemas.
This approach enables:
The ability to query multiple Kafka clusters by using a single connector
Support the dynamic table/schema discovery without restarting the application
Background
At Uber, we run Kafka clusters at scale, with over 2 dozen Kafka clusters and tens of thousands of topics. The proposal changes the implementation of the existing Kafka Connector to better suit the large-scale Kafka setup.
One connector for multiple clusters: Having a single connector for multiple clusters to simplify connector configuration.
Dynamic cluster discovery: No need to restart Presto when adding/removing a new Kafka Cluster
Dynamic table/schema discovery: No need to restart Presto when adding a new table/schema
Overview
For backward compatibility, the existing static configuration will be supported and will be the default method, users need to define configuration cluster-description-supplier=DYNAMIC to enable dynamic multiple cluster support and table-description-supplier=DYNAMIC to enable dynamic table descriptor support. The dynamic supplier will populate metadata from supplier dir periodically instead of one-off from the connector initialization phase
Query syntax
To support multiple Kafka clusters, each Kafka cluster will have an alias(clusterName), and we use presto schemaName to specify Kafka cluster name in sql query syntax.
For example, the sql query for cluster:foo on topic:bar will be
Interface for KafkaClusterMetadataSupplier will be
public interface KafkaClusterMetadataSupplier
{
List<HostAddress> getNodes(String clusterName);
}
The default implementation FileKafkaClusterMetadataSupplier to fetch Kafka Cluster Metadata(broker bootstrap list) from kafka connector configuration file.
A parallel implementation of DynamicKafkaClusterMetadataSupplier to fetch Kafka Cluster Metadata(broker bootstrap list) from a supplier dir, and cluster metadata will keep in-memory. A scheduled job will scan the supplier dir periodically and update the in-memory cache.
Extract pluggable KafkaTableDescriptionSupplier Interface(cherry-pick of trino commit)
The default implementation FileKafkaTableDescriptionSupplier to fetch table description. (cherry-pick of trino commit)
A parallel implementation DynamicKafkaTableDescriptionSupplier based on FileKafkaTableDescriptionSupplier enhanced with dynamic table discovery through Kafka broker and a scheduled job will scan the kafka.table-description-dir periodically and update the in-memory cache.
Today, Presto Kafka connector only supports a single Kafka cluster and the static list of tables/schemas.
This approach enables:
The ability to query multiple Kafka clusters by using a single connector
Support the dynamic table/schema discovery without restarting the application
Background
At Uber, we run Kafka clusters at scale, with over 2 dozen Kafka clusters and tens of thousands of topics. The proposal changes the implementation of the existing Kafka Connector to better suit the large-scale Kafka setup.
One connector for multiple clusters: Having a single connector for multiple clusters to simplify connector configuration.
Dynamic cluster discovery: No need to restart Presto when adding/removing a new Kafka Cluster
Dynamic table/schema discovery: No need to restart Presto when adding a new table/schema
Overview
For backward compatibility, the existing static configuration will be supported and will be the default method, users need to define configuration cluster-description-supplier=DYNAMIC to enable dynamic multiple cluster support and table-description-supplier=DYNAMIC to enable dynamic table descriptor support. The dynamic supplier will populate metadata from supplier dir periodically instead of one-off from the connector initialization phase
Query syntax
To support multiple Kafka clusters, each Kafka cluster will have an alias(clusterName), and we use presto schemaName to specify Kafka cluster name in sql query syntax. For example, the sql query for cluster:foo on topic:bar will be
select * from kafka.foo.bar
Implementation
Extract pluggable KafkaClusterMetadataSupplier interface
Interface for KafkaClusterMetadataSupplier will be
The default implementation FileKafkaClusterMetadataSupplier to fetch Kafka Cluster Metadata(broker bootstrap list) from kafka connector configuration file.
A parallel implementation of DynamicKafkaClusterMetadataSupplier to fetch Kafka Cluster Metadata(broker bootstrap list) from a supplier dir, and cluster metadata will keep in-memory. A scheduled job will scan the supplier dir periodically and update the in-memory cache.
Extract pluggable KafkaTableDescriptionSupplier Interface(cherry-pick of trino commit)
The default implementation FileKafkaTableDescriptionSupplier to fetch table description. (cherry-pick of trino commit)
A parallel implementation DynamicKafkaTableDescriptionSupplier based on FileKafkaTableDescriptionSupplier enhanced with dynamic table discovery through Kafka broker and a scheduled job will scan the kafka.table-description-dir periodically and update the in-memory cache.