Open TFdream opened 4 years ago
转载 Java SPI 机制在 Flink SQL 中的应用:https://mp.weixin.qq.com/s/dYi9_CSwKTanJjl6C4qEgg
Java SPI机制,即Java Service Provider Interface,是Java提供的基于“接口编程 + 策略模式 + 配置文件”组合实现的动态加载机制。调用者可以根据实际使用需要,来启用、扩展或者替换框架的现有实现策略。在Java中,基于该SPI思想,提供了具体的实现,ServiceLoader,利用该类可以轻松实现面向服务的注册与发现,完成服务提供与使用的解耦。
Java SPI机制常见的例子,如:
在Flink SQL程序中用到了Java SPI机制动态加载各种Factory的实现类。比如说,对于TableFactory接口,Flink程序会从程序所使用到的依赖中找到META-INF/services/org.apache.flink.table.factories.TableFactory,并通过反射实例化TableFactory接口的实现,并通过TableFactoryService#filter()方法筛选出符合条件的TableFactory实现类。以Flink SQL程序从Kafka(版本0.11)读取数据为例,Flink SQL程序会首先获得TableFactory所有可用的实现类,通过TableFactoryService#filter()得到符合条件的TableFactory实现类Kafka011TableSourceSinkFactory实例。本文主要说明Java SPI机制在Flink SQL程序中的应用,对于对TableFactory实现类的筛选将在另一篇文章中说明。
特别说明:本文涉及的flink源码版本为1.9。
tEnv .connect( new Kafka() .version("0.11") .topic(topic) .startFromLatest() .properties(props)) .withSchema(schema) .withFormat(format) .registerTableSource("record");
上述程序用于与Kafka建立连接,并指定了读取数据的结构与格式,最后使用registerTableSource完成table source的注册工作。我们跟进代码,可以发现内部调用了TableFactoryService#find()方法查找到符合条件的TableSourceFactory实例,并调用createTableSource()方法创建Kafka011TableSource实例。
Java SPI 机制简介
Java SPI机制,即Java Service Provider Interface,是Java提供的基于“接口编程 + 策略模式 + 配置文件”组合实现的动态加载机制。调用者可以根据实际使用需要,来启用、扩展或者替换框架的现有实现策略。在Java中,基于该SPI思想,提供了具体的实现,ServiceLoader,利用该类可以轻松实现面向服务的注册与发现,完成服务提供与使用的解耦。
Java SPI机制常见的例子,如:
Java SPI机制在Flink中的应用
在Flink SQL程序中用到了Java SPI机制动态加载各种Factory的实现类。比如说,对于TableFactory接口,Flink程序会从程序所使用到的依赖中找到META-INF/services/org.apache.flink.table.factories.TableFactory,并通过反射实例化TableFactory接口的实现,并通过TableFactoryService#filter()方法筛选出符合条件的TableFactory实现类。以Flink SQL程序从Kafka(版本0.11)读取数据为例,Flink SQL程序会首先获得TableFactory所有可用的实现类,通过TableFactoryService#filter()得到符合条件的TableFactory实现类Kafka011TableSourceSinkFactory实例。本文主要说明Java SPI机制在Flink SQL程序中的应用,对于对TableFactory实现类的筛选将在另一篇文章中说明。
特别说明:本文涉及的flink源码版本为1.9。
上述程序用于与Kafka建立连接,并指定了读取数据的结构与格式,最后使用registerTableSource完成table source的注册工作。我们跟进代码,可以发现内部调用了TableFactoryService#find()方法查找到符合条件的TableSourceFactory实例,并调用createTableSource()方法创建Kafka011TableSource实例。