@Override
public void processElement2(Document localDoc, Context ctx, Collector out) throws Exception {
// Retrieve the state stored earlier
Document mongoDoc = mongoState.value();
if (mongoDoc != null) {
// Case 1: Update Operation
if (mongoDoc.getString("CSPA").equals(localDoc.getString("CSPA")) &&
mongoDoc.getString("layout_name").equals(localDoc.getString("layout_name"))) {
// Initialize a new Document to store only changed fields
Document changedFields = new Document();
// Compare each key in the local document with the corresponding key in the MongoDB document
for (String key : localDoc.get("document", Document.class).keySet()) {
Object localValue = localDoc.get("document", Document.class).get(key);
Object mongoValue = mongoDoc.get("document", Document.class).get(key);
// If the values are different, add the key-value pair to changedFields
if (localValue != null && !localValue.equals(mongoValue)) {
changedFields.append(key, localValue);
}
}
// If there are any changes, prepare the delta document
if (!changedFields.isEmpty()) {
Document deltaDoc = new Document("operation", "update")
.append("timestamp", new Date())
.append("CSPA", localDoc.getString("CSPA"))
.append("layout_name", localDoc.getString("layout_name"))
.append("document", changedFields);
out.collect(deltaDoc);
}
}
// Case 2: Insert Operation
else if (!mongoDoc.getString("CSPA").equals(localDoc.getString("CSPA")) &&
mongoDoc.getString("layout_name").equals(localDoc.getString("layout_name"))) {
// New CSPA under the same layout_name
out.collect(new Document("operation", "insert")
.append("timestamp", new Date())
.append("CSPA", localDoc.getString("CSPA"))
.append("layout_name", localDoc.getString("layout_name"))
.append("document", localDoc.get("document", Document.class)));
}
} else {
// No matching document in MongoDB, treat as new record
out.collect(new Document("operation", "insert")
.append("timestamp", new Date())
.append("CSPA", localDoc.getString("CSPA"))
.append("layout_name", localDoc.getString("layout_name"))
.append("document", localDoc.get("document", Document.class)));
}
// Update the state
mongoState.update(localDoc);
@Override public void processElement2(Document localDoc, Context ctx, Collector out) throws Exception {
// Retrieve the state stored earlier
Document mongoDoc = mongoState.value();
}