Johnwickdev / Hightable

0 stars 0 forks source link

Checksum #73

Open Johnwickdev opened 2 months ago

Johnwickdev commented 2 months ago

package com.example.flink;

import com.mongodb.client.MongoCollection; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction; import org.apache.flink.util.Collector; import org.bson.Document;

import java.util.HashMap; import java.util.Map;

public class MongoDBJsonCoProcessFunction extends KeyedCoProcessFunction<String, Map<String, Object>, Map<String, Object>, Void> {

private final MongoCollection<Document> deltaCollection;
private final MongoCollection<Document> mongoCollection;

private transient ValueState<Map<String, Object>> state;

public MongoDBJsonCoProcessFunction(MongoCollection<Document> deltaCollection, MongoCollection<Document> mongoCollection) {
    this.deltaCollection = deltaCollection;
    this.mongoCollection = mongoCollection;
}

@Override
public void open(org.apache.flink.configuration.Configuration parameters) {
    ValueStateDescriptor<Map<String, Object>> descriptor = new ValueStateDescriptor<>("state", Map.class);
    state = getRuntimeContext().getState(descriptor);
}

@Override
public void processElement1(Map<String, Object> mongoDoc, Context ctx, Collector<Void> out) throws Exception {
    // Store the MongoDB document in Flink state for comparison
    state.update(mongoDoc);
}

@Override
public void processElement2(Map<String, Object> localDoc, Context ctx, Collector<Void> out) throws Exception {
    Map<String, Object> mongoDoc = state.value();

    if (mongoDoc != null) {
        // Compare documents, detect changes
        Map<String, Object> changes = compareDocuments(mongoDoc, localDoc);

        if (!changes.isEmpty()) {
            // Update delta collection with changes
            deltaCollection.insertOne(new Document(changes));

            // Update the actual MongoDB collection
            mongoCollection.updateOne(new Document("_id", mongoDoc.get("_id")), new Document("$set", new Document(changes)));
        }
    } else {
        // New record found, insert into MongoDB and delta collection
        mongoCollection.insertOne(new Document(localDoc));
        deltaCollection.insertOne(new Document(localDoc));
    }
}

private Map<String, Object> compareDocuments(Map<String, Object> doc1, Map<String, Object> doc2) {
    Map<String, Object> changes = new HashMap<>();
    for (String key : doc2.keySet()) {
        if (!doc2.get(key).equals(doc1.get(key))) {
            changes.put(key, doc2.get(key));
        }
    }
    return changes;
}

}