@Override
public void processElement2(Document localDoc, Context ctx, Collector out) throws Exception {
// Retrieve the state stored earlier
Document mongoDoc = mongoState.value();
if (mongoDoc != null) {
// Create a new document to store only the changed fields
Document deltaDoc = new Document("operation", "update")
.append("CSPA", localDoc.getString("CSPA"))
.append("date", localDoc.getString("date"))
.append("version", localDoc.getString("version"))
.append("layout_name", localDoc.getString("layout_name"));
Document changedFields = new Document();
for (String key : localDoc.get("document", Document.class).keySet()) {
Object localValue = localDoc.get("document", Document.class).get(key);
Object mongoValue = mongoDoc.get("document", Document.class).get(key);
if (!localValue.equals(mongoValue)) {
changedFields.append(key, "Changed from " + mongoValue + " to " + localValue);
}
}
if (!changedFields.isEmpty()) {
deltaDoc.append("document", changedFields);
out.collect(deltaDoc); // Emit the delta document
}
} else {
// If no matching document in MongoDB, treat this as a new record
Document deltaDoc = new Document("operation", "insert")
.append("new", localDoc);
out.collect(deltaDoc); // Emit the new document
}
// Update state
mongoState.update(localDoc);
@Override public void processElement2(Document localDoc, Context ctx, Collector out) throws Exception {
// Retrieve the state stored earlier
Document mongoDoc = mongoState.value();
}