Johnwickdev / Hightable

0 stars 0 forks source link

Mongidbsink #57

Open Johnwickdev opened 2 months ago

Johnwickdev commented 2 months ago

public class MongoDBDeltaSink implements SinkFunction {

private final MongoDBConfig mongoConfig;

public MongoDBDeltaSink(MongoDBConfig config) {
    this.mongoConfig = config;
}

@Override
public void invoke(Document deltaDoc, Context context) {
    System.out.println("Invoking MongoDBDeltaSink with Document: " + deltaDoc.toJson());
    try (var mongoClient = MongoClients.create(mongoConfig.getUri())) {
        MongoDatabase database = mongoClient.getDatabase(mongoConfig.getDatabase());
        MongoCollection<Document> deltaCollection = database.getCollection(mongoConfig.getCollection());
        deltaCollection.insertOne(deltaDoc);
    } catch (Exception e) {
        e.printStackTrace();
    }
}

}