Open Johnwickdev opened 2 months ago
@Override public void processElement2(Document localDoc, Context ctx, Collector out) throws Exception { // Retrieve the state stored earlier Document mongoDoc = mongoState.value();
if (mongoDoc != null) { Document deltaDoc = compareDocuments(mongoDoc, localDoc); if (!deltaDoc.isEmpty()) { // Only if there are changes Document updateDoc = new Document("operation", "update") .append("timestamp", System.currentTimeMillis()) .append("CSPA", localDoc.getString("CSPA")) .append("date", localDoc.getString("date")) .append("version", localDoc.getString("version")) .append("layout_name", localDoc.getString("layout_name")) .append("document", deltaDoc); out.collect(updateDoc); } } else { // Handle new insert case Document insertDoc = new Document("operation", "insert") .append("timestamp", System.currentTimeMillis()) .append("CSPA", localDoc.getString("CSPA")) .append("date", localDoc.getString("date")) .append("version", localDoc.getString("version")) .append("layout_name", localDoc.getString("layout_name")) .append("document", localDoc); out.collect(insertDoc); }
}
@Override public void processElement2(Document localDoc, Context ctx, Collector out) throws Exception {
// Retrieve the state stored earlier
Document mongoDoc = mongoState.value();
}