Open Johnwickdev opened 2 months ago
// Retrieve the side output stream DataStream deltaStream = processedStream.getSideOutput(deltaTag);
// Update the actual MongoDB collection with the changes deltaStream.addSink(new SinkFunction() { @Override public void invoke(Document deltaDoc, Context context) { try (var mongoClient = MongoClients.create(mongoConfig.getUri())) { MongoDatabase database = mongoClient.getDatabase(mongoConfig.getDatabase()); MongoCollection collection = database.getCollection(mongoConfig.getCollection());
Document filter = new Document("CSPA", deltaDoc.getString("CSPA")); Document update = new Document("$set", deltaDoc.get("document")); collection.updateOne(filter, update); } catch (Exception e) { e.printStackTrace(); } }
});
// Optionally, also write the delta records to a separate delta collection deltaStream.addSink(new MongoDBDeltaSink(mongoConfig));
// Retrieve the side output stream DataStream deltaStream = processedStream.getSideOutput(deltaTag);
// Update the actual MongoDB collection with the changes deltaStream.addSink(new SinkFunction() {
@Override
public void invoke(Document deltaDoc, Context context) {
try (var mongoClient = MongoClients.create(mongoConfig.getUri())) {
MongoDatabase database = mongoClient.getDatabase(mongoConfig.getDatabase());
MongoCollection collection = database.getCollection(mongoConfig.getCollection());
});
// Optionally, also write the delta records to a separate delta collection deltaStream.addSink(new MongoDBDeltaSink(mongoConfig));