Johnwickdev / Hightable

0 stars 0 forks source link

Mongosource #95

Open Johnwickdev opened 1 month ago

Johnwickdev commented 1 month ago

import com.ververica.cdc.connectors.mongodb.MongoDBSource; import com.ververica.cdc.connectors.mongodb.MongoDBSourceBuilder; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class FlinkMongoDBCDCExample {

public static void main(String[] args) throws Exception {
    // Set up the Flink streaming execution environment
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // Define the MongoDB connection string (e.g., a remote database)
    String connectionString = "mongodb://user:password@mongo-host:27017/database?retryWrites=true";

    // Build the MongoDBSource using the connection string
    MongoDBSource<String> mongoSource = MongoDBSource.<String>builder()
            .connectionString(connectionString)   // Use MongoDB connection string
            .database("myDatabase")               // Specify the database
            .collection("myCollection")           // Specify the collection
            .deserializer(new MyMongoDBDeserializer()) // Use your custom deserializer
            .build();

    // Create the Flink data stream
    DataStreamSource<String> mongoStream = env.fromSource(
            mongoSource,
            WatermarkStrategy.noWatermarks(),
            "MongoDBSource"
    );

    // Print the stream to the console
    mongoStream.print();

    // Execute the Flink job
    env.execute("Flink MongoDB CDC Example");
}

}