allwefantasy / spark-binlog

A library for querying Binlog with Apache Spark structure streaming, for Spark SQL , DataFrames and [MLSQL](https://www.mlsql.tech).
Apache License 2.0
154 stars 54 forks source link

Why did I change a database consumption can not start the program? #15

Closed zhengqiangtan closed 4 years ago

zhengqiangtan commented 4 years ago

1、mysql binglog variable settings as follows :

binlog_format ROW log_bin ON log_slave_updates ON

2、binglog as follows :

XidEventData{xid=756741036} GtidEventData{flags=1, gtid='45d35feb-dea1-11e9-8e97-7cd30a5180f6:78862619'} QueryEventData{threadId=1985594, executionTime=0, errorCode=0, database='teacher', sql='BEGIN'} TableMapEventData{tableId=1065904, database='teacher', table='t_log_event', columnTypes=3, 15, 15, 15, 15, 18, 18, columnMetadata=0, 200, 200, 8, 4096, 0, 0, columnNullability={5, 6}} WriteRowsEventData{tableId=1065904, includedColumns={0, 1, 2, 3, 4, 5, 6}, rows=[ [128794196, [B@7c7bdcfc, [B@185039b1, [B@76b4b962, [B@697f6522, 1576582567000, 1576582567000] ]} XidEventData{xid=756740970} GtidEventData{flags=1, gtid='45d35feb-dea1-11e9-8e97-7cd30a5180f6:78862613'} QueryEventData{threadId=1968857, executionTime=0, errorCode=0, database='xxl-job-admin', sql='BEGIN'} TableMapEventData{tableId=1070495, database='xxl-job-admin', table='xxl_job_qrtz_trigger_registry', columnTypes=3, 15, 15, 15, 17, 1, columnMetadata=0, 765, 765, 765, 0, 0, columnNullability={}} UpdateRowsEventData{tableId=1070495, includedColumnsBeforeUpdate={0, 1, 2, 3, 4, 5}, includedColumns={0, 1, 2, 3, 4, 5}, rows=[ {before=[41432, [B@2d7a05f9, [B@574d38a0, [B@54b34eab, 1576553737000, 0], after=[41432, [B@20b7c820, [B@5d1f5402, [B@6329ff39, 1576553767000, 0]} ]}

3、sometimes when I start the stream will meet this warn like this :

2019-12-17 13:43:32 WARN JettyUtils:87 - GET /jobs/ failed: java.util.NoSuchElementException java.util.NoSuchElementException at java.util.Collections$EmptyIterator.next(Collections.java:4189)

and then the stream is terminated ,other error as follow:

2019-12-17 14:07:38 INFO CodeGenerator:54 - Generated method too long to be JIT compiled: org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.serializefromobject_doConsume_0$ is 14018 bytes 2019-12-17 14:07:38 INFO CodeGenerator:54 - Code generated in 180.791597 ms Traceback (most recent call last): File "/home/zmbigdata/ztan/delta/test.py", line 34, in outputMode('append').trigger(processingTime='30 seconds').start() File "/opt/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/streaming.py", line 1105, in start File "/opt/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in call File "/opt/spark-2.4.0-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco File "/opt/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o73.start. : java.util.concurrent.ExecutionException: org.apache.spark.sql.catalyst.errors.package$TreeNodeException: execute, tree: Exchange hashpartitioning(coalesce(add#59.path, remove#60.path), 50) +- (1) Project [txn#58, add#59, remove#60, metaData#61, protocol#62, commitInfo#63, UDF(input_file_name()) AS file#64] +- (1) SerializeFromObject [if (isnull(assertnotnull(input[0,

so. I've initialized the delta table and the checkpoint data before. What should I do now?

allwefantasy commented 4 years ago

Can you paste your script? Remember to replace your password with "xxxxx";

allwefantasy commented 4 years ago

This issue is caused by that he binary log format of RDS is not compatible with MySQL.

zhengqiangtan commented 4 years ago

Different databases may produce different event types, so may be to modify parts of the source code