DTStack / chunjun

A data integration framework
Apache License 2.0
3.98k stars 1.69k forks source link

[Feature][hbase] sql模式下新增特性 #1886

Closed libailin closed 4 months ago

libailin commented 7 months ago

Search before asking


sql模式下新增特性: 1、写入时支持配置rowkeyExpress 2、写入时支持配置hbase的时间戳,支持选项:当前时间(默认)、指定时间列名(例如:列簇:字段名)、指定具体时间 3、读取时支持配置客户端每次 rpc 从服务器端读取的列数,默认不限制。 4、读取时支持配置 rowKey起始点、rowKey结束点、rowkey是否是BytesBinary 5、解决sql模式下缺少Hbase2DynamicTableFactory 6、hbase 支持 multiVersionFixedColumn模式(竖表读取) 当是竖表读取时,声明 source table只能有四个字段,并且为固定的字段类型,参考下面声明:

CREATE TABLE source_hbase
    `rowkey` VARCHAR,
    `family_qualifier` VARCHAR,
    `timestamp` bigint,
    `value` VARCHAR

7、支持配置hadoop用户名,解决读写hbase权限问题 8、支持配置字段值为空时写入模式,SKIP:跳过,此字段不写入,EMPTY:空字节数组代替 9、优化当下游没有声明使用 rowkey 字段时报错问题

Caused by: java.lang.UnsupportedOperationException: No implementation provided for SupportsProjectionPushDown. Please implement SupportsProjectionPushDown#applyProjection(int[][], DataType)

Use case

-- hbase 命令:
-- 输入 `hbase shell` 进入hbase shell
-- list 查看全部表
-- describe 'tableName' 查看表结构
-- scan 'tableName' 查看表所有记录(全面扫描)
-- count 'tableName' 统计行数
-- alter '表名',{NAME=>'cf列族名',VERSIONS=>3} 修改表结构,让Hbase表支持存储3个VERSIONS的版本列数据
-- scan '表名',{VERSIONS=>5}

CREATE TABLE source_stream
    rowkey VARCHAR,
    item_id VARCHAR,
    category_id VARCHAR,
    behavior VARCHAR,
    ts TIMESTAMP(3)
) WITH (
    'connector' = 'stream-x'
    ,'number-of-rows' = '1'

CREATE TABLE source_hbase
    rowkey VARCHAR,
    cf ROW(item_id VARCHAR, category_id VARCHAR, behavior VARCHAR, ts TIMESTAMP(3)),
) WITH (
      'connector' = 'hbase2-x'
      ,'zookeeper.quorum' = 'xxx:2181'
      ,'zookeeper.znode.parent' = '/hbase'
      -- 空值字符串代替,默认值:"null"
      ,'null-string-literal' = 'null'
      -- 表名,支持带命名空间的表名,格式 namespace:table , 命名空间与表名之间是冒号连接的
      ,'table-name' = 'test_hbase'
      -- rowKey起始点, 默认值:无
      ,'start-row-key' = 'a'
      -- rowKey结束点, 默认值:无
      ,'end-row-key' = 'c'
      -- rowkey是否是BytesBinary, 默认值:false
      ,'is-binary-row-key' = 'false'
      -- 客户端rpc每次fetch最大行数, 默认值:1000
      ,'scan-cache-size' = '1000'
      -- 客户端每次rpc从服务器端读取的列数, 默认值:不限制-1
--       ,'scan-batch-size' = '2'

     -- 传入hadoop账号读取\写入无权限问题
     ,'properties.hadoop.user.name' = 'hdp-test'
      -- 读取HBase的模式,支持normal模式和multiVersionFixedColumn模式。默认:normal
      ,'mode' = 'multiVersionFixedColumn'
       -- 指定在多版本模式下的HBase Reader读取的版本数,取值只能为-1或大于1的数字,-1表示读取所有版本。
      ,'max-version' = '5'

      -- hbase kerberos 配置
      , 'properties.hbase.security.authorization' = 'Kerberos'
      , 'properties.hbase.security.authentication' = 'Kerberos'
      , 'properties.hbase.security.auth.enable' = 'true'
      -- kerberos 使用sftp远程文件
      ,'properties.remoteDir' = '/data/kerberos'
      ,'properties.java.security.krb5.conf' = 'krb5.conf'
      ,'properties.principalFile' = 'hbase.keytab'
      ,'properties.sftpConf' = '{"username":"xxx", "password":"xx", "host":"xxx", "port":"22"}'
      ,'properties.principal' = 'xx/xxx@EXAMPLE.COM'
      -- 以下参数 解决 Can't get Master Kerberos principal for use as renewer
      ,'properties.yarn.resourcemanager.principal' = 'xx/xxx@EXAMPLE.COM'
      -- 以下参数 解决 org.apache.hadoop.hdfs.DFSClient - Failed to connect to /xxxx for block, add to deadNodes and continue. java.io.IOException: 远程主机强迫关闭了一个现有的连接。
      -- 以下参数 解决 INFO org.apache.hadoop.hdfs.server.datanode.DataNode: Failed to read expected SASL data transfer protection handshake from client at /xxxxx. Perhaps the client is running an older version of Hadoop which does not support SASL data transfer protection
      ,'properties.dfs.data.transfer.protection' = 'authentication'

CREATE TABLE sink_hbase
    rowkey VARCHAR,
    cf ROW(item_id VARCHAR, category_id string, behavior VARCHAR, ts TIMESTAMP(3))
) WITH (
    'connector' = 'hbase2-x'
    -- zk地址
    ,'zookeeper.quorum' = 'xxxx:2181'
    -- hbase在zk的路径
    ,'zookeeper.znode.parent' = '/hbase'
    -- 描述:每个写请求缓冲行的最大内存大小。这样可以提高HBase写数据的性能,但可能会增加时延。可以设置为'0'来禁用它。默认值:2mb
--     ,'sink.buffer-flush.max-size' = '1000'
    -- 描述:每个写入请求要缓冲的最大行数。这样可以提高HBase写数据的性能,但可能会增加时延。可以设置为'0'来禁用它。默认值:1000
    ,'sink.buffer-flush.max-rows' = '1000'
    -- 描述:批量写时间间隔,单位:毫秒, 默认值:10000
    ,'sink.buffer-flush.interval' = '2000'
    -- 表名
    ,'table-name' = 'test_hbase'

    -- 用于构造rowkey的描述信息,采用字符串格式,形式如下 字符串格式为:$(cf:col),
    -- 可以多个字段组合:$(cf:col1)_$(cf:col2), 可以使用md5函数:md5($(cf:col))
    ,'rowkey-express' = 'md5($(cf:item_id)_$(cf:category_id))'
    --  描述:指定写入hbase的时间戳。支持:当前时间、指定时间列,指定时间,三者选一。
    -- 指定时间列簇:列名
    ,'version-column-name' = 'cf:behavior'
    -- 指定具体时间
--     ,'version-column-value' = '2024-02-23 10:10:10'
     -- 字段值为空时写入模式,可选:SKIP:跳过,此字段不写入,EMPTY:空字节数组代替,默认值:SKIP
    ,'null-mode' = 'SKIP'

CREATE TABLE sink_stream
    rowkey VARCHAR,
--     cf ROW(item_id VARCHAR, category_id VARCHAR, behavior VARCHAR, ts TIMESTAMP(3))

    item_id VARCHAR,
    category_id VARCHAR,
    behavior VARCHAR,
    ts TIMESTAMP(3)

) WITH (
      'connector' = 'stream-x'

-- 测试向hbase写入数据
-- insert into sink_hbase
-- SELECT rowkey, ROW(item_id, category_id, behavior, ts ) as cf
-- from source_stream;

-- 指定rowkey具体值
-- insert into sink_hbase
-- SELECT rowkey, ROW(item_id, category_id, behavior, ts ) as cf
-- from (select
--           CAST('a' as string) as rowkey,
--           CAST('a-item_id' as string) as item_id,
--           CAST('a-category_id' as string) as category_id,
--           CAST('a-behavior' as string) as behavior,
--           CAST('2024-02-21 17:10:10' as timestamp(3)) as ts);

-- insert into sink_hbase
-- SELECT rowkey, ROW(item_id, category_id, behavior, ts ) as cf
-- from (select
--           CAST('a' as string) as rowkey,
--           CAST('a-item_id-7' as string) as item_id,
-- --           CAST('1708654215000' as string) as category_id,
--           CAST('a-category_id-7' as string) as category_id,
--           CAST('2024-02-23 10:10:15' as string) as behavior,
--           CAST('2024-02-23 10:10:13' as timestamp(3)) as ts);

-- 测试从hbase读取数据
-- insert into sink_stream select * from source_hbase;
insert into sink_stream select rowkey, cf.item_id, cf.category_id, cf.behavior, cf.ts from source_hbase;

Related issues

No response

Are you willing to submit a PR?

Code of Conduct