Open AAbhardwaj opened 6 years ago
$ curl localhost:8083/connectors/mongodb-source-connector/tasks [ { "id": { "connector":"mongodb-source-connector", "task":0 }, "config":{ "databases":"db.movies,mydb.ratings,db.usersAndMovies", "topic.prefix":"abhishek", "task.class":"org.apache.kafka.connect.mongodb.MongodbSourceTask", "schema.name":"test", "batch.size":"100", "converter.class":"org.apache.kafka.connect.mongodb.converter.JsonStructConverter" ,"uri":"mongodb://127.0.0.1:27017" } } ]
$curl localhost:8083/connectors/mongodb-source-connector/status { "name":"mongodb-source-connector", "connector":{ "state":"RUNNING", "worker_id":"1xx.2x.xx.xx:8083" }, "tasks": [ { "state":"RUNNING", "id":0, "worker_id":"1xx.2x.xx.xx:8083" } ], "type":"source" }
I do not get any messages in my kafka topic. Any idea?
As the documentation of this connector says: "When the connector is run as a Source Connector, it reads data from Mongodb oplog and publishes it on Kafka". See: https://docs.mongodb.com/manual/core/replica-set-oplog/
Check that your mongo DB has the collection "local.oplog.rs". If not, it's because is not set up to be a replica set. As soon as you set it, that collection is created in your mongo and the connector is able to start creating and sending the messages to kafka.
As @ruileal said, MongoDB has to be configured as a replica set. In short, to make it work:
/etc/mongod.conf
), uncomment the replication
line and add a replSetName: rs
line;rs.initiate()
from its command line.Then, you should start seeing the oplog messages.
@system103:~$ rs.initiate()
I am not getting oplog messages
Does mongo have replication set? Check this - https://docs.mongodb.com/manual/core/replica-set-oplog/. Also, it might be easy to just clone the source and run test case- testInsertWithNullOffsets by setting your connection string. You have to comment a bunch of things, but it's easier to debug issues that way.
Hi Source Connector connected but not fetching data from mongodb. Please advice.
[2017-12-05 16:59:58,009] INFO Cluster created with settings {hosts=[127.0.0.1:27017], mode=SINGLE, requiredClusterType=UNKNOWN, serverSelectionTimeout='30000 ms', maxWaitQueueSize=500} (org.mongodb.driver.cluster:71) [2017-12-05 16:59:58,016] INFO Cluster description not yet available. Waiting for 30000 ms before timing out (org.mongodb.driver.cluster:71) [2017-12-05 16:59:58,019] INFO Opened connection [connectionId{localValue:3, serverValue:282}] to 127.0.0.1:27017 (org.mongodb.driver.connection:71) [2017-12-05 16:59:58,019] INFO Monitor thread successfully connected to server with description ServerDescription{address=127.0.0.1:27017, type=STANDALONE, state=CONNECTED, ok=true, version=ServerVersion{versionList=[3, 4, 10]}, minWireVersion=0, maxWireVersion=5, maxDocumentSize=16777216, roundTripTimeNanos=280047} (org.mongodb.driver.cluster:71) [2017-12-05 16:59:58,020] INFO WorkerSourceTask{id=mongodb-source-connector-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:158) [2017-12-05 16:59:58,024] INFO Opened connection [connectionId{localValue:4, serverValue:283}] to 127.0.0.1:27017 (org.mongodb.driver.connection:71) [2017-12-05 16:59:58,024] INFO Opened connection [connectionId{localValue:5, serverValue:284}] to 127.0.0.1:27017 (org.mongodb.driver.connection:71) [2017-12-05 16:59:58,024] INFO Opened connection [connectionId{localValue:6, serverValue:285}] to 127.0.0.1:27017 (org.mongodb.driver.connection:71) [2017-12-05 17:00:57,903] INFO WorkerSourceTask{id=mongodb-source-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:306) [2017-12-05 17:00:57,904] INFO WorkerSourceTask{id=mongodb-source-connector-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:323)
Best regards,