allwefantasy / spark-binlog

A library for querying Binlog with Apache Spark structure streaming, for Spark SQL , DataFrames and [MLSQL](https://www.mlsql.tech).
Apache License 2.0
154 stars 54 forks source link

Why did my stream fail after running for a while ? #20

Open zhengqiangtan opened 4 years ago

zhengqiangtan commented 4 years ago

I have no idea why did my streaming program failed after running for a while.

Yesterday, a few hours after I started the program to run steadily, I received an email warning that my program had failed. Fortunately, I set up the automatic recovery of failure. The following is the detailed error information。

2019-12-25 17:35:03 INFO ContextCleaner:54 - Cleaned accumulator 103188 2019-12-25 17:35:03 INFO ContextCleaner:54 - Cleaned accumulator 101683 2019-12-25 17:40:00 ERROR MicroBatchExecution:91 - Query [id = 0665d063-69a7-494b-9e48-3dc5dab9fcdc, runId = a6b0095c-dd63-4652-9718-3dbe9b53dbf4] terminated with error java.io.EOFException at java.io.DataInputStream.readInt(DataInputStream.java:392) at org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerSerDer$class.readResponse(servers.scala:163) at org.apache.spark.sql.mlsql.sources.MLSQLBinLogSource.readResponse(MLSQLBinLogDataSource.scala:239) at org.apache.spark.sql.mlsql.sources.MLSQLBinLogSource.getLatestOffset(MLSQLBinLogDataSource.scala:320) at org.apache.spark.sql.mlsql.sources.MLSQLBinLogSource.getOffset(MLSQLBinLogDataSource.scala:330) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$9.apply(MicroBatchExecution.scala:345) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5$$anonfun$apply$9.apply(MicroBatchExecution.scala:345) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5.apply(MicroBatchExecution.scala:344) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1$$anonfun$5.apply(MicroBatchExecution.scala:341) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcZ$sp(MicroBatchExecution.scala:341) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:337) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withProgressLocked(MicroBatchExecution.scala:554) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:337) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:183) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189) 2019-12-25 17:40:00 INFO DAGScheduler:54 - Asked to cancel job group 499fdc52-d16e-4cd3-88ba-964fea21cf0d 2019-12-25 17:40:00 INFO YarnScheduler:54 - Cancelling stage 3 2019-12-25 17:40:00 INFO YarnScheduler:54 - Killing all running tasks in stage 3: Stage cancelled 2019-12-25 17:40:00 INFO YarnScheduler:54 - Stage 3 was cancelled 2019-12-25 17:40:00 INFO DAGScheduler:54 - ResultStage 3 (start at NativeMethodAccessorImpl.java:0) failed in 86674.120 s due to Job 1 cancelled part of cancelled job group 499fdc52-d16e-4cd3-88ba-964fea21cf0d 2019-12-25 17:40:00 INFO DAGScheduler:54 - Job 1 failed: start at NativeMethodAccessorImpl.java:0, took 86674.125588 s Exception in thread "launch-binlog-socket-server-in-spark-job" org.apache.spark.SparkException: Job 1 cancelled part of cancelled job group 499fdc52-d16e-4cd3-88ba-964fea21cf0d at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1887) at org.apache.spark.scheduler.DAGScheduler.handleJobCancellation(DAGScheduler.scala:1822) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleJobGroupCancelled$1.apply$mcVI$sp(DAGScheduler.scala:906) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleJobGroupCancelled$1.apply(DAGScheduler.scala:906) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleJobGroupCancelled$1.apply(DAGScheduler.scala:906) at scala.collection.mutable.HashSet.foreach(HashSet.scala:78) at org.apache.spark.scheduler.DAGScheduler.handleJobGroupCancelled(DAGScheduler.scala:906) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2077) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) at org.apache.spark.rdd.RDD.collect(RDD.scala:944) at org.apache.spark.sql.mlsql.sources.MLSQLBinLogDataSource.org$apache$spark$sql$mlsql$sources$MLSQLBinLogDataSource$$launchBinlogServer$1(MLSQLBinLogDataSource.scala:203) at org.apache.spark.sql.mlsql.sources.MLSQLBinLogDataSource$$anon$4.run(MLSQLBinLogDataSource.scala:210) Traceback (most recent call last): File "/data/sync/stream2hdfs.py", line 38, in query.awaitTermination() File "/opt/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/streaming.py", line 103, in awaitTermination File "/opt/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in call File "/opt/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/utils.py", line 75, in deco pyspark.sql.utils.StreamingQueryException: u'null\n=== Streaming Query ===\nIdentifier: [id = 0665d063-69a7-494b-9e48-3dc5dab9fcdc, runId = a6b0095c-dd63-4652-9718-3dbe9b53dbf4]\nCurrent Committed Offsets: {MLSQLBinLogSource(ExecutorBinlogServer(zmbd-pm26,37103), org.apache.spark.sql.SparkSession@65f394ba,hdfs://nameservice1/user/zmbigdata/checkpoint-binlog/sources/0,Some(97250000386023094),Map(binglognameprefix -> mysql-bin, tablenamepattern -> xxx, databasenamepattern -> xxx, binlogindex -> 9724, username -> xxx, host -> xxx, port -> 3306, binlogserverid -> 499fdc52-d16e-4cd3-88ba-964fea21cf0d, binlogfileoffset -> 88952842, password -> xxx)): 97790000376934283}\n Current Available Offsets: {MLSQLBinLogSource(ExecutorBinlogServer(zmbd-pm26,37103),org.apache.spark.sql.SparkSession@65f394ba,hdfs://nameservice1/user/zmbigdata/checkpoint-binlog/sources/0,Some(97250000386023094),Map(binglognameprefix -> mysql-bin, tablenamepattern -> users, databasenamepattern -> xx, binlogindex -> 9724, username -> xx, host -> 172.xx.xx.xx, port -> 3306, binlogserverid -> 499fdc52-d16e-4cd3-88ba-964fea21cf0d, binlogfileoffset -> 88952842, password -> xxx)): 97790000376934283} \n\nCurrent State: ACTIVE\nThread State: RUNNABLE\n\nLogical Plan:\nMLSQLBinLogSource(ExecutorBinlogServer(zmbd-pm26,37103),org.apache.spark.sql.SparkSession@65f394ba, hdfs://nameservice1/user/zmbigdata/checkpoint-binlog/sources/0,Some(97250000386023094),Map(binglognameprefix -> mysql-bin, tablenamepattern -> users, databasenamepattern -> xx, binlogindex -> 9724, username -> xx, host -> xx, port -> 3306, binlogserverid -> 499fdc52-d16e-4cd3-88ba-964fea21cf0d, binlogfileoffset -> 88952842, password -> xx))' 2019-12-25 17:40:00 INFO SparkContext:54 - Invoking stop() from shutdown hook 2019-12-25 17:40:00 INFO AbstractConnector:318 - Stopped Spark@9a4dd2b{HTTP/1.1,[http/1.1]}{0.0.0.0:4040} 2019-12-25 17:40:00 INFO SparkUI:54 - Stopped Spark web UI at http://zmbd-vpc-wk04:4040 2019-12-25 17:40:00 INFO YarnClientSchedulerBackend:54 - Interrupting monitor thread 2019-12-25 17:40:00 INFO YarnClientSchedulerBackend:54 - Shutting down all executors 2019-12-25 17:40:00 INFO YarnSchedulerBackend$YarnDriverEndpoint:54 - Asking each executor to shut down 2019-12-25 17:40:00 INFO SchedulerExtensionServices:54 - Stopping SchedulerExtensionServices (serviceOption=None, services=List(), started=false) 2019-12-25 17:40:00 INFO YarnClientSchedulerBackend:54 - Stopped 2019-12-25 17:40:00 ERROR TransportRequestHandler:277 - Error while invoking RpcHandler#receive() for one-way message. org.apache.spark.SparkException: Could not find CoarseGrainedScheduler. at org.apache.spark.rpc.netty.Dispatcher.postMessage(Dispatcher.scala:160) at org.apache.spark.rpc.netty.Dispatcher.postOneWayMessage(Dispatcher.scala:140) at org.apache.spark.rpc.netty.NettyRpcHandler.receive(NettyRpcEnv.scala:655) at org.apache.spark.network.server.TransportRequestHandler.processOneWayMessage(TransportRequestHandler.java:275) at org.apache.spark.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:105) at org.apache.spark.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:118) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at org.apache.spark.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:85) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1359) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:935) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:138) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459) at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:858) at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138) at java.lang.Thread.run(Thread.java:748) 2019-12-25 17:40:00 INFO MapOutputTrackerMasterEndpoint:54 - MapOutputTrackerMasterEndpoint stopped! 2019-12-25 17:40:00 INFO MemoryStore:54 - MemoryStore cleared 2019-12-25 17:40:00 INFO BlockManager:54 - BlockManager stopped 2019-12-25 17:40:00 INFO BlockManagerMaster:54 - BlockManagerMaster stopped 2019-12-25 17:40:00 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint:54 - OutputCommitCoordinator stopped! 2019-12-25 17:40:00 INFO SparkContext:54 - Successfully stopped SparkContext 2019-12-25 17:40:00 INFO ShutdownHookManager:54 - Shutdown hook called 2019-12-25 17:40:00 INFO ShutdownHookManager:54 - Deleting directory /dfs/data1/sparkdata/spark-8d3cdd89-85d5-4ef4-8fc8-754a1a9add98 2019-12-25 17:40:00 INFO ShutdownHookManager:54 - Deleting directory /dfs/data1/sparkdata/spark-8d3cdd89-85d5-4ef4-8fc8-754a1a9add98/pyspark-63924722-b7ee-43fc-ae3e-75f1d2df5bd4 2019-12-25 17:40:00 INFO ShutdownHookManager:54 - Deleting directory /tmp/spark-9d33e5d5-df54-4a64-9518-52553eb4ec06

@allwefantasy Can you give me some advice to avoid such problems? thanks~

allwefantasy commented 4 years ago

Network jitter. Every duration the streaming application will request a new offset from the binlog server, and they communicate with a socket. When the network is unstable, the connection may be broken and the streaming application may be not able to read data from the connection.

Make sure when the application fails you can restart it automatically. And we will also try to add some retry mechanism on the socket communication in the future. Thanks for your feedback.

zhengqiangtan commented 4 years ago

We are looking forward to add some retry mechanism on the socket communication in the future,because it is very necessary, thanks for the answer!