Open qingfei1994 opened 8 months ago
@qingfei1994 Hi there, Thank you for highlighting this important point. The current limitation of having only StringDeserializer indeed constrains the flexibility in processing data from Kafka topics. Directly supporting JSON deserialization in the Kafka source would be a valuable enhancement.
The use case you mentioned, where strings need to be parsed to identify vertices or edges, is quite common, and having structured data upstream could greatly simplify the processing workflow. Not only would this result in a more robust data ingestion process, but it would also coincide with the trend of JSON being widely adopted as the data interchange format in various systems and services.
If you're interested in contributing this feature, I believe it would be a very welcome improvement. Should you have any design ideas, feel free to outline the design and implementation details, and engage with the community to discuss and refine the concept.
Once again, thank you for your initiative and for contributing to the TuGraph project. If you need any further information or have additional questions, please do not hesitate to reach out.
Looking forward to your proposal! Best wishes!
Thanks! @Leomrlin My rough idea is to add some options for kafka source.
1."geaflow.dsl.kafka.format". It can be configured as text or json. If it's configured as "json", kafka message will be deserialized as json and return a collection of Row according to the table schema within the fetch function of KafkaSourceTable.
@Override
public <T> FetchData<T> fetch(Partition partition, Optional<Offset> startOffset,
long newWindowSize) throws IOException {
Error may occur when deserializing json, so need to add more options like 'geaflow.dsl.kafka.format.json.fail-on-missing-field'(true/false), 'geaflow.dsl.kafka.format.json.ignore-parse-error'.
In KafkaSouceTable getDeserializer() will be returned as RowTableDeserializer for "json" case, so that it can return a row.
@Override
public <IN> TableDeserializer<IN> getDeserializer(Configuration conf) {
// return (TableDeserializer<IN>) new TextDeserializer();
}
@qingfei1994 Hello,
Your proposal for enhancing the JSON deserialization capabilities within our Kafka source is superb! I see it as not just a solution to the immediate needs but also as laying the groundwork for a more robust deserialization framework.
Our existing system's TableDeserializer interface is perfectly suited to integrate different parsers, similar to the current TextDeserializer. By incorporating the JSON deserializer at this level, we ensure that it can be utilized across different connectors within the TuGraph project, not just limited to Kafka.
Regarding the configuration options, I concur that the deserializer may have numerous parameters in the future that could further define its functionalities. However, I suggest that adjusting the configuration to a more general level, such as geaflow.dsl.connector.format.json, could be a strategic approach. This would enable future JSON parsers to maintain configuration consistency across different connectors and simplify the management of settings related to JSON deserialization. Perhaps in the future, we can provide a uniform and flexible JSON deserialization error handling strategy for the entire TuGraph system.
Once again, thank you for your proactive attitude and for contributing such thoughtful ideas to the TuGraph project. We are very much looking forward to your detailed design and subsequent implementation. Should you need any assistance or have further questions as you develop this feature, please do not hesitate to reach out to the community.
Thanks @Leomrlin! As you said, there may be numerous parameters to further define the deserialization configuration. So my idea is the configuration for json format itself could be a more general level, as other system like Pulsar may also need a json deserializer configuration. Something like this.
geaflow.dsl.connector.format : json/text
geaflow.dsl.connector.format.json.ignore-parse-error: true/false
geaflow.dsl.connector.format.json.fail-on-missing-field: true/false
What do you think?
Thanks @Leomrlin! As you said, there may be numerous parameters to further define the deserialization configuration. So my idea is the configuration for json format itself could be a more general level, as other system like Pulsar may also need a json deserializer configuration. Something like this.
geaflow.dsl.connector.format : json/text geaflow.dsl.connector.format.json.ignore-parse-error: true/false geaflow.dsl.connector.format.json.fail-on-missing-field: true/false
What do you think?
Good!
I fully agree with your view on abstracting the configuration to a more general level, such as geaflow.dsl.connector.format.json being a possible strategic move. This would ensure that future JSON parsers can maintain consistent configuration across different connectors and make it easier to manage settings related to JSON deserialization.
Such a design would allow us to integrate these configuration options into each of the connectors within TuGraph, not just Kafka or Pulsar.
I support your continued efforts to advance this proposal. If you need more feedback or encounter challenges in implementing these features, please do not hesitate to share with us immediately. The TuGraph community is always eager to help and move forward together.
Looking forward to seeing your further progress!
Currently Kafka Source only support StringDeserializer, we have to use some special character to identify a vertex or edge, something like this.
INSERT INTO dy_modern.person(id, name) SELECT cast(trim(split_ex(t1, ',', 0)) as bigint), split_ex(trim(t1), ',', 1) FROM ( Select trim(substr(text, 2)) as t1 FROM tbl_source WHERE substr(text, 1, 1) = '.' );
But we wanna use a more structured format in a kafka topic. Could we support deserializing json in kafka source?
I'm willing to work on this if you guys think it's neccessary.