Johnwickdev / Hightable

0 stars 0 forks source link

Logic monhdbsink #58

Open Johnwickdev opened 2 months ago

Johnwickdev commented 2 months ago

import org.apache.flink.streaming.api.functions.sink.SinkFunction; import org.bson.Document; import com.mongodb.client.MongoClients; import com.mongodb.client.MongoClient; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; import com.mongodb.client.model.InsertManyOptions;

import java.util.ArrayList; import java.util.List;

public class MongoDBSink extends RichSinkFunction {

private final MongoDBConfig mongoConfig;
private transient MongoClient mongoClient;
private transient MongoCollection<Document> collection;
private List<Document> buffer;
private static final int BATCH_SIZE = 100; // Define your batch size

public MongoDBSink(MongoDBConfig config) {
    this.mongoConfig = config;
    this.buffer = new ArrayList<>();
}

@Override
public void open(Configuration parameters) throws Exception {
    this.mongoClient = MongoClients.create(mongoConfig.getUri());
    MongoDatabase database = mongoClient.getDatabase(mongoConfig.getDatabase());
    this.collection = database.getCollection(mongoConfig.getCollection());
}

@Override
public void invoke(Document value, Context context) {
    buffer.add(value);

    if (buffer.size() >= BATCH_SIZE) {
        insertBufferedDocuments();
    }
}

@Override
public void close() {
    // Insert any remaining documents when closing the sink
    if (!buffer.isEmpty()) {
        insertBufferedDocuments();
    }

    if (mongoClient != null) {
        mongoClient.close();
    }
}

private void insertBufferedDocuments() {
    try {
        collection.insertMany(new ArrayList<>(buffer), new InsertManyOptions().ordered(false));
        buffer.clear();
    } catch (Exception e) {
        e.printStackTrace();
        // Handle the exception or retry logic here
    }
}

}