Open Johnwickdev opened 2 months ago
import com.mongodb.client.MongoClients; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction; import org.apache.flink.util.Collector; import org.bson.Document;
public class FlinkCompareAndProcess extends KeyedCoProcessFunction<String, Document, Document, Document> {
private transient ValueState<Document> mongoState; @Override public void open(Configuration parameters) throws Exception { ValueStateDescriptor<Document> descriptor = new ValueStateDescriptor<>( "mongoState", TypeInformation.of(new TypeHint<Document>() {})); mongoState = getRuntimeContext().getState(descriptor); } @Override public void processElement1(Document mongoDoc, Context ctx, Collector<Document> out) throws Exception { // Save MongoDB document in state mongoState.update(mongoDoc); } @Override public void processElement2(Document localDoc, Context ctx, Collector<Document> out) throws Exception { Document mongoDoc = mongoState.value(); // Retrieve the state stored earlier if (mongoDoc != null) { boolean hasChanges = compareDocuments(mongoDoc, localDoc); if (hasChanges) { Document changes = extractChanges(mongoDoc, localDoc); // Extract only the changed fields updateMongoDBCollection(mongoDoc, changes); // Update MongoDB with changes Document deltaDoc = new Document("operation", "update") .append("before", mongoDoc) .append("after", changes); out.collect(deltaDoc); } } else { Document deltaDoc = new Document("operation", "insert") .append("new", localDoc); out.collect(deltaDoc); } } private boolean compareDocuments(Document mongoDoc, Document localDoc) { for (String key : localDoc.keySet()) { if (!localDoc.get(key).equals(mongoDoc.get(key))) { return true; } } return false; } private Document extractChanges(Document mongoDoc, Document localDoc) { Document changes = new Document(); for (String key : localDoc.keySet()) { if (!localDoc.get(key).equals(mongoDoc.get(key))) { changes.append(key, localDoc.get(key)); } } return changes; } private void updateMongoDBCollection(Document mongoDoc, Document changes) { try (var mongoClient = MongoClients.create("mongodb://your_mongodb_uri_here")) { MongoDatabase database = mongoClient.getDatabase("your_database_name"); MongoCollection<Document> collection = database.getCollection("your_collection_name"); // Update the MongoDB document with only the changed fields Document updateDoc = new Document("$set", changes); collection.updateOne(new Document("CSPA", mongoDoc.getString("CSPA")), updateDoc); } catch (Exception e) { e.printStackTrace(); // Handle exception (logging or other mechanisms can be used) } }
}
import com.mongodb.client.MongoClients; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction; import org.apache.flink.util.Collector; import org.bson.Document;
public class FlinkCompareAndProcess extends KeyedCoProcessFunction<String, Document, Document, Document> {
}