apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
8.01k stars 1.82k forks source link

[Bug] [Clickhouse-File] clickhouse-file connector-v2 generated data part name conflict #3242

Closed Z1Wu closed 1 year ago

Z1Wu commented 2 years ago

Search before asking

What happened

When using clickhouse-file connector v2 to load data into clickhouse, if sink get multiple data partitions, different clickhouse-local program on different executors may generate data part with same name. Under this situation, the following operations including transfering local file to clickhouse clsuter node and altering data part of table may fail because of the name conflict.

sample configuration file to reproduce error :

env {
  job.name = "SeaTunnel"
  spark.executor.instances = 2
  spark.executor.cores = 2
  spark.executor.memory = "1g"
  spark.master = "local[*]"
}

source {

  FakeSource {
    row.num = 20
    split.num = 2 
    parallelism = 2 # make connector-source  produce rdd with 2 partitions
    schema = {
      fields {
        c_string = string
        c_int = int
      }
    }
    result_table_name = "fake"
  }
}

transform {
}

sink {
  // for standalone server
  ClickhouseFile {
    host = "localhost"
    database = "default"
    table = "test_clickhouse_table"
    username = ""
    password = ""
    clickhouse_local_path = "<path to clickhouse local>"
    sharding_key = ""
    node_free_password = false
    node_pass = [
      {
        node_address = "localhost:8123" 
        username = "***"
        password = "***"
      }
    ]
  }
  # If you would like to get more information about how to configure seatunnel and see full list of output plugins,
  # please go to https://seatunnel.apache.org/docs/category/sink-v2
}

SeaTunnel Version

2.2.0-beta-release

SeaTunnel Config

shell
env {
  job.name = "SeaTunnel"
  spark.executor.instances = 2
  spark.executor.cores = 2
  spark.executor.memory = "1g"
  spark.master = "local[*]"
}

source {

  FakeSource {
    row.num = 20
    split.num = 2
    parallelism = 2
    schema = {
      fields {
        c_string = string
        c_int = int
      }
    }
    result_table_name = "fake"
  }
}

transform {
}

sink {
  // for standalone server
  ClickhouseFile {
    host = "localhost:8123"
    database = "default"
    table = "test_clickhouse_table"
    username = ""
    password = ""
    clickhouse_local_path = "<path to clickhouse local>"

    sharding_key = ""

    node_free_password = false
    node_pass = [
      {
        node_address = "localhost:8123" 
        username = "***"
        password = "***"
      }
    ]
  }
  # If you would like to get more information about how to configure seatunnel and see full list of output plugins,
  # please go to https://seatunnel.apache.org/docs/category/sink-v2
}

Running Command

./bin/start-seatunnel-spark-connector-v2.sh \
--master local[*] \
--deploy-mode client \
--config ./config/ck.file.conf

Error Exception

Pushing operators to class org.apache.seatunnel.translation.spark.source.SeaTunnelSourceSupport
Pushed Filters: 
Post-Scan Filters: 
Output: c_string#0, c_int#1

22/10/31 16:29:04 INFO CodeGenerator: Code generated in 124.463292 ms
[before] total partition number : 2
22/10/31 16:29:04 INFO DataSourceV2Strategy: 
Pushing operators to class org.apache.seatunnel.translation.spark.source.SeaTunnelSourceSupport
Pushed Filters: 
Post-Scan Filters: 
Output: c_string#0, c_int#1

22/10/31 16:29:04 INFO WriteToDataSourceV2Exec: Start processing data source writer: org.apache.seatunnel.translation.spark.sink.SparkDataSourceWriter@5ec06e97. The input RDD has 2 partitions.
22/10/31 16:29:04 INFO SparkContext: Starting job: save at SinkExecuteProcessor.java:88
22/10/31 16:29:04 INFO DAGScheduler: Got job 0 (save at SinkExecuteProcessor.java:88) with 2 output partitions
22/10/31 16:29:04 INFO DAGScheduler: Final stage: ResultStage 0 (save at SinkExecuteProcessor.java:88)
22/10/31 16:29:04 INFO DAGScheduler: Parents of final stage: List()
22/10/31 16:29:04 INFO DAGScheduler: Missing parents: List()
22/10/31 16:29:04 INFO DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[5] at save at SinkExecuteProcessor.java:88), which has no missing parents
22/10/31 16:29:04 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 8.1 KB, free 2.2 GB)
22/10/31 16:29:04 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 4.4 KB, free 2.2 GB)
22/10/31 16:29:04 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on ***:54427 (size: 4.4 KB, free: 2.2 GB)
22/10/31 16:29:04 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1161
22/10/31 16:29:04 INFO DAGScheduler: Submitting 2 missing tasks from ResultStage 0 (MapPartitionsRDD[5] at save at SinkExecuteProcessor.java:88) (first 15 tasks are for partitions Vector(0, 1))
22/10/31 16:29:04 INFO TaskSchedulerImpl: Adding task set 0.0 with 2 tasks
22/10/31 16:29:04 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, executor driver, partition 0, PROCESS_LOCAL, 10516 bytes)
22/10/31 16:29:04 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, executor driver, partition 1, PROCESS_LOCAL, 10521 bytes)
22/10/31 16:29:04 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
22/10/31 16:29:04 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
create parallel source
create parallel source
create Enumerator
create Enumerator
22/10/31 16:29:04 INFO FakeSourceSplitEnumerator: Starting to calculate splits.
22/10/31 16:29:04 INFO FakeSourceSplitEnumerator: Starting to calculate splits.
22/10/31 16:29:04 INFO FakeSourceReader: wait split!
22/10/31 16:29:04 INFO FakeSourceReader: wait split!
22/10/31 16:29:04 INFO FakeSourceSplitEnumerator: Calculated splits successfully, the size of splits is 4.
22/10/31 16:29:04 INFO FakeSourceSplitEnumerator: Assigning splits to readers [FakeSourceSplit(splitId=3, rowNum=10), FakeSourceSplit(splitId=1, rowNum=10)]
22/10/31 16:29:04 INFO FakeSourceSplitEnumerator: Calculated splits successfully, the size of splits is 4.
22/10/31 16:29:04 INFO FakeSourceSplitEnumerator: Assigning splits to readers [FakeSourceSplit(splitId=0, rowNum=10), FakeSourceSplit(splitId=2, rowNum=10)]
22/10/31 16:29:05 INFO FakeSourceReader: 10 rows of data have been generated in split(0). Generation time: 1667204945976
22/10/31 16:29:05 INFO FakeSourceReader: 10 rows of data have been generated in split(3). Generation time: 1667204945973
22/10/31 16:29:06 INFO FakeSourceReader: 10 rows of data have been generated in split(2). Generation time: 1667204946994
22/10/31 16:29:06 INFO FakeSourceReader: Closed the bounded fake source
22/10/31 16:29:06 INFO FakeSourceReader: 10 rows of data have been generated in split(1). Generation time: 1667204946995
22/10/31 16:29:06 INFO FakeSourceReader: Closed the bounded fake source
22/10/31 16:29:07 INFO DataWritingSparkTask: Commit authorized for partition 1 (task 1, attempt 0stage 0.0)
22/10/31 16:29:07 INFO DataWritingSparkTask: Commit authorized for partition 0 (task 0, attempt 0stage 0.0)
22/10/31 16:29:07 INFO ClickhouseFileSinkWriter: Generate clickhouse local file command: /Users/<my_user>/my_code/clickhouse_dev/clickhouse_tutorial/ck_local/clickhouse local --file /tmp/clickhouse-local/seatunnel-file/c51fadc5_2/local_data.log -S "c_string String,c_int Int32" -N "temp_tablec51fadc5_2" -q "CREATE TABLE test_clickhouse_table (c_string String, c_int Int32) ENGINE = MergeTree PRIMARY KEY (c_string, c_int) ORDER BY (c_string, c_int) SETTINGS index_granularity = 8192; INSERT INTO TABLE test_clickhouse_table SELECT c_string,c_int FROM temp_tablec51fadc5_2;" --path "/tmp/clickhouse-local/seatunnel-file/c51fadc5_2"
22/10/31 16:29:07 INFO ClickhouseFileSinkWriter: Generate clickhouse local file command: /Users/<my_user>/my_code/clickhouse_dev/clickhouse_tutorial/ck_local/clickhouse local --file /tmp/clickhouse-local/seatunnel-file/35ba1038_5/local_data.log -S "c_string String,c_int Int32" -N "temp_table35ba1038_5" -q "CREATE TABLE test_clickhouse_table (c_string String, c_int Int32) ENGINE = MergeTree PRIMARY KEY (c_string, c_int) ORDER BY (c_string, c_int) SETTINGS index_granularity = 8192; INSERT INTO TABLE test_clickhouse_table SELECT c_string,c_int FROM temp_table35ba1038_5;" --path "/tmp/clickhouse-local/seatunnel-file/35ba1038_5"
try attach file to remote server, <my_user>:**@localhost:22 
try attach file to remote server, <my_user>:**@localhost:22 
22/10/31 16:29:07 INFO DefaultIoServiceFactoryFactory: No detected/configured IoServiceFactoryFactory using Nio2ServiceFactoryFactory
22/10/31 16:29:07 WARN AcceptAllServerKeyVerifier: Server at localhost/127.0.0.1:22 presented unverified EC key: SHA256:GUzMr0YoT3SJn/eqEoHy+iHNGRfC6HYkP1BKkDE/RlI
22/10/31 16:29:07 WARN AcceptAllServerKeyVerifier: Server at localhost/127.0.0.1:22 presented unverified EC key: SHA256:GUzMr0YoT3SJn/eqEoHy+iHNGRfC6HYkP1BKkDE/RlI
scp send from /tmp/clickhouse-local/seatunnel-file/35ba1038_5/data/_local/test_clickhouse_table/all_1_1_0 to /Users/<my_user>/my_code/clickhouse_dev/clickhouse_tutorial/ck_bin/new_dir/store/9db/9dbf86b2-f1aa-450f-bece-6f7ee4aa28e8/detached/
scp send from /tmp/clickhouse-local/seatunnel-file/c51fadc5_2/data/_local/test_clickhouse_table/all_1_1_0 to /Users/<my_user>/my_code/clickhouse_dev/clickhouse_tutorial/ck_bin/new_dir/store/9db/9dbf86b2-f1aa-450f-bece-6f7ee4aa28e8/detached/
22/10/31 16:29:07 WARN ClientConnectionService: globalRequest(ClientConnectionService[ClientSessionImpl[<my_user>@localhost/127.0.0.1:22]])[hostkeys-00@openssh.com, want-reply=false] failed (SshException) to process: EdDSA provider not supported
22/10/31 16:29:07 WARN ClientConnectionService: globalRequest(ClientConnectionService[ClientSessionImpl[<my_user>@localhost/127.0.0.1:22]])[hostkeys-00@openssh.com, want-reply=false] failed (SshException) to process: EdDSA provider not supported
22/10/31 16:29:07 INFO ScpFileTransfer: execute remote command: ls -l /Users/<my_user>/my_code/clickhouse_dev/clickhouse_tutorial/ck_bin/new_dir/store/9db/9dbf86b2-f1aa-450f-bece-6f7ee4aa28e8/ | tail -n 1 | awk '{print $3}' | xargs -t -i chown -R {}:{} /Users/<my_user>/my_code/clickhouse_dev/clickhouse_tutorial/ck_bin/new_dir/store/9db/9dbf86b2-f1aa-450f-bece-6f7ee4aa28e8/detached/
22/10/31 16:29:07 INFO ScpFileTransfer: execute remote command: ls -l /Users/<my_user>/my_code/clickhouse_dev/clickhouse_tutorial/ck_bin/new_dir/store/9db/9dbf86b2-f1aa-450f-bece-6f7ee4aa28e8/ | tail -n 1 | awk '{print $3}' | xargs -t -i chown -R {}:{} /Users/<my_user>/my_code/clickhouse_dev/clickhouse_tutorial/ck_bin/new_dir/store/9db/9dbf86b2-f1aa-450f-bece-6f7ee4aa28e8/detached/
22/10/31 16:29:07 ERROR Utils: Aborting task
java.lang.RuntimeException: Flush data into clickhouse file error
    at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file.ClickhouseFileSinkWriter.flush(ClickhouseFileSinkWriter.java:135)
    at java.base/java.util.HashMap.forEach(HashMap.java:1337)
    at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file.ClickhouseFileSinkWriter.close(ClickhouseFileSinkWriter.java:122)
    at org.apache.seatunnel.translation.spark.sink.SparkDataWriter.commit(SparkDataWriter.java:81)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:127)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
    at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
    at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: com.clickhouse.client.ClickHouseException: Code: 521. DB::ErrnoException: Paths cannot be exchanged because /Users/<my_user>/my_code/clickhouse_dev/clickhouse_tutorial/ck_bin/new_dir/store/9db/9dbf86b2-f1aa-450f-bece-6f7ee4aa28e8/detached/all_1_1_0 or /Users/<my_user>/my_code/clickhouse_dev/clickhouse_tutorial/ck_bin/new_dir/store/9db/9dbf86b2-f1aa-450f-bece-6f7ee4aa28e8/detached/attaching_all_1_1_0 does not exist, errno: 2, strerror: No such file or directory. (ATOMIC_RENAME_FAIL) (version 22.7.1.2246 (official build))
, server ClickHouseNode(addr=http:localhost:8123, db=default)@-1345290604
    at com.clickhouse.client.ClickHouseException.of(ClickHouseException.java:113)
    at com.clickhouse.client.http.ClickHouseHttpClient.lambda$execute$0(ClickHouseHttpClient.java:109)
    at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
    ... 3 more
Caused by: java.io.IOException: Code: 521. DB::ErrnoException: Paths cannot be exchanged because /Users/<my_user>/my_code/clickhouse_dev/clickhouse_tutorial/ck_bin/new_dir/store/9db/9dbf86b2-f1aa-450f-bece-6f7ee4aa28e8/detached/all_1_1_0 or /Users/<my_user>/my_code/clickhouse_dev/clickhouse_tutorial/ck_bin/new_dir/store/9db/9dbf86b2-f1aa-450f-bece-6f7ee4aa28e8/detached/attaching_all_1_1_0 does not exist, errno: 2, strerror: No such file or directory. (ATOMIC_RENAME_FAIL) (version 22.7.1.2246 (official build))

    at com.clickhouse.client.http.HttpUrlConnectionImpl.checkResponse(HttpUrlConnectionImpl.java:160)
    at com.clickhouse.client.http.HttpUrlConnectionImpl.post(HttpUrlConnectionImpl.java:251)
    at com.clickhouse.client.http.ClickHouseHttpClient.postRequest(ClickHouseHttpClient.java:88)
    at com.clickhouse.client.http.ClickHouseHttpClient.lambda$execute$0(ClickHouseHttpClient.java:107)
    ... 4 more
22/10/31 16:29:07 ERROR DataWritingSparkTask: Aborting commit for partition 1 (task 1, attempt 0stage 0.0)
22/10/31 16:29:07 ERROR DataWritingSparkTask: Aborted commit for partition 1 (task 1, attempt 0stage 0.0)
22/10/31 16:29:07 ERROR Executor: Exception in task 1.0 in stage 0.0 (TID 1)
java.lang.RuntimeException: Flush data into clickhouse file error
    at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file.ClickhouseFileSinkWriter.flush(ClickhouseFileSinkWriter.java:135)
    at java.base/java.util.HashMap.forEach(HashMap.java:1337)
    at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file.ClickhouseFileSinkWriter.close(ClickhouseFileSinkWriter.java:122)
    at org.apache.seatunnel.translation.spark.sink.SparkDataWriter.commit(SparkDataWriter.java:81)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:127)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
    at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
    at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: com.clickhouse.client.ClickHouseException: Code: 521. DB::ErrnoException: Paths cannot be exchanged because /Users/<my_user>/my_code/clickhouse_dev/clickhouse_tutorial/ck_bin/new_dir/store/9db/9dbf86b2-f1aa-450f-bece-6f7ee4aa28e8/detached/all_1_1_0 or /Users/<my_user>/my_code/clickhouse_dev/clickhouse_tutorial/ck_bin/new_dir/store/9db/9dbf86b2-f1aa-450f-bece-6f7ee4aa28e8/detached/attaching_all_1_1_0 does not exist, errno: 2, strerror: No such file or directory. (ATOMIC_RENAME_FAIL) (version 22.7.1.2246 (official build))
, server ClickHouseNode(addr=http:localhost:8123, db=default)@-1345290604
    at com.clickhouse.client.ClickHouseException.of(ClickHouseException.java:113)
    at com.clickhouse.client.http.ClickHouseHttpClient.lambda$execute$0(ClickHouseHttpClient.java:109)
    at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
    ... 3 more
Caused by: java.io.IOException: Code: 521. DB::ErrnoException: Paths cannot be exchanged because /Users/<my_user>/my_code/clickhouse_dev/clickhouse_tutorial/ck_bin/new_dir/store/9db/9dbf86b2-f1aa-450f-bece-6f7ee4aa28e8/detached/all_1_1_0 or /Users/<my_user>/my_code/clickhouse_dev/clickhouse_tutorial/ck_bin/new_dir/store/9db/9dbf86b2-f1aa-450f-bece-6f7ee4aa28e8/detached/attaching_all_1_1_0 does not exist, errno: 2, strerror: No such file or directory. (ATOMIC_RENAME_FAIL) (version 22.7.1.2246 (official build))

    at com.clickhouse.client.http.HttpUrlConnectionImpl.checkResponse(HttpUrlConnectionImpl.java:160)
    at com.clickhouse.client.http.HttpUrlConnectionImpl.post(HttpUrlConnectionImpl.java:251)
    at com.clickhouse.client.http.ClickHouseHttpClient.postRequest(ClickHouseHttpClient.java:88)
    at com.clickhouse.client.http.ClickHouseHttpClient.lambda$execute$0(ClickHouseHttpClient.java:107)
    ... 4 more
22/10/31 16:29:07 INFO DataWritingSparkTask: Committed partition 0 (task 0, attempt 0stage 0.0)
22/10/31 16:29:07 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 1141 bytes result sent to driver
22/10/31 16:29:07 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 2967 ms on localhost (executor driver) (1/2)
22/10/31 16:29:07 WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, localhost, executor driver): java.lang.RuntimeException: Flush data into clickhouse file error
    at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file.ClickhouseFileSinkWriter.flush(ClickhouseFileSinkWriter.java:135)
    at java.base/java.util.HashMap.forEach(HashMap.java:1337)
    at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file.ClickhouseFileSinkWriter.close(ClickhouseFileSinkWriter.java:122)
    at org.apache.seatunnel.translation.spark.sink.SparkDataWriter.commit(SparkDataWriter.java:81)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:127)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
    at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
    at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: com.clickhouse.client.ClickHouseException: Code: 521. DB::ErrnoException: Paths cannot be exchanged because /Users/<my_user>/my_code/clickhouse_dev/clickhouse_tutorial/ck_bin/new_dir/store/9db/9dbf86b2-f1aa-450f-bece-6f7ee4aa28e8/detached/all_1_1_0 or /Users/<my_user>/my_code/clickhouse_dev/clickhouse_tutorial/ck_bin/new_dir/store/9db/9dbf86b2-f1aa-450f-bece-6f7ee4aa28e8/detached/attaching_all_1_1_0 does not exist, errno: 2, strerror: No such file or directory. (ATOMIC_RENAME_FAIL) (version 22.7.1.2246 (official build))
, server ClickHouseNode(addr=http:localhost:8123, db=default)@-1345290604
    at com.clickhouse.client.ClickHouseException.of(ClickHouseException.java:113)
    at com.clickhouse.client.http.ClickHouseHttpClient.lambda$execute$0(ClickHouseHttpClient.java:109)
    at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
    ... 3 more
Caused by: java.io.IOException: Code: 521. DB::ErrnoException: Paths cannot be exchanged because /Users/<my_user>/my_code/clickhouse_dev/clickhouse_tutorial/ck_bin/new_dir/store/9db/9dbf86b2-f1aa-450f-bece-6f7ee4aa28e8/detached/all_1_1_0 or /Users/<my_user>/my_code/clickhouse_dev/clickhouse_tutorial/ck_bin/new_dir/store/9db/9dbf86b2-f1aa-450f-bece-6f7ee4aa28e8/detached/attaching_all_1_1_0 does not exist, errno: 2, strerror: No such file or directory. (ATOMIC_RENAME_FAIL) (version 22.7.1.2246 (official build))

    at com.clickhouse.client.http.HttpUrlConnectionImpl.checkResponse(HttpUrlConnectionImpl.java:160)
    at com.clickhouse.client.http.HttpUrlConnectionImpl.post(HttpUrlConnectionImpl.java:251)
    at com.clickhouse.client.http.ClickHouseHttpClient.postRequest(ClickHouseHttpClient.java:88)
    at com.clickhouse.client.http.ClickHouseHttpClient.lambda$execute$0(ClickHouseHttpClient.java:107)
    ... 4 more

22/10/31 16:29:07 ERROR TaskSetManager: Task 1 in stage 0.0 failed 1 times; aborting job
22/10/31 16:29:07 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
22/10/31 16:29:07 INFO TaskSchedulerImpl: Cancelling stage 0
22/10/31 16:29:07 INFO TaskSchedulerImpl: Killing all running tasks in stage 0: Stage cancelled
22/10/31 16:29:07 INFO DAGScheduler: ResultStage 0 (save at SinkExecuteProcessor.java:88) failed in 3.064 s due to Job aborted due to stage failure: Task 1 in stage 0.0 failed 1 times, most recent failure: Lost task 1.0 in stage 0.0 (TID 1, localhost, executor driver): java.lang.RuntimeException: Flush data into clickhouse file error
    at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file.ClickhouseFileSinkWriter.flush(ClickhouseFileSinkWriter.java:135)
    at java.base/java.util.HashMap.forEach(HashMap.java:1337)
    at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file.ClickhouseFileSinkWriter.close(ClickhouseFileSinkWriter.java:122)
    at org.apache.seatunnel.translation.spark.sink.SparkDataWriter.commit(SparkDataWriter.java:81)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:127)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
    at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
    at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: com.clickhouse.client.ClickHouseException: Code: 521. DB::ErrnoException: Paths cannot be exchanged because /Users/<my_user>/my_code/clickhouse_dev/clickhouse_tutorial/ck_bin/new_dir/store/9db/9dbf86b2-f1aa-450f-bece-6f7ee4aa28e8/detached/all_1_1_0 or /Users/<my_user>/my_code/clickhouse_dev/clickhouse_tutorial/ck_bin/new_dir/store/9db/9dbf86b2-f1aa-450f-bece-6f7ee4aa28e8/detached/attaching_all_1_1_0 does not exist, errno: 2, strerror: No such file or directory. (ATOMIC_RENAME_FAIL) (version 22.7.1.2246 (official build))
, server ClickHouseNode(addr=http:localhost:8123, db=default)@-1345290604
    at com.clickhouse.client.ClickHouseException.of(ClickHouseException.java:113)
    at com.clickhouse.client.http.ClickHouseHttpClient.lambda$execute$0(ClickHouseHttpClient.java:109)
    at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
    ... 3 more
Caused by: java.io.IOException: Code: 521. DB::ErrnoException: Paths cannot be exchanged because /Users/<my_user>/my_code/clickhouse_dev/clickhouse_tutorial/ck_bin/new_dir/store/9db/9dbf86b2-f1aa-450f-bece-6f7ee4aa28e8/detached/all_1_1_0 or /Users/<my_user>/my_code/clickhouse_dev/clickhouse_tutorial/ck_bin/new_dir/store/9db/9dbf86b2-f1aa-450f-bece-6f7ee4aa28e8/detached/attaching_all_1_1_0 does not exist, errno: 2, strerror: No such file or directory. (ATOMIC_RENAME_FAIL) (version 22.7.1.2246 (official build))

    at com.clickhouse.client.http.HttpUrlConnectionImpl.checkResponse(HttpUrlConnectionImpl.java:160)
    at com.clickhouse.client.http.HttpUrlConnectionImpl.post(HttpUrlConnectionImpl.java:251)
    at com.clickhouse.client.http.ClickHouseHttpClient.postRequest(ClickHouseHttpClient.java:88)
    at com.clickhouse.client.http.ClickHouseHttpClient.lambda$execute$0(ClickHouseHttpClient.java:107)
    ... 4 more

Driver stacktrace:
22/10/31 16:29:07 INFO DAGScheduler: Job 0 failed: save at SinkExecuteProcessor.java:88, took 3.089511 s
22/10/31 16:29:07 ERROR WriteToDataSourceV2Exec: Data source writer org.apache.seatunnel.translation.spark.sink.SparkDataSourceWriter@5ec06e97 is aborting.
22/10/31 16:29:07 ERROR WriteToDataSourceV2Exec: Data source writer org.apache.seatunnel.translation.spark.sink.SparkDataSourceWriter@5ec06e97 aborted.
22/10/31 16:29:07 ERROR SparkApiTaskExecuteCommand: Run SeaTunnel on spark failed.
org.apache.spark.SparkException: Writing job aborted.
    at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:92)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
    at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668)
    at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:259)
    at org.apache.seatunnel.core.starter.spark.execution.SinkExecuteProcessor.execute(SinkExecuteProcessor.java:88)
    at org.apache.seatunnel.core.starter.spark.execution.SparkExecution.execute(SparkExecution.java:56)
    at org.apache.seatunnel.core.starter.spark.command.SparkApiTaskExecuteCommand.execute(SparkApiTaskExecuteCommand.java:52)
    at org.apache.seatunnel.core.starter.Seatunnel.run(Seatunnel.java:39)
    at org.apache.seatunnel.example.spark.v2.ExampleUtils.builder(ExampleUtils.java:43)
    at org.apache.seatunnel.example.spark.v2.SeaTunnelApiExample.main(SeaTunnelApiExample.java:32)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1 in stage 0.0 failed 1 times, most recent failure: Lost task 1.0 in stage 0.0 (TID 1, localhost, executor driver): java.lang.RuntimeException: Flush data into clickhouse file error
    at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file.ClickhouseFileSinkWriter.flush(ClickhouseFileSinkWriter.java:135)
    at java.base/java.util.HashMap.forEach(HashMap.java:1337)
    at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file.ClickhouseFileSinkWriter.close(ClickhouseFileSinkWriter.java:122)
    at org.apache.seatunnel.translation.spark.sink.SparkDataWriter.commit(SparkDataWriter.java:81)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:127)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
    at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
    at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: com.clickhouse.client.ClickHouseException: Code: 521. DB::ErrnoException: Paths cannot be exchanged because /Users/<my_user>/my_code/clickhouse_dev/clickhouse_tutorial/ck_bin/new_dir/store/9db/9dbf86b2-f1aa-450f-bece-6f7ee4aa28e8/detached/all_1_1_0 or /Users/<my_user>/my_code/clickhouse_dev/clickhouse_tutorial/ck_bin/new_dir/store/9db/9dbf86b2-f1aa-450f-bece-6f7ee4aa28e8/detached/attaching_all_1_1_0 does not exist, errno: 2, strerror: No such file or directory. (ATOMIC_RENAME_FAIL) (version 22.7.1.2246 (official build))
, server ClickHouseNode(addr=http:localhost:8123, db=default)@-1345290604
    at com.clickhouse.client.ClickHouseException.of(ClickHouseException.java:113)
    at com.clickhouse.client.http.ClickHouseHttpClient.lambda$execute$0(ClickHouseHttpClient.java:109)
    at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
    ... 3 more
Caused by: java.io.IOException: Code: 521. DB::ErrnoException: Paths cannot be exchanged because /Users/<my_user>/my_code/clickhouse_dev/clickhouse_tutorial/ck_bin/new_dir/store/9db/9dbf86b2-f1aa-450f-bece-6f7ee4aa28e8/detached/all_1_1_0 or /Users/<my_user>/my_code/clickhouse_dev/clickhouse_tutorial/ck_bin/new_dir/store/9db/9dbf86b2-f1aa-450f-bece-6f7ee4aa28e8/detached/attaching_all_1_1_0 does not exist, errno: 2, strerror: No such file or directory. (ATOMIC_RENAME_FAIL) (version 22.7.1.2246 (official build))

    at com.clickhouse.client.http.HttpUrlConnectionImpl.checkResponse(HttpUrlConnectionImpl.java:160)
    at com.clickhouse.client.http.HttpUrlConnectionImpl.post(HttpUrlConnectionImpl.java:251)
    at com.clickhouse.client.http.ClickHouseHttpClient.postRequest(ClickHouseHttpClient.java:88)
    at com.clickhouse.client.http.ClickHouseHttpClient.lambda$execute$0(ClickHouseHttpClient.java:107)
    ... 4 more

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1887)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1875)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1874)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1874)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:926)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2108)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2057)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2046)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
    at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:64)
    ... 21 more
Caused by: java.lang.RuntimeException: Flush data into clickhouse file error
    at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file.ClickhouseFileSinkWriter.flush(ClickhouseFileSinkWriter.java:135)
    at java.base/java.util.HashMap.forEach(HashMap.java:1337)
    at org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file.ClickhouseFileSinkWriter.close(ClickhouseFileSinkWriter.java:122)
    at org.apache.seatunnel.translation.spark.sink.SparkDataWriter.commit(SparkDataWriter.java:81)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:127)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$$anonfun$run$3.apply(WriteToDataSourceV2Exec.scala:116)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
    at org.apache.spark.sql.execution.datasources.v2.DataWritingSparkTask$.run(WriteToDataSourceV2Exec.scala:146)
    at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:67)
    at org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec$$anonfun$doExecute$2.apply(WriteToDataSourceV2Exec.scala:66)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:121)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: com.clickhouse.client.ClickHouseException: Code: 521. DB::ErrnoException: Paths cannot be exchanged because /Users/<my_user>/my_code/clickhouse_dev/clickhouse_tutorial/ck_bin/new_dir/store/9db/9dbf86b2-f1aa-450f-bece-6f7ee4aa28e8/detached/all_1_1_0 or /Users/<my_user>/my_code/clickhouse_dev/clickhouse_tutorial/ck_bin/new_dir/store/9db/9dbf86b2-f1aa-450f-bece-6f7ee4aa28e8/detached/attaching_all_1_1_0 does not exist, errno: 2, strerror: No such file or directory. (ATOMIC_RENAME_FAIL) (version 22.7.1.2246 (official build))
, server ClickHouseNode(addr=http:localhost:8123, db=default)@-1345290604
    at com.clickhouse.client.ClickHouseException.of(ClickHouseException.java:113)
    at com.clickhouse.client.http.ClickHouseHttpClient.lambda$execute$0(ClickHouseHttpClient.java:109)
    at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
    ... 3 more
Caused by: java.io.IOException: Code: 521. DB::ErrnoException: Paths cannot be exchanged because /Users/<my_user>/my_code/clickhouse_dev/clickhouse_tutorial/ck_bin/new_dir/store/9db/9dbf86b2-f1aa-450f-bece-6f7ee4aa28e8/detached/all_1_1_0 or /Users/<my_user>/my_code/clickhouse_dev/clickhouse_tutorial/ck_bin/new_dir/store/9db/9dbf86b2-f1aa-450f-bece-6f7ee4aa28e8/detached/attaching_all_1_1_0 does not exist, errno: 2, strerror: No such file or directory. (ATOMIC_RENAME_FAIL) (version 22.7.1.2246 (official build))

    at com.clickhouse.client.http.HttpUrlConnectionImpl.checkResponse(HttpUrlConnectionImpl.java:160)
    at com.clickhouse.client.http.HttpUrlConnectionImpl.post(HttpUrlConnectionImpl.java:251)
    at com.clickhouse.client.http.ClickHouseHttpClient.postRequest(ClickHouseHttpClient.java:88)
    at com.clickhouse.client.http.ClickHouseHttpClient.lambda$execute$0(ClickHouseHttpClient.java:107)
    ... 4 more
22/10/31 16:29:07 ERROR Seatunnel: 

===============================================================================

22/10/31 16:29:07 ERROR Seatunnel: Fatal Error, 

22/10/31 16:29:07 ERROR Seatunnel: Please submit bug report in https://github.com/apache/incubator-seatunnel/issues

22/10/31 16:29:07 ERROR Seatunnel: Reason:Writing job aborted. 

22/10/31 16:29:07 ERROR Seatunnel: Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException: Writing job aborted.
    at org.apache.seatunnel.core.starter.spark.command.SparkApiTaskExecuteCommand.execute(SparkApiTaskExecuteCommand.java:55)
    at org.apache.seatunnel.core.starter.Seatunnel.run(Seatunnel.java:39)
    at org.apache.seatunnel.example.spark.v2.ExampleUtils.builder(ExampleUtils.java:43)
    at org.apache.seatunnel.example.spark.v2.SeaTunnelApiExample.main(SeaTunnelApiExample.java:32)

22/10/31 16:29:07 ERROR Seatunnel: 
===============================================================================

Exception in thread "main" org.apache.seatunnel.core.starter.exception.CommandExecuteException: Writing job aborted.
    at org.apache.seatunnel.core.starter.spark.command.SparkApiTaskExecuteCommand.execute(SparkApiTaskExecuteCommand.java:55)
    at org.apache.seatunnel.core.starter.Seatunnel.run(Seatunnel.java:39)
    at org.apache.seatunnel.example.spark.v2.ExampleUtils.builder(ExampleUtils.java:43)
    at org.apache.seatunnel.example.spark.v2.SeaTunnelApiExample.main(SeaTunnelApiExample.java:32)
22/10/31 16:29:07 INFO SparkContext: Invoking stop() from shutdown hook
22/10/31 16:29:07 INFO AbstractConnector: Stopped Spark@414f2431{HTTP/1.1,[http/1.1]}{0.0.0.0:4040}
22/10/31 16:29:07 INFO SparkUI: Stopped Spark web UI at http://***:4040
22/10/31 16:29:07 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
22/10/31 16:29:07 INFO MemoryStore: MemoryStore cleared
22/10/31 16:29:07 INFO BlockManager: BlockManager stopped
22/10/31 16:29:07 INFO BlockManagerMaster: BlockManagerMaster stopped
22/10/31 16:29:07 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
22/10/31 16:29:07 INFO SparkContext: Successfully stopped SparkContext
22/10/31 16:29:07 INFO ShutdownHookManager: Shutdown hook called
22/10/31 16:29:07 INFO ShutdownHookManager: Deleting directory /private/var/folders/r0/nmtq7_pd2jz0hysd2t5t6tm00000gn/T/spark-9f74b145-5a5a-47da-b947-d2250523384b
Process finished with exit code 1

Flink or Spark Version

No response

Java or Scala Version

No response

Screenshots

No response

Are you willing to submit PR?

Code of Conduct

Hisoka-X commented 1 year ago

I add random number on directory to fix it.

Z1Wu commented 1 year ago

Maybe we don't need to generate random number, I currently use a task Id get from SinkWriter.Context as a work-around.

this.taskIndex = context.getIndexOfSubtask();
hailin0 commented 1 year ago

Maybe we don't need to generate random number, I currently use a task Id get from SinkWriter.Context as a work-around.

this.taskIndex = context.getIndexOfSubtask();

@Hisoka-X

Hisoka-X commented 1 year ago

Maybe we don't need to generate random number, I currently use a task Id get from SinkWriter.Context as a work-around.

this.taskIndex = context.getIndexOfSubtask();

This is a good way, I will change the code. Please help me review @Z1Wu