confluentinc / kafka-connect-jdbc

Kafka Connect connector for JDBC-compatible databases
Other
8 stars 954 forks source link

JDBC Sink fails to alter MySQL table #592

Open rmoff opened 5 years ago

rmoff commented 5 years ago

Streaming Avro data to MySQL with the JDBC Sink, connector aborts if switching from "pk.mode": "none" to "pk.mode": "kafka" with the error:

Caused by: org.apache.kafka.connect.errors.ConnectException: Cannot ALTER to add missing field SinkRecordField{schema=Schema{STRING}, name='__connect_topic', isPrimaryKey=true}, as it is not optional and does not have a default value

Connector config:

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{ "name": "dev-dsb-errors-mysql-sink6",
  "config": { 
   "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
   "tasks.max": "1",
   "topics": "PAGEVIEWS_REGIONS", 
   "connection.url": "jdbc:mysql://localhost:3306/demo?user=rmoff&password=pw",
   "auto.create": "true",
   "auto.evolve":"true",
   "key.converter": "org.apache.kafka.connect.storage.StringConverter",
   "value.converter": "io.confluent.connect.avro.AvroConverter",
   "value.converter.schema.registry.url": "http://localhost:8081",
   "pk.mode": "kafka"
  }    
}'

Logs:

[2019-02-06 15:29:13,235] INFO Using MySql dialect table "PAGEVIEWS_REGIONS" present (io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect)
[2019-02-06 15:29:13,236] DEBUG Querying MySql dialect column metadata for catalog:null schema:null table:PAGEVIEWS_REGIONS (io.confluent.connect.jdbc.dialect.MySqlDatabaseDialect)
[2019-02-06 15:29:13,252] INFO Setting metadata for table "PAGEVIEWS_REGIONS" to Table{name='"PAGEVIEWS_REGIONS"', columns=[Column{'GENDER', isPrimaryKey=false, allowsNull=true, sqlType=VARCHAR}, Column{'REGIONID', isPrimaryKey=false, allowsNull=true, sqlType=VARCHAR}, Column{'NUMUSERS', isPrimaryKey=false, allowsNull=true, sqlType=BIGINT}]} (io.confluent.connect.jdbc.util.TableDefinitions)
[2019-02-06 15:29:13,253] DEBUG Found missing field: SinkRecordField{schema=Schema{STRING}, name='__connect_topic', isPrimaryKey=true} (io.confluent.connect.jdbc.sink.DbStructure)
[2019-02-06 15:29:13,253] DEBUG Found missing field: SinkRecordField{schema=Schema{INT32}, name='__connect_partition', isPrimaryKey=true} (io.confluent.connect.jdbc.sink.DbStructure)
[2019-02-06 15:29:13,253] DEBUG Found missing field: SinkRecordField{schema=Schema{INT64}, name='__connect_offset', isPrimaryKey=true} (io.confluent.connect.jdbc.sink.DbStructure)
[2019-02-06 15:29:13,253] INFO Unable to find fields [SinkRecordField{schema=Schema{INT32}, name='__connect_partition', isPrimaryKey=true}, SinkRecordField{schema=Schema{INT64}, name='__connect_offset', isPrimaryKey=true}, SinkRecordField{schema=Schema{STRING}, name='__connect_topic', isPrimaryKey=true}] among column names [GENDER, REGIONID, NUMUSERS] (io.confluent.connect.jdbc.sink.DbStructure)
[2019-02-06 15:29:13,253] ERROR WorkerSinkTask{id=dev-dsb-errors-mysql-sink5-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. (org.apache.kafka.connect.runtime.WorkerSinkTask)
org.apache.kafka.connect.errors.ConnectException: Cannot ALTER to add missing field SinkRecordField{schema=Schema{INT32}, name='__connect_partition', isPrimaryKey=true}, as it is not optional and does not have a default value
    at io.confluent.connect.jdbc.sink.DbStructure.amendIfNecessary(DbStructure.java:132)
    at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:73)
    at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:84)
    at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:65)
    at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:73)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:565)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:194)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

If the table is dropped first, the connector then succeeds, creating the table from scratch with the additional columns.

blcksrx commented 4 years ago

Damn it! I got this

suikast42 commented 4 years ago

Damn it! I got this

The problem is still present

gavnyx commented 3 years ago

I got this error as well. DDL on my field is not null and doesn't have default value. How to ignore or set default value for this field?

fizwan commented 2 years ago

Still got this as well

nguyentructran commented 1 year ago

still got this.

0xSumitBanik commented 1 year ago

Faced the same issue today.

gokhansahin477 commented 1 year ago

same is today mysql,

ghkdqhrbals commented 1 year ago

Still present

ihsan-96 commented 4 months ago

I am facing this as well. This seems to be a common case. I could find that this is a design choice from the docs here

For backward-compatible table schema evolution, new fields in record schemas must be optional or have a default value. If you need to delete a field, the table schema should be manually altered to either drop the corresponding column, assign it a default value, or make it nullable.

But, manually changing the schema is a dirty solution and is not scalable. Do we have some workaround here? Or can we expect some fixes?

phamvanlinh20111993 commented 3 weeks ago

i have faced the same problem, any suggestion here. thanks. Here is my error log: Cannot ALTER TABLE "sink_connector_example"."test" to add missing field SinkRecordField{schema=Schema{io.debezium.connector.postgresql.Source:STRUCT}, name='source', isPrimaryKey=false}, as the field is not optional and does not have a default value at io.confluent.connect.jdbc.sink.DbStructure.amendIfNecessary(DbStructure.java:182) at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:83) at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:122) at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:74) at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:90) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:601)