// Create the filter to find the document by 'id'
BsonDocument filter = new BsonDocument("id", document.get("id"));
// Define the update document (updating the whole document here; customize as needed)
BsonDocument update = new BsonDocument("$set", document);
// Return the UpdateOneModel with filter and update
return new UpdateOneModel<>(filter, update);
})
.build();
// Attach the sink to the data stream
stream.sinkTo(sink);
import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.mongodb.sink.MongoSink; import org.apache.flink.streaming.api.datastream.DataStream; import com.mongodb.client.model.UpdateOneModel; import com.mongodb.client.model.Filters; import com.mongodb.client.model.Updates; import org.bson.BsonDocument;
DataStream stream = ...;
MongoSink sink = MongoSink.builder()
.setUri("mongodb://user:password@127.0.0.1:27017")
.setDatabase("my_db")
.setCollection("my_coll")
.setBatchSize(1000)
.setBatchIntervalMs(1000)
.setMaxRetries(3)
.setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
.setSerializationSchema((input, context) -> {
// Parse input JSON to a BsonDocument
BsonDocument document = BsonDocument.parse(input);
// Attach the sink to the data stream stream.sinkTo(sink);