即可复现
Screenshots
If applicable, add screenshots to help explain your problem.
Environment (please complete the following information):
TDengine Version TDengine-server-2.6.0.8-Linux-x64
taosdata-kafka-connect-TDengine Version 1.0.1
log如下
[2022-07-21 14:45:29,154] WARN [TDengineSinkConnector|task-0] Write of 5 records failed, remainingRetries=2 (com.taosdata.kafka.connect.sink.TDengineSinkTask:110)
java.sql.SQLException: TDengine ERROR (80000362): Table does not exist
at com.taosdata.jdbc.TSDBError.createSQLException(TSDBError.java:76)
at com.taosdata.jdbc.TSDBJNIConnector.insertLines(TSDBJNIConnector.java:376)
at com.taosdata.jdbc.SchemalessWriter.write(SchemalessWriter.java:35)
at com.taosdata.kafka.connect.db.CacheProcessor.schemalessInsert(CacheProcessor.java:86)
at com.taosdata.kafka.connect.sink.TDengineSinkTask.bulkWriteBatch(TDengineSinkTask.java:108)
at com.taosdata.kafka.connect.sink.TDengineSinkTask.put(TDengineSinkTask.java:85)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
[2022-07-21 14:45:29,156] INFO [TDengineSinkConnector|task-0] Try closing connection jdbc:TAOS://127.0.0.1:6030 (com.taosdata.kafka.connect.db.CacheProcessor:94)
[2022-07-21 14:45:29,557] ERROR [TDengineSinkConnector|task-0] WorkerSinkTask{id=TDengineSinkConnector-0} RetriableException from SinkTask: (org.apache.kafka.connect.runtime.WorkerSinkTask:600)
org.apache.kafka.connect.errors.RetriableException: java.sql.SQLException: Exception chain:
java.sql.SQLException: TDengine ERROR (80000362): Table does not exist
at com.taosdata.kafka.connect.sink.TDengineSinkTask.bulkWriteBatch(TDengineSinkTask.java:121)
at com.taosdata.kafka.connect.sink.TDengineSinkTask.put(TDengineSinkTask.java:85)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.sql.SQLException: Exception chain:
java.sql.SQLException: TDengine ERROR (80000362): Table does not exist
at com.taosdata.kafka.connect.sink.TDengineSinkTask.getAllMessagesException(TDengineSinkTask.java:141)
at com.taosdata.kafka.connect.sink.TDengineSinkTask.bulkWriteBatch(TDengineSinkTask.java:116)
... 12 more
[2022-07-21 14:45:32,558] INFO [TDengineSinkConnector|task-0] create TDengine Connection, Attempt 0 of 3 (com.taosdata.kafka.connect.db.TSDBConnectionProvider:34)
[2022-07-21 14:45:32,559] DEBUG [TDengineSinkConnector|task-0] Received 5 records. First record kafka coordinates:(meters-0-48594). Writing them to the database... (com.taosdata.kafka.connect.sink.TDengineSinkTask:101)
[2022-07-21 14:45:32,560] WARN [TDengineSinkConnector|task-0] Write of 5 records failed, remainingRetries=1 (com.taosdata.kafka.connect.sink.TDengineSinkTask:110)
java.sql.SQLException: TDengine ERROR (80000362): Table does not exist
at com.taosdata.jdbc.TSDBError.createSQLException(TSDBError.java:76)
at com.taosdata.jdbc.TSDBJNIConnector.insertLines(TSDBJNIConnector.java:376)
at com.taosdata.jdbc.SchemalessWriter.write(SchemalessWriter.java:35)
at com.taosdata.kafka.connect.db.CacheProcessor.schemalessInsert(CacheProcessor.java:86)
at com.taosdata.kafka.connect.sink.TDengineSinkTask.bulkWriteBatch(TDengineSinkTask.java:108)
at com.taosdata.kafka.connect.sink.TDengineSinkTask.put(TDengineSinkTask.java:85)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
[2022-07-21 14:45:32,560] INFO [TDengineSinkConnector|task-0] Try closing connection jdbc:TAOS://127.0.0.1:6030 (com.taosdata.kafka.connect.db.CacheProcessor:94)
[2022-07-21 14:45:33,011] ERROR [TDengineSinkConnector|task-0] WorkerSinkTask{id=TDengineSinkConnector-0} RetriableException from SinkTask: (org.apache.kafka.connect.runtime.WorkerSinkTask:600)
org.apache.kafka.connect.errors.RetriableException: java.sql.SQLException: Exception chain:
java.sql.SQLException: TDengine ERROR (80000362): Table does not exist
at com.taosdata.kafka.connect.sink.TDengineSinkTask.bulkWriteBatch(TDengineSinkTask.java:121)
at com.taosdata.kafka.connect.sink.TDengineSinkTask.put(TDengineSinkTask.java:85)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.sql.SQLException: Exception chain:
java.sql.SQLException: TDengine ERROR (80000362): Table does not exist
at com.taosdata.kafka.connect.sink.TDengineSinkTask.getAllMessagesException(TDengineSinkTask.java:141)
at com.taosdata.kafka.connect.sink.TDengineSinkTask.bulkWriteBatch(TDengineSinkTask.java:116)
... 12 more
[2022-07-21 14:45:34,917] INFO [TDengineSinkConnector|task-0] create TDengine Connection, Attempt 0 of 3 (com.taosdata.kafka.connect.db.TSDBConnectionProvider:34)
[2022-07-21 14:45:34,919] DEBUG [TDengineSinkConnector|task-0] Received 5 records. First record kafka coordinates:(meters-0-48594). Writing them to the database... (com.taosdata.kafka.connect.sink.TDengineSinkTask:101)
[2022-07-21 14:45:34,920] WARN [TDengineSinkConnector|task-0] Write of 5 records failed, remainingRetries=0 (com.taosdata.kafka.connect.sink.TDengineSinkTask:110)
java.sql.SQLException: TDengine ERROR (80000362): Table does not exist
at com.taosdata.jdbc.TSDBError.createSQLException(TSDBError.java:76)
at com.taosdata.jdbc.TSDBJNIConnector.insertLines(TSDBJNIConnector.java:376)
at com.taosdata.jdbc.SchemalessWriter.write(SchemalessWriter.java:35)
at com.taosdata.kafka.connect.db.CacheProcessor.schemalessInsert(CacheProcessor.java:86)
at com.taosdata.kafka.connect.sink.TDengineSinkTask.bulkWriteBatch(TDengineSinkTask.java:108)
at com.taosdata.kafka.connect.sink.TDengineSinkTask.put(TDengineSinkTask.java:85)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
[2022-07-21 14:45:34,921] INFO [TDengineSinkConnector|task-0] Try closing connection jdbc:TAOS://127.0.0.1:6030 (com.taosdata.kafka.connect.db.CacheProcessor:94)
[2022-07-21 14:45:35,021] INFO [TDengineSinkConnector|task-0] create TDengine Connection, Attempt 0 of 3 (com.taosdata.kafka.connect.db.TSDBConnectionProvider:34)
[2022-07-21 14:45:35,024] ERROR [TDengineSinkConnector|task-0] Error encountered in task TDengineSinkConnector-0. Executing stage 'TASK_PUT' with class 'org.apache.kafka.connect.sink.SinkTask'. (org.apache.kafka.connect.runtime.errors.LogReporter:66)
java.sql.SQLException: Exception chain:
java.sql.SQLException: TDengine ERROR (80000217): Database not specified or available
at com.taosdata.kafka.connect.sink.TDengineSinkTask.getAllMessagesException(TDengineSinkTask.java:141)
at com.taosdata.kafka.connect.sink.TDengineSinkTask.unrollAndRetry(TDengineSinkTask.java:153)
at com.taosdata.kafka.connect.sink.TDengineSinkTask.bulkWriteBatch(TDengineSinkTask.java:124)
at com.taosdata.kafka.connect.sink.TDengineSinkTask.put(TDengineSinkTask.java:85)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
[2022-07-21 14:45:35,042] INFO [TDengineSinkConnector|task-0] [Producer clientId=connector-dlq-producer-TDengineSinkConnector-0] Resetting the last seen epoch of partition my-connector-errors-0 to 0 since the associated topicId changed from null to OpZ4KkyLQm6cesUeJh8GlA (org.apache.kafka.clients.Metadata:402)
[2022-07-21 14:45:35,062] INFO [TDengineSinkConnector|task-0] Try closing connection jdbc:TAOS://127.0.0.1:6030 (com.taosdata.kafka.connect.db.CacheProcessor:94)
[2022-07-21 14:45:35,531] INFO [TDengineSinkConnector|task-0] create TDengine Connection, Attempt 0 of 3 (com.taosdata.kafka.connect.db.TSDBConnectionProvider:34)
[2022-07-21 14:45:35,532] ERROR [TDengineSinkConnector|task-0] Error encountered in task TDengineSinkConnector-0. Executing stage 'TASK_PUT' with class 'org.apache.kafka.connect.sink.SinkTask'. (org.apache.kafka.connect.runtime.errors.LogReporter:66)
java.sql.SQLException: Exception chain:
java.sql.SQLException: TDengine ERROR (80000217): Database not specified or available
at com.taosdata.kafka.connect.sink.TDengineSinkTask.getAllMessagesException(TDengineSinkTask.java:141)
at com.taosdata.kafka.connect.sink.TDengineSinkTask.unrollAndRetry(TDengineSinkTask.java:153)
at com.taosdata.kafka.connect.sink.TDengineSinkTask.bulkWriteBatch(TDengineSinkTask.java:124)
at com.taosdata.kafka.connect.sink.TDengineSinkTask.put(TDengineSinkTask.java:85)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
[2022-07-21 14:45:35,533] INFO [TDengineSinkConnector|task-0] Try closing connection jdbc:TAOS://127.0.0.1:6030 (com.taosdata.kafka.connect.db.CacheProcessor:94)
[2022-07-21 14:45:36,034] INFO [TDengineSinkConnector|task-0] create TDengine Connection, Attempt 0 of 3 (com.taosdata.kafka.connect.db.TSDBConnectionProvider:34)
[2022-07-21 14:45:36,035] ERROR [TDengineSinkConnector|task-0] Error encountered in task TDengineSinkConnector-0. Executing stage 'TASK_PUT' with class 'org.apache.kafka.connect.sink.SinkTask'. (org.apache.kafka.connect.runtime.errors.LogReporter:66)
java.sql.SQLException: Exception chain:
java.sql.SQLException: TDengine ERROR (80000217): Database not specified or available
at com.taosdata.kafka.connect.sink.TDengineSinkTask.getAllMessagesException(TDengineSinkTask.java:141)
at com.taosdata.kafka.connect.sink.TDengineSinkTask.unrollAndRetry(TDengineSinkTask.java:153)
at com.taosdata.kafka.connect.sink.TDengineSinkTask.bulkWriteBatch(TDengineSinkTask.java:124)
at com.taosdata.kafka.connect.sink.TDengineSinkTask.put(TDengineSinkTask.java:85)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
[2022-07-21 14:45:36,037] INFO [TDengineSinkConnector|task-0] Try closing connection jdbc:TAOS://127.0.0.1:6030 (com.taosdata.kafka.connect.db.CacheProcessor:94)
[2022-07-21 14:45:36,538] INFO [TDengineSinkConnector|task-0] create TDengine Connection, Attempt 0 of 3 (com.taosdata.kafka.connect.db.TSDBConnectionProvider:34)
[2022-07-21 14:45:36,539] ERROR [TDengineSinkConnector|task-0] Error encountered in task TDengineSinkConnector-0. Executing stage 'TASK_PUT' with class 'org.apache.kafka.connect.sink.SinkTask'. (org.apache.kafka.connect.runtime.errors.LogReporter:66)
java.sql.SQLException: Exception chain:
java.sql.SQLException: TDengine ERROR (80000217): Database not specified or available
at com.taosdata.kafka.connect.sink.TDengineSinkTask.getAllMessagesException(TDengineSinkTask.java:141)
at com.taosdata.kafka.connect.sink.TDengineSinkTask.unrollAndRetry(TDengineSinkTask.java:153)
at com.taosdata.kafka.connect.sink.TDengineSinkTask.bulkWriteBatch(TDengineSinkTask.java:124)
at com.taosdata.kafka.connect.sink.TDengineSinkTask.put(TDengineSinkTask.java:85)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
[2022-07-21 14:45:36,542] INFO [TDengineSinkConnector|task-0] Try closing connection jdbc:TAOS://127.0.0.1:6030 (com.taosdata.kafka.connect.db.CacheProcessor:94)
[2022-07-21 14:45:37,023] INFO [TDengineSinkConnector|task-0] create TDengine Connection, Attempt 0 of 3 (com.taosdata.kafka.connect.db.TSDBConnectionProvider:34)
[2022-07-21 14:45:37,024] ERROR [TDengineSinkConnector|task-0] Error encountered in task TDengineSinkConnector-0. Executing stage 'TASK_PUT' with class 'org.apache.kafka.connect.sink.SinkTask'. (org.apache.kafka.connect.runtime.errors.LogReporter:66)
java.sql.SQLException: Exception chain:
java.sql.SQLException: TDengine ERROR (80000217): Database not specified or available
at com.taosdata.kafka.connect.sink.TDengineSinkTask.getAllMessagesException(TDengineSinkTask.java:141)
at com.taosdata.kafka.connect.sink.TDengineSinkTask.unrollAndRetry(TDengineSinkTask.java:153)
at com.taosdata.kafka.connect.sink.TDengineSinkTask.bulkWriteBatch(TDengineSinkTask.java:124)
at com.taosdata.kafka.connect.sink.TDengineSinkTask.put(TDengineSinkTask.java:85)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Bug Description kafka连接器在遇到不符合规范的数据,会丢弃整个batch,而不是将batch中可以插入的数据插入,丢弃错误数据。
To Reproduce 依据官方示例 https://docs.taosdata.com/third-party/kafka 将数据文件test-data.txt改为
meters,location=California.LosAngeles,groupid=2 current=11.8,voltage=221,phase=0.28 1648432611249000000 meters,location=California.LosAngeles,groupid=2 current=13.4,voltage=223,phase=0.29 1648432611250000000 errorline meters,location=California.LosAngeles,groupid=3 current=10.8,voltage=223,phase=0.29 1648432611249000000 meters,location=California.LosAngeles,groupid=3 current=11.3,voltage=221,phase=0.35 1648432611250000000
即可复现 Screenshots If applicable, add screenshots to help explain your problem.
Environment (please complete the following information): TDengine Version TDengine-server-2.6.0.8-Linux-x64 taosdata-kafka-connect-TDengine Version 1.0.1
Additional Context 经过整体批重试后,会进入一个对批中每个记录的迭代,此时由于connection已经被关闭,丢失了数据库名,造成即使是正确的记录也无法插入,整个批次都被丢弃。详见log加粗部分
log如下 [2022-07-21 14:45:29,154] WARN [TDengineSinkConnector|task-0] Write of 5 records failed, remainingRetries=2 (com.taosdata.kafka.connect.sink.TDengineSinkTask:110) java.sql.SQLException: TDengine ERROR (80000362): Table does not exist at com.taosdata.jdbc.TSDBError.createSQLException(TSDBError.java:76) at com.taosdata.jdbc.TSDBJNIConnector.insertLines(TSDBJNIConnector.java:376) at com.taosdata.jdbc.SchemalessWriter.write(SchemalessWriter.java:35) at com.taosdata.kafka.connect.db.CacheProcessor.schemalessInsert(CacheProcessor.java:86) at com.taosdata.kafka.connect.sink.TDengineSinkTask.bulkWriteBatch(TDengineSinkTask.java:108) at com.taosdata.kafka.connect.sink.TDengineSinkTask.put(TDengineSinkTask.java:85) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) [2022-07-21 14:45:29,156] INFO [TDengineSinkConnector|task-0] Try closing connection jdbc:TAOS://127.0.0.1:6030 (com.taosdata.kafka.connect.db.CacheProcessor:94) [2022-07-21 14:45:29,557] ERROR [TDengineSinkConnector|task-0] WorkerSinkTask{id=TDengineSinkConnector-0} RetriableException from SinkTask: (org.apache.kafka.connect.runtime.WorkerSinkTask:600) org.apache.kafka.connect.errors.RetriableException: java.sql.SQLException: Exception chain: java.sql.SQLException: TDengine ERROR (80000362): Table does not exist
Caused by: java.sql.SQLException: Exception chain: java.sql.SQLException: TDengine ERROR (80000362): Table does not exist
[2022-07-21 14:45:32,558] INFO [TDengineSinkConnector|task-0] create TDengine Connection, Attempt 0 of 3 (com.taosdata.kafka.connect.db.TSDBConnectionProvider:34) [2022-07-21 14:45:32,559] DEBUG [TDengineSinkConnector|task-0] Received 5 records. First record kafka coordinates:(meters-0-48594). Writing them to the database... (com.taosdata.kafka.connect.sink.TDengineSinkTask:101) [2022-07-21 14:45:32,560] WARN [TDengineSinkConnector|task-0] Write of 5 records failed, remainingRetries=1 (com.taosdata.kafka.connect.sink.TDengineSinkTask:110) java.sql.SQLException: TDengine ERROR (80000362): Table does not exist at com.taosdata.jdbc.TSDBError.createSQLException(TSDBError.java:76) at com.taosdata.jdbc.TSDBJNIConnector.insertLines(TSDBJNIConnector.java:376) at com.taosdata.jdbc.SchemalessWriter.write(SchemalessWriter.java:35) at com.taosdata.kafka.connect.db.CacheProcessor.schemalessInsert(CacheProcessor.java:86) at com.taosdata.kafka.connect.sink.TDengineSinkTask.bulkWriteBatch(TDengineSinkTask.java:108) at com.taosdata.kafka.connect.sink.TDengineSinkTask.put(TDengineSinkTask.java:85) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) [2022-07-21 14:45:32,560] INFO [TDengineSinkConnector|task-0] Try closing connection jdbc:TAOS://127.0.0.1:6030 (com.taosdata.kafka.connect.db.CacheProcessor:94) [2022-07-21 14:45:33,011] ERROR [TDengineSinkConnector|task-0] WorkerSinkTask{id=TDengineSinkConnector-0} RetriableException from SinkTask: (org.apache.kafka.connect.runtime.WorkerSinkTask:600) org.apache.kafka.connect.errors.RetriableException: java.sql.SQLException: Exception chain: java.sql.SQLException: TDengine ERROR (80000362): Table does not exist
Caused by: java.sql.SQLException: Exception chain: java.sql.SQLException: TDengine ERROR (80000362): Table does not exist
[2022-07-21 14:45:34,917] INFO [TDengineSinkConnector|task-0] create TDengine Connection, Attempt 0 of 3 (com.taosdata.kafka.connect.db.TSDBConnectionProvider:34) [2022-07-21 14:45:34,919] DEBUG [TDengineSinkConnector|task-0] Received 5 records. First record kafka coordinates:(meters-0-48594). Writing them to the database... (com.taosdata.kafka.connect.sink.TDengineSinkTask:101) [2022-07-21 14:45:34,920] WARN [TDengineSinkConnector|task-0] Write of 5 records failed, remainingRetries=0 (com.taosdata.kafka.connect.sink.TDengineSinkTask:110) java.sql.SQLException: TDengine ERROR (80000362): Table does not exist at com.taosdata.jdbc.TSDBError.createSQLException(TSDBError.java:76) at com.taosdata.jdbc.TSDBJNIConnector.insertLines(TSDBJNIConnector.java:376) at com.taosdata.jdbc.SchemalessWriter.write(SchemalessWriter.java:35) at com.taosdata.kafka.connect.db.CacheProcessor.schemalessInsert(CacheProcessor.java:86) at com.taosdata.kafka.connect.sink.TDengineSinkTask.bulkWriteBatch(TDengineSinkTask.java:108) at com.taosdata.kafka.connect.sink.TDengineSinkTask.put(TDengineSinkTask.java:85) at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:581) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:333) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:234) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:203) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) [2022-07-21 14:45:34,921] INFO [TDengineSinkConnector|task-0] Try closing connection jdbc:TAOS://127.0.0.1:6030 (com.taosdata.kafka.connect.db.CacheProcessor:94) [2022-07-21 14:45:35,021] INFO [TDengineSinkConnector|task-0] create TDengine Connection, Attempt 0 of 3 (com.taosdata.kafka.connect.db.TSDBConnectionProvider:34) [2022-07-21 14:45:35,024] ERROR [TDengineSinkConnector|task-0] Error encountered in task TDengineSinkConnector-0. Executing stage 'TASK_PUT' with class 'org.apache.kafka.connect.sink.SinkTask'. (org.apache.kafka.connect.runtime.errors.LogReporter:66) java.sql.SQLException: Exception chain: java.sql.SQLException: TDengine ERROR (80000217): Database not specified or available
[2022-07-21 14:45:35,042] INFO [TDengineSinkConnector|task-0] [Producer clientId=connector-dlq-producer-TDengineSinkConnector-0] Resetting the last seen epoch of partition my-connector-errors-0 to 0 since the associated topicId changed from null to OpZ4KkyLQm6cesUeJh8GlA (org.apache.kafka.clients.Metadata:402) [2022-07-21 14:45:35,062] INFO [TDengineSinkConnector|task-0] Try closing connection jdbc:TAOS://127.0.0.1:6030 (com.taosdata.kafka.connect.db.CacheProcessor:94) [2022-07-21 14:45:35,531] INFO [TDengineSinkConnector|task-0] create TDengine Connection, Attempt 0 of 3 (com.taosdata.kafka.connect.db.TSDBConnectionProvider:34) [2022-07-21 14:45:35,532] ERROR [TDengineSinkConnector|task-0] Error encountered in task TDengineSinkConnector-0. Executing stage 'TASK_PUT' with class 'org.apache.kafka.connect.sink.SinkTask'. (org.apache.kafka.connect.runtime.errors.LogReporter:66) java.sql.SQLException: Exception chain: java.sql.SQLException: TDengine ERROR (80000217): Database not specified or available
[2022-07-21 14:45:35,533] INFO [TDengineSinkConnector|task-0] Try closing connection jdbc:TAOS://127.0.0.1:6030 (com.taosdata.kafka.connect.db.CacheProcessor:94) [2022-07-21 14:45:36,034] INFO [TDengineSinkConnector|task-0] create TDengine Connection, Attempt 0 of 3 (com.taosdata.kafka.connect.db.TSDBConnectionProvider:34) [2022-07-21 14:45:36,035] ERROR [TDengineSinkConnector|task-0] Error encountered in task TDengineSinkConnector-0. Executing stage 'TASK_PUT' with class 'org.apache.kafka.connect.sink.SinkTask'. (org.apache.kafka.connect.runtime.errors.LogReporter:66) java.sql.SQLException: Exception chain: java.sql.SQLException: TDengine ERROR (80000217): Database not specified or available
[2022-07-21 14:45:36,037] INFO [TDengineSinkConnector|task-0] Try closing connection jdbc:TAOS://127.0.0.1:6030 (com.taosdata.kafka.connect.db.CacheProcessor:94) [2022-07-21 14:45:36,538] INFO [TDengineSinkConnector|task-0] create TDengine Connection, Attempt 0 of 3 (com.taosdata.kafka.connect.db.TSDBConnectionProvider:34) [2022-07-21 14:45:36,539] ERROR [TDengineSinkConnector|task-0] Error encountered in task TDengineSinkConnector-0. Executing stage 'TASK_PUT' with class 'org.apache.kafka.connect.sink.SinkTask'. (org.apache.kafka.connect.runtime.errors.LogReporter:66) java.sql.SQLException: Exception chain: java.sql.SQLException: TDengine ERROR (80000217): Database not specified or available
[2022-07-21 14:45:36,542] INFO [TDengineSinkConnector|task-0] Try closing connection jdbc:TAOS://127.0.0.1:6030 (com.taosdata.kafka.connect.db.CacheProcessor:94) [2022-07-21 14:45:37,023] INFO [TDengineSinkConnector|task-0] create TDengine Connection, Attempt 0 of 3 (com.taosdata.kafka.connect.db.TSDBConnectionProvider:34) [2022-07-21 14:45:37,024] ERROR [TDengineSinkConnector|task-0] Error encountered in task TDengineSinkConnector-0. Executing stage 'TASK_PUT' with class 'org.apache.kafka.connect.sink.SinkTask'. (org.apache.kafka.connect.runtime.errors.LogReporter:66) java.sql.SQLException: Exception chain: java.sql.SQLException: TDengine ERROR (80000217): Database not specified or available