DTStack / chunjun

A data integration framework
https://dtstack.github.io/chunjun/
Apache License 2.0
3.98k stars 1.69k forks source link

[Feature] [ftp] 写入时增加支持压缩、写入模式以及修复获取sftp config bug #1842

Closed libailin closed 10 months ago

libailin commented 11 months ago

Search before asking

What happened

1、在windows 10系统 idea local下运行sql模式读取开启kerberos的hdfs时,报错

sql模式下从sftp加载Kerberos相关配置报错The config of sftp is null

2023-11-07 17:36:33,256 ERROR com.dtstack.chunjun.util.RetryUtil - Exception when calling callable, 异常Msg:java.lang.IllegalArgumentException: The config of sftp is null
    at com.dtstack.chunjun.security.SftpHandler.checkConfig(SftpHandler.java:104)
    at com.dtstack.chunjun.security.SftpHandler.getInstance(SftpHandler.java:68)
    at com.dtstack.chunjun.security.SftpHandler.lambda$getInstanceWithRetry$0(SftpHandler.java:61)
    at com.dtstack.chunjun.util.RetryUtil$Retry.call(RetryUtil.java:129)
    at com.dtstack.chunjun.util.RetryUtil$Retry.doRetry(RetryUtil.java:71)
    at com.dtstack.chunjun.util.RetryUtil.executeWithRetry(RetryUtil.java:47)
    at com.dtstack.chunjun.security.SftpHandler.getInstanceWithRetry(SftpHandler.java:61)
    at com.dtstack.chunjun.security.KerberosUtil.loadFromSftp(KerberosUtil.java:282)
    at com.dtstack.chunjun.security.KerberosUtil.loadFile(KerberosUtil.java:229)
    at com.dtstack.chunjun.util.FileSystemUtil.getUGI(FileSystemUtil.java:129)
    at com.dtstack.chunjun.connector.hdfs.source.BaseHdfsInputFormat.createInputSplitsInternal(BaseHdfsInputFormat.java:72)
    at com.dtstack.chunjun.source.format.BaseRichInputFormat.createInputSplits(BaseRichInputFormat.java:129)

What you expected to happen

--

How to reproduce

CREATE TABLE hdfs_source
(
    `raw_message`     string
) WITH (
      'connector' = 'hdfs-x'
      ,'path' = 'hdfs://test.net:9000/home/test/kerberos-test'
      ,'default-fs' = 'hdfs://test.net:9000'
      ,'field-delimiter' = ''
      ,'encoding' = 'utf-8'
      ,'file-type' = 'text'
      ,'properties.hadoop.user.name' = 'test'
      ,'properties.fs.defaultFS' = 'hdfs://test.net:9000'
      ,'properties.dfs.client.failover.proxy.provider.ns' = 'org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider'
      ,'properties.fs.hdfs.impl.disable.cache' = 'true'
      ,'properties.fs.hdfs.impl' = 'org.apache.hadoop.hdfs.DistributedFileSystem'

--       -- kerberos 配置
--       ,'properties.hadoop.security.authorization' = 'true'
--       ,'properties.hadoop.security.authentication' = 'Kerberos'
--      -- kerberos 使用本地文件
--       ,'properties.java.security.krb5.conf' = 'E:/data/java/study/github.com/libailin/chunjun/chunjun-dev/data/kerberos/krb5.conf'
--       ,'properties.principalFile' =           'E:/data/java/study/github.com/libailin/chunjun/chunjun-dev/data/kerberos/hdfs.keytab'
--       ,'properties.useLocalFile' = 'true'
--       ,'properties.principal' = 'hdfs/test.net@EXAMPLE.COM'
--       -- 以下参数 解决 Can't get Master Kerberos principal for use as renewer
--       ,'properties.yarn.resourcemanager.principal' = 'hdfs/test.net@EXAMPLE.COM'
--       -- 以下参数 解决 org.apache.hadoop.hdfs.DFSClient - Failed to connect to /xx.xx.x.xx:61004 for block, add to deadNodes and continue. java.io.IOException: 远程主机强迫关闭了一个现有的连接。
--       -- 以下参数 解决 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: Failed to read expected SASL data transfer protection handshake from client at /10.xx.xx.xx:57113. Perhaps the client is running an older version of Hadoop which does not support SASL data transfer protection
--       ,'properties.dfs.data.transfer.protection' = 'authentication'

      -- kerberos 配置
      ,'properties.hadoop.security.authorization' = 'true'
      ,'properties.hadoop.security.authentication' = 'Kerberos'
      -- kerberos 使用sftp远程文件
      ,'properties.remoteDir' = '/data/kerberos'
      ,'properties.java.security.krb5.conf' = 'krb5.conf'
      ,'properties.principalFile' = 'hdfs.keytab'
      ,'properties.sftpConf' = '{"username":"xxx", "password":"xx", "host":"192.168.56.xx", "port":"22"}'
      ,'properties.principal' = 'hdfs/test.net@EXAMPLE.COM'
      -- 以下参数 解决 Can't get Master Kerberos principal for use as renewer
      ,'properties.yarn.resourcemanager.principal' = 'hdfs/test.net@EXAMPLE.COM'
      -- 以下参数 解决 org.apache.hadoop.hdfs.DFSClient - Failed to connect to /xx.xx.xx.xx:61004 for block, add to deadNodes and continue. java.io.IOException: 远程主机强迫关闭了一个现有的连接。
      -- 以下参数 解决 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: Failed to read expected SASL data transfer protection handshake from client at /10.xx.xx.xx:57113. Perhaps the client is running an older version of Hadoop which does not support SASL data transfer protection
      ,'properties.dfs.data.transfer.protection' = 'authentication'

      );

CREATE TABLE stream_sink
(
    `raw_message`     string
) WITH (
    'connector' = 'stream-x'
    ,'print' = 'true'
);

insert into stream_sink select * from hdfs_source;

2、 增加写入模式

    -- 写入模式,
    ,'write-mode' = 'OVERWRITE'

3、写入时增加支持压缩

    -- 文件压缩类型,支持gzip和bzip2两种压缩形式。 默认值:无
    ,'compress-type' = 'gzip'

Anything else

No response

Version

master

Are you willing to submit PR?

Code of Conduct