Johnwickdev / Hightable

0 stars 0 forks source link

Sub #69

Open Johnwickdev opened 2 months ago

Johnwickdev commented 2 months ago

public class FlinkCompareAndProcess extends KeyedCoProcessFunction<String, Document, Document, Document> {

private transient ValueState<Document> mongoState;

@Override
public void open(Configuration parameters) throws Exception {
    ValueStateDescriptor<Document> descriptor = new ValueStateDescriptor<>(
        "mongoState", 
        Document.class);
    mongoState = getRuntimeContext().getState(descriptor);
}

@Override
public void processElement2(Document localDoc, Context ctx, Collector<Document> out) throws Exception {
    // Retrieve the state stored earlier
    Document mongoDoc = mongoState.value();

    if (mongoDoc != null) {
        // Check if CSPA matches and if layout_name exists
        if (!mongoDoc.getString("CSPA").equals(localDoc.getString("CSPA")) 
            && mongoDoc.getString("layout_name").equals(localDoc.getString("layout_name"))) {

            Document deltaDoc = new Document("operation", "insert")
                .append("timestamp", new Date())  // Add timestamp
                .append("CSPA", localDoc.getString("CSPA"))
                .append("layout_name", localDoc.getString("layout_name"));

            // Only add new key-value pairs under layout_name
            Document newValues = new Document();
            for (String key : localDoc.get("document", Document.class).keySet()) {
                if (!mongoDoc.get("document", Document.class).containsKey(key)) {
                    newValues.append(key, localDoc.get("document", Document.class).get(key));
                }
            }

            if (!newValues.isEmpty()) {
                deltaDoc.append("document", newValues);
                out.collect(deltaDoc);  // Collect the delta document
            }

        } else {
            // Perform usual comparison and update if CSPA matches or handle differently if needed
            boolean hasChanges = compareDocuments(mongoDoc, localDoc);
            if (hasChanges) {
                Document deltaDoc = new Document("operation", "update")
                    .append("before", mongoDoc)
                    .append("after", localDoc);
                out.collect(deltaDoc);
            }
        }

        mongoState.update(mongoDoc);

    } else {
        // Handle the case when there is no matching document in MongoDB
        Document deltaDoc = new Document("operation", "insert")
            .append("new", localDoc);
        out.collect(deltaDoc);
    }
}

private boolean compareDocuments(Document mongoDoc, Document localDoc) {
    for (String key : localDoc.keySet()) {
        if (!localDoc.get(key).equals(mongoDoc.get(key))) {
            return true;
        }
    }
    return false;
}

}