allwefantasy / spark-binlog

A library for querying Binlog with Apache Spark structure streaming, for Spark SQL , DataFrames and [MLSQL](https://www.mlsql.tech).
Apache License 2.0
154 stars 54 forks source link

when binlog server is shutdown, the streming can not Fail fast #19

Open zhengqiangtan opened 4 years ago

zhengqiangtan commented 4 years ago
question:

when I modify the mysql data type from datetime to timestamp , I found the binlog server is shutdown , but the streaming program is still working , and no data can insert into delta table, why ?

image

and the error log as follow :

19/12/24 10:53:30 ERROR MicroBatchExecution: Query [id = b11c2596-8106-40b9-a8d3-e0117ffcf23e, runId = b469978d-ecff-4518-b62b-c545215742eb] terminated with error org.apache.spark.sql.AnalysisException: Failed to merge fields 'activated_at' and 'activated_at'. Failed to merge incompatible data types TimestampType and DateType;;

zhengqiangtan commented 4 years ago

The detailed error log is shown below

19/12/24 11:23:43 ERROR MicroBatchExecution: Query [id = b11c2596-8106-40b9-a8d3-e0117ffcf23e, runId = cea35ba0-a5bc-4c68-94d8-6223cb2cf2ed] terminated with error org.apache.spark.sql.AnalysisException: Failed to merge fields 'activated_at' and 'activated_at'. Failed to merge incompatible data types TimestampType and DateType;; at org.apache.spark.sql.delta.schema.SchemaUtils$$anonfun$18.apply(SchemaUtils.scala:666) at org.apache.spark.sql.delta.schema.SchemaUtils$$anonfun$18.apply(SchemaUtils.scala:655) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at org.apache.spark.sql.delta.schema.SchemaUtils$.org$apache$spark$sql$delta$schema$SchemaUtils$$merge$1(SchemaUtils.scala:655) at org.apache.spark.sql.delta.schema.SchemaUtils$.mergeSchemas(SchemaUtils.scala:731) at org.apache.spark.sql.delta.schema.ImplicitMetadataOperation$class.updateMetadata(ImplicitMetadataOperation.scala:60) at org.apache.spark.sql.delta.commands.UpsertTableInDelta.updateMetadata(UpsertTableInDelta.scala:17) at org.apache.spark.sql.delta.commands.UpsertTableInDelta$$anonfun$_run$4.apply(UpsertTableInDelta.scala:58) at org.apache.spark.sql.delta.commands.UpsertTableInDelta$$anonfun$_run$4.apply(UpsertTableInDelta.scala:55) at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:388) at org.apache.spark.sql.delta.commands.UpsertTableInDelta._run(UpsertTableInDelta.scala:55) at org.apache.spark.sql.delta.commands.UpsertTableInDelta$$anonfun$run$1.apply$mcV$sp(UpsertTableInDelta.scala:31) at tech.mlsql.common.DeltaJob$.runWithTry(DeltaJob.scala:17) at org.apache.spark.sql.delta.commands.UpsertTableInDelta.run(UpsertTableInDelta.scala:30) at org.apache.spark.sql.delta.commands.BinlogSyncToDelta$$anonfun$saveToSink$1$1.apply(BinlogSyncToDelta.scala:121) at org.apache.spark.sql.delta.commands.BinlogSyncToDelta$$anonfun$saveToSink$1$1.apply(BinlogSyncToDelta.scala:82) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.Map$Map1.foreach(Map.scala:116) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.delta.commands.BinlogSyncToDelta$.saveToSink$1(BinlogSyncToDelta.scala:82) at org.apache.spark.sql.delta.commands.BinlogSyncToDelta$.run(BinlogSyncToDelta.scala:132) at org.apache.spark.sql.delta.sources.MLSQLDeltaSink.addBatch(MLSQLDeltaSink.scala:31) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$17.apply(MicroBatchExecution.scala:537) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:535) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:534) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193) 19/12/24 11:23:43 INFO DAGScheduler: Asked to cancel job group 18b844a4-64b3-473e-8c99-c7361931e7db 19/12/24 11:23:43 ERROR SocketServerInExecutor: The server ServerSocket[addr=/192.168.46.175,localport=60840] is closing the socket Socket[addr=/192.168.46.175,port=60844,localport=60840] connection java.io.EOFException at java.io.DataInputStream.readInt(DataInputStream.java:392) at org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerSerDer$class.readRequest(servers.scala:141) at org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerInExecutor.readRequest(BinLogSocketServerInExecutor.scala:29) at org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerInExecutor.handleConnection(BinLogSocketServerInExecutor.scala:403) at org.apache.spark.sql.mlsql.sources.mysql.binlog.SocketServerInExecutor$$anonfun$2.apply(servers.scala:128) at org.apache.spark.sql.mlsql.sources.mysql.binlog.SocketServerInExecutor$$anonfun$2.apply(servers.scala:127) at org.apache.spark.sql.mlsql.sources.mysql.binlog.SocketServerInExecutor$$anon$4$$anon$5.run(servers.scala:101) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 19/12/24 11:23:43 INFO TaskSchedulerImpl: Cancelling stage 3 19/12/24 11:23:43 INFO TaskSchedulerImpl: Killing all running tasks in stage 3: Stage cancelled 19/12/24 11:23:43 INFO Executor: Executor is trying to kill task 0.0 in stage 3.0 (TID 51), reason: Stage cancelled 19/12/24 11:23:43 INFO SocketServerInExecutor: Received connection fromSocket[addr=/192.168.46.175,port=60899,localport=60840] 19/12/24 11:23:43 ERROR SocketServerInExecutor: The server ServerSocket[addr=/192.168.46.175,localport=60840] is closing the socket Socket[addr=/192.168.46.175,port=60899,localport=60840] connection java.io.EOFException at java.io.DataInputStream.readInt(DataInputStream.java:392) at org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerSerDer$class.readRequest(servers.scala:141) at org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerInExecutor.readRequest(BinLogSocketServerInExecutor.scala:29) at org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerInExecutor.handleConnection(BinLogSocketServerInExecutor.scala:403) at org.apache.spark.sql.mlsql.sources.mysql.binlog.SocketServerInExecutor$$anonfun$2.apply(servers.scala:128) at org.apache.spark.sql.mlsql.sources.mysql.binlog.SocketServerInExecutor$$anonfun$2.apply(servers.scala:127) at org.apache.spark.sql.mlsql.sources.mysql.binlog.SocketServerInExecutor$$anon$4$$anon$5.run(servers.scala:101) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 19/12/24 11:23:43 INFO TaskSchedulerImpl: Stage 3 was cancelled 19/12/24 11:23:43 INFO Executor: Executor interrupted and killed task 0.0 in stage 3.0 (TID 51), reason: Stage cancelled 19/12/24 11:23:43 INFO BinLogSocketServerInExecutor: Shutdown ServerSocket[addr=/192.168.46.175,localport=60840]. This may caused by the task is killed. Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: Failed to merge fields 'activated_at' and 'activated_at'. Failed to merge incompatible data types TimestampType and DateType;; === Streaming Query === Identifier: [id = b11c2596-8106-40b9-a8d3-e0117ffcf23e, runId = cea35ba0-a5bc-4c68-94d8-6223cb2cf2ed] Current Committed Offsets: {MLSQLBinLogSource(ExecutorBinlogServer(192.168.46.175,60840),org.apache.spark.sql.SparkSession@35f57010,file:///tmp/checkpoint/sources/0,Some(15000000123421051),Map(binglognameprefix -> mysql-bin, tablenamepattern -> users, databasenamepattern -> bi, binlogindex -> 1500, username -> xxx, host -> xxx, port -> 3306, binlogserverid -> 18b844a4-64b3-473e-8c99-c7361931e7db, binlogfileoffset -> 48739574, password -> xxx)): 15000000123421051} Current Available Offsets: {MLSQLBinLogSource(ExecutorBinlogServer(192.168.46.175,60840),org.apache.spark.sql.SparkSession@35f57010,file:///tmp/checkpoint/sources/0,Some(15000000123421051),Map(binglognameprefix -> mysql-bin, tablenamepattern -> users, databasenamepattern -> bi, binlogindex -> 1500, username -> xxx, host -> xx, port -> 3306, binlogserverid -> 18b844a4-64b3-473e-8c99-c7361931e7db, binlogfileoffset -> 48739574, password -> xxxx)): 15000000138427733}

Current State: ACTIVE Thread State: RUNNABLE

Logical Plan: MLSQLBinLogSource(ExecutorBinlogServer(192.168.46.175,60840),org.apache.spark.sql.SparkSession@35f57010,file:///tmp/checkpoint/sources/0,Some(15000000123421051),Map(binglognameprefix -> mysql-bin, tablenamepattern -> users, databasenamepattern -> bi, binlogindex -> 1500, username -> xx, host -> xxx, port -> 3306, binlogserverid -> 18b844a4-64b3-473e-8c99-c7361931e7db, binlogfileoffset -> 48739574, password -> xxx)) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:297) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193) Caused by: org.apache.spark.sql.AnalysisException: Failed to merge fields 'activated_at' and 'activated_at'. Failed to merge incompatible data types TimestampType and DateType;; at org.apache.spark.sql.delta.schema.SchemaUtils$$anonfun$18.apply(SchemaUtils.scala:666) at org.apache.spark.sql.delta.schema.SchemaUtils$$anonfun$18.apply(SchemaUtils.scala:655) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:186) at org.apache.spark.sql.delta.schema.SchemaUtils$.org$apache$spark$sql$delta$schema$SchemaUtils$$merge$1(SchemaUtils.scala:655) at org.apache.spark.sql.delta.schema.SchemaUtils$.mergeSchemas(SchemaUtils.scala:731) at org.apache.spark.sql.delta.schema.ImplicitMetadataOperation$class.updateMetadata(ImplicitMetadataOperation.scala:60) at org.apache.spark.sql.delta.commands.UpsertTableInDelta.updateMetadata(UpsertTableInDelta.scala:17) at org.apache.spark.sql.delta.commands.UpsertTableInDelta$$anonfun$_run$4.apply(UpsertTableInDelta.scala:58) at org.apache.spark.sql.delta.commands.UpsertTableInDelta$$anonfun$_run$4.apply(UpsertTableInDelta.scala:55) at org.apache.spark.sql.delta.DeltaLog.withNewTransaction(DeltaLog.scala:388) at org.apache.spark.sql.delta.commands.UpsertTableInDelta._run(UpsertTableInDelta.scala:55) at org.apache.spark.sql.delta.commands.UpsertTableInDelta$$anonfun$run$1.apply$mcV$sp(UpsertTableInDelta.scala:31) at tech.mlsql.common.DeltaJob$.runWithTry(DeltaJob.scala:17) at org.apache.spark.sql.delta.commands.UpsertTableInDelta.run(UpsertTableInDelta.scala:30) at org.apache.spark.sql.delta.commands.BinlogSyncToDelta$$anonfun$saveToSink$1$1.apply(BinlogSyncToDelta.scala:121) at org.apache.spark.sql.delta.commands.BinlogSyncToDelta$$anonfun$saveToSink$1$1.apply(BinlogSyncToDelta.scala:82) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.immutable.Map$Map1.foreach(Map.scala:116) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at org.apache.spark.sql.delta.commands.BinlogSyncToDelta$.saveToSink$1(BinlogSyncToDelta.scala:82) at org.apache.spark.sql.delta.commands.BinlogSyncToDelta$.run(BinlogSyncToDelta.scala:132) at org.apache.spark.sql.delta.sources.MLSQLDeltaSink.addBatch(MLSQLDeltaSink.scala:31) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5$$anonfun$apply$17.apply(MicroBatchExecution.scala:537) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$5.apply(MicroBatchExecution.scala:535) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:534) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:198) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281) ... 1 more 19/12/24 11:23:43 ERROR SocketServerInExecutor: The server ServerSocket[addr=/192.168.46.175,localport=60840] is closing the socket Socket[addr=/192.168.46.175,port=60848,localport=60840] connection java.net.SocketException: Socket closed at java.net.SocketInputStream.socketRead0(Native Method) at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) at java.net.SocketInputStream.read(SocketInputStream.java:171) at java.net.SocketInputStream.read(SocketInputStream.java:141) at java.net.SocketInputStream.read(SocketInputStream.java:224) at java.io.DataInputStream.readInt(DataInputStream.java:387) at org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerSerDer$class.readRequest(servers.scala:141) at org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerInExecutor.readRequest(BinLogSocketServerInExecutor.scala:29) at org.apache.spark.sql.mlsql.sources.mysql.binlog.BinLogSocketServerInExecutor.handleConnection(BinLogSocketServerInExecutor.scala:403) at org.apache.spark.sql.mlsql.sources.mysql.binlog.SocketServerInExecutor$$anonfun$2.apply(servers.scala:128) at org.apache.spark.sql.mlsql.sources.mysql.binlog.SocketServerInExecutor$$anonfun$2.apply(servers.scala:127) at org.apache.spark.sql.mlsql.sources.mysql.binlog.SocketServerInExecutor$$anon$4$$anon$5.run(servers.scala:101) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 19/12/24 11:23:43 INFO DAGScheduler: ResultStage 3 (start at MysqlBinLogRead.scala:44) failed in 22.634 s due to Job 1 cancelled part of cancelled job group 18b844a4-64b3-473e-8c99-c7361931e7db 19/12/24 11:23:43 INFO FileBasedWriteAheadLog_BinLogSocketServerInExecutor: Stopped write ahead log manager 19/12/24 11:23:43 INFO DAGScheduler: Job 1 failed: start at MysqlBinLogRead.scala:44, took 22.655395 s Exception in thread "launch-binlog-socket-server-in-spark-job" org.apache.spark.SparkException: Job 1 cancelled part of cancelled job group 18b844a4-64b3-473e-8c99-c7361931e7db at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1889) at org.apache.spark.scheduler.DAGScheduler.handleJobCancellation(DAGScheduler.scala:1824) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleJobGroupCancelled$1.apply$mcVI$sp(DAGScheduler.scala:906) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleJobGroupCancelled$1.apply(DAGScheduler.scala:906) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleJobGroupCancelled$1.apply(DAGScheduler.scala:906) at scala.collection.mutable.HashSet.foreach(HashSet.scala:78) at org.apache.spark.scheduler.DAGScheduler.handleJobGroupCancelled(DAGScheduler.scala:906) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2079) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126) at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:945) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) at org.apache.spark.rdd.RDD.collect(RDD.scala:944) at org.apache.spark.sql.mlsql.sources.MLSQLBinLogDataSource.org$apache$spark$sql$mlsql$sources$MLSQLBinLogDataSource$$launchBinlogServer$1(MLSQLBinLogDataSource.scala:203) at org.apache.spark.sql.mlsql.sources.MLSQLBinLogDataSource$$anon$4.run(MLSQLBinLogDataSource.scala:210) 19/12/24 11:23:43 WARN TaskSetManager: Lost task 0.0 in stage 3.0 (TID 51, localhost, executor driver): TaskKilled (Stage cancelled) 19/12/24 11:23:43 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool

zhengqiangtan commented 4 years ago

If a schema change causes the binlog server to fail, the binlog server service will still fail if the inserted data is consumed from the previous consumption point

allwefantasy commented 4 years ago

This issue is caused by Delta. By default, Delta will validate the schema of input data and do not doing schema merging. This exception will make the task close the connection to binlog service and we do not handle the exception. I guess the best way is trying to quit the streaming application, another way is to make the binlog service handle this exception.

zhengqiangtan commented 4 years ago

This issue is caused by Delta. By default, Delta will validate the schema of input data and do not doing schema merging. This exception will make the task close the connection to binlog service and we do not handle the exception. I guess the best way is trying to quit the streaming application, another way is to make the binlog service handle this exception.

I think it's a good idea to just throw an exception and exit, rather than pretend the program is dead