itinycheng / flink-connector-clickhouse

Flink SQL connector for ClickHouse. Support ClickHouseCatalog and read/write primary data, maps, arrays to clickhouse.
Apache License 2.0
346 stars 148 forks source link

flink-connector-clickhouse里面的sink怎么使用?有例子吗? #116

Open huotianyu opened 5 months ago

huotianyu commented 5 months ago

看flink-connector-clickhouse里面ClickHouseBatchExecutor有批量写入Clickhouse的操作,怎么使用,有例子吗? 还是只能使用flink的原生sink?

itinycheng commented 5 months ago

看flink-connector-clickhouse里面ClickHouseBatchExecutor有批量写入Clickhouse的操作,怎么使用,有例子吗? 还是只能使用flink的原生sink?

Hi: 项目readme就是使用demo,使用sql的; 如果要用API得结果flink的Table API使用;

huotianyu commented 5 months ago
    Table test_product_order_item = tEnv.sqlQuery(sql);

// TableResult dsSql = tEnv.executeSql(dsSQL); test_product_order_item.execute().print(); // tEnv.createTemporaryView("test_product_order_item_bk", test_product_order_item);

    String sinkDDL =  "" +
            "CREATE TABLE if not exists test_product_order_item( " +
            "create_time DateTime, " +
            "sku_id String, " +
            "product_id String, " +
            "product_name String, " +
            "total_amount Decimal(10, 2) default 0. " +
            "with ( " +
            "'connector' = 'clickhouse', " +
            "'url' = 'clickhouse://192.168.110.109:8124/ads', " +
            "'database-name' = 'test_product_order_item', " +
            "'username' = 'default', " +
            "'password' = '123456' " +
            "'lookup.cache.max-rows' = '100', " +
            "'lookup.cache.ttl' = '10', " +
            "'lookup.max-retries' = '3' " +
            "" ;
    tEnv.sqlQuery(sinkDDL);
    test_product_order_item.insertInto("test_product_order_item");

test_product_order_item作为flink table已经有了数据,sink的话,用tEnv.sqlQuery一直报错,是因为api用的不对吗?