StarRocks / starrocks-connector-for-apache-flink

Apache License 2.0
194 stars 162 forks source link

flink sink 到starrocks,必须设置table-name属性吗? #149

Open zhaoxuezhi888 opened 1 year ago

zhaoxuezhi888 commented 1 year ago

StarRocksSinkOptions.builder() .withProperty("jdbc-url", "jdbc:mysql://192.168.0.126:9030/starrocks_demo") .withProperty("load-url", "192.168.0.126:8030") .withProperty("username", "root") .withProperty("password", "") .withProperty("table-name", "detail") .withProperty("database-name", "test") .withProperty("sink.properties.format", "json") .withProperty("sink.properties.strip_outer_array", "true") // 设置并行度,多并行度情况下需要考虑如何保证数据有序性 .withProperty("sink.parallelism", "1") .build()); flink sink 到starrocks,必须设置table-name属性吗?如果必须设置,意味着每一个starrocks的table,对应flink就要有一个sink,因此也必须对应一个独立的job,那如果我要同步20个msyql表到starrocks,就要建20个job,这很占资源,也不合理啊,有什么思路解决这个问题啊?

xlfjcg commented 1 year ago

可以使用stream的方式,将数据包装成StarRocksSinkRowDataWithMeta后写入

zhaoxuezhi888 commented 1 year ago

谢谢,请问StarRocksSinkRowDataWithMeta写法有范例吗?网上找不到范例啊

zhaoxuezhi888 commented 1 year ago

`DataStream source = env .addSource(new MySourceJava_Json())//返回map数据,里面有两个key,database,table_name .uid("sourceStream-uid").name("sourceStream") .setParallelism(1) .map(new RichMapFunction<Map<String, Object>, StarRocksSinkRowDataWithMeta>() {
@Override public StarRocksSinkRowDataWithMeta map(Map<String, Object> value) throws Exception { JSONObject data = JSON.parseObject(JSON.toJSONString(value)); StarRocksSinkRowDataWithMeta starRocksSinkRowDataWithMeta = new StarRocksSinkRowDataWithMeta(); starRocksSinkRowDataWithMeta.setDatabase(data.getString("database")); starRocksSinkRowDataWithMeta.setTable(data.getString("table_name")); starRocksSinkRowDataWithMeta.addDataRow(JSON.toJSONString(value)); return starRocksSinkRowDataWithMeta; } }).uid("sourceStreamMap-uid").name("sourceStreamMap");

source .addSink( StarRocksSink.sink( //这里没有StarRocksSink.SinkRowDataWithMeta语法,只有sink,里面必须指定.withProperty("table-name", "orders") //所以不知道怎么写了,我没有用flink sql,用的是DataStream ) )`

IT-XiaoYang commented 2 months ago

可以使用stream的方式,将数据包装成StarRocksSinkRowDataWithMeta后写入 这种模式,starrocks 官方后续是否支持呢,

Level1Accelerator commented 3 weeks ago

可以使用stream的方式,将数据包装成StarRocksSinkRowDataWithMeta后写入 这种模式,starrocks 官方后续是否支持呢,

算鸟算鸟,自己改下StarRocksSink,再打个包吧。。