confluentinc / kafka-connect-jdbc

Kafka Connect connector for JDBC-compatible databases
Other
1.01k stars 953 forks source link

`org.postgresql.util.PSQLException` should cause the task to fail #1374

Open yeikel opened 6 months ago

yeikel commented 6 months ago

I am currently seeing the following error in the logs :


[2023-12-08 14:33:07,008] ERROR [event-log\|task-0] Failed to run query for table TimestampIncrementingTableQuerier{table=null, query='SELECT * FROM (SELECT * FROM event and action_layer_tx='service' AND event_status_tx='COMPLETED_SUCCESS') as elv1', topicPrefix='topic', incrementingColumn='', timestampColumns=[create_epoch_ts]}: {} (io.confluent.connect.jdbc.source.JdbcSourceTask:386)
  | org.postgresql.util.PSQLException: ERROR: syntax error at or near "and"
  | Position: 46
  | at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2676)
  | at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2366)
  | at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:356)
  | at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:496)
  | at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:413)
  | at org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:190)
 

But the task is still active. I was only able to find out that something is wrong because I checked the logs manually. This should not be necessary

Sample configuration :


{
    "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "timestamp.column.name": "create_epoch_ts",
    "connection.password": "redacted",
    "tasks.max": "1",
    "query": "SELECT * FROM (SELECT * FROM event and action_layer_tx='service' AND event_status_tx='COMPLETED_SUCCESS') as elv1",
    "connection.attempts": "10",
    "batch.max.rows": "100",
    "connection.backoff.ms": "1000",
    "mode": "timestamp",
    "topic.prefix": "redacted",
    "connection.user": "redacted",
    "db.timezone": "America/Denver",
    "poll.interval.ms": "100",
    "name": "test",
    "errors.tolerance": "none",
    "connection.url": "jdbc:postgresql://redacted:5432/redacted"
}

In this case, my query was invalid, but I should think that the task should fail rather than to just log it and keep the task active