vesoft-inc / nebula-flink-connector

Flink Connector for Nebula Graph
48 stars 30 forks source link

Full insert/update/delete support for dynamic table sinks #77

Closed linhr closed 1 year ago

linhr commented 1 year ago

I have an upstream data source and I'd like to replicate the data to NebulaGraph in real time. In my current setup I have one dynamic table A powered by the Kafka connector with the Debezium format, which captures data change events of the upstream MySQL database. I'd like to use Flink SQL INSERT statements to write to another dynamic table B powered by Nebula Flink connector. If everything is working, any insert/update/delete operation to dynamic table A will trigger a write to dynamic table B, where the Nebula Flink connector is responsible for understanding the change log and perform corresponding insert/update/delete operations to the underlying graph database. Does the Nebula Flink connector support such a use case?

I was looking at the source code and it seems to me that the Nebula Flink connector do supports Flink SQL, but I cannot find examples how this could be used for streaming applications. Any guidance would be much appreciated.

Thanks a lot!

wey-gu commented 1 year ago

Ping @Nicole00

BTW. Have you checked this project?

https://github.com/nebula-contrib/nebula-real-time-exchange

linhr commented 1 year ago

Thanks for the prompt reply! Let me take a look at the real time exchange project as well.

linhr commented 1 year ago

I took a brief look at the real time exchange project, and it seems that it reads the MySQL binlog directly (using the Flink CDC connector which runs Debezium embedded inside the connector). It is definitely a viable solution for replicating MySQL databases to NebulaGraph. (We were aware of the Flink CDC project, and it is really exciting to learn that the NebulaGraph ecosystem already has something built on top of it!)

However, in our current production architecture, we already have Debezium running separately, which writes database change events to Kafka. In this way our Flink applications do not access MySQL binlog directly. In the future we may also have other Kafka-based streaming events (not from MySQL data changes) that we'd like to store in NebulaGraph, so the full dynamic table support would be very beneficial.

wey-gu commented 1 year ago

Sure, also, nebula-exchange can be wired to Kafka, apart from the Flink SQL part, which I am not familiar with, before @Nicole00 or others could help.

https://github.com/vesoft-inc/nebula-exchange

linhr commented 1 year ago

Thanks @wey-gu for sharing the nebula-exchange project as well. It seems to support Spark use cases. Right now I'm working on Flink applications, but I'll definitely take a look at it when I need to work with Kafka and NebulaGraph in Spark.

linhr commented 1 year ago

I looked into the source code in more details and it seems there is per-executor WriteModeEnum configuration for vertex/edge batch executor classes (NebulaVertexTableBatchExecutor and NebulaEdgeTableBatchExecutor). However this does not take into account the RowKind value for each RowData object when processing them in batches. Therefore when a changelog contains a mixed kinds of insert/update/delete events the dynamic table sink does not behave correctly.

I think the proper way is to buffer the rows (and deduplicate them by primary keys) and delegate the execution to three executors (for insert, update, and delete, respectively) when committing the batch. The official JDBC connector can serve as an example for such an implementation (link).

I'm happy to contribute a PR if the above makes sense.

liuxiaocs7 commented 1 year ago

I have an upstream data source and I'd like to replicate the data to NebulaGraph in real time. In my current setup I have one dynamic table A powered by the Kafka connector with the Debezium format, which captures data change events of the upstream MySQL database. I'd like to use Flink SQL INSERT statements to write to another dynamic table B powered by Nebula Flink connector. If everything is working, any insert/update/delete operation to dynamic table A will trigger a write to dynamic table B, where the Nebula Flink connector is responsible for understanding the change log and perform corresponding insert/update/delete operations to the underlying graph database. Does the Nebula Flink connector support such a use case?

I was looking at the source code and it seems to me that the Nebula Flink connector do supports Flink SQL, but I cannot find examples how this could be used for streaming applications. Any guidance would be much appreciated.

Thanks a lot!

Hi, @linhr, you can find examples in sink and source now, and I will try to enrich the documentation later

liuxiaocs7 commented 1 year ago

I looked into the source code in more details and it seems there is per-executor WriteModeEnum configuration for vertex/edge batch executor classes (NebulaVertexTableBatchExecutor and NebulaEdgeTableBatchExecutor). However this does not take into account the RowKind value for each RowData object when processing them in batches. Therefore when a changelog contains a mixed kinds of insert/update/delete events the dynamic table sink does not behave correctly.

I think the proper way is to buffer the rows (and deduplicate them by primary keys) and delegate the execution to three executors (for insert, update, and delete, respectively) when committing the batch. The official JDBC connector can serve as an example for such an implementation (link).

I'm happy to contribute a PR if the above makes sense.

I looked into the source code in more details and it seems there is per-executor WriteModeEnum configuration for vertex/edge batch executor classes (NebulaVertexTableBatchExecutor and NebulaEdgeTableBatchExecutor). However this does not take into account the RowKind value for each RowData object when processing them in batches. Therefore when a changelog contains a mixed kinds of insert/update/delete events the dynamic table sink does not behave correctly.

I think the proper way is to buffer the rows (and deduplicate them by primary keys) and delegate the execution to three executors (for insert, update, and delete, respectively) when committing the batch. The official JDBC connector can serve as an example for such an implementation (link).

I'm happy to contribute a PR if the above makes sense.

Yes, simultaneous operation is not supported yet. Your suggestion seems good for me. Look forward to your contribution and improve Nebula Flink Connector with you. And What's your idea? @Nicole00

Nicole00 commented 1 year ago

Hi @linhr , Sorry for late reply. I carefully consider your requirements, yeah, the flink connector does not consider the RowKind, and just performs the insert/update/delete operation according to users' WriteMode config, which maybe be unreasonable for the streaming delete operation.

  1. For now, to meet your requirement, can we try Flink side-output to filter the insert data, udpate data and delete data and sink them into Nebula separately?
  2. Thanks for your continuous participation @liuxiaocs7. I'm so glad to hear from you and look forward to your pr for dynamic data operation with different kind.
linhr commented 1 year ago

@Nicole00 Thanks for your reply! Side-output seems a reasonable alternative to this issue (assuming we switch to the Data Stream API).

Actually I have been working on this issue in the past few days. I've got something working locally. I'll send out a PR once my code change is in good shape (probably next week). (cc @liuxiaocs7)

Nicole00 commented 1 year ago

@Nicole00 Thanks for your reply! Side-output seems a reasonable alternative to this issue (assuming we switch to the Data Stream API).

Actually I have been working on this issue in the past few days. I've got something working locally. I'll send out a PR once my code change is in good shape (probably next week). (cc @liuxiaocs7)

Wow, looking forward to your PR 🎉. Don't feel pressured and do it on your own time.

liuxiaocs7 commented 1 year ago

Hi @linhr , Sorry for late reply. I carefully consider your requirements, yeah, the flink connector does not consider the RowKind, and just performs the insert/update/delete operation according to users' WriteMode config, which maybe be unreasonable for the streaming delete operation.

  1. For now, to meet your requirement, can we try Flink side-output to filter the insert data, udpate data and delete data and sink them into Nebula separately?
  2. Thanks for your continuous participation @liuxiaocs7. I'm so glad to hear from you and look forward to your pr for dynamic data operation with different kind.

My pleasure, I'll think about it carefully. :)

linhr commented 1 year ago

The PR is here: #81. It took a bit longer than I expected to finish writing the unit/integration tests, but I think my code change is now ready for review. @Nicole00 Looking forward to your feedback!