uncleguanghui / pyflink_learn

基于 PyFlink 的学习文档,通过一个个小实践,便于大家快速入手 PyFlink
269 stars 97 forks source link

版本1.12.0 运行的问题 #3

Closed helxsz closed 3 years ago

helxsz commented 3 years ago

在运行第一个example wordcount的时候,flink 1.12.0 报说

t_env.execute_sql(f"""
    CREATE TABLE source (
        id BIGINT,     -- ID
        word STRING    -- word
    ) WITH (
        'connector' = 'filesystem',
        'path' = 'file://{dir_word}',
        'format' = 'csv'
    )
""")  // syntax error 

报错

  File "batch1.py", line 24
    """)
      ^
SyntaxError: invalid syntax

请问这是什么问题

uncleguanghui commented 3 years ago

SyntaxError: invalid syntax 一般都是因为默认的 Python 版本错误,请检查默认的 Python 版本是否是 3.4/3.5、3.6。

helxsz commented 3 years ago

我之前运行时是 python3 3.6.9, 后面改成 python3 到 3.5.10. 但是 flink run -m localhost:8081 -py batch.py 还是报出同样的问题。换了台18.04的ubuntu系统也是一样。 我看了下 /usr/bin 里面的python版本,

/usr/bin/python /usr/bin/python2.7 /usr/bin/python3 /usr/bin/python3.5m /usr/bin/python3.6-config /usr/bin/python3.6m-config /usr/bin/python3m /usr/bin/python2 /usr/bin/python2-wsdump /usr/bin/python3.5 /usr/bin/python3.6 /usr/bin/python3.6m /usr/bin/python3-config /usr/bin/python3m-config

虽然我想你说的syntax问题是对的,但是不知道怎么还是会报出这种问题来。

我后面用 python3 直接运行 python3 batch.py.

WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/usr/local/lib/python3.6/dist-packages/pyflink/lib/flink-dist_2.11-1.12.0.jar) to field java.lang.Class.ANNOTATION WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release

不知道是什么问题

helxsz commented 3 years ago
  1. 设置了环境变量 export PYFLINK_CLIENT_EXECUTABLE=/usr/local/python3

  2. 同时 原代码中

    t_env.execute_sql(f"""
    CREATE TABLE source (
        id BIGINT,     -- ID
        word STRING    -- word
    ) WITH (
        'connector' = 'filesystem',
        'path' = 'file://{dir_word}',
        'format' = 'csv'
    )
    """)  // syntax error 

    换成了

abc = """
    CREATE TABLE source (
        id BIGINT,     -- ID
        word STRING    -- word
    ) WITH (
        'connector' = 'filesystem',
        'path' = '{dir_word}',
        'format' = 'csv'
    )
""".format(dir_word=dir_word)
t_env.execute_sql(abc)

syntax的问题就不存在了

但是有一个新的问题。

dir_result = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'result.csv')

cde = """
    CREATE TABLE sink (
        word STRING,   -- word
        cnt BIGINT     -- cnt
    ) WITH (
        'connector' = 'filesystem',
        'path' = '{dir_result}',
        'format' = 'csv'
    )
""".format(dir_result=dir_result)

t_env.execute_sql(cde)

t_env.execute_sql("""
    INSERT INTO sink
    SELECT word
           , count(1) AS cnt
    FROM source
    GROUP BY word
""")

最后生成的文件不是 result.csv, 而是一个叫 result.csv 的文件夹,我检查了代码 sink DDL里面有'format' = 'csv',但为什么还会有文件夹生成

helxsz commented 3 years ago

关于例子3 mysql cdc的问题是

在连接source的代码我改成了我这里的mysql的数据库和DDL

定义DDL和连接

source_ddl = """
    CREATE TABLE source (
        id INT,                            -- ID
        first_name STRING,                       -- 姓名
        last_name STRING,                       -- 姓名
        email STRING                   -- 姓名
    ) WITH (
        'connector' = 'jdbc',
        'url' = 'jdbc:mysql://127.0.0.1:3307/inventory',
        'driver' = 'com.mysql.cj.jdbc.Driver',
        'table-name' = 'customers',
        'username' = 'mysqluser',
        'password' = 'mysqlpw'
    )
"""
t_env.execute_sql(source_ddl)

打印table的数据

my_table = t_env.from_path("source")
my_table.execute().print()

代码到这里都是成功的,可以打印出数据库的数据。

在sink的部分,我又一个新的mysql数据库(端口和第一个不一样),DDL中定义了一个没有的表格名 customers2

sink_ddl = """
    CREATE TABLE sink (
        id INT,                            -- ID
        first_name STRING,                       -- 姓名
        last_name STRING,                       -- 姓名
        email STRING                      -- 姓名
    ) WITH (
        'connector' = 'jdbc',
        'url' = 'jdbc:mysql://127.0.0.1:3308/inventory',
        'driver' = 'com.mysql.cj.jdbc.Driver',
        'table-name' = 'customers2',
        'username' = 'mysqluser',
        'password' = 'mysqlpw'
    )
"""
t_env.execute_sql(sink_ddl)
my_table2 = t_env.from_path("sink")

用方式1 和 方式2 都执行了

方式1

t_env.from_path('source').insert_into('sink')

方式2

t_env.sql_query(""" SELECT id, first_name, last_name, email FROM source """).insert_into('sink') t_env.execute('sync by binlog')

但是最后我通过工具查看第二个数据库中,两个方式都没有找到新建的 customer2的表,不知道是哪里出的问题。

P.S. 我的执行方式目前就是直接通过 python3 文件名.py 的方式来执行。

我看了官方1.12的文档,好像 insert_into 这个方法要取消,所以我用了

t_env.execute_sql("""
    INSERT INTO sink SELECT id, first_name,last_name,email
    FROM source
""")

但是运行的时候也报错了,

Traceback (most recent call last): File "stream.py", line 105, in t_env.execute('sync by binlog') File "/usr/local/lib/python3.6/dist-packages/pyflink/table/table_environment.py", line 1276, in execute return JobExecutionResult(self._j_tenv.execute(job_name)) File "/usr/local/lib/python3.6/dist-packages/py4j/java_gateway.py", line 1286, in call answer, self.gateway_client, self.target_id, self.name) File "/usr/local/lib/python3.6/dist-packages/pyflink/util/exceptions.py", line 147, in deco return f(*a, kw) File "/usr/local/lib/python3.6/dist-packages/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o4.execute. : java.lang.IllegalStateException: No operators defined in streaming topology. Cannot generate StreamGraph.** at org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:46) at org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47) at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1216)

uncleguanghui commented 3 years ago

我之前运行时是 python3 3.6.9, 后面改成 python3 到 3.5.10. 但是 flink run -m localhost:8081 -py batch.py 还是报出同样的问题。换了台18.04的ubuntu系统也是一样。 我看了下 /usr/bin 里面的python版本,

/usr/bin/python /usr/bin/python2.7 /usr/bin/python3 /usr/bin/python3.5m /usr/bin/python3.6-config /usr/bin/python3.6m-config /usr/bin/python3m /usr/bin/python2 /usr/bin/python2-wsdump /usr/bin/python3.5 /usr/bin/python3.6 /usr/bin/python3.6m /usr/bin/python3-config /usr/bin/python3m-config

虽然我想你说的syntax问题是对的,但是不知道怎么还是会报出这种问题来。

我后面用 python3 直接运行 python3 batch.py.

WARNING: An illegal reflective access operation has occurred WARNING: Illegal reflective access by org.apache.flink.api.java.ClosureCleaner (file:/usr/local/lib/python3.6/dist-packages/pyflink/lib/flink-dist_2.11-1.12.0.jar) to field java.lang.Class.ANNOTATION WARNING: Please consider reporting this to the maintainers of org.apache.flink.api.java.ClosureCleaner WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations WARNING: All illegal access operations will be denied in a future release

不知道是什么问题

告警产生的原因貌似是因为 flink 版本不一致,你用的是 1.12,我的 demo 代码是基于 1.11.2 的,可能我的有些写法已经过时了

uncleguanghui commented 3 years ago

关于例子3 mysql cdc的问题是

在连接source的代码我改成了我这里的mysql的数据库和DDL

定义DDL和连接

source_ddl = """
    CREATE TABLE source (
        id INT,                            -- ID
        first_name STRING,                       -- 姓名
        last_name STRING,                       -- 姓名
        email STRING                   -- 姓名
    ) WITH (
        'connector' = 'jdbc',
        'url' = 'jdbc:mysql://127.0.0.1:3307/inventory',
        'driver' = 'com.mysql.cj.jdbc.Driver',
        'table-name' = 'customers',
        'username' = 'mysqluser',
        'password' = 'mysqlpw'
    )
"""
t_env.execute_sql(source_ddl)

打印table的数据

my_table = t_env.from_path("source")
my_table.execute().print()

代码到这里都是成功的,可以打印出数据库的数据。

在sink的部分,我又一个新的mysql数据库(端口和第一个不一样),DDL中定义了一个没有的表格名 customers2

sink_ddl = """
    CREATE TABLE sink (
        id INT,                            -- ID
        first_name STRING,                       -- 姓名
        last_name STRING,                       -- 姓名
        email STRING                      -- 姓名
    ) WITH (
        'connector' = 'jdbc',
        'url' = 'jdbc:mysql://127.0.0.1:3308/inventory',
        'driver' = 'com.mysql.cj.jdbc.Driver',
        'table-name' = 'customers2',
        'username' = 'mysqluser',
        'password' = 'mysqlpw'
    )
"""
t_env.execute_sql(sink_ddl)
my_table2 = t_env.from_path("sink")

用方式1 和 方式2 都执行了

方式1

t_env.from_path('source').insert_into('sink')

方式2

t_env.sql_query(""" SELECT id, first_name, last_name, email FROM source """).insert_into('sink') t_env.execute('sync by binlog')

但是最后我通过工具查看第二个数据库中,两个方式都没有找到新建的 customer2的表,不知道是哪里出的问题。

P.S. 我的执行方式目前就是直接通过 python3 文件名.py 的方式来执行。

我看了官方1.12的文档,好像 insert_into 这个方法要取消,所以我用了

t_env.execute_sql("""
    INSERT INTO sink SELECT id, first_name,last_name,email
    FROM source
""")

但是运行的时候也报错了,

Traceback (most recent call last): File "stream.py", line 105, in t_env.execute('sync by binlog') File "/usr/local/lib/python3.6/dist-packages/pyflink/table/table_environment.py", line 1276, in execute return JobExecutionResult(self._j_tenv.execute(job_name)) File "/usr/local/lib/python3.6/dist-packages/py4j/java_gateway.py", line 1286, in call answer, self.gateway_client, self.target_id, self.name) File "/usr/local/lib/python3.6/dist-packages/pyflink/util/exceptions.py", line 147, in deco return f(*a, kw) File "/usr/local/lib/python3.6/dist-packages/py4j/protocol.py", line 328, in get_return_value format(target_id, ".", name), value) py4j.protocol.Py4JJavaError: An error occurred while calling o4.execute. : java.lang.IllegalStateException: No operators defined in streaming topology. Cannot generate StreamGraph.** at org.apache.flink.table.planner.utils.ExecutorUtils.generateStreamGraph(ExecutorUtils.java:46) at org.apache.flink.table.planner.delegation.StreamExecutor.createPipeline(StreamExecutor.java:47) at org.apache.flink.table.api.internal.TableEnvironmentImpl.execute(TableEnvironmentImpl.java:1216)

现在 'format' = 'csv' 只能保证每行的结果是类似于 csv 的(也就是用半角逗号 , 作为分隔符),但是数据文件不是 .csv 格式。我的 demo 中的描述需要更新一下~

uncleguanghui commented 3 years ago

@helxsz sink 表都没创建,怎么做到数据同步呢,当然要创建后才可以看到同步的结果了。

另外,我不太清楚在 1.12 版本里,你修改 source 表的 connector 为 jdbc 是否可以实现 cdc 实时同步的功能,后续有时间了试验一下。在 1.11 版本里,是需要引入第三方的 flink-sql-connector-mysql-cdc-1.1.0.jar 才能实现 cdc 的功能。看你的代码里报错是 Cannot generate StreamGraph,也就是无法生成流处理的拓扑图,可能也是因为这个原因。

建议 source 部分先改成我的写法,然后再创建 sink 表,最后再运行一遍代码看看

helxsz commented 3 years ago

之前我以为定义DDL,并且 执行 execute_sql 就相当于建表了。

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/sql/create.html#create-database

CREATE statements can be executed with the execute_sql() method of the TableEnvironment. The execute_sql() method returns ‘OK’ for a successful CREATE operation, otherwise will throw an exception.

这里说到用execute_sql()可以执行 flink sql的命令,我也像下面代码一样执行了,结果是 既没有生成新的表,也没有返回 ok 这个结果,而是 <pyflink.table.table_result.TableResult object at 0x7fd1f35edbe0> 这样的结果。

sink_ddl = """
    CREATE TABLE sink (
        id INT,                            -- ID
        first_name STRING,                       -- 姓名
        last_name STRING,                       -- 姓名
        email STRING                      -- 姓名
    ) WITH (
        'connector' = 'jdbc',
        'url' = 'jdbc:mysql://127.0.0.1:3308/inventory',
        'driver' = 'com.mysql.cj.jdbc.Driver',
        'table-name' = 'customers2',
        'username' = 'mysqluser',
        'password' = 'mysqlpw'
    )
"""
abc = t_env.execute_sql(sink_ddl)
print('执行结果',abc)

但是如你所说,我是手动建立了sink表, sink表建立之后,确实可以做到数据同步了。

当然关于数据同步,我还有一个问题就是 当sink表是空的时候,数据同步成功。而当我把同步好的sink表里面内容改变一些后,我再次同步,我本来期望同步的数据会覆盖老的数据,但结果是 依然显示的是老的数据。

假设 source 表 的数据是 abc, sink 表的数据是 abc1. 再次同步之后,sink表的数据还是abc1,没有被abc覆盖。

这是什么原因

uncleguanghui commented 3 years ago

@helxsz 你可能没太明白同步的目的是什么,一般同步后的数据是不允许修改的(读写分离,从库设置为只读),不然数据的一致性无法得到保证。

另外你还要明白同步的原理是什么,有基于 SQL 语句的同步,也有基于 binlog 的同步,但无论是哪种类型 or 原理的同步,你都可以简单地认为是把主库的变更给执行了一遍,至于从库本身是否中途发生了修改,这不是“同步”要保证的事。

MaleicAcid commented 3 years ago
  1. 设置了环境变量 export PYFLINK_CLIENT_EXECUTABLE=/usr/local/python3
  2. 同时 原代码中
t_env.execute_sql(f"""
    CREATE TABLE source (
        id BIGINT,     -- ID
        word STRING    -- word
    ) WITH (
        'connector' = 'filesystem',
        'path' = 'file://{dir_word}',
        'format' = 'csv'
    )
""")  // syntax error 

换成了

abc = """
    CREATE TABLE source (
        id BIGINT,     -- ID
        word STRING    -- word
    ) WITH (
        'connector' = 'filesystem',
        'path' = '{dir_word}',
        'format' = 'csv'
    )
""".format(dir_word=dir_word)
t_env.execute_sql(abc)

syntax的问题就不存在了

但是有一个新的问题。

dir_result = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'result.csv')

cde = """
    CREATE TABLE sink (
        word STRING,   -- word
        cnt BIGINT     -- cnt
    ) WITH (
        'connector' = 'filesystem',
        'path' = '{dir_result}',
        'format' = 'csv'
    )
""".format(dir_result=dir_result)

t_env.execute_sql(cde)

t_env.execute_sql("""
    INSERT INTO sink
    SELECT word
           , count(1) AS cnt
    FROM source
    GROUP BY word
""")

最后生成的文件不是 result.csv, 而是一个叫 result.csv 的文件夹,我检查了代码 sink DDL里面有'format' = 'csv',但为什么还会有文件夹生成

我也是这个样子,生成的都是文件夹而不是文件,也看不到输出结果。我pip instal 指定==1.11.1 也还是有那些警告。我是在windows上运行的