uncleguanghui / pyflink_learn

基于 PyFlink 的学习文档,通过一个个小实践,便于大家快速入手 PyFlink
269 stars 97 forks source link

请问json格式的文件读取(Source)怎么添加 jar 包 #11

Closed Sober-Chen closed 3 years ago

Sober-Chen commented 3 years ago

①使用 t_env.connect(FileSystem())获取 json 文件的数据如何添加 json 的 jar 包呢,我看官方文档都是使用 maven 添加字段来使用依赖,PyFlink 的话该怎么处理呢?

② Source 读取数据以后,该怎么标准化输出 我用了 print 和 print_schema() 都是输出表的属性之类的,不是标准化输出里面的数据

感谢回复!!!

uncleguanghui commented 3 years ago
  1. PyFlink内置了对于 json 格式的文件支持,不需要额外导入 jar 包,详见官方文档file-formats
  2. t_env.from_path('source').to_pandas() # 转为 pandas.DataFrame
Sober-Chen commented 3 years ago

谢谢解答 发现用 execute_sql 创建的表使用 print 或者 to_pandas() 或者 print_schema() 都会报错,请问博主知道原因吗 但我看官方是有相关代码,以下是官方的写法 (在官方文档有有说 SQL查询不支持部分数据类型 ,不知道是否是这个原因) `env = StreamExecutionEnvironment.get_execution_environment() table_env = StreamTableEnvironment.create(env, settings)

enable checkpointing

table_env.get_config().get_configuration().set_string("execution.checkpointing.mode", "EXACTLY_ONCE") table_env.get_config().get_configuration().set_string("execution.checkpointing.interval", "10s")

table_env.execute_sql("CREATE TABLE Orders (user BIGINT, product STRING, amount INT) WITH (...)")

execute SELECT statement

table_result1 = table_env.execute_sql("SELECT * FROM Orders") table_result1.print()

execute Table

table_result2 = table_env.sql_query("SELECT * FROM Orders").execute() table_result2.print()`

我的建表语句: t_env.execute_sql(f""" CREATE TABLE source( line STRING )WITH( 'connector' = 'filesystem', 'path' = '{dir_log}', 'format' = 'csv') """)

查询语句: t_env.execute_sql("SELECT line FROM source LIMIT 1").print()

但结果会报错说无法读取下一个字段,但如果我的建表语句换成 table API 就可以正常运行: Table API 语句: t_env.connect(FileSystem().path(dir_log)) \ .with_format(OldCsv() .line_delimiter('\n') .field('line', DataTypes.STRING())) \ .with_schema(Schema() .field('line', DataTypes.STRING())) \ .create_temporary_table('source')