RADAR-base / MongoDb-Sink-Connector

Kafka MongoDb sink connector
Apache License 2.0
19 stars 16 forks source link

Mongo DB Connection support for with replication #51

Open aashishkumar0102 opened 5 years ago

aashishkumar0102 commented 5 years ago

Hi,

How can I connect to a mongodb cluster having replica?

fracasula commented 5 years ago

Hey guys, I'm having the same issue here. I noticed that you added URI support here but it doesn't seem to be documented so for now I'm just trying to get it to work.

Here's what I was doing before with a single node (and it was working fine):

mongodb:
  host: foodchain-mongodb.foodchain.svc.cluster.local
  port: 27017
  user: admin
  db: foodchain_test

This is what I need to do now:

mongodb:
  uri: mongodb://foodchain-backend-mongodb-replicaset,foodchain-backend-mongodb-replicaset-client/foodchain?replicaSet=foodchain_rs

Beware of the final ?replicaSet=foodchain_rs, that's needed too :slightly_smiling_face:

Unfortunately that doesn't seem to work though because the configuration doesn't validate:

[2019-03-14 14:56:39,770] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone) java.util.concurrent.ExecutionException: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector configuration is invalid and contains the following 4 error(s): Invalid value for configuration mongo.host: String may not be empty Invalid value for configuration mongo.port: Not a number of type INT Invalid value null for configuration mongo.port: Value must be non-null Invalid value for configuration mongo.database: String may not be empty

Is there any way to get this to work with a replica set? Thanks!

fracasula commented 5 years ago

I didn't notice that you didn't merge those changes to master. I'm now using the dev image with this configuration:

# Kafka consumer configuration
name=kafka-sink-connectors-mongodb-sink

# Kafka connector configuration
connector.class=org.radarcns.connect.mongodb.MongoDbSinkConnector
tasks.max=1

# Topics that will be consumed
topics=topic-1,topic-2,topic-3

# MongoDB server
mongo.uri=mongodb://foodchain-backend-mongodb-replicaset,foodchain-backend-mongodb-replicaset-client/foodchain?replicaSet=foodchain_rs

# Batch configuration
batch.flush.ms=500
batch.size=2500
buffer.capacity=20000

It doesn't seem to work though, now I'm getting this error:

[2019-03-14 15:48:24,309] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone)
java.util.concurrent.ExecutionException: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector config {log4j.loggers=org.reflections=ERROR, config.storage.topic=mongodb-sink.config, rest.advertised.host.name=radar-mongodb-connector, status.storage.topic=mongodb-sink.status, group.id=mongodb-sink-consumer-group-id, plugin.path=/usr/share/java/kafka-connect/plugins, bootstrap.servers=PLAINTEXT://kafka-kafka.kafka.svc.cluster.local:9092, internal.key.converter.schemas.enable=false, rest.port=8083, internal.key.converter=org.apache.kafka.connect.json.JsonConverter, value.converter.schema.registry.url=http://schema-registry-schema-registry.schema-registry.svc.cluster.local:8000, offset.storage.file.filename=/tmp/mongodb-sink.offset, zookeeper.connect=kafka-zookeeper.kafka.svc.cluster.local:2181, internal.value.converter.schemas.enable=false, internal.value.converter=org.apache.kafka.connect.json.JsonConverter, offset.storage.topic=mongodb-sink.offsets, value.converter=io.confluent.connect.avro.AvroConverter, key.converter=org.apache.kafka.connect.converters.ByteArrayConverter, key.converter.schema.registry.url=http://schema-registry-schema-registry.schema-registry.svc.cluster.local:8000} contains no connector type
    at org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:79)
    at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:66)
    at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:110)
Caused by: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector config {log4j.loggers=org.reflections=ERROR, config.storage.topic=mongodb-sink.config, rest.advertised.host.name=radar-mongodb-connector, status.storage.topic=mongodb-sink.status, group.id=mongodb-sink-consumer-group-id, plugin.path=/usr/share/java/kafka-connect/plugins, bootstrap.servers=PLAINTEXT://kafka-kafka.kafka.svc.cluster.local:9092, internal.key.converter.schemas.enable=false, rest.port=8083, internal.key.converter=org.apache.kafka.connect.json.JsonConverter, value.converter.schema.registry.url=http://schema-registry-schema-registry.schema-registry.svc.cluster.local:8000, offset.storage.file.filename=/tmp/mongodb-sink.offset, zookeeper.connect=kafka-zookeeper.kafka.svc.cluster.local:2181, internal.value.converter.schemas.enable=false, internal.value.converter=org.apache.kafka.connect.json.JsonConverter, offset.storage.topic=mongodb-sink.offsets, value.converter=io.confluent.connect.avro.AvroConverter, key.converter=org.apache.kafka.connect.converters.ByteArrayConverter, key.converter.schema.registry.url=http://schema-registry-schema-registry.schema-registry.svc.cluster.local:8000} contains no connector type
    at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:250)
    at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:164)
    at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:107)
alexjg commented 5 years ago

I've done some investivating and managed to fix this the above issue by making sure the following environment variable is set in the image:

CONNECTOR_PROPERTY_FILE_PREFIX=<whatever your config filename (without the .properties extension) is>

If you're upgrading from 0.2.2 then setting it to sink should do the trick.

Once this is done replication works like a charm.

This environment variable is undocumented though and it's not ideal to depend on the dev version for replication. Is there anything stopping the work on dev from being merged?