Open Johnwickdev opened 3 months ago
Error fixed code
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.util.Collector; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.IOException; import java.util.Iterator; import java.util.Map;
public class CompareJsonFlinkJob {
public static void main(String[] args) throws Exception {
// Set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// Read the JSON files
DataSet<String> json1 = env.readTextFile("path/to/json1.json");
DataSet<String> json2 = env.readTextFile("path/to/json2.json");
// Parse JSON strings to JsonNode
DataSet<JsonNode> jsonNode1 = json1.map(new JsonToNodeMapper());
DataSet<JsonNode> jsonNode2 = json2.map(new JsonToNodeMapper());
// Compare the JSON objects and find matching key-value pairs
DataSet<JsonNode> matchingData = jsonNode1
.flatMap(new FlatMapFunction<JsonNode, JsonNode>() {
@Override
public void flatMap(JsonNode json1Node, Collector<JsonNode> collector) throws Exception {
for (JsonNode json2Node : jsonNode2.collect()) {
JsonNode matchingNode = compareJsonNodes(json1Node, json2Node);
if (matchingNode != null) {
collector.collect(matchingNode);
}
}
}
});
// Write the matching data to a new JSON file
matchingData.writeAsText("path/to/matching.json");
// Execute the Flink job
env.execute("Compare JSON with Apache Flink");
}
// Function to map JSON string to JsonNode
public static class JsonToNodeMapper implements MapFunction<String, JsonNode> {
private static final ObjectMapper mapper = new ObjectMapper();
@Override
public JsonNode map(String value) throws IOException {
return mapper.readTree(value);
}
}
// Function to compare two JsonNode objects and return matching key-value pairs
public static JsonNode compareJsonNodes(JsonNode node1, JsonNode node2) {
ObjectMapper mapper = new ObjectMapper();
Iterator<Map.Entry<String, JsonNode>> fields = node1.fields();
// Create a new JSON object to store matching key-value pairs
ObjectNode matchingNode = mapper.createObjectNode();
while (fields.hasNext()) {
Map.Entry<String, JsonNode> field = fields.next();
String key = field.getKey();
JsonNode value1 = field.getValue();
JsonNode value2 = node2.get(key);
if (value2 != null && value1.equals(value2)) {
matchingNode.set(key, value1);
}
}
return matchingNode.size() > 0 ? matchingNode : null;
}
}
import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper;
import java.io.IOException; import java.util.Iterator; import java.util.Map;
public class CompareJsonFlinkJob {
}