lw-lin / CoolplaySpark

酷玩 Spark: Spark 源代码解析、Spark 类库等
3.46k stars 1.41k forks source link

[SS]《1.1 Structured Streaming 实现思路与实现概述》讨论区 #29

Open lw-lin opened 7 years ago

lw-lin commented 7 years ago

如需要贴代码,请复制以下内容并修改:

public static final thisIsJavaCode;
val thisIsScalaCode

谢谢!

ar5art commented 7 years ago

@lw-lin 你好,请教一个问题。 structured streaming的数据源为kafka,并设置任务triggers为1小时。streaming会每间隔一小时确认一次新到达的数据,这里是每间隔一小时去kafka中取新到达的数据,还是隔一小时确认一次streaming的input table中新追加的数据?kafka中的数据会实时追加到input table中吗?如果我想设置任务每一小时跑一次,但是要实时把kafka拉取到input table中并做一次预处理,可以做到吗? 谢谢!

lw-lin commented 7 years ago

@ar5art

是每间隔一小时去kafka中取新到达的数据,还是隔一小时确认一次streaming的input table中新追加的数据?

是前者

kafka中的数据会实时追加到input table中吗?

呃,不会

如果我想设置任务每一小时跑一次,但是要实时把kafka拉取到input table中并做一次预处理,可以做到吗?

用 1min 做 trigger,实时从 kafka 中拉数据、并写出到 HDFS dir x 另起一个任务,用 1hour 做 trigger,从 HDFS dir x 读入,并另外写出?

zhouyan8603 commented 7 years ago

您好,我有个问题想请教下,我当前用的是spark streaming处理每个页面每小时的新增用户数问题的,具体逻辑是:批次读取stream,然后解析stream中的日志事件,用每个事件的pageid和uid拼接为key到数据库表user里查找用户是否存在,如果不存在(说明是该pageid的新用户),则更新user.new表,将对应的pageid下的时间段下的用户数+1。这个流程会有个严重的缺点,就是每条log解析出来的事件和uid都需要去到表里查找,所以表的请求量很大。现在想用structured streaming优化这个方案,目前的想法是先group by pageid,uid agg: floor(min(time)/3600)*3600 as event_time as min_table创建一个memory 表,然后再在这个表的基础上group by pageid,event_time,agg: count(distinct uid)创建一个writestream,然后输出到外部表hbase。但是感觉这种方案不可行,起码memory表会越来越大,另外如何保证尽可能少的对hbase的输出也是个问题?谢谢

lw-lin commented 7 years ago

@zhouyan8603

不管是 spark streaming, 还是 structured streaming, 都可以先做一步 pageid, uid 的聚合,再往外写。structured streaming 的 memory table 确实需要有个过期机制(比如只记录最近三天、或一周的所有用户),否则 oom。

上面说的是精确记录 distinct(id) 的做法。如果不需要精确记录(比如可以接受误差在 5% 以内),那么可以考虑用基于概率的算法,占用空间非常小。比如一些概率算法的索引:http://blog.csdn.net/bagba/article/details/51822189

zhouyan8603 commented 7 years ago

@lw-lin 感谢回复,另外想了解下structured streaming的应用场景,比如我的问题里提到用memory table这种做法是否适合生产环境?structured streaming的面世是不是不需要以前的那种ETL的批处理了呢?还是说只是实时消费的一种方案?

lw-lin commented 7 years ago

@zhouyan8603 (1) Structured Steaming 基于 Dataset/DataFrame API, Spark Streaming 基于 RDD API,所以 Structured Steaming 能用 SQL 操作实时的 streaming 数据,而且性能会高些,这些都是 Dataset/DataFrame 带来的收益。 (2) 所以应用场景的话,如果需要 SQL 支持,或者比现有 Spark Streaming 更高的性能,可尝试 Structured Streaming。 (3) 你这个场景,Structured Streaming 当然可以用于生产环境,但是如前面我所说,一定要设置一个 memory table 的过期清除机制;额外多说一句,你这个场景,Spark Streaming 也都能做。 (4) Spark 1.x 的批处理是基于 RDD 的,现在 Spark 2.x 的批处理是基于 Dataset/DataFrame 的;Spark 1.x 的 streaming 是基于 RDD 的,Spark 2.x 的批处理是基于 Dataset/DataFrame 的。Spark 2.x 还是需要批处理 + streaming,只不过都是基于 Dataset/DataFrame 的。

zhouyan8603 commented 7 years ago

@lw-lin 万分感谢!我自己再好好考虑下

cxzdy commented 5 years ago

博主,好想看你出个CoolPlayFlink系列,讲解方式解决了我很多思考,非常受用。

kuncle commented 5 years ago

博主你好, 这里第 (5) 步需要分两种情况讨论 (i) 如果上次执行在 (5) 结束前即失效,那么本次执行里 sink 应该完整写出计算结果 (ii) 如果上次执行在 (5) 结束后才失效,那么本次执行里 sink 可以重新写出计算结果(覆盖上次结果),也可以跳过写出计算结果(因为上次执行已经完整写出过计算结果了) 需要如何判断第五步是否执行成功?这个需要下游系统支持还是?