lw-lin / CoolplaySpark

酷玩 Spark: Spark 源代码解析、Spark 类库等
3.46k stars 1.41k forks source link

Executor运行一段时间以后Streaming程序失败 #17

Open tsface opened 8 years ago

tsface commented 8 years ago

你好 @lw-lin : 我们在使用Streaming的时候,发现Executor运行一段时间(1小时左右)后,整个程序就会失败,查看CPU,内存,网络,GC情况,都处于安全状态。

error: java.lang.Exception: Could not compute split, block input-0-1416573258200 not found

最开始的Storage策略配置的是Memory_ONLY,当数据量激增的时候,会报这个错误,所以调整Storage的策略是Memory_And_DIsk,但是程序运行一段时间还是会报这个错误。同时,会抛出Executor和ReceiverTracker的通信超时(120s)。

请问这个有什么好的排查方法吗,谢谢。

ps:部署模式yarn-cluster

lw-lin commented 8 years ago

@tsface

block input-x-xxxxx not found 这个错误,就是数据已经找不到了:

通常解决方法是几种:

不过还是建议先看下 Executor 打出来的具体日志,看看需要加资源还是说能够容忍部分数据损失,再酌情选择解决方法。可在本帖后随时反馈;希望有帮助!

lw-lin commented 8 years ago

@tsface

上面提到的 try-catch 代码:

val inputDStream = ssc.fileStream("")
inputDStream.foreachRDD(rdd => {
  try {
    // do something
  } catch {
    case e => e.printStackTrace()
  }
})
tsface commented 8 years ago

@lw-lin 谢谢你的解答

try-catch的代码还没有试过,测试了下MEMORY_AND_DISK_2,性能比MEMORY_AND_DISK差很多,目前测试业务下数据处理性能差不多是这样的关系 :4 * MEMORY_AND_DISK_2 = MEMORY_AND_DISK 。

Executor被kill的原因,是Active Job队列里面任务开始积压,处理时延增加。Job的提交周期是1秒,由于CPU平均使用率到95%左右,Receiver接收速率不变,每个Job处理时延增加到了5到10s,目前Job的提交Interval能动态指定吗?

lw-lin commented 8 years ago

@tsface

现在有几个 receiver?几个 Executor、每个 Executor 几个 core?

block interval 是多大?batch interval(即 batch duration) 呢?每个 batch 处理多少 records?

tsface commented 8 years ago

@lw-lin

每个Block差不多6M左右,batch duration: 1s, 每个batch处理的events没有注意,应该是12000多个吧。

测试了下try-catch,可以解决Executor被kill的情况:+1:

lw-lin commented 8 years ago

@tsface 好的,try-catch 只是个应急手段,看起来还是建议调整下 block interval 和每个 executor 的 core 数~

tsface commented 8 years ago

@lw-lin 关于这个问题我跟踪了下Driver和Executor端的debug日志,日志中有些问题暂时不明白,想请教下,下面信息是从Driver端Akka消息的角度整理的,完成的日志太大,不方便post

Driver:node4 Executors:node2,node3

  1. node2的上报数据更新请求:UpdateBlockInfo(input-0-1460509204597),处理成功
  2. node2发送AddBlock请求,处理成功,数据块存储在node2,块大小5.6MB
  3. sparkDriver端产生GetLocations消息,处理成功
  4. sparkDriver端发出RemoveBlock消息,此时node2上的BlockManager执行了remove操作
  5. node2发送UpdateBlockInfo消息,此时块的存储级别变为None
  6. sparkDriver端产生GetLocations,产生java.lang.Exception: Could not compute split, block input-0-1460509204597 not found (4次)(7~11的日志发生了两分钟后)
  7. node3上报更新请求:UpdateBlockInfo,存储块input-0-1460509204597到node3,块大小4.8MB
  8. node3产生AddBlock消息
  9. sparkDriver端产生GetLocations消息,处理成功
  10. sparkDriver端发出RemoveBlock消息,此时node3上的BlockManager执行了remove操作
  11. node3发送UpdateBlockInfo消息,Driver的blockid信息被删除

问题:

  1. 在4中,为什么sparkDriver的Akka端会产生RemoveBlock的消息,这个消息到底是怎么产生的?
  2. 在数据被删除的情况下JobScheduler将Tasks分发给node3,导致node3 getRemote的时候找不到数据,这个在任务调度的时序上是怎么样的一个过程?
  3. 既然数据已经被删除,为什么在node3上这个块数据又出现了,而且块的大小改变了?

谢谢!

lw-lin commented 8 years ago

@tsface 收到。这个应用是跑在 Spark 1.? 的环境上的?Receiver 的 StorageLevel 是怎么设置的?整个 DAG 拓扑中有 window 操作吗?

tsface commented 8 years ago

@lw-lin spark 1.5.2 StorageLevel : MEMORY_AND_DISK 没有window操作

weibin0516 commented 8 years ago

@tsface 有没有调用过 StreamingContext#remember ?

tsface commented 8 years ago

@keepsimplefocus 没有

weibin0516 commented 8 years ago

@tsface 我怀疑和 spark streaming 的清理机制有关。在 jobSet 完成和 checkpoint 的时候都会触发清理操作,这个时候可能会把需要用到的 blocks 删掉。http://www.jianshu.com/p/5e096df2618d 可能会给你一些启发,希望有帮助~

airphoto commented 8 years ago

@tsface 请问,你的问题解决了吗?我也遇到了相同的问题 我的运行模式是 standalone client模式,同时在网上请教了一下,说是也有可能driver的内存设置的太小,导致gc时间太长,我的driver内存设置是默认的(应该是1g吧),请问有可能是这个原因吗?

proflin commented 8 years ago

@tsface 能把完整日志(driver+相关 executors)发给我吗?lwlin7#gmail.com 之前没遇到过这种情况

tsface commented 8 years ago

@proflin 已发

tsface commented 8 years ago

@keepsimplefocus 谢谢,源码中加了日志,在跟踪

abetterme commented 5 years ago

请问这个问题解决了吗,我也遇到了类似的问题