uncleguanghui / pyflink_learn

基于 PyFlink 的学习文档,通过一个个小实践,便于大家快速入手 PyFlink
269 stars 97 forks source link

关于example 5中的问题想请教一下大佬 #2

Closed yifanfish233 closed 3 years ago

yifanfish233 commented 3 years ago

您好,我最近在通过您的示例学习pyflink, 收益匪浅,但是示例5中stream.py 的udf 函数不是很理解,因为我只看到了定义但是没看到调用,我在run的时候没有数据写入到我自己的redis 中,导致报错,想请教一下

  1. model class 中函数是怎么调用的?我只在最后的sink 看到中调用了udf(train_and_predict), 但是eval ,open 这些方法是怎么调用的呢?
  2. pyflink 因为是调用py4j run 的flink, 怎么debug比较好 ?(i.e 打断点这种)

感谢!

uncleguanghui commented 3 years ago
  1. model 是继承自 pyflink.table.udf 中的基类 ScalarFunction ,每来一条流数据,就隐式地调用 model.eval() ,所以代码里看不到 eval 的调用。另外 load_model 在 model.init() 里调用,而 model 的初始化是由 flink 在流处理逻辑执行之前,自动帮我们完成的;dump_model 在 model.eval() 里调用。
  2. 我也没有找到特别好的打断点方法,但可以考虑: 1)编写单元测试,保证每个环节的输出与期望的一致,也算是间接地获取到变量的值了。 2)先修改结果表的表结构,然后将中间结果 sink 出来观察,最后再把表结构改回去。
yifanfish233 commented 3 years ago

非常感谢您的及时回复,我最后还想请教一下:

model 中用的python 自己的set (),get()方法给redis, 而不是走pyflink table/SQL 那种sink方法,还能够这样的?

2)先修改结果表的表结构,然后将中间结果 sink 出来观察,最后再把表结构改回去。

因为我还在排查redis/kafka sink 都没有数据的问题,并且没有看到logging info 出现在terminal, 所以我怀疑是不是udf 整个没有被call到?udf 的中间结果也是可以sink 看的?

再次感谢!

uncleguanghui commented 3 years ago

模型是一种对象,没办法走 sink 方法。案例 5 只是提供了一种解决的思路,具有一定的可行性,但是并没有考虑性能问题 ~

没有数据的话,先检查一下 kafka 是否持续有数据写入,即是否有运行 kafka_producer.py 。另外,你可以把 UDF 的中间结果和 UDF 的最终结果整合到一个字典中,然后调用 json.dumps 输出为一个字符串,就可以 sink 看了。