allwefantasy / delta-plus

A library based on delta for Spark and MLSQL
62 stars 24 forks source link

option("mergeSchema", "true") #15

Open liangrui1988 opened 4 years ago

liangrui1988 commented 4 years ago

Caused by: org.apache.spark.sql.AnalysisException: A schema mismatch detected when writing to the Delta table. To enable schema migration, please set: '.option("mergeSchema", "true")'.

我已经在写的时候加上了这个配置,但对表增加列的时候,这里还是报错了,这个附加option的配置,在写数据的时候,是否传进去并配置上呢? var df = spark.readStream. format("org.apache.spark.sql.mlsql.sources.MLSQLBinLogDataSource"). option("host", xlPropertis.getProperty("xulie_host")). option("port", "3306"). option("userName", xlPropertis.getProperty("xulie_user")). option("password", xlPropertis.getProperty("xulie_passwd")). option("bingLogNamePrefix", "mysql-bin"). option("databaseNamePattern", databaseNamePattern). option("tableNamePattern", tableNames). option("tableNamePatternType", "stringList"). option("binlogIndex", "1"). option("binlogFileOffset", "14783737"). load() df.printSchema() val query = df.writeStream. option("mergeSchema", "true"). // option("spark.databricks.delta.schema.autoMerge", "true"). outputMode(OutputMode.Append()). format("org.apache.spark.sql.delta.sources.MLSQLDeltaDataSource"). option("path", "/apps/delta/{db}/{table}"). option("mode", "Append"). //Append Overwrite option("idCols", "id"). option("duration", "5"). option("syncType", "binlog"). option("checkpointLocation", xlPropertis.getProperty("xulie_checkpoint")). option("path", "{db}/{table}"). start() query.awaitTermination()

zhupishiye commented 4 years ago

遇到相同的问题了,添加了配置无效的还,请问解决了吗