lw-lin / CoolplaySpark

酷玩 Spark: Spark 源代码解析、Spark 类库等
3.46k stars 1.41k forks source link

《0.1 Spark Streaming 实现思路与模块概述.md》讨论区 #1

Open lw-lin opened 8 years ago

lw-lin commented 8 years ago

这里是 《0.1 Spark Streaming 实现思路与模块概述.md》 讨论区。

如需要贴代码,请复制以下内容并修改:

public static final thisIsJavaCode;
val thisIsScalaCode

谢谢!

fengshenwu commented 8 years ago

Spark Streaming 的长时容错特性,能够提供不重、不丢,exactly-once 的处理语义。 不丢 可以通过记录标签及wal做到,怎么做到不重及exactly-once呢,此点本文没有说。

lw-lin commented 8 years ago

@lwwcl1314

考虑三点:

  1. 给定 HDFS 上的输入文件 f,那么 MapReduce 无论失败几次、重做几次,最终的结果 r 是一致的;
  2. Spark Streaming 就是保证了每条源头数据都唯一的划分到一个块数据里,每个块数据都唯一的划分到一个 batch 里;
  3. 然后对于一个 batch,失败几次就对着源头数据再重做几次(就像 MapReduce 对着 f 多次重做一样),就可以保证本 batch 的结果与本 batch 的源头数据是一一对应的。

不重复,是因为每条源头数据都唯一划分一个 batch 里;不丢 + 不重复,就等于 exactly-once 了。

Storm 的 Trident 层,也是通过类似划分 batch 的方式做到的 exactly-once —— 不过那个实现有点不太自然。。。

fengshenwu commented 8 years ago

我感觉 http://blog.cloudera.com/blog/2015/03/exactly-once-spark-streaming-from-apache-kafka/ 这个文件讲的还是很好的,呵呵。

lw-lin commented 8 years ago

cloudera 的这篇文章是针对 Apache Kafka 的情况作了具体的讲解,也推荐大家都看看。

赞 @lwwcl1314 ~

jacksu commented 8 years ago

里面的图是什么画得呢?

luphappy commented 8 years ago

我用spark streaming,但时间长了之后,经常会自己就退出了,是为什么呢

jacksu commented 8 years ago

https://github.com/jacksu/utils4s/tree/master/sparkstreaming-demo

lw-lin commented 8 years ago

@jacksu 基本都是用 Visio 画的,:-)

lw-lin commented 8 years ago

@luphappy 这个需要具体看一下 driver 端打印的日志,退出一般是报 Exception 了,可以根据具体的 Exception 来看;有可能还需要到报错的 executor 端去看日志。

可否贴一下报错信息?

winwill2012 commented 8 years ago

大神,你这一系列文章可以转载吗?转载会附上原文链接与作者名。

lw-lin commented 8 years ago

@winwill2012 OK 的。附上原文链接与作者名,然后把链接 post 到这里就可以了。

zwzm85 commented 8 years ago

想问一下,热备和冷备都是针对块数据来的,那些还没成为块数据的缓存中的细小数据怎么处理?

lw-lin commented 8 years ago

@zwzm85 思考的很细致,赞。确实这里是有些问题的,这些细小数据就没有保障了;最主要的原因还是在于上游不能支持重放。

luphappy commented 8 years ago

您好,请问,我的spark-streaming程序以yarn-client的方式运行了一段时间后,就退出了,但driver还在,yarn日志如下: 15/09/19 14:32:34 ERROR util.Utils: Uncaught exception in thread Thread-1 org.apache.hadoop.security.token.SecretManager$InvalidToken: Invalid AMRMToken from appattempt_1437371132890_10529_000002 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:526) at org.apache.hadoop.yarn.ipc.RPCUtil.instantiateException(RPCUtil.java:53) at org.apache.hadoop.yarn.ipc.RPCUtil.unwrapAndThrowException(RPCUtil.java:104) at org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.finishApplicationMaster(ApplicationMasterProtocolPBClientImpl.java:94) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) at com.sun.proxy.$Proxy18.finishApplicationMaster(Unknown Source) at org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.unregisterApplicationMaster(AMRMClientImpl.java:378) at org.apache.spark.deploy.yarn.YarnRMClient.unregister(YarnRMClient.scala:86) at org.apache.spark.deploy.yarn.ApplicationMaster.unregister(ApplicationMaster.scala:184) at org.apache.spark.deploy.yarn.ApplicationMaster$$anonfun$run$1.apply$mcV$sp(ApplicationMaster.scala:123) at org.apache.spark.util.SparkShutdownHook.run(Utils.scala:2308) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(Utils.scala:2278) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(Utils.scala:2278) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1$$anonfun$apply$mcV$sp$1.apply(Utils.scala:2278) at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1772) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply$mcV$sp(Utils.scala:2278) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(Utils.scala:2278) at org.apache.spark.util.SparkShutdownHookManager$$anonfun$runAll$1.apply(Utils.scala:2278) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.util.SparkShutdownHookManager.runAll(Utils.scala:2278) at org.apache.spark.util.SparkShutdownHookManager$$anon$6.run(Utils.scala:2260) at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:54) Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): Invalid AMRMToken from appattempt_1437371132890_10529_000002 at org.apache.hadoop.ipc.Client.call(Client.java:1468) at org.apache.hadoop.ipc.Client.call(Client.java:1399) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232) at com.sun.proxy.$Proxy17.finishApplicationMaster(Unknown Source) at org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.finishApplicationMaster(ApplicationMasterProtocolPBClientImpl.java:91) ... 23 more

这个是什么原因造成的呢

lw-lin commented 8 years ago

@luphappy

你好,请问你的 Yarn 是什么版本?有可能跟这个有关: YARN-3103: AMRMClientImpl does not update AMRM token properly

另外,用 yarn cluster 方式部署试试看;通常 Streaming 的程序不会部署为 yarn client。

pzz2011 commented 8 years ago

问个Spark-Core的问题,只是我自己一直没搞明白。看源码的时候生成FinalStage的时候,那个ActiveJob是啥? 话说一个Stage应该就是对应一个job吧,然后一个Stage可以含有多个RDD,但是对于FinalStage而言,应该就只有一个job吧。 而非FinalStage可以对应多个RDD,那么非FinalStage是不是可以对应多个job呢? 求解... 看源码看晕了T_T~~~~

pzz2011 commented 8 years ago

@lw-lin

luckuan commented 8 years ago

对于job如何提交的,如何运行的,你可以看看这篇博客 http://www.cnblogs.com/luckuan/p/5250258.html 。 一个job可以生成多个stage,每个stage中间会有一个shuffle过程。@pzz2011

pzz2011 commented 8 years ago

@luckuan @lw-lin 1.5.0 version spark-1.5.0\spark-1.5.0\core\src\main\scala\org\apache\spark\scheduler\DAGScheduler.scala

private[scheduler] val jobIdToStageIds = new HashMap[Int, HashSet[Int]]
private[scheduler] val stageIdToStage = new HashMap[Int, Stage]
private[scheduler] val shuffleToMapStage = new HashMap[Int, ShuffleMapStage]
private[scheduler] val jobIdToActiveJob = new HashMap[Int, ActiveJob]

// Stages we need to run whose parents aren't done
private[scheduler] val waitingStages = new HashSet[Stage]

// Stages we are running right now
private[scheduler] val runningStages = new HashSet[Stage]

private[scheduler] val activeJobs = new HashSet[ActiveJob]

感觉还是这几个Collection没搞懂jobIdToActiveJob shuffleToMapStage stageIdToStage jobIdToStageIds 没搞懂它们和Stage啊 job啊task的关系....

weibin0516 commented 8 years ago

@pzz2011 可以看看这篇文章 [Spark源码剖析] DAGScheduler划分stage http://www.jianshu.com/p/50c5c1032206

wangwenting commented 8 years ago

我想问下WAL 默认配置是false的, 这个你们目前线上是开启的吗? 如果在保存WAL的时候程序强制退出,做了checkpoint ,这个没有保存在WAL的数据是不是丢失了。

在保存数据的时候exactly-once 这个怎么做到,比如写到非事务的存储里,写了一半挂掉了。

还有个问题,对于输出数据到hdfs中,如何做到一个小时对应partition个文件。 设置 batchDuration 为一个小时,感觉这种做法不靠谱, 能够append 到之前的文件这种做法吗?

lw-lin commented 8 years ago

@wangwenting

(1.a) 我们线上还是分应用,对于一点不能丢的都是从可重放的数据源读数据(如 HDFS,Kafka)等,可以丢一点的就开 WAL 或 RAM_2 双机热备; (1.b) 是的,不可重放的数据源不论是 WAL 还是 RAM_2 双机热备都有丢数据的风险,保险的还是靠可重 放数据源。

(2) exactly-once 主要是指 Streaming 内部的处理,end-to-end 的 exactly-once 还需要上游数据源支持(可重放)、下游数据接收支持(事务)。如果写到非事务的存储里,则保证 at-lease-once;其实多加一列 batch-id,其实也能保证 exactly-once,比如 batch-id =10, count = 2000 这样的输出多写几次,最终取出来还是 count = 2000 这样一条。

(3) 这个需求还不好直接支持,可以尝试间接:(a) 自己起一个合并任务,每小时把 p × n 个文件合并为 p 个;或者 (b) 自己写个输出方式为 append 到已有文件,只不过这种要注意设置 concurrentJobs = 1,以防同一个小时内多个 batch 同时 append 到同一个 partition 对应的文件。

希望有帮助!

wangwenting commented 8 years ago

非常感谢,很有帮助,前期准备就用kafka direct 方式读,在线的就foreachRDD 的方式写入mysql 离线的就kafka direct 方式读 写 hdfs中,spark目前好像不支持小文件combine, 只能自己合并了。

wangwenting commented 8 years ago

在streaming checkpoing 的情况下用broadcast 会出现raise Exception("Broadcast variable '%s' not loaded!" % bid) 不知道你可预见过

romantic123 commented 8 years ago

请问: 一个inputStream对应一个Receiver对吧,一个Receiver会分配到一个Executor上,那么这个Receiver接收到的数据都会放在这个Executor上吧,这样会不会造成数据倾斜呢?

lw-lin commented 8 years ago

@wangwenting

没有遇到过。以下代码是 checkpoint + broadcast,起停了几次都是正常运行的,请参考:……

Update: 代码已删除,原来的代码在 local 运行有效、但放到 Yarn 上跑确实有问题。

正确的代码请见 Streaming 官方 Programming Guide

lw-lin commented 8 years ago

@romantic123

是的,如果整个 app 只有 1 个 receiver,那数据就会只收到 1 个或 2 个 Executor 上(看数据冗余是配了总共 1 份还是 2 份),数据会倾斜很多。

这种情况下一般是起多个 receiver,分别消费数据的不同部分,而且 Spark Streaming 会保证 receiver 均分到不同的 Executor 上,这样数据就可以均分到不同的 Executor 上了,同时计算时又会有很好的 data locality。

wangwenting commented 8 years ago

Accumulators and Broadcast Variables Accumulators and Broadcast variables cannot be recovered from checkpoint in Spark Streaming. If you enable checkpointing and use Accumulators or Broadcast variables as well, you’ll have to create lazily instantiated singleton instances for Accumulators and Broadcast variables so that they can be re-instantiated after the driver restarts on failure. This is shown in the following example.

when I use lazy --no problem def getWordBlacklist(sparkContext): if ('wordBlacklist' not in globals()): globals()['wordBlacklist'] = sparkContext.broadcast(["a", "b", "c"]) return globals()['wordBlacklist']

when I use bc = sc.broadcast(["a", "b", "c"]) in main func and use in foreachRDD has error

File "/home/bfd_hz/spark/python/lib/pyspark.zip/pyspark/broadcast.py", line 39, in _from_id raise Exception("Broadcast variable '%s' not loaded!" % bid) Exception: (Exception("Broadcast variable '0' not loaded!",)

lw-lin commented 8 years ago

@wangwenting

之前贴的代码确实有问题,从你上面的解释里也学习到了 accumulators & broadcast 在 Streaming 里的正确用法,感谢感谢!

luphappy commented 8 years ago

@lw-lin 2.6.0,应该是那个原因

luphappy commented 8 years ago

@lw-lin 您好,我想问一下,以yarn-cluster运行的程序,如果需要更新,重启,要怎么做呢

lw-lin commented 8 years ago

@luphappy 2.6.1 应该是可以修复了。 你是说如何停止 yarn-cluster 下的 Streaming 程序对吧。

image

huduan-D commented 8 years ago

@lw-lin 您好,非常感谢您的分享,写得特别棒!赞赞赞 我有一段没看明白: “(2) 要求 DStreamGraph 复制出一套新的 RDD DAG 的实例,具体过程是:DStreamGraph 将要求图里的尾 DStream 节点生成具体的 RDD 实例,并递归的调用尾 DStream 的上游 DStream 节点……以此遍历整个 DStreamGraph,遍历结束也就正好生成了 RDD DAG 的实例;” DAG不是已经静态生成了吗?即DStreamGraph从头到尾的操作已经固定了,为什么还要如此从尾部遍历呢?

spanf commented 8 years ago

你好,我也遇到一个类似的问题,但是我的hadoop版本是2.7.2,YARN-3103这个bug也是修复了的,但是还是运行一个星期后报了下面这个错误,麻烦你看看这里还可能会有什么错误呢....

WARN 16/08/18 00:10:19 Client: Exception encountered while connecting to the server : org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): Invalid AMRMToken from appattempt_1470817219719_0004_000001 WARN 16/08/18 00:10:19 ApplicationMaster: Reporter thread fails 2 time(s) in a row. org.apache.hadoop.security.token.SecretManager$InvalidToken: Invalid AMRMToken from appattempt_1470817219719_0004_000001 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:422) at org.apache.hadoop.yarn.ipc.RPCUtil.instantiateException(RPCUtil.java:53) at org.apache.hadoop.yarn.ipc.RPCUtil.unwrapAndThrowException(RPCUtil.java:104) at org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.allocate(ApplicationMasterProtocolPBClientImpl.java:79) at sun.reflect.GeneratedMethodAccessor20.invoke(Unknown Source) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) at com.sun.proxy.$Proxy16.allocate(Unknown Source) at org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl.allocate(AMRMClientImpl.java:278) at org.apache.spark.deploy.yarn.YarnAllocator.allocateResources(YarnAllocator.scala:225) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$1.run(ApplicationMaster.scala:368) Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.token.SecretManager$InvalidToken): Invalid AMRMToken from appattempt_1470817219719_0004_000001 at org.apache.hadoop.ipc.Client.call(Client.java:1468) at org.apache.hadoop.ipc.Client.call(Client.java:1399) at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232) at com.sun.proxy.$Proxy15.allocate(Unknown Source) at org.apache.hadoop.yarn.api.impl.pb.client.ApplicationMasterProtocolPBClientImpl.allocate(ApplicationMasterProtocolPBClientImpl.java:77) ... 9 more

teeyog commented 7 years ago

第一次看源码解析完全看进去了,如果有spark core的源码解析就更完美了!

lw-lin commented 7 years ago

@SOBIGUFO

Glad it helped! :-)

Nicksxs commented 7 years ago

@lw-lin 上面@wangweting 提到的那个问题还是不太懂怎么使用lazy,能具体解释下吗,感谢

ly13131wyq commented 7 years ago

Greet!

jinchenga commented 7 years ago

我刚开始研究Spark Streaming,这一节看到每个DStream instance维护着每个batch的指针,我想还是用引用比较好?毕竟JAVA类语言没有了指针的概念?不知道对不对啊?

bjkonglu commented 6 years ago

@lw-lin 感谢你的分享,写的非常好。我很是受益,但是文章有一段没看明白,请大神赐教。

具体的,Spark Streaming 在程序刚开始运行时: (1) 由 Receiver 的总指挥 ReceiverTracker 分发多个 job(每个 job 有 1 个 task),到多个 executor 上分别启动 ReceiverSupervisor 实例; (2) 每个 ReceiverSupervisor 启动后将马上生成一个用户提供的 Receiver 实现的实例 —— 该 Receiver 实现可以持续产生或者持续接收系统外数据,比如 TwitterReceiver 可以实时爬取 twitter 数据 —— 并在 Receiver 实例生成后调用 Receiver.onStart();

上述描述的意思是下面哪个:

  1. 用户在业务代码里面创建一个Receiver实例,而Spark集群上会在多个executor上启动多个Receiver;
  2. 用户在业务代码里面创建多个Receiver实例,Spark集群会在多个executor上启动多个Receiver;
lw-lin commented 6 years ago

@jinchenga 是的,我来修改下,还是用“引用”比较好。

lw-lin commented 6 years ago

@bjkonglu

是提供一个 Receiver 的具体类实现(系统内置的,或用户自定义的)、并提供并行度 x,然后 Spark 会在 x 个 executor 上启动 x 个Receiver。

duanjianmin commented 6 years ago

有个问题请教一下: SparkStreaming 会把连续不断的 streaming data 进行多次切片,就会形成多个 batch 然后提交各个batch,进而由spark core 去执行。 问题:这多个batch可以并行处理吗,还是等一个batch执行完毕才会执行下一个? 我自己感觉是这两种方式都支持。如:updateStateBykey这种调用是有状态的,所以batch的执行应该是顺序执行。但是如果只是单纯的统计每个batch中的wordcount,那么就可以并行执行多个batch,提高效率。

lw-lin commented 6 years ago

@duanjianmin

是的,你的理解是正确的。对于有状态的,需要顺序执行。对于无状态的,可以并行执行。是由参数 spark.streaming.concurrentJobs 控制的。进一步可以看这个系列的《2.1 JobScheduler, Job, JobSet 详解》,里面有详细的解释。

duanjianmin commented 6 years ago

@lw-lin 谢谢你的回复。另外一个问题: 对于Spark Streaming , 我的理解是他的优点主要是

  1. 可以将Streaming data 分片
  2. 使用Spark core的API

但是对于通常的架构,服务器开启一个consumer,一条一条的从MQ中读取数据然后处理,个人感觉一条一条的数据也不会很大,因此不用“Spark core的API”性能也不会有多大影响。因为流处理其实每一个分片都不会大,所以很大程度Spark streaming是不是不适用(取而代之直接用一条一条的接受数据然后处理,这样也不会引入Spark依赖)?我想问一下Spark streaming 到底适用的场景是什么呢 ,想对于通常的架构有什么好处?

lw-lin commented 6 years ago

@duanjianmin

对于一般的数据量,区别不大。但是对于大数据量,batch processing 的吞吐量就比 one record at a time 的流式吞吐量大很多了,也就是节省很多机器,比如从 50 台节省到 30 台的话,就有 40% 的节约。

另外 Spark Core 的好处是自带节点故障恢复、stage/task 失败重做、以及推测执行。这些都是不间断处理、快速处理的保障。即使不用 Spark Core,也要有好的办法应对这些问题才行。

duanjianmin commented 6 years ago

@lw-lin Thanks .

lw-lin commented 6 years ago

@duanjianmin

可以加一下讨论群:https://github.com/lw-lin/CoolplaySpark/tree/master/Spark%20%E8%B5%84%E6%BA%90%E9%9B%86%E5%90%88

bjkonglu commented 6 years ago

@lw-lin 在Job动态生成这个模块中,获取ReceiverTracker分配给本batch的源头数据的meta信息,图中是箭头是从JobGenerator指向ReceiverTracker,但我觉得这个过程应该是获取的过程,图中的箭头应该从ReceiverTracker到JobGenerator,不知道对不对! 此外,在executor长时间容错模块中,有个笔误“关联是保障收到的块数据的安全”,应该是“关键是保障收到的块数据的安全”吧

joe2016 commented 6 years ago

@spanf 请问你找到原因了吗,我在2.7.5上面也会出现这个异常,每隔一天就出现。能分享一下你怎么解决的吗?