upstash / issues

Issue Tracker for Upstash
https://upstash.com
2 stars 0 forks source link

Debezium MongoDB Connector Source 2.0.0 #79

Open altoning opened 7 months ago

altoning commented 7 months ago

Can you upgrade your Debezium MongoDB Connector Source from 2.0.0 to 2.4 ?

Otherwise, we will need to find an alternative to sync MongoDB Atlas to Snowflake.

Debezium MongoDB Connector 2.0 does not work with our MongoDB Atlas instance - especially, connection strings containing +srv. For example, mongodb+srv://

sancar commented 7 months ago

Hi @altoning , We have a long term plan about how to do upgrades. Also not that debezium mongo connector is of version 2.2.1. The console seems to be showing stale info. I will update it asap.

Until that, let me try to help me with your current problem.

We have tested the connections strings with +srv and they are working. If you are using Atlas be sure to add "mongodb.ssl.enabled": true to your config, which is commonly overlooked.

You can follow our guide for the debezium mongo connector. https://upstash.com/docs/kafka/connect/debeziummongo

If you have any other extra config from 2.4 please let us know that so that we can priotorize the issue. And if our guide does not help, please share any extra config, or error log that you have for us to find what might be the problem.

altoning commented 7 months ago

hi @sancar i found the problem.

after removing the property "collection.include.list", the mongodb debezium connector starts creating the topics.

{
  "collection.include.list": "customers",
  "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
  "database.include.list": "dev-alton",
  "key.converter": "org.apache.kafka.connect.json.JsonConverter",
  "key.converter.schemas.enable": true,
  "mongodb.connection.string": "mongodb+srv://USER:PASS@cluster0.XXX.mongodb.net",
  "mongodb.ssl.enabled": true,
  "topic.prefix": "mongodb",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "value.converter.schemas.enable": true
}

however, i get the following error message:

org.apache.kafka.connect.util.TopicAdmin.createOrFindTopics(TopicAdmin.java:383) ... 11 more Caused by: 
org.apache.kafka.common.errors.InvalidPartitionsException: 
Exceeding max allowed partitions: 10 for topic 'mongodb.dev-alton.settings'. 
Current: 10, requested: 1

how do you recommend i specify the exact collections / topics i want ... to keep it under the limit of 10?

altoning commented 7 months ago

hi @sancar, i got it working.

it turns out that the property "collection.include.list" needs to be fully-qualified (ie. prefixed with the database name).

looking good now.

{
    "collection.include.list": "dev-alton.customers",
    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
    "database.include.list": "dev-alton",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "key.converter.schemas.enable": true,
    "mongodb.connection.string": "mongodb+srv://USER:PASS@cluster0.XXXX.mongodb.net",
    "mongodb.ssl.enabled": true,
    "topic.prefix": "mongodb",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": true
}

P.S. i notice you upgraded MongoDb Debezium from 2.0.0 to 2.2.1

altoning commented 7 months ago

hi @sancar

with MongoDb Debezium 2.2.1, i see my kafka topic completes the snapshot as expected.

however, document inserts and deletes are not produced in the kafka topic.

we encountered the same issue with https://artie.so until they upgraded to MongoDb Debezium 2.4.

can you also upgrade to v2.4?

sancar commented 7 months ago

The version is not upgraded. We just updated the stale document. Lets try to find the root cause first. As I said, it should work out of the box as we tested and verified it.

Do you see any errors in the console ? Can you share the kafka cluster name and the connector name so that we can check it internally as well ?

I can suggest changing the connector name after some failing attempts as the connect framework keeps data about last offsets and might interfere with the new connector.

altoning commented 7 months ago

hi @sancar

there are no visible error messages. it appears the initial snapshot performed correctly but subsequent deletes or inserts to the mongodb collection do not produce any new messages.

at nov 14 11:27 UTC i performed a delete and insert and my kafka consumer shows no new messages.

the cluster i am testing is named "snowpipe" with a topic named "mongodb.dev-alton.customers" created by a Debezium 2.2.1 MongoDB Source Connector ("dev-alton").

this is the same issue we encountered with https://artie.so until they upgraded from Debezium 2.2.1 to 2.4.

FYI ... i am running a test in parallel using Redpanda + Debezium 2.4 and the delete and insert correctly produces messages containing "op":"d" and "op":"c" as expected.

sancar commented 7 months ago

I see the following error internally. I will try to investigate what it is. Have you seen this error with https://artie.so/ 2.2.1 before ?

"exception" : "com.mongodb.MongoCommandException",
            "message" : "Command failed with error 280 (ChangeStreamFatalError): 'PlanExecutor error during aggregation :: caused by :: cannot resume stream; the resume token was not found. {_data: \"826552CD02000018F32B042C0100296E5A10040AAE3750059A4BFD95E84158946FA342463C6F7065726174696F6E54797065003C64656C6574650046646F63756D656E744B657900461E5F6964002C07FC000004\", _typeBits: BinData(0, \"820001\")}' on server cluster0-shard-00-04.zcgba.mongodb.net:27017. The full response is {\"errorLabels\": [\"NonResumableChangeStreamError\"], \"ok\": 0.0, \"errmsg\": \"PlanExecutor error during aggregation :: caused by :: cannot resume stream; the resume token was not found. {_data: \\\"826552CD02000018F32B042C0100296E5A10040AAE3750059A4BFD95E84158946FA342463C6F7065726174696F6E54797065003C64656C6574650046646F63756D656E744B657900461E5F6964002C07FC000004\\\", _typeBits: BinData(0, \\\"820001\\\")}\", \"code\": 280, \"codeName\": \"ChangeStreamFatalError\", \"$clusterTime\": {\"clusterTime\": {\"$timestamp\": {\"t\": 1699950189, \"i\": 152}}, \"signature\": {\"hash\": {\"$binary\": {\"base64\": \"huYYcwZ7uKvK+vdVU/vx4SYYcSI=\", \"subType\": \"00\"}}, \"keyId\": 7264886204217163780}}, \"operationTime\": {\"$timestamp\": {\"t\": 1699950189, \"i\": 145}}}"
altoning commented 7 months ago

hi @sancar correct - this is the same error message we got with artie using 2.2.1 before they upgraded to 2.4

sancar commented 7 months ago

Your config on the Upstash side is pretty simple. That means that you are doing something different than us on the Atlas side. Can you help me reproduce the problem via describing what you are doing as detailed as possible ?

sancar commented 7 months ago

Hi @altoning , I think I found the root cause. https://issues.redhat.com/browse/DBZ-6522?jql=project%20%3D%20DBZ%20AND%20resolution%20%3D%20Unresolved%20AND%20text%20~%20%22NonResumableChangeStreamError%22%20ORDER%20BY%20priority%20DESC%2C%20updated%20DESC

To confirm, what is the type of ID of the related collection. In the meantime, I will try to reproduce it as well.

altoning commented 7 months ago

@sancar thank you. i'm testing with ID of type Long.

however, in our production collections we actually use type ObjectId or UUID.

today, i will test using ID of type ObjectId / UUID and let you know how it goes.

altoning commented 7 months ago

@sancar still no luck with 2.2.1.

i created a new collection "customers2" using ID of type ObjectId as well as a new connector named "customers2".

the snapshot was successful but following delete and insert operations did not produce any new records in kafka.

FYI ... i tried the same using RedPanda with same results using v2.2.1 ... but v2.4 successfully produced the snapshot as well as following delete and insert records in kafka

sancar commented 7 months ago

@altoning Can you give specific instructions for me to reproduce the issue then, please ? Like following details as much as possible, What do you enter as items in the Atlas ? Do you have any specific collection type like time series collections ? Do you enter some data first to Atlas then open the connector ? Anything that might be relevant ?

altoning commented 7 months ago

STEP 1. initialize the mongodb collection named "customer2"

db.customers2.insert([{ _id : ObjectId(), first_name : 'Robin', last_name : 'Robin', email : 'robin@gmail.com', unique_id : UUID(), test_bool_false: false, test_bool_true: true, new_id: ObjectId(), test_decimal: NumberDecimal("13.37"), test_int: NumberInt("1337"), test_decimal_2: 13.37, test_list: [1, 2, 3, 4, "hello"], test_null: null, test_ts: Timestamp(42, 1), test_nested_object: {a: { b: { c: "hello"}}}}]);
db.customers2.insert([{ _id : ObjectId(), first_name : 'Alton', last_name : 'Alton', email : 'alton@gmail.com', unique_id : UUID(), test_bool_false: false, test_bool_true: true, new_id: ObjectId(), test_decimal: NumberDecimal("13.37"), test_int: NumberInt("1337"), test_decimal_2: 13.37, test_list: [1, 2, 3, 4, "hello"], test_null: null, test_ts: Timestamp(42, 1), test_nested_object: {a: { b: { c: "hello"}}}}]);
db.customers2.insert([{ _id : ObjectId(), first_name : 'Ehsan', last_name : 'Ehsan', email : 'ehsan@gmail.com', unique_id : UUID(), test_bool_false: false, test_bool_true: true, new_id: ObjectId(), test_decimal: NumberDecimal("13.37"), test_int: NumberInt("1337"), test_decimal_2: 13.37, test_list: [1, 2, 3, 4, "hello"], test_null: null, test_ts: Timestamp(42, 1), test_nested_object: {a: { b: { c: "hello"}}}}]);

STEP 2. create the mongodb source connector (v2.2.1)

{
  "collection.include.list": "dev-alton.customers2",
  "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
  "database.include.list": "dev-alton",
  "key.converter": "org.apache.kafka.connect.json.JsonConverter",
  "key.converter.schemas.enable": true,
  "mongodb.connection.string": "mongodb+srv://USER:PASSWD@cluster0.XXXX.mongodb.net",
  "mongodb.ssl.enabled": true,
  "topic.prefix": "mongodb",
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",
  "value.converter.schemas.enable": true
}

STEP 3. watch kafka messages produced by debezium snapshot

from kafka import KafkaConsumer

consumer = KafkaConsumer('mongodb.dev-alton.customers2',
    bootstrap_servers=['full-caiman-11606-eu2-kafka.upstash.io:9092'],
    sasl_mechanism='SCRAM-SHA-256',
    security_protocol='SASL_SSL',
    sasl_plain_username='USERNAME',
    sasl_plain_password='PASSWORD',
    group_id='$GROUP_NAME_XXXX',
    auto_offset_reset='earliest',
)

i = 1
for msg in consumer:
    print(i)
    print(msg)
    i += 1

consumer.close()

STEP 4. test debezium for "op":"d" event

db.customers2.deleteOne({first_name:'Ehsan'})

STEP 5. test debezium for "op":"c" event

db.customers2.insert([{ _id : ObjectId(), first_name : 'Ehsan', last_name : 'Ehsan', email : 'ehsan@gmail.com', unique_id : UUID(), test_bool_false: false, test_bool_true: true, new_id: ObjectId(), test_decimal: NumberDecimal("13.37"), test_int: NumberInt("1337"), test_decimal_2: 13.37, test_list: [1, 2, 3, 4, "hello"], test_null: null, test_ts: Timestamp(42, 1), test_nested_object: {a: { b: { c: "hello"}}}}]);

i see messages from step 3 (snapshot) but i do not see messages from step 4 (delete) or step 5 (insert)

sancar commented 7 months ago

@altoning Thanks for the steps. We have reproduced the problem and verified that 2.4.0 is fixing the issue. Version upgrade is in the road map.