Open qingzhongli opened 6 years ago
pipeline 满了吧。你确定 pipeline 写出的操作的执行结果被及时读取了么?如果 pipeline 满了的话操作会被阻塞。
@spinlock 谢谢您的回复,还有几个问题请教一下。 请问: 1、 pipeline 写出的操作的执行结果是否被及时读取了如何验证?可否详细的说一下方法? 2、pipeline 满了的话,如何查看是否满了? 3、我现在在 pipeline.sync() 这里加了异常捕获的逻辑,是否合理?
一般我会写两个 thread,一个负责 read 一个负责 write,但是你的代码里面我只看到了 pipeline 的 write,那么两个问题,1. pipeline 写出的结果都不重要了么?2. pipeline 写出过程的错误,都不做检查了么?
你的这种写法很容易把缓冲区写爆掉的~
@spinlock 多谢指点。刚开始学着用,还想问一下: 1、两个thread的写法,能给个简单的参考吗? 2、pipeline 写出过程的错误,怎么检查?
jedis pipeline操作,sync()方法是不关注返回结果的,难道codis需要另外的get返回结果,如果不get会阻塞缓冲区吗?
@yangzhe1991 @elvuel @spinlock @fancy-rabbit 请教个问题,spark streaming消费kafka中的消息,写入codis,执行一段时间后报错,spark streaming程序日志如下:
User class threw exception: org.apache.spark.SparkException: Job aborted due to stage failure: Task 17 in stage 7813.0 failed 4 times, most recent failure: Lost task 17.3 in stage 7813.0 (TID 468911, DataNode-01, executor 77): redis.clients.jedis.exceptions.JedisConnectionException: java.net.SocketTimeoutException: Read timed out at redis.clients.util.RedisInputStream.ensureFill(RedisInputStream.java:201) at redis.clients.util.RedisInputStream.readByte(RedisInputStream.java:40) at redis.clients.jedis.Protocol.process(Protocol.java:141) at redis.clients.jedis.Protocol.read(Protocol.java:205) at redis.clients.jedis.Connection.readProtocolWithCheckingBroken(Connection.java:297) at redis.clients.jedis.Connection.getAll(Connection.java:267) at redis.clients.jedis.Connection.getAll(Connection.java:259) at redis.clients.jedis.Pipeline.sync(Pipeline.java:99) at com.lqz.sparkStreaming.statics.Statics$$anonfun$doCount$1$$anonfun$apply$1.apply(Statics.scala:131) at com.lqz.sparkStreaming.statics.Statics$$anonfun$doCount$1$$anonfun$apply$1.apply(Statics.scala:94) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$33.apply(RDD.scala:920) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1870) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1870) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:229) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:114
对应代码处理逻辑片段如下:
static.foreachRDD(rdd` => { rdd.foreachPartition(partitionOfRecords => { lazy val pipelineObj = CodisClient.pipelined; lazy val pipeline = pipelineObj.pipeline;