apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
8.01k stars 1.82k forks source link

[Bug] Cannot operate on a closed statement #5352

Closed ZanebonoAlter closed 1 year ago

ZanebonoAlter commented 1 year ago

Search before asking

What happened

I upgrade seatunnel from 2.3.1 to 2.3.3 due to I want Clickhouse writing is triggered when a checkpoint is reached https://github.com/apache/seatunnel/pull/4999 But seatunnel break when checkpoint is reached, and The shorter the interval I configure(checkpoint.interval = 20000), the greater the chance of triggering I think maybe the old Clickhouse JDBC connection is closed due to too fast thread switching, but the new inserted data uses the old channel, causing flush to throw an exception

This config works in 2.3.1

SeaTunnel Version

2.3.3 branch: dev 9e85d1228d0cb8a0433ac05422362ec2623506c5

SeaTunnel Config

env {
  execution.parallelism = 1
  job.mode = "STREAMING"
  checkpoint.interval = 60000
}

source {
  MySQL-CDC {
    result_table_name = "yc"
    parallelism = 1
    server-id = 5656
    username = "****"
    password = "****"
    table-names = ["**.****1"]
    base-url = "jdbc:mysql://***:3306/**"
    server-time-zone = "UTC+8"
    startup.mode = "initial"
  }

  MySQL-CDC {
    result_table_name = "hiswarn"
    parallelism = 1
    server-id = 6657
    username = "***"
    password = "****"
    table-names = ["**.****2"]
    base-url = "jdbc:mysql://****:3306/**"
    server-time-zone = "UTC+8"
    startup.mode = "initial"
  }
}

sink {

Clickhouse {
    host = "****:8123"
    database = "migrate"
    table = "this_hiswarn"

    source_table_name = "hiswarn"
    # split mode options
    split_mode = true
    sharding_key = "f_time"
  }

  Clickhouse {
    host = "****:8123"
    database = "migrate"
    table = "this_yc"

    source_table_name = "yc"
    # split mode options
    split_mode = true
    sharding_key = "f_time"
  }
}

Running Command

java -Dhazelcast.client.config=/home/seatunnel/apache-seatunnel-2.3.3-SNAPSHOT/config/hazelcast-client.yaml -Dseatunnel.config=/home/seatunnel/apache-seatunnel-2.3.3-SNAPSHOT/config/seatunnel.yaml -Dhazelcast.config=/home/seatunnel/apache-seatunnel-2.3.3-SNAPSHOT/config/hazelcast.yaml -Dlog4j2.configurationFile=/home/seatunnel/apache-seatunnel-2.3.3-SNAPSHOT/config/log4j2_client.properties -Dseatunnel.logs.path=/home/seatunnel -cp /home/seatunnel/apache-seatunnel-2.3.3-SNAPSHOT/lib/*:/home/seatunnel/apache-seatunnel-2.3.3-SNAPSHOT/starter/seatunnel-starter.jar org.apache.seatunnel.core.starter.seatunnel.SeaTunnelClient --config ./config/test.streaming.conf -e local

Error Exception

org.apache.seatunnel.common.utils.SeaTunnelException: java.lang.InterruptedException: sleep interrupted
        at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:166) ~[connector-cdc-mysql-2.3.3-SNAPSHOT.jar:2.3.3-SNAPSHOT]
        at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:92) ~[connector-cdc-mysql-2.3.3-SNAPSHOT.jar:2.3.3-SNAPSHOT]
        at org.apache.seatunnel.connectors.cdc.base.source.reader.IncrementalSourceReader.pollNext(IncrementalSourceReader.java:98) ~[connector-cdc-mysql-2.3.3-SNAPSHOT.jar:2.3.3-SNAPSHOT]
        at org.apache.seatunnel.engine.server.task.flow.SourceFlowLifeCycle.collect(SourceFlowLifeCycle.java:150) ~[seatunnel-starter.jar:2.3.3-SNAPSHOT]
        at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.collect(SourceSeaTunnelTask.java:95) ~[seatunnel-starter.jar:2.3.3-SNAPSHOT]
        at org.apache.seatunnel.engine.server.task.SeaTunnelTask.stateProcess(SeaTunnelTask.java:167) ~[seatunnel-starter.jar:2.3.3-SNAPSHOT]
        at org.apache.seatunnel.engine.server.task.SourceSeaTunnelTask.call(SourceSeaTunnelTask.java:100) ~[seatunnel-starter.jar:2.3.3-SNAPSHOT]
        at org.apache.seatunnel.engine.server.TaskExecutionService$BlockingWorker.run(TaskExecutionService.java:611) [seatunnel-starter.jar:2.3.3-SNAPSHOT]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_372]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_372]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_372]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_372]
        at java.lang.Thread.run(Thread.java:750) [?:1.8.0_372]
Caused by: java.lang.InterruptedException: sleep interrupted
        at java.lang.Thread.sleep(Native Method) ~[?:1.8.0_372]
        at org.apache.seatunnel.connectors.seatunnel.common.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:164) ~[connector-cdc-mysql-2.3.3-SNAPSHOT.jar:2.3.3-SNAPSHOT]
        ... 12 more

Caused by: org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException: ErrorCode:[COMMON-10], ErrorDescription:[Flush data operation that in sink connector failed] - Clickhouse execute batch statement error
        at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseSinkWriter.flush(ClickhouseSinkWriter.java:119)
        at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseSinkWriter.flush(ClickhouseSinkWriter.java:134)
        at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseSinkWriter.prepareCommit(ClickhouseSinkWriter.java:93)
        at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:166)
        ... 15 more
        Suppressed: java.sql.SQLException: Cannot operate on a closed statement
                at com.clickhouse.jdbc.SqlExceptionUtils.clientError(SqlExceptionUtils.java:73)
                at com.clickhouse.jdbc.internal.ClickHouseStatementImpl.ensureOpen(ClickHouseStatementImpl.java:114)
                at com.clickhouse.jdbc.internal.InputBasedPreparedStatement.setString(InputBasedPreparedStatement.java:262)
                at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor.FieldNamedPreparedStatement.setString(FieldNamedPreparedStatement.java:123)
                at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.StringInjectFunction.injectFields(StringInjectFunction.java:51)
                at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor.JdbcRowConverter.toExternal(JdbcRowConverter.java:79)
                at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor.SimpleBatchStatementExecutor.addToBatch(SimpleBatchStatementExecutor.java:42)
                at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor.BufferedBatchStatementExecutor.executeBatch(BufferedBatchStatementExecutor.java:51)
                at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor.BufferedBatchStatementExecutor.closeStatements(BufferedBatchStatementExecutor.java:61)
                at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor.JdbcBatchStatementExecutor.close(JdbcBatchStatementExecutor.java:37)
                at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseSinkWriter.flush(ClickhouseSinkWriter.java:128)
                ... 17 more

Caused by: java.sql.SQLException: Cannot operate on a closed statement
        at com.clickhouse.jdbc.SqlExceptionUtils.clientError(SqlExceptionUtils.java:73)
        at com.clickhouse.jdbc.internal.ClickHouseStatementImpl.ensureOpen(ClickHouseStatementImpl.java:114)
        at com.clickhouse.jdbc.internal.InputBasedPreparedStatement.setString(InputBasedPreparedStatement.java:262)
        at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor.FieldNamedPreparedStatement.setString(FieldNamedPreparedStatement.java:123)
        at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.StringInjectFunction.injectFields(StringInjectFunction.java:51)
        at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor.JdbcRowConverter.toExternal(JdbcRowConverter.java:79)
        at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor.SimpleBatchStatementExecutor.addToBatch(SimpleBatchStatementExecutor.java:42)
        at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.executor.BufferedBatchStatementExecutor.executeBatch(BufferedBatchStatementExecutor.java:51)
        at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseSinkWriter.flush(ClickhouseSinkWriter.java:117)
        ... 18 more

Zeta or Flink or Spark Version

No response

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

Code of Conduct

github-actions[bot] commented 1 year ago

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.

github-actions[bot] commented 1 year ago

This issue has been closed because it has not received response for too long time. You could reopen it if you encountered similar problems in the future.