基于 PyFlink 的学习文档,通过一个个小实践,便于小伙伴们快速入手 PyFlink
首先本地的 java 版本需要升级到 8 或 11
java -version
# 可能会看到 java version "1.8.0_111"
然后使用 brew 安装 Flink ,目前 Flink 的最新版本为 1.11.2
brew switch apache-flink 1.11.2
cd 到 /usr/local/Cellar/apache-flink/1.11.2/libexec/bin/start-cluster.sh
路径下,启动 flink
cd /usr/local/Cellar/apache-flink/1.11.2/libexec/bin
sh start-cluster.sh
启动后,运行 jps
命令,可以看到本地所有的 java 进程,如果 Flink 被正确安装的话,应该可以看到这两个进程 TaskManagerRunner
与 StandaloneSessionClusterEntrypoint
,代表现在 jobmanager 和 taskmanager 都已经正常启动了。
此时,我们也可以打开网页 http://localhost:8081/ ,看到 Flink 作业的管理面板,目前应该显示 Available Task Slots 为 1 (代表现在只有 1 个 taskmanager,且其中只有 1 个 task slot,并行度为 1),还可以看到 Running Jobs 为 0(代表此时没有 Flink 作业在执行)。
另外 flink 的关闭命令为
sh stop-cluster.sh
为了方便,可以修改本地的 ~/.bash_profile
文件,插入下面的 3 行内容(注意修改版本)然后运行 source ~/.bash_profile
来激活修改。
alias start-flink='/usr/local/Cellar/apache-flink/1.11.2/libexec/bin/start-cluster.sh'
alias stop-flink='/usr/local/Cellar/apache-flink/1.11.2/libexec/bin/stop-cluster.sh'
alias flink='/usr/local/Cellar/apache-flink/1.11.2/libexec/bin/flink'
请参考 官方文档
本教程会用到 MySQL、Kafka、Zookeeper 等数据库或大数据组件,为了便于统一部署和管理,这里选择使用 docker。
从开发角度来看,以最快的速度搭建起一个可以运行的环境最为重要。基于如下的 3 个角度,解释了为何使用 Docker:
首先,安装 docker 。
然后,在本教程的项目根目录下,启动 docker 编排服务:
# windows 系统先加下面这句
# set COMPOSE_CONVERT_WINDOWS_PATHS=1
docker-compose up -d
启动后,运行 docker ps
可以看到起了 5 个容器,如下所示
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES
32d6b6cdf30b mysql:8.0.22 "docker-entrypoint.s…" 5 days ago Up 3 seconds 0.0.0.0:3306->3306/tcp, 33060/tcp mysql1
cc8246824903 mysql:8.0.22 "docker-entrypoint.s…" 5 days ago Up 3 seconds 33060/tcp, 0.0.0.0:3307->3306/tcp mysql2
f732effb7559 redis:6.0.9 "docker-entrypoint.s…" 5 days ago Up 5 seconds 0.0.0.0:6379->6379/tcp redis
b62b8d8363c3 wurstmeister/kafka:2.13-2.6.0 "start-kafka.sh" 5 days ago Up 3 seconds 0.0.0.0:9092->9092/tcp kafka
fe2ad0230ffa adminer "entrypoint.sh docke…" 5 days ago Up 12 seconds 0.0.0.0:8080->8080/tcp adminer
df80ca04755d zookeeper:3.6.2 "/docker-entrypoint.…" 5 days ago Up 3 seconds 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp, 8080/tcp zookeeper
解释下各容器的作用:
PS,为了访问安全,在 docker-compose.yml
文件中可以看到我为一些组件设置了密码:
很简单地,我们完成了环境的搭建。
另外,停止命令如下:
# 停止
docker-compose stop
# 停止并删除
docker-compose down
如果遇到某个容器启动失败的话,一个简单的方法就是先删掉该容器,然后重新构建,以 kafka 为例:
docker rm kafka
docker-compose up -d --build
PyFlink 要求 python 版本为 3.5、3.6 或 3.7,否则会出错。
推荐使用 miniconda 来搭建 python 环境,优点是体积小、与系统环境隔离、便于管理多个 python 虚拟环境……
网上很容易找到 python3 安装教程 。
先确保以下环节是否走通:
一切 ready 后,就完成本地 PyFilnk 开发与测试环境的搭建,让我们开始正题。
教程正文: PyFlink 从入门到精通,代码在 examples
目录下可以看到。
本教程目前提供了 5 个案例,如果是新手的话,建议按顺序来学习:
批处理 Word Count
:
自定义函数 UDF
:
实时 CDC
:
实时排行榜
:
在线机器学习 Online Machine Learning
:
运行的方法也很简单,对于每个案例,cd 到案例目录下后,运行下面的脚本(xx 换成对应的脚本名称)即可运行。
flink run -m localhost:8081 -py xxx.py
接下来,请前往 PyFlink 从入门到精通 吧。