Johnwickdev / Hightable

0 stars 0 forks source link

Side output #79

Open Johnwickdev opened 2 months ago

Johnwickdev commented 2 months ago

import com.mongodb.client.MongoClients; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.co.KeyedCoProcessFunction; import org.apache.flink.util.Collector; import org.apache.flink.util.OutputTag; import org.bson.Document;

public class FlinkCompareAndProcess extends KeyedCoProcessFunction<String, Document, Document, Document> {

private transient ValueState<Document> mongoState;

// Define an OutputTag for side output (delta changes)
private final OutputTag<Document> deltaTag = new OutputTag<Document>("delta-output") {};

@Override
public void open(Configuration parameters) throws Exception {
    ValueStateDescriptor<Document> descriptor = new ValueStateDescriptor<>(
            "mongoState", TypeInformation.of(new TypeHint<Document>() {}));
    mongoState = getRuntimeContext().getState(descriptor);
}

@Override
public void processElement1(Document mongoDoc, Context ctx, Collector<Document> out) throws Exception {
    // Save MongoDB document in state
    mongoState.update(mongoDoc);
}

@Override
public void processElement2(Document localDoc, Context ctx, Collector<Document> out) throws Exception {
    Document mongoDoc = mongoState.value(); // Retrieve the state stored earlier

    if (mongoDoc != null) {
        boolean hasChanges = compareDocuments(mongoDoc, localDoc);
        if (hasChanges) {
            Document changes = extractChanges(mongoDoc, localDoc); // Extract only the changed fields

            // Update MongoDB with changes
            updateMongoDBCollection(mongoDoc, changes);

            // Emit the delta changes to the side output (delta collection)
            Document deltaDoc = new Document("operation", "update")
                    .append("before", mongoDoc)
                    .append("after", changes);
            ctx.output(deltaTag, deltaDoc);  // Emit to side output

            // Emit the updated document to the main output
            out.collect(changes);
        }
    } else {
        // If no matching document was found in MongoDB, treat it as a new insert
        Document deltaDoc = new Document("operation", "insert")
                .append("new", localDoc);
        ctx.output(deltaTag, deltaDoc);  // Emit to side output

        // Insert the new document into the actual MongoDB collection
        insertIntoMongoDBCollection(localDoc);

        // Emit the new document to the main output
        out.collect(localDoc);
    }
}

private boolean compareDocuments(Document mongoDoc, Document localDoc) {
    for (String key : localDoc.keySet()) {
        if (!localDoc.get(key).equals(mongoDoc.get(key))) {
            return true;
        }
    }
    return false;
}

private Document extractChanges(Document mongoDoc, Document localDoc) {
    Document changes = new Document();
    for (String key : localDoc.keySet()) {
        if (!localDoc.get(key).equals(mongoDoc.get(key))) {
            changes.append(key, localDoc.get(key));
        }
    }
    return changes;
}

private void updateMongoDBCollection(Document mongoDoc, Document changes) {
    try (var mongoClient = MongoClients.create("mongodb://your_mongodb_uri_here")) {
        MongoDatabase database = mongoClient.getDatabase("your_database_name");
        MongoCollection<Document> collection = database.getCollection("your_collection_name");

        // Update the MongoDB document with only the changed fields
        Document updateDoc = new Document("$set", changes);
        collection.updateOne(new Document("CSPA", mongoDoc.getString("CSPA")), updateDoc);
    } catch (Exception e) {
        e.printStackTrace(); // Handle exception (logging or other mechanisms can be used)
    }
}

private void insertIntoMongoDBCollection(Document localDoc) {
    try (var mongoClient = MongoClients.create("mongodb://your_mongodb_uri_here")) {
        MongoDatabase database = mongoClient.getDatabase("your_database_name");
        MongoCollection<Document> collection = database.getCollection("your_collection_name");

        // Insert the new document into MongoDB
        collection.insertOne(localDoc);
    } catch (Exception e) {
        e.printStackTrace(); // Handle exception (logging or other mechanisms can be used)
    }
}

}