apache / seatunnel

SeaTunnel is a next-generation super high-performance, distributed, massive data integration tool.
https://seatunnel.apache.org/
Apache License 2.0
7.94k stars 1.8k forks source link

[Bug] [sink-sft] 使用sftp作为sink时,rename文件失败 #5318

Open fuckingcodingisshit opened 1 year ago

fuckingcodingisshit commented 1 year ago

Search before asking

What happened

使用sftp作为sink数据源,上游source数据已写入临时文件,但在rename文件时报错,以及我的作业脚本配置是否正确

SeaTunnel Version

2.3.2

SeaTunnel Config

"transform" : [
        {
            "query" : "SELECT name AS name FROM source_temp_table",
            "result_table_name" : "sink_temp_table",
            "source_table_name" : "source_temp_table",
            "plugin_name" : "Sql"
        }
    ],
    "sink" : [
        {
            "sink_columns" : [
                "name"
            ],
            "host" : "10.XX.XX.226",
            "port" : 22,
            "user" : "axxx",
            "password" : "toxxxx023",
            "path" : "/data01/sync_test",
            "file_format_type" : "text",
            "row_delimiter" : "\n",
            "field_delimiter" : ",",
            "skip_header_row_number" : 1,
            "source_table_name" : "sink_temp_table",
            "plugin_name" : "SftpFile",
            "type" : "text"
        }
    ],
    "source" : [
        {
            "driver" : "com.aliyun.odps.jdbc.OdpsDriver",
            "query" : "SELECT * FROM sync_sink_test",
            "url" : "jdbc:odps:http://service.cn-hangzhou.maxcompute.aliyun.com/api?project=dm_v2",
            "plugin_name" : "Jdbc",
            "user" : "LTAI5g39PcdZUosH8iS9",
            "password" : "c6eniXBeSQVK9Rh95eXuUBmvLf",
            "connection_check_timeout_sec" : 100,
            "result_table_name" : "source_temp_table",
            "type" : "odps"
        }
    ],
    "env" : {
        "job.name" : "b94e94275b994304857f9b94e6b6ad19"
    }
}

Running Command

"transform" : [
        {
            "query" : "SELECT name AS name FROM source_temp_table",
            "result_table_name" : "sink_temp_table",
            "source_table_name" : "source_temp_table",
            "plugin_name" : "Sql"
        }
    ],
    "sink" : [
        {
            "sink_columns" : [
                "name"
            ],
            "host" : "10.XX.XX.226",
            "port" : 22,
            "user" : "axxx",
            "password" : "toxxxx023",
            "path" : "/data01/sync_test",
            "file_format_type" : "text",
            "row_delimiter" : "\n",
            "field_delimiter" : ",",
            "skip_header_row_number" : 1,
            "source_table_name" : "sink_temp_table",
            "plugin_name" : "SftpFile",
            "type" : "text"
        }
    ],
    "source" : [
        {
            "driver" : "com.aliyun.odps.jdbc.OdpsDriver",
            "query" : "SELECT * FROM sync_sink_test",
            "url" : "jdbc:odps:http://service.cn-hangzhou.maxcompute.aliyun.com/api?project=dm_v2",
            "plugin_name" : "Jdbc",
            "user" : "LTAI5g39PcdZUosH8iS9",
            "password" : "c6eniXBeSQVK9Rh95eXuUBmvLf",
            "connection_check_timeout_sec" : 100,
            "result_table_name" : "source_temp_table",
            "type" : "odps"
        }
    ],
    "env" : {
        "job.name" : "b94e94275b994304857f9b94e6b6ad19"
    }
}

Error Exception

2023-08-16 15:36:54,872 ERROR org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter - commit aggregatedCommitInfo error, aggregatedCommitInfo = FileAggregatedCommitInfo(transactionMap={/tmp/seatunnel/seatunnel/744097110248390657/6473b95eda/T_744097110248390657_6473b95eda_0_1={/tmp/seatunnel/seatunnel/744097110248390657/6473b95eda/T_744097110248390657_6473b95eda_0_1/NON_PARTITION/T_744097110248390657_6473b95eda_0_1_0.txt=/data01/sync_test/T_744097110248390657_6473b95eda_0_1_0.txt}}, partitionDirAndValuesMap={}) 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException: ErrorCode:[COMMON-01], ErrorDescription:[File operation failed, such as (read,list,write,move,copy,sync) etc...] - rename file :[/tmp/seatunnel/seatunnel/744097110248390657/6473b95eda/T_744097110248390657_6473b95eda_0_1/NON_PARTITION/T_744097110248390657_6473b95eda_0_1_0.txt] to [/data01/sync_test/T_744097110248390657_6473b95eda_0_1_0.txt] error
    at org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils.renameFile(FileSystemUtils.java:170) ~[connector-file-sftp-2.3.1.jar:2.3.1]
    at org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter.lambda$commit$0(FileSinkAggregatedCommitter.java:52) ~[connector-file-sftp-2.3.1.jar:2.3.1]
    at java.util.ArrayList.forEach(ArrayList.java:1249) ~[?:1.8.0_60]
    at org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter.commit(FileSinkAggregatedCommitter.java:44) ~[connector-file-sftp-2.3.1.jar:2.3.1]

Zeta or Flink or Spark Version

spark2

Java or Scala Version

No response

Screenshots

image

Are you willing to submit PR?

Code of Conduct

lightzhao commented 1 year ago

Check if there is rename permission.

CheneyYin commented 1 year ago

I also encountered a similar problem.

version
Engine
Config
env {
  # You can set flink configuration here
  execution.parallelism = 1
  job.mode = "BATCH"

  # You can set spark configuration here
  spark.app.name = "SeaTunnel"
  spark.executor.instances = 1
  spark.executor.cores = 1
  spark.executor.memory = "1g"
  spark.master = local
}

source {
  FakeSource {
    result_table_name = "sftp"
    schema = {
      fields {
        c_map = "map<string, string>"
        c_array = "array<int>"
        c_string = string
        c_boolean = boolean
        c_tinyint = tinyint
        c_smallint = smallint
        c_int = int
        c_bigint = bigint
        c_float = float
        c_double = double
        c_bytes = bytes
        c_date = date
        c_decimal = "decimal(38, 18)"
        c_timestamp = timestamp
        c_row = {
          c_map = "map<string, string>"
          c_array = "array<int>"
          c_string = string
          c_boolean = boolean
          c_tinyint = tinyint
          c_smallint = smallint
          c_int = int
          c_bigint = bigint
          c_float = float
          c_double = double
          c_bytes = bytes
          c_date = date
          c_decimal = "decimal(38, 18)"
          c_timestamp = timestamp
        }
      }
    }
  }
}

sink {
  SftpFile {
    host = "localhost"
    port = 22
    user = cheney
    password = xxxxxx
    path = "/home/cheney/tmp/seatunnel/json"
    source_table_name = "sftp"
    row_delimiter = "\n"
    partition_dir_expression = "${k0}=${v0}"
    is_partition_field_write_in_file = true
    file_name_expression = "${transactionId}_${now}"
    file_format_type = "json"
    filename_time_format = "yyyy.MM.dd"
    is_enable_transaction = true
  }
}
Command
./bin/seatunnel.sh --config ./config/json/fake_to_sftp_file_json.conf -e local  
Error Exception
2023-08-18 16:04:46,600 INFO  org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - Job SeaTunnel_Job (744829028610867201), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-FakeSource-sftp]-SourceTask (1/1)] end with state FINISHED
2023-08-18 16:04:47,282 ERROR org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter - commit aggregatedCommitInfo error, aggregatedCommitInfo = FileAggregatedCommitInfo(transactionMap={/tmp/seatunnel/seatunnel/744829028610867201/db1540c7c1/T_744829028610867201_db1540c7c1_0_1={/tmp/seatunnel/seatunnel/744829028610867201/db1540c7c1/T_744829028610867201_db1540c7c1_0_1/NON_PARTITION/T_744829028610867201_db1540c7c1_0_1_2023.08.18_0.json=/home/cheney/tmp/seatunnel/json/T_744829028610867201_db1540c7c1_0_1_2023.08.18_0.json}}, partitionDirAndValuesMap={})
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException: ErrorCode:[COMMON-01], ErrorDescription:[File operation failed, such as (read,list,write,move,copy,sync) etc...] - rename file :[/tmp/seatunnel/seatunnel/744829028610867201/db1540c7c1/T_744829028610867201_db1540c7c1_0_1/NON_PARTITION/T_744829028610867201_db1540c7c1_0_1_2023.08.18_0.json] to [/home/cheney/tmp/seatunnel/json/T_744829028610867201_db1540c7c1_0_1_2023.08.18_0.json] error
    at org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils.renameFile(FileSystemUtils.java:170) ~[connector-file-sftp-2.3.2.jar:2.3.2]
    at org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter.lambda$commit$0(FileSinkAggregatedCommitter.java:52) ~[connector-file-sftp-2.3.2.jar:2.3.2]
    at java.util.ArrayList.forEach(ArrayList.java:1541) ~[?:?]
    at org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter.commit(FileSinkAggregatedCommitter.java:44) ~[connector-file-sftp-2.3.2.jar:2.3.2]
    at org.apache.seatunnel.engine.server.task.SinkAggregatedCommitterTask.notifyCheckpointComplete(SinkAggregatedCommitterTask.java:276) ~[seatunnel-starter.jar:2.3.2]
    at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.lambda$run$0(CheckpointFinishedOperation.java:91) ~[seatunnel-starter.jar:2.3.2]
    at org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:48) [seatunnel-starter.jar:2.3.2]

I am sure I have permission to rename directory.

❯  ll /home/cheney/tmp/ 
total 20K
40937109 4.0K drwxr-xr-x   3 cheney cheney 4.0K  8月17日 14:33 .
37748738  12K drwx------ 116 cheney cheney  12K  8月18日 15:59 ..
40937110 4.0K drwxr-xr-x   3 cheney cheney 4.0K  8月17日 14:33 seatunnel
❯ ssh cheney@localhost -C mv /tmp/seatunnel/seatunnel/744829028610867201/db1540c7c1/T_744829028610867201_db1540c7c1_0_1/NON_PARTITION/T_744829028610867201_db1540c7c1_0_1_2023.08.18_0.json /home/cheney/tmp/seatunnel/json/T_744829028610867201_db1540c7c1_0_1_2023.08.18_0.json
cheney@localhost's password:
❯ ls /home/cheney/tmp/seatunnel/json/T_744829028610867201_db1540c7c1_0_1_2023.08.18_0.json
/home/cheney/tmp/seatunnel/json/T_744829028610867201_db1540c7c1_0_1_2023.08.18_0.json 
github-actions[bot] commented 1 year ago

This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.

ruanwenjun commented 1 year ago

I also meet this problem, I am not clear if this is caused by rename command.

ruanwenjun commented 1 year ago

I also encountered a similar problem.

version
  • 2.3.2
Engine
  • zeta
Config
env {
  # You can set flink configuration here
  execution.parallelism = 1
  job.mode = "BATCH"

  # You can set spark configuration here
  spark.app.name = "SeaTunnel"
  spark.executor.instances = 1
  spark.executor.cores = 1
  spark.executor.memory = "1g"
  spark.master = local
}

source {
  FakeSource {
    result_table_name = "sftp"
    schema = {
      fields {
        c_map = "map<string, string>"
        c_array = "array<int>"
        c_string = string
        c_boolean = boolean
        c_tinyint = tinyint
        c_smallint = smallint
        c_int = int
        c_bigint = bigint
        c_float = float
        c_double = double
        c_bytes = bytes
        c_date = date
        c_decimal = "decimal(38, 18)"
        c_timestamp = timestamp
        c_row = {
          c_map = "map<string, string>"
          c_array = "array<int>"
          c_string = string
          c_boolean = boolean
          c_tinyint = tinyint
          c_smallint = smallint
          c_int = int
          c_bigint = bigint
          c_float = float
          c_double = double
          c_bytes = bytes
          c_date = date
          c_decimal = "decimal(38, 18)"
          c_timestamp = timestamp
        }
      }
    }
  }
}

sink {
  SftpFile {
    host = "localhost"
    port = 22
    user = cheney
    password = xxxxxx
    path = "/home/cheney/tmp/seatunnel/json"
    source_table_name = "sftp"
    row_delimiter = "\n"
    partition_dir_expression = "${k0}=${v0}"
    is_partition_field_write_in_file = true
    file_name_expression = "${transactionId}_${now}"
    file_format_type = "json"
    filename_time_format = "yyyy.MM.dd"
    is_enable_transaction = true
  }
}
Command
./bin/seatunnel.sh --config ./config/json/fake_to_sftp_file_json.conf -e local  
Error Exception
2023-08-18 16:04:46,600 INFO  org.apache.seatunnel.engine.server.dag.physical.PhysicalVertex - Job SeaTunnel_Job (744829028610867201), Pipeline: [(1/1)], task: [pipeline-1 [Source[0]-FakeSource-sftp]-SourceTask (1/1)] end with state FINISHED
2023-08-18 16:04:47,282 ERROR org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter - commit aggregatedCommitInfo error, aggregatedCommitInfo = FileAggregatedCommitInfo(transactionMap={/tmp/seatunnel/seatunnel/744829028610867201/db1540c7c1/T_744829028610867201_db1540c7c1_0_1={/tmp/seatunnel/seatunnel/744829028610867201/db1540c7c1/T_744829028610867201_db1540c7c1_0_1/NON_PARTITION/T_744829028610867201_db1540c7c1_0_1_2023.08.18_0.json=/home/cheney/tmp/seatunnel/json/T_744829028610867201_db1540c7c1_0_1_2023.08.18_0.json}}, partitionDirAndValuesMap={})
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException: ErrorCode:[COMMON-01], ErrorDescription:[File operation failed, such as (read,list,write,move,copy,sync) etc...] - rename file :[/tmp/seatunnel/seatunnel/744829028610867201/db1540c7c1/T_744829028610867201_db1540c7c1_0_1/NON_PARTITION/T_744829028610867201_db1540c7c1_0_1_2023.08.18_0.json] to [/home/cheney/tmp/seatunnel/json/T_744829028610867201_db1540c7c1_0_1_2023.08.18_0.json] error
  at org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils.renameFile(FileSystemUtils.java:170) ~[connector-file-sftp-2.3.2.jar:2.3.2]
  at org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter.lambda$commit$0(FileSinkAggregatedCommitter.java:52) ~[connector-file-sftp-2.3.2.jar:2.3.2]
  at java.util.ArrayList.forEach(ArrayList.java:1541) ~[?:?]
  at org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter.commit(FileSinkAggregatedCommitter.java:44) ~[connector-file-sftp-2.3.2.jar:2.3.2]
  at org.apache.seatunnel.engine.server.task.SinkAggregatedCommitterTask.notifyCheckpointComplete(SinkAggregatedCommitterTask.java:276) ~[seatunnel-starter.jar:2.3.2]
  at org.apache.seatunnel.engine.server.checkpoint.operation.CheckpointFinishedOperation.lambda$run$0(CheckpointFinishedOperation.java:91) ~[seatunnel-starter.jar:2.3.2]
  at org.apache.seatunnel.common.utils.RetryUtils.retryWithException(RetryUtils.java:48) [seatunnel-starter.jar:2.3.2]

I am sure I have permission to rename directory.

❯  ll /home/cheney/tmp/ 
total 20K
40937109 4.0K drwxr-xr-x   3 cheney cheney 4.0K  8月17日 14:33 .
37748738  12K drwx------ 116 cheney cheney  12K  8月18日 15:59 ..
40937110 4.0K drwxr-xr-x   3 cheney cheney 4.0K  8月17日 14:33 seatunnel
❯ ssh cheney@localhost -C mv /tmp/seatunnel/seatunnel/744829028610867201/db1540c7c1/T_744829028610867201_db1540c7c1_0_1/NON_PARTITION/T_744829028610867201_db1540c7c1_0_1_2023.08.18_0.json /home/cheney/tmp/seatunnel/json/T_744829028610867201_db1540c7c1_0_1_2023.08.18_0.json
cheney@localhost's password:
❯ ls /home/cheney/tmp/seatunnel/json/T_744829028610867201_db1540c7c1_0_1_2023.08.18_0.json
/home/cheney/tmp/seatunnel/json/T_744829028610867201_db1540c7c1_0_1_2023.08.18_0.json 

It seems you use mv command rather than rename