hpgrahsl / kafka-connect-mongodb

**Unofficial / Community** Kafka Connect MongoDB Sink Connector -> integrated 2019 into the official MongoDB Kafka Connector here: https://www.mongodb.com/kafka-connector
Apache License 2.0
153 stars 60 forks source link

Is there "update"(NOT "replace") as operation type of the result from MongoSinkConnector? #129

Closed HyunSangHan closed 2 years ago

HyunSangHan commented 3 years ago

@hpgrahsl

I am doing sink from Postgres to MongoDB by MongoSinkConnector with Kafka. (Postgres --> Kafka --> MongoDB) I can check there is u(= update) in op(= operationType) when I consume Kafka messages produced by Postgres from update operation.

However, all the operation types from updating Postgres's records are not update but replace in op log of MongoDB. Is replace default operation type when updating document by MongoSinkConnector? If so, Is there any option to change it to update?

Thank you.

hpgrahsl commented 3 years ago

Hi @HyunSangHan!

thanks for reaching out. So you are right that the MongoDB SinkConnector is implemented such that it applies a replace operation on the document that was matched. This is by design, and allows to employ an upsert semantic for both insert and update operations that it processes from the kafka topic. Also a CDC message from kafka always results in updating the full document on MongoDB side, so partial updates aren't supported because you don't really need them for a CDC pipeline use case.

So maybe you can elaborate a bit, why would you need to have a differentiation in that regard? The end result after processing the CDC records with the currently imposed write model semantics should give you the correct document state on MongoDB side. But it might be, that I misunderstood your question.

HyunSangHan commented 3 years ago

@hpgrahsl

I'm so glad to get your quick reply!! Thank you :)

So maybe you can elaborate a bit, why would you need to have a differentiation in that regard?

I am planning to make more pipeline after consuming the message. Let me explain as below.

I told you "Postgres --> Kafka --> MongoDB", that exactly means

Postgres --> Kafka --> (MongoSinkConnector) --> MongoDB

but there was a part that has been omitted. My whole plan is:

Postgres --> Kafka --> (MongoSinkConnector) --> MongoDB --> (Mongo change streams) --> (Real-time processing application) --> (Kafka) --> (Many applications as consumers) --> ...

As a result, I will use changes of documents to produce Kafka messages again by Mongo change streams. When using mongo change streams, I can get updateDescription.updatedFields that shows what fields are updated if it has update as the operationType. However, if operationType is replace, there's no field like that. As you can see mongodb docs:

Finally, I need to get updateDescription.updatedFields from Mongo change streams and that's why I want MongoSinkConnector to update documents with update operationType. Is there any way to do it with update operationType by MongoSinkConnector?

hpgrahsl commented 3 years ago

Ok I see. Well the short answer is no, not out-of-the-box based on the current version. However, many functionalities of the sink connector implementation have been written with customizability in mind. That said, you could come up with your own individual write model behaviour by implementing the corresponding interfaces and / or extending existing classes with your custom ones where you override the default behaviour.

One more question, just to make sure I fully got you. Instead of sourcing your real-time processing application based on the change streams topic resulting after sinking the data in to MongoDB, why can't you just directly source this from the original kafka topic that results from the Postgres CDC?

HyunSangHan commented 3 years ago

@hpgrahsl It is a good question! I think that my explanation was not enough.

There are a few reasons:

  1. First of all, I need to sink the data anyway from Postgres to MongoDB to reuse them next time from MongoDB. (So I cannot skip this sink process.)
  2. If I directly source the data from the original Kafka topic, I couldn't guarantee if those data also saved to MongoDB successfully, because it seems like splitting the pipeline in two!(Kafka --> MongoDB as well as Kafka --> real-time processing application) That's why I want to consume the data after being successfully saved.(Kafka --> MongoDB --> real-time processing application)
  3. It's not necessary to depend on something like Schema Registry when consuming the data in the application anymore.