damianiandrea / mongodb-nats-connector

A connector that uses MongoDB's change streams to capture data changes and publishes those changes to NATS JetStream.
MIT License
49 stars 7 forks source link

Connector publishes unwanted change events such as drop, rename, etc. #15

Closed damianiandrea closed 9 months ago

damianiandrea commented 9 months ago

Currently, the connector publishes any kind of watched change event to NATS JetStream, regardless of the operation type (invalidate is an exception).

This is not the expected behavior as the connector's purpose is to publish events when records are inserted, updated/replaced, or deleted, and nothing else.

Furthermore, most users are not interested in publishing events such as drop, rename, etc. (see the full list here), it would be unreasonable to let the connector publish those events by default.

Examples:

connector  | {"time":"2024-01-26T21:05:27.072001992Z","level":"DEBUG","msg":"received change event","changeEvent":"{\"_id\":{\"_data\":\"8265B41E97000000012B022C0100296E5A1004002FB9BB2E124BFCAF935B8C8F64417104\"},\"operationType\":\"drop\",\"clusterTime\":{\"$timestamp\":{\"t\":1706303127,\"i\":1}},\"wallTime\":{\"$date\":\"2024-01-26T21:05:27.061Z\"},\"ns\":{\"db\":\"test-connector\",\"coll\":\"coll1\"}}"}
connector  | {"time":"2024-01-26T21:05:27.073064092Z","level":"DEBUG","msg":"published message","subj":"COLL1.drop","data":"{\"_id\":{\"_data\":\"8265B41E97000000012B022C0100296E5A1004002FB9BB2E124BFCAF935B8C8F64417104\"},\"operationType\":\"drop\",\"clusterTime\":{\"$timestamp\":{\"t\":1706303127,\"i\":1}},\"wallTime\":{\"$date\":\"2024-01-26T21:05:27.061Z\"},\"ns\":{\"db\":\"test-connector\",\"coll\":\"coll1\"}}"}
connector  | {"time":"2024-01-27T21:41:12.536828176Z","level":"DEBUG","msg":"received change event","changeEvent":"{\"to\":{\"db\":\"test-connector\",\"coll\":\"coll3\"},\"_id\":{\"_data\":\"8265B57878000000012B022C0100296E5A10047EE166F4EC144D78AD8F063AE917581D04\"},\"operationType\":\"rename\",\"clusterTime\":{\"$timestamp\":{\"t\":1706391672,\"i\":1}},\"wallTime\":{\"$date\":\"2024-01-27T21:41:12.53Z\"},\"ns\":{\"db\":\"test-connector\",\"coll\":\"coll1\"}}"}
connector  | {"time":"2024-01-27T21:41:12.537909286Z","level":"DEBUG","msg":"published message","subj":"COLL1.rename","data":"{\"to\":{\"db\":\"test-connector\",\"coll\":\"coll3\"},\"_id\":{\"_data\":\"8265B57878000000012B022C0100296E5A10047EE166F4EC144D78AD8F063AE917581D04\"},\"operationType\":\"rename\",\"clusterTime\":{\"$timestamp\":{\"t\":1706391672,\"i\":1}},\"wallTime\":{\"$date\":\"2024-01-27T21:41:12.53Z\"},\"ns\":{\"db\":\"test-connector\",\"coll\":\"coll1\"}}"}