pyflink / playgrounds

Provide docker environment and examples for PyFlink
Apache License 2.0
186 stars 84 forks source link

Created a new mysql job, and it doesn't work。There could be sth wrong with the IntSerializer according to the bug report。 #3

Open yyz940922 opened 4 years ago

yyz940922 commented 4 years ago

container Info:

(base) [root@iz8vb6evwfagx3tyjx4fl8z data]# docker ps
CONTAINER ID        IMAGE                                                     COMMAND                  CREATED             STATUS              PORTS                                                NAMES
efc1d310eb14        pyflink/playgrounds:1.10.0                                "/docker-entrypoint.…"   3 weeks ago         Up 3 weeks          6121-6123/tcp, 8081/tcp                              playgrounds_taskmanager_1
af466f98e914        wurstmeister/kafka:2.12-2.2.1                             "start-kafka.sh"         3 weeks ago         Up 3 weeks          0.0.0.0:32768->9092/tcp                              playgrounds_kafka_1
637b020df406        mysql                                                     "docker-entrypoint.s…"   3 weeks ago         Up 2 days           0.0.0.0:3306->3306/tcp, 33060/tcp                    playgrounds_db_1
6cc0b28bb45a        pyflink/playgrounds:1.10.0                                "/docker-entrypoint.…"   3 weeks ago         Up 3 weeks          6123/tcp, 8081/tcp, 0.0.0.0:8003->8088/tcp           playgrounds_jobmanager_1
2ef57cfa9494        docker.elastic.co/elasticsearch/elasticsearch-oss:6.3.1   "/usr/local/bin/dock…"   3 weeks ago         Up 3 weeks          0.0.0.0:9200->9200/tcp, 0.0.0.0:9300->9300/tcp       playgrounds_elasticsearch_1
8087b3554674        wurstmeister/zookeeper:3.4.6                              "/bin/sh -c '/usr/sb…"   3 weeks ago         Up 3 weeks          22/tcp, 2888/tcp, 3888/tcp, 0.0.0.0:2181->2181/tcp   playgrounds_zookeeper_1
cfbfb51770b8        adminer                                                   "entrypoint.sh docke…"   3 weeks ago         Up 3 weeks          0.0.0.0:8080->8080/tcp                               playgrounds_adminer_1
42a9f050933b        puckel/docker-airflow                                     "/entrypoint.sh webs…"   4 weeks ago         Up 4 weeks          5555/tcp, 8793/tcp, 0.0.0.0:8004->8080/tcp           gallant_villani

sql for table and data:

SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;

-- ----------------------------
-- Table structure for stu
-- ----------------------------
DROP TABLE IF EXISTS `stu`;
CREATE TABLE `stu`  (
  `id` int(10) UNSIGNED NOT NULL AUTO_INCREMENT COMMENT '自增id',
  `name` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '学生名字',
  `school` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '学校名字',
  `nickname` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL COMMENT '学生小名',
  `age` int(11) NOT NULL COMMENT '学生年龄',
  `class_num` int(11) NOT NULL COMMENT '班级人数',
  `score` decimal(4, 2) NOT NULL COMMENT '成绩',
  `phone` bigint(20) NOT NULL COMMENT '电话号码',
  `email` varchar(64) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT '家庭网络邮箱',
  `ip` varchar(32) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL COMMENT 'IP地址',
  `address` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL COMMENT '家庭地址',
  PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 1000001 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;

-- ----------------------------
-- Records of stu
-- ----------------------------
INSERT INTO `stu` VALUES (1, 'Ukq', '复旦附中', '敌法师', 12, 59, 38.89, 15573293938, 'mojing@hotmail.com', '203.0.117.157', '宁夏回族自治区帆县静安何街f座 126696');
INSERT INTO `stu` VALUES (2, 'Ukq', '上海中学', '幽鬼', 12, 59, 38.89, 15573293938, 'mojing@hotmail.com', '203.0.117.157', '宁夏回族自治区帆县静安何街f座 126696');
INSERT INTO `stu` VALUES (3, 'Ukq', '人和中心', '敌法师', 12, 59, 38.89, 15573293938, 'mojing@hotmail.com', '203.0.117.157', '宁夏回族自治区帆县静安何街f座 126696');
INSERT INTO `stu` VALUES (4, 'Ukq', '广东中学', '影魔', 12, 59, 38.89, 15573293938, 'mojing@hotmail.com', '203.0.117.157', '宁夏回族自治区帆县静安何街f座 126696');
INSERT INTO `stu` VALUES (5, 'kulwWkwsZ', '广东中学', '鬼泣', 12, 55, 99.59, 15789193091, 'namo@yahoo.com', '192.4.223.99', '新疆维吾尔自治区郑州市梁平丁路a座 750587');
INSERT INTO `stu` VALUES (6, 'kulwWkwsZ', '猪场', '影魔', 12, 55, 99.59, 15789193091, 'namo@yahoo.com', '192.4.223.99', '新疆维吾尔自治区郑州市梁平丁路a座 750587');
INSERT INTO `stu` VALUES (7, 'kulwWkwsZ', '鹅厂', '影魔', 12, 55, 99.59, 15789193091, 'namo@yahoo.com', '192.4.223.99', '新疆维吾尔自治区郑州市梁平丁路a座 750587');
INSERT INTO `stu` VALUES (8, 'kulwWkwsZ', '上海中学', '影魔', 12, 55, 99.59, 15789193091, 'namo@yahoo.com', '192.4.223.99', '新疆维吾尔自治区郑州市梁平丁路a座 750587');
INSERT INTO `stu` VALUES (9, 'eHFOyHtIGfiduV', '旧大院', '高小王子', 82, 43, 90.96, 18506504233, 'taowan@ye.cn', '198.51.173.171', '湖南省辛集市海港王路a座 115439');
INSERT INTO `stu` VALUES (10, 'eHFOyHtIGfiduV', '人和中心', '影魔', 82, 43, 90.96, 18506504233, 'taowan@ye.cn', '198.51.173.171', '湖南省辛集市海港王路a座 115439');
INSERT INTO `stu` VALUES (11, 'eHFOyHtIGfiduV', '上海中学', '歌神', 82, 43, 90.96, 18506504233, 'taowan@ye.cn', '198.51.173.171', '湖南省辛集市海港王路a座 115439');
INSERT INTO `stu` VALUES (12, 'eHFOyHtIGfiduV', '华师大附中', '影魔', 82, 43, 90.96, 18506504233, 'taowan@ye.cn', '198.51.173.171', '湖南省辛集市海港王路a座 115439');
INSERT INTO `stu` VALUES (13, 'kWZjrT', '猪场', '逗比', 94, 63, 94.24, 18870125400, 'jing57@gmail.com', '192.88.93.128', '宁夏回族自治区南京县涪城辛集街C座 115529');
INSERT INTO `stu` VALUES (14, 'kWZjrT', '华师大二附中', '鬼泣', 94, 63, 94.24, 18870125400, 'jing57@gmail.com', '192.88.93.128', '宁夏回族自治区南京县涪城辛集街C座 115529');
INSERT INTO `stu` VALUES (15, 'kWZjrT', '清华中学', '影魔', 94, 63, 94.24, 18870125400, 'jing57@gmail.com', '192.88.93.128', '宁夏回族自治区南京县涪城辛集街C座 115529');
INSERT INTO `stu` VALUES (16, 'Jec', '华师大二附中', '高小王子', 38, 68, 51.16, 13248136245, 'lhe@yahoo.com', '198.51.103.171', '云南省西安市西峰张家港路j座 882348');
INSERT INTO `stu` VALUES (17, 'kWZjrT', '猪场', '高小王子', 94, 63, 94.24, 18870125400, 'jing57@gmail.com', '192.88.93.128', '宁夏回族自治区南京县涪城辛集街C座 115529');

mysql_transfer.py:

from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, EnvironmentSettings

s_env = StreamExecutionEnvironment.get_execution_environment()
s_env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
s_env.set_parallelism(1)

# use blink table planner
st_env = StreamTableEnvironment \
    .create(s_env, environment_settings=EnvironmentSettings
            .new_instance()
            .in_streaming_mode()
            .use_blink_planner().build())

source_ddl = """CREATE TABLE StuSourceTable (id int , name varchar, school varchar, nickname varchar, age int, class_num int, score decimal, phone int, email varchar, ip varchar) WITH (
        'connector.type' = 'jdbc',
        'connector.url' = 'jdbc:mysql://db:3306/flink-test',
        'connector.table' = 'stu',
        'connector.driver' = 'com.mysql.jdbc.Driver',
        'connector.username' = 'root',
        'connector.password' = 'example')
"""

sink_ddl = """CREATE TABLE StuSinkTable (
    id int,
    name varchar, 
    school varchar,
    age int
    ) WITH (
        'connector.type' = 'jdbc',
        'connector.url' = 'jdbc:mysql://db:3306/flink-test',
        'connector.table' = 'stu_result',
        'connector.driver' = 'com.mysql.jdbc.Driver',
        'connector.username' = 'root',
        'connector.password' = 'example')
"""

st_env.sql_update(source_ddl)
st_env.sql_update(sink_ddl)

t = st_env.from_path('StuSourceTable')
data = t.select("id,name,school,age")
data.insert_into('StuSinkTable')

st_env.execute("mysql_transfer")

call function:

docker-compose exec jobmanager ./bin/flink run -py /opt/examples/mysql_transfer.py

logs for bug report:

2020-07-08 09:54:41
org.apache.flink.runtime.JobException: Recovery is suppressed by NoRestartBackoffTimeStrategy
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:110)
    at org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:76)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:192)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:186)
    at org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:180)
    at org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:484)
    at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:380)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:279)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:194)
    at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:74)
    at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
    at akka.actor.ActorCell.invoke(ActorCell.scala:561)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
    at akka.dispatch.Mailbox.run(Mailbox.scala:225)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to java.lang.Integer
    at org.apache.flink.api.common.typeutils.base.IntSerializer.copy(IntSerializer.java:32)
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
    at org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:639)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
    at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
    at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
    at org.apache.flink.streaming.api.functions.source.InputFormatSourceFunction.run(InputFormatSourceFunction.java:93)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
    at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
    at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:196)
yyz940922 commented 3 years ago

No matter i changed the int type to bigint type, the ClassCastException remains. It seems like sth goes wrong when the IntSerializer called the copy(Integer from) method.

yyz940922 commented 3 years ago

When i removed all the int related paramters in table creating sql, the job works, so there must be sth wrong with the source code.

yyz940922 commented 3 years ago

@hequn8128 取表的时候只要有int相关类型的字段就会有类型转换的报错, long转Integer的报错, 用tinyint的时候会有boolean转byte的报错, 能否检查下底层代码? 一把int相关类型字段抽掉数据就进去了.....