Closed wandayou closed 10 months ago
UDF:
package com.test.udf;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;
import org.apache.flink.table.functions.ScalarFunction;
@FunctionHint(output = @DataTypeHint("ROW<type STRING,source_field STRING, val STRING>"))
public class StringToMutiRowsFunction extends TableFunction<Row> {
public void eval(String... strs) {
if (strs != null && strs.length > 0) {
for (String it : strs) {
//it=关键词@keyword@新闻,主持人...
if(it != null && !it.isEmpty()){
String [] arr = it.split("@");
if(arr!=null && arr.length > 2){
for (String val : arr[2].split(",")) {
if(!val.trim().isEmpty())
collect(Row.of(arr[0],arr[1], val.trim()));
}
}
}
}
}
}
}
flink sql如下:
SET 'execution.checkpointing.interval' = '10s';
create temporary function string_split_to_muti_rows as 'com.test.udf.StringToMutiRowsFunction';
CREATE TABLE doris_dwd_content_tag_detail_sink (
`source_id` string,
`source_field` string,
`tag` string,
PRIMARY KEY(`source_id`) NOT ENFORCED
)
WITH (
'connector' = 'doris',
'fenodes' = '172.16.152.208:8030',
'table.identifier' = 'sc_scndp.dwd_content_tag_detail',
'username' = 'root',
'password' = '',
'sink.properties.format' = 'json',
'sink.properties.read_json_by_line' = 'true',
'sink.label-prefix' = 'kafka_2_dor_dwd_content_tag_terms_detail_61',
'sink.enable-2pc'='true'
);
--kafka_源素材信息
CREATE TABLE kafka_ods_entity_source (
id string,
keyword string,
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'cdc-ods-entitydata-topic',
'properties.bootstrap.servers' = '172.16.152.73:8092',
'key.format' = 'json',
'value.format' = 'json'
);
insert into doris_dwd_content_tag_detail_sink(`source_id`,`source_field`,`tag`)
select en.id,en.source_field,en.new_tag from (
select distinct
t.id,
tmp.new_tag,
tmp.source_field
from kafka_ods_entity_source t,lateral table(string_split_to_muti_rows(CONCAT('关键词@keyword@',t.keyword))) AS tmp(category_type,source_field,new_tag)
) en;
@wandayou 请修改你的标题,请阅读http://www.dinky.org.cn/docs/next/developer_guide/contribution/issue
UDF:
package com.test.udf; import java.util.List; import java.util.ArrayList; import org.apache.flink.table.functions.ScalarFunction; public class StringToArrayFunction extends ScalarFunction { public String[] eval(String str) { if (str == null || str.isEmpty()) { return null; } String[] strArray = str.split(","); List<String> nonEmptyStrings = new ArrayList<>(); for (String s : strArray) { if (!s.isEmpty()) { nonEmptyStrings.add(s); } } return nonEmptyStrings.toArray(new String[nonEmptyStrings.size()]); } }
flink sql如下:
create temporary function string_split_to_muti_rows as 'com.test.StringToMutiRowsFunction'; CREATE TABLE doris_dwd_content_tag_detail_sink ( `source_id` string, `source_field` string, `tag` string, PRIMARY KEY(`source_id`) NOT ENFORCED ) WITH ( 'connector' = 'doris', 'fenodes' = '172.16.152.208:8030', 'table.identifier' = 'scndp.dwd_content_tag_detail', 'username' = 'root', 'password' = '', 'sink.properties.format' = 'json', 'sink.properties.read_json_by_line' = 'true', 'sink.label-prefix' = 'kafka_2_dor_dwd_content_tag_terms_detail_60', 'sink.enable-2pc'='true' ); --kafka_源素材信息 CREATE TABLE kafka_ods_entity_source ( id string, keyword string ) WITH ( 'connector' = 'upsert-kafka', 'topic' = 'cdc-ods-entity-topic', 'properties.bootstrap.servers' = '172.16.152.73:8092', 'scan.startup.mode' = 'earliest-offset', 'value.format' = 'json' ); insert into doris_dwd_content_tag_detail_sink(`source_id`,`source_field`,`tag`) select en.id,en.source_field,en.new_tag from ( select distinct t.id, tmp.new_tag, tmp.source_field from kafka_ods_entity_source t,lateral table(string_split_to_muti_rows(CONCAT('关键词@keyword@',t.keyword))) AS tmp(category_type,source_field,new_tag) ) en;
please confirm package and class.
已修复,等rc4发版还请验证下
Search before asking
What happened
flink1.17.2+dinky1.0.0-rc3环境中,dinky中自定义UDF函数,然后flinksql中使用函数后提交任务报错
What you expected to happen
希望能正常使用UDF功能
How to reproduce
Anything else
flink1.17.2+dinky1.0.0-rc3
Version
dev
Are you willing to submit PR?
Code of Conduct