vesoft-inc / nebula-exchange

NebulaGraph Exchange is an Apache Spark application to parse data from different sources to NebulaGraph in a distributed environment. It supports both batch and streaming data in various formats and sources including other Graph Databases, RDBMS, Data warehouses, NoSQL, Message Bus, File systems, etc.
Apache License 2.0
28 stars 35 forks source link

[WIP]Fix the error "Field “xxx” does not exist." when data is imported from Kafka to Nebula Graph via Nebula Exchange #8

Closed jamieliu1023 closed 2 years ago

jamieliu1023 commented 3 years ago

The bug was reported by a user on the Chinese forum: https://discuss.nebula-graph.com.cn/t/topic/2623/

For those who cannot understand Chinese well, please refer to the title of the issue for a basic background. The reason why the error occurs is that Nebula Exchange is not able to parse the "value" field of the data in Kafka.

Committer @guojun85 is working on this now. Thanks for his contribution in advance!

guojun85 commented 3 years ago

We are working on this issue, the fix plan is:

  1. value should be json format: { "field1": "value1", "field2": "value2", "field3": "value3" }
    1. We will parse value and get all the fields to match the fields in the configuration file.
jamieliu1023 commented 3 years ago

@Nicole00

Nicole00 commented 3 years ago

We are working on this issue, the fix plan is:

  1. value should be json format: { "items": [ { "field1": "value1"}, { "field2":"value2"}, { "field3":"value3"} ] }
  2. We will parse value and get all the fields to match the fields in the configuration file.

Thanks very mach for your solution. There are two small questions we want to discuss with you:

  1. how to config the fields in the configuration file in your plan
  2. can we get the keywords field(key, offset, topic and so on) of kafka yet
guojun85 commented 3 years ago

We are working on this issue, the fix plan is:

  1. value should be json format: { "items": [ { "field1": "value1"}, { "field2":"value2"}, { "field3":"value3"} ] }
  2. We will parse value and get all the fields to match the fields in the configuration file.

Thanks very mach for your solution. There are two small questions we want to discuss with you:

  1. how to config the fields in the configuration file in your plan
  2. can we get the keywords field(key, offset, topic and so on) of kafka yet
  1. how to config the fields in the configuration file in your plan 【guojun】:Change configuration file: https://github.com/vesoft-inc/nebula-spark-utils/blob/master/nebula-exchange/src/main/resources/application.conf --KAFKA { name: tag-name-7 type: { source: kafka sink: client } service: "kafka.service.address" topic: "topic-name" fields: [field1, field2, field3] nebula.fields: [field1, field2, field3] vertex: { field: field1 } partition: 10 batch: 10 interval.seconds: 10 }

  2. Don't need others keywords(field(key, offset, topic and so on), only parse value and import fileds in value to nebula fields.

Nicole00 commented 3 years ago

That‘s reasonable, Iooking forward to your pr ~

sworduo commented 3 years ago

I am sorry that I can not reply in this pr vesoft-inc/nebula-spark-utils#157. Sorry about my poor English. I am not very sure what "can we just modify the StreamingReader to parse the kafka's value to DataFrame" means. Does it mean we just modify the input logic and try to transform the Kafka data to DataFrame while maintain the process logic as before, i.e. get DataFrame from Kafka source and parse it in vertice/edgeProcessor separately? If we want to implement it, the first question is how to switch vertices/edgeProcessor. For Kafka source, according to the code, we can not switch to another tag/edge once a tagconfig is chosen. In this case, we can only parse one tag defined in config to nebula. Let's assuming that the number of tag and edge defined in config is N. One possible solution is that we create N sparkContext and N Kafka streaming reader. With the help of Muti-thread, each tag/edge is processed in one thread with one Kafka consumer separately. However, I don't think it's a good idea. Assuming that partition is P, in this case, there is N P partition and N P nebula-writer. Maybe it will throw a socket exception. In addition, we find a bug in the pr. In the last version, it is only supported string type vid/source/target while the int type vid/source/target maybe throw exception. This is because we call jsonObj.getString() default for vid/source/target. However, for int type vid/source/target, it should call jsonObj.getLong(). It can be easily fixed in the KakfaProcessor.scala everywhere the getAndCheckVID is called.

That‘s reasonable, Iooking forward to your pr ~

Nicole00 commented 3 years ago

Yes, my point is we just modify the input logic and try to transform the Kafka data to DataFrame while maintain the process logic as before, i.e. get DataFrame from Kafka source and parse it in vertice/edgeProcessor separately .

  1. how to switch vertex/edge? In one exchange application, we just support one type of tag or edgeType for streaming datasource. I understand your consideration when the number of tags/edges and partition is large. Multiple tags or edges configured in one configure file are executed in order, and there's no paralleism between them. So we don't suppose submit all the application for all tags/edges at the meantime, (if do so, the applications may wait for resources of spark or yarn, whose default schedule policy is FIFO ). More likely N is less than 10.
sworduo commented 3 years ago

That's OK. However, since the Kafka producer produces data all the time typically, it is expected for the Kafka consumer to consume data all the time as well. Hence, when one of the data source of tag/edge defined in configuration is kafak, the nebula-exchange application will only process that tag/edge forever which will not switch to any other tag/edge defined in the same configuration.

Hence, in the new pr, we will make the following restrictions:

  1. One application for one Kafka. So if more than one tag/edge need to be parsed from Kafka, they should be processed in different application.
  2. In order to ensure 1., we will check the configuration at the beginning. If the data source of tag/edge in configuration is Kafka and the number of tag/edge is more than 1, then an exception would be throw. In other words, for Kafka, one configuration should only define one tag or edge.

Implement summary: We don't need to modify the input logic any more. All we need is to add a new logic in vertices/edgeprocessor.scala that parse data from value field if the data source is Kafka.

Expectation effect: In this case, if someone need to parse several tag/edge from Kafka, they need to process these tag/edge in different application and re-consume the same Kafka data for number(tag+edge) times. And more nebula writer is necessary at the same time. I still don't thinks it's a good idea, however, in order to maintain the architecture, I think maybe it's ok in this way.

Nicole00 commented 3 years ago

So sorry for reply late. It is look good to me, just two questions:

  1. can we process the kafka's data all in the Reader stage? The architecture commands the DataFrame that has entered VertexProcess/EdgeProcess does not need more process logic.
  2. that's a great idea to check the configuration at the beginning. However, if users' tag config oder is csv,kafka, then it is not allowed? In fact, the datasource csv and kafka can all be imported successfully. Maybe we need to ensure that there's no more other tag/edge config after kafka?
sworduo commented 3 years ago
  1. I am not very sure. Since the json string is recorded in the value field of Kafka, we have to get Kafka data before parsing json. Maybe we can map the data in reader. But I have no idea whether it can work. If not, we can parse json in the tag/edgeprocessor.
  2. Yes, you're right. We just need to ensure that there is no config after Kafka, which will be detected at the beginning.
Nicole00 commented 3 years ago

In the Reader process, we can use the config fields to parse kafka's json value, just like:

assume flieds config is:[id,name,age] (all fields must exist in kafka's value data)

val df = session.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", kafkaConfig.server)
      .option("subscribe", kafkaConfig.topic)
      .load()
df.select(get_json_object($"value", "$.id").alias("id"), get_json_object($"value", "$.name").alias("name"), get_json_object($"value", "$.age").alias("age")).show()

Then we get the needed DataFrame from kafka, and other process keep the same with current logic.

sworduo commented 3 years ago

I get your point. I will try it like df.select(value).map(v => parseJson)

Nicole00 commented 2 years ago

https://github.com/vesoft-inc/nebula-exchange/pull/29 resolved it.