wuchong / flink-sql-demo

Demo: Build End-to-End Streaming Application using Flink SQL
254 stars 172 forks source link

Demo:基于 Flink SQL 构建流式应用 连接不上Elasticsearch #1

Closed zgq25302111 closed 4 years ago

zgq25302111 commented 4 years ago

我用docker练习这个demo,docker启动的服务有datagen,kafka,zookeeper,elasticsearch,mysql,kibana,datagen,kafka,zookeeper,elasticsearch,mysql,kibana,这几个服务都是正常的。flink单独启动,insert时报错,推测是因为flink连不上Elasticsearch。 另外,在我的环境下,建表语句'connector.version' = '7' 此处必须为7,否则报错。

CREATE TABLE buy_cnt_per_hour ( hour_of_day BIGINT, buy_cnt BIGINT ) WITH ( 'connector.type' = 'elasticsearch', -- 使用 elasticsearch connector 'connector.version' = '7', -- elasticsearch 版本,6 能支持 es 6+ 以及 7+ 的版本 'connector.hosts' = 'http://localhost:9200', -- elasticsearch 地址 'connector.index' = 'buy_cnt_per_hour', -- elasticsearch 索引名,相当于数据库的表名 'connector.document-type' = 'user_behavior', -- elasticsearch 的 type,相当于数据库的库名 'connector.bulk-flush.max-actions' = '1', -- 每条数据都刷新 'format.type' = 'json', -- 输出数据格式 json 'update-mode' = 'append' );

wuchong commented 4 years ago

博客中(直播中)是特意使用 'connector.version' = '6' 的,因为7版本的包有问题。博客中下载 es connector 的链接错了,应该是下载6的,写了成下载7的,现在已修改。 你可以下载 es6 connector jar 到 lib 下,再试下。

zgq25302111 commented 4 years ago

问题已经解决了。谢谢。 一、我一开始用的flink版本是flink-1.10.0-bin-scala_2.12.tgz,但是博客(直播)中是要下载flink-1.10.0-bin-scala_2.11.tgz,修改成博客(直播)中的flink包。 二、将flink-sql-connector-elasticsearch6_2.11-1.10.0.jar放在lib目录。注意flink的Scala版本是2.11,那么这个elasticsearch包也需要是2.11。 三、docker-compose.yml 中指明image: docker.elastic.co/elasticsearch/elasticsearch:7.6.0 用的是elasticsearch 7版本,也就是可以用es6 connector jar链接elasticsearch 7,就是“6 能支持 es 6+ 以及 7+ 的版本”,您说的没错。

wuchong commented 4 years ago

Yes. You are right.