Closed syalinbas closed 4 years ago
Hi @zbrsy,
I assume by "deleting records" you mean you delete them in the sink i.e. the mongodb target collection... and then you say it gets re-inserted by the connector right away, without new writes into the kafka source topic?
If what you describe is indeed happening I would be very surprised to say the least :) In fact, I currently cannot think of any way how this could work if I wanted to configure this behaviour on purpose.
Of course, I'm very happy to investigate further, so please if possible share more details -> full configuration, sample data from the topic in question so that I can try to locally reproduce.
Hi Hans, Thanks for the reply. I know it is extremely weird and I was very hesitant to report it. I was trying to understand KSQLDB's behavior and making a lot of stop/start. However this did not happen between those starts/stops. I said happened because I could not duplicate this next day but I have deleted records from MongoDB and total count of documents came back to 1288 always. I deleted them at least 5 times and with same result. Unless I lost my mind there must be a reason. My hunch is on KSQLDB. Thanks for confirming that there's no such thing and we can close the ticket.
The offer is still there to help you further investigate this weird behaviour you reported. I also happen to know a thing or two about ksqlDB as well :)
Thanks for the offer. Let me try to duplicate it first, if I can, and then I will definitely let you know.
I have a connector that upserts records into the collection. However if I delete records connector inserts those deleted records immediately. I tried couple of times removing records and this behavior is consistent. Is it how it supposed to work? I am sure that there's no new messages coming into the topic since I manually feed them. I do not do any restarts either. I just delete records from collection.
... "mongodb.document.id.strategy":"at.grahsl.kafka.connect.mongodb.processor.id.strategy.PartialValueStrategy", "mongodb.key.projection.type":"whitelist", "mongodb.key.projection.list":"deviceId,timestamp", "mongodb.replace.one.strategy":"at.grahsl.kafka.connect.mongodb.writemodel.filter.strategy.ReplaceOneBusinessKeyFilterStrategy" ...
Thanks.