Johnwickdev / Hightable

0 stars 0 forks source link

Mingosyncsink #83

Open Johnwickdev opened 2 months ago

Johnwickdev commented 2 months ago

package com.example.flink.source;

import com.mongodb.client.MongoClient; import com.mongodb.client.MongoClients; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCursor; import com.mongodb.client.MongoDatabase; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.bson.Document;

public class MongoSyncSource implements SourceFunction {

private final String mongoUri;
private final String databaseName;
private final String collectionName;
private volatile boolean isRunning = true;

public MongoSyncSource(String mongoUri, String databaseName, String collectionName) {
    this.mongoUri = mongoUri;
    this.databaseName = databaseName;
    this.collectionName = collectionName;
}

@Override
public void run(SourceContext<Document> ctx) {
    try (MongoClient mongoClient = MongoClients.create(mongoUri)) {
        MongoDatabase database = mongoClient.getDatabase(databaseName);
        MongoCollection<Document> collection = database.getCollection(collectionName);

        while (isRunning) {
            try (MongoCursor<Document> cursor = collection.find().iterator()) {
                while (cursor.hasNext() && isRunning) {
                    Document document = cursor.next();
                    synchronized (ctx.getCheckpointLock()) {
                        ctx.collect(document);
                    }
                }
            }
            // Sleep for a short period to prevent tight looping
            Thread.sleep(5000);
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
}

@Override
public void cancel() {
    isRunning = false;
}

}