apache / flink-cdc

Flink CDC is a streaming data integration tool
https://nightlies.apache.org/flink/flink-cdc-docs-stable
Apache License 2.0
5.6k stars 1.89k forks source link

[docs] Add entire DataStream API package guidance #1729

Closed leonardBang closed 9 months ago

leonardBang commented 1 year ago

Users always confused by shade/class conflicts when users use datastream api to build their own application, especially their Application designed to deploy on multiple version flink cluster.

We can give an example or tutorial to let user understand better.

caicancai commented 1 year ago

Hi @leonardBang I am very interested in this,and I want do some job for flink-cdc,can I help to do that? Thank you

leonardBang commented 1 year ago

@caicancai Sure, assign this ticket to you.

caicancai commented 1 year ago

Is there a document reference format?

ruanhang1993 commented 1 year ago

Hi, @caicancai . Is there any progress about this issue ? You could take a look at the existed documents. if I recall it correctly, there is not a fixed format for the documents now. Thanks~

caicancai commented 1 year ago

@ruanhang1993 Sorry, due to the busy work some time ago, this work has been carried out, but it is difficult for me and it will take a certain amount of time

caicancai commented 1 year ago

@ruanhang1993 I will finish it in this time

ruanhang1993 commented 1 year ago

Hi, @caicancai . Thanks for your reply. I will review it when you need my helps.

caicancai commented 1 year ago

Hi,@ruanhang1993 . I want to ask whether it is possible to write various datastream APIs in this way

Operators

Operators transform one or more DataStreams into a new DataStream. Programs can combine multiple transformations into sophisticated dataflow topologies.

Filter

DataStream → DataStream

Evaluates a boolean function for each element and retains those for which the function returns true. A filter that filters out zero values:

Java


import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class MySqlSourceExample2 {
    public static void main(String[] args) throws Exception {

        MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
            .hostname("yourHostname")
            .port(yourPort)
            .databaseList("yourDatabaseName") // set captured database, If you need to synchronize the whole database, Please set tableList to ".*".
            .tableList("yourDatabaseName.yourTableName") // set captured table
            .username("yourUsername")
            .password("yourPassword")
            .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
            .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.enableCheckpointing(3000);

        DataStream<String> stream = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source");
        DataStream<String> words = stream.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String value) throws Exception {
                return value != " ";
            }
        });
        words.print();
        env.execute();
    }
}
caicancai commented 1 year ago

屏幕截图 2023-04-28 161537 Is the guidance written based on these DataStreams enough?

caicancai commented 1 year ago

@ruanhang1993

ruanhang1993 commented 1 year ago

@caicancai Sorry for my late reply.

I think this guide aims at providing a demo about how to package the jar and avoiding the classes conflict. The datastream API could be found in the Flink documents. We do not need to describe them here.

We should provide the pom.xml and the example codes for each cdc connector. Let developers know how to package their code.

caicancai commented 1 year ago

@ruanhang1993 Thanks for your reply, I've started working on this, there may have been a problem with the previous directions, so sorry

caicancai commented 1 year ago

I've started work on this

caicancai commented 1 year ago

@ruanhang1993 Hello, I was a little busy a while ago, and I have some questions that need your answers. Do I need to consider the dependence of different datastream APIs on the flink version? For example, some datastream APIs only exist in flink1.16, but not in flink1.10.

caicancai commented 1 year ago

I think this kind of is unavoidable, so what I can do is to use the shade plugin to repackage exclude Flink classes

ruanhang1993 commented 1 year ago

Hi, @caicancai . We can provide a demo in Flink 1.16 or 1.17 in the first version.

caicancai commented 1 year ago

@ruanhang1993 Thank you for your reply, then I will start writing the document this weekend

jikuanyu commented 1 year ago

我的一些经验是: 比如我做的是mariadb、oceanbase组合的stream api同步。 我就把: flink-sql-connector-mysql-cdc-2.4.1.jar flink-sql-connector-oceanbase-cdc-2.4.1.jar 复制同步依赖的flinkcdc的jar到lib/中

之后如果对应的jar存在相关的类,就采用jar里面的东西。 比如: import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Field; import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Schema; import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.data.Struct; import com.ververica.cdc.connectors.shaded.org.apache.kafka.connect.source.SourceRecord; 另外是mysql驱动的问题: 应用上述的flink-sql-connector-*.jar都存在mysql驱动,当时版本有冲突,那么,我向flink/lib中放入一个 mariadb-java-client-2.7.9.jar ,之后使用org.mariadb.jdbc.Driver 作为驱动名字,验证没有问题。 希望有所帮助。

gtk96 commented 11 months ago

@caicancai Do you have any progress on this issue?

caicancai commented 11 months ago

@gtk96 Sorry, due to the busy time, there is no progress in this part for the time being. You can distribute this feature to others. I'm sorry to have caused any trouble

gtk96 commented 10 months ago

@ruanhang1993 @GOODBOY008 @Jiabao-Sun Please assign this task to me. I have implemented a simple pom sample locally and successfully run it in the local flink session cluster. Where should I add a pom example?

Jiabao-Sun commented 10 months ago

Thanks @gtk96, assigned to you.

gtk96 commented 10 months ago

Thanks @gtk96, assigned to you.

hi @Jiabao-Sun Where is the best place to add this user guidance example?

Jiabao-Sun commented 10 months ago

How about Getting Started section?

MartinHou commented 9 months ago

Hi, can Flink CDC support PyFlink DataStream API?