Json split by primary key #4

Johnwickdev commented 1 month ago

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.connectors.mongodb.MongoSource; import org.apache.flink.streaming.connectors.mongodb.source.MongoSourceBuilder; import org.apache.flink.api.common.functions.MapFunction; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper;

public class MongoDBFlinkExample {

public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    MongoSource<String> mongoSource = MongoSource.<String>builder()
            .setDeserializationSchema(new SimpleStringSchema())

    DataStream<String> mongoStream = env.addSource(mongoSource);

    // Parse JSON and extract custom primary key
    KeyedStream<JsonNode, String> keyedStream = mongoStream
            .map(new MapFunction<String, JsonNode>() {
                private static final ObjectMapper objectMapper = new ObjectMapper();

                public JsonNode map(String value) throws Exception {
                    JsonNode jsonNode = objectMapper.readTree(value);
                    return jsonNode;
            .keyBy((JsonNode node) -> node.get("yourPrimaryKeyField").asText());

    // Further processing of the keyed stream
            .map(new MapFunction<JsonNode, String>() {
                public String map(JsonNode value) throws Exception {
                    // Process the JSON object
                    return value.toString();

    env.execute("MongoDB Flink Example");


Johnwickdev commented 1 month ago
org.apache.flink flink-streaming-java_2.11 1.14.0 org.apache.flink flink-connector-mongodb 1.2.1 com.fasterxml.jackson.core jackson-databind 2.13.1
Johnwickdev commented 1 month ago

New splitter

import org.apache.flink.api.common.functions.MapFunction; import; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;

public class FlinkMongoDBSplit {

public static void main(String[] args) throws Exception {

    // Initialize the Flink execution environment
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    // Read parameters, you can provide MongoDB connection details here
    final ParameterTool params = ParameterTool.fromArgs(args);

    // Create a DataStream from MongoDB (this part depends on your MongoDB Flink connector)
    DataStream<String> jsonStream = env.addSource(new MongoDBSourceFunction(params));

    // Deserialize JSON to your document class
    DataStream<MyDocumentClass> documentStream = MapFunction<String, MyDocumentClass>() {
        public MyDocumentClass map(String value) throws Exception {
            // Use your preferred JSON library to convert JSON string to MyDocumentClass
            return MyDocumentClass.fromJson(value);

    // Key the stream by the primary key
    SingleOutputStreamOperator<MyDocumentClass> keyedStream = documentStream
            .keyBy(document -> document.getPrimaryKey());

    // Here you can process each partition independently MapFunction<MyDocumentClass, MyDocumentClass>() {
        public MyDocumentClass map(MyDocumentClass value) throws Exception {
            // Process the document
            return processDocument(value);

    // Sink the processed stream to another MongoDB collection or other sinks
    keyedStream.addSink(new MongoDBSinkFunction(params));

    // Execute the Flink job
    env.execute("Flink MongoDB Split by Primary Key");


Johnwickdev commented 1 month ago

Split json by Java8

import com.fasterxml.jackson.databind.ObjectMapper;

import java.util.Map; import;

public class JsonSplitter {

public static void main(String[] args) throws Exception {
    String json = "{ \"key1\": {\"name\": \"John\", \"age\": 30}, " +
            "\"key2\": {\"name\": \"Jane\", \"age\": 25}, " +
            "\"key3\": {\"name\": \"Doe\", \"age\": 22}}";

    // Parse JSON to Map
    ObjectMapper mapper = new ObjectMapper();
    Map<String, Map<String, Object>> map = mapper.readValue(json, Map.class);

    // Split the map by primary key
    Map<String, Map<String, Object>> key1Map = map.entrySet().stream()
            .filter(entry -> "key1".equals(entry.getKey()))
            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

    Map<String, Map<String, Object>> key2Map = map.entrySet().stream()
            .filter(entry -> "key2".equals(entry.getKey()))
            .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

    // Output the results
    System.out.println("Key1 Map: " + mapper.writeValueAsString(key1Map));
    System.out.println("Key2 Map: " + mapper.writeValueAsString(key2Map));


Johnwickdev commented 1 month ago

Convert do to json object

import com.mongodb.client.FindIterable; import org.bson.Document; import; import;

public class MongoToJsonExample {

public static JsonArray convertToJSON(FindIterable<Document> documents) {
    JsonArray jsonArray = new JsonArray();

    // Iterate over the documents and add them to the JSON array
    for (Document document : documents) {
        JsonObject jsonObject = new JsonObject();

        // Convert each BSON Document to a JSON string
        document.forEach((key, value) -> jsonObject.addProperty(key, value.toString()));


    return jsonArray;

public static void main(String[] args) {
    // Assuming 'documents' is your FindIterable<Document> object
    FindIterable<Document> documents = /* your MongoDB query here */;

    JsonArray jsonArray = convertToJSON(documents);

    // Convert the JsonArray to a JSON string
    String jsonString = jsonArray.toString();

