StarRocks / starrocks-connector-for-apache-flink

Apache License 2.0
195 stars 156 forks source link

多表sink目前有除了smt的解决方案吗? #226

Closed TurnoffyourTV closed 1 year ago

TurnoffyourTV commented 1 year ago

如题,在mysql到starrocks的过程中,能否通过代码以库的形式同步到starrocks?如果有能否提供demo? image

image

zhongshao666 commented 1 year ago

看源码,有方案 -------------process: StarRocksSinkRowDataWithMeta starRocksSinkRowDataWithMeta = new StarRocksSinkRowDataWithMeta(); starRocksSinkRowDataWithMeta.setDatabase(database);//指定库 starRocksSinkRowDataWithMeta.setTable(value.getTableName());//指定表 starRocksSinkRowDataWithMeta.addDataRow(jsonData.toJSONString());//指定数据 out.collect(starRocksSinkRowDataWithMeta);

------------main sink: StarRocksSinkOptions options = StarRocksSinkOptions.builder() .withProperty("jdbc-url", parameterTool.get("jdbc-url")) .withProperty("load-url", parameterTool.get("load-url")) .withProperty("username", parameterTool.get("username")) .withProperty("password", parameterTool.get("password")) .withProperty("database-name", parameterTool.get("database-name")) .withProperty("table-name", "flink_cdc")//这个参数不生效 .withProperty("sink.properties.format", "json") .withProperty("sink.properties.strip_outer_array", "true") .withProperty("sink.parallelism", "1") .withProperty("sink.version", "V2") .build();

    StarRocksDynamicSinkFunctionV2<StarRocksSinkRowDataWithMeta> sinkFunctionV2 = new StarRocksDynamicSinkFunctionV2<>(options);

    endStream.addSink(sinkFunctionV2).name("StarRocks入库");
TurnoffyourTV commented 1 year ago

看源码,有方案 -------------process: StarRocksSinkRowDataWithMeta starRocksSinkRowDataWithMeta = new StarRocksSinkRowDataWithMeta(); starRocksSinkRowDataWithMeta.setDatabase(database);//指定库 starRocksSinkRowDataWithMeta.setTable(value.getTableName());//指定表 starRocksSinkRowDataWithMeta.addDataRow(jsonData.toJSONString());//指定数据 out.collect(starRocksSinkRowDataWithMeta);

------------main sink: StarRocksSinkOptions options = StarRocksSinkOptions.builder() .withProperty("jdbc-url", parameterTool.get("jdbc-url")) .withProperty("load-url", parameterTool.get("load-url")) .withProperty("username", parameterTool.get("username")) .withProperty("password", parameterTool.get("password")) .withProperty("database-name", parameterTool.get("database-name")) .withProperty("table-name", "flink_cdc")//这个参数不生效 .withProperty("sink.properties.format", "json") .withProperty("sink.properties.strip_outer_array", "true") .withProperty("sink.parallelism", "1") .withProperty("sink.version", "V2") .build();

    StarRocksDynamicSinkFunctionV2<StarRocksSinkRowDataWithMeta> sinkFunctionV2 = new StarRocksDynamicSinkFunctionV2<>(options);

    endStream.addSink(sinkFunctionV2).name("StarRocks入库");

谢谢解答,不胜感激