DataReply / kafka-connect-mongodb

Apache License 2.0
129 stars 61 forks source link

Copying connect-mongodb-1.0.jar vs. connect-mongodb-1.0-jar-with-dependencies.jar to $CONFLUENT_HOME/share/java/confluent-common - different issues #11

Open camelia-c opened 8 years ago

camelia-c commented 8 years ago

Hello,

I use kafka-connect-mongo with Confluent 3.0.0 and MongoDB 3.2.8.

CASE 1:

If I copy connect-mongodb-1.0.jar to $CONFLUENT_HOME/share/java/confluent-common then I can run fine Zookeeper, Kafka server and SchemaRegistry. But:

$CONFLUENT_HOME/bin/connect-standalone $CONFLUENT_HOME/etc/kafka/connect-standalone.properties $CONFLUENT_HOME/etc/kafka/connect-file-sink.properties

gives the error "java.lang.NoClassDefFoundError: org/bson/conversions/Bson"

Here is the extended output:

... [2016-08-25 14:31:24,095] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:58) [2016-08-25 14:31:24,099] INFO ConnectorConfig values: connector.class = org.apache.kafka.connect.mongodb.MongodbSinkConnector tasks.max = 3 name = mongodb-sink-connector (org.apache.kafka.connect.runtime.ConnectorConfig:178) [2016-08-25 14:31:24,105] INFO Creating connector mongodb-sink-connector of type org.apache.kafka.connect.mongodb.MongodbSinkConnector (org.apache.kafka.connect.runtime.Worker:168) [2016-08-25 14:31:24,107] INFO Instantiated connector mongodb-sink-connector with version 0.10.0.0-cp1 of type org.apache.kafka.connect.mongodb.MongodbSinkConnector (org.apache.kafka.connect.runtime.Worker:176) [2016-08-25 14:31:24,111] INFO Finished creating connector mongodb-sink-connector (org.apache.kafka.connect.runtime.Worker:181) [2016-08-25 14:31:24,111] INFO SinkConnectorConfig values: connector.class = org.apache.kafka.connect.mongodb.MongodbSinkConnector tasks.max = 3 topics = [railstations_topic2] name = mongodb-sink-connector (org.apache.kafka.connect.runtime.SinkConnectorConfig:178) [2016-08-25 14:31:24,117] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:100) java.lang.NoClassDefFoundError: org/bson/conversions/Bson at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:348) at org.apache.kafka.common.config.ConfigDef.parseType(ConfigDef.java:663) at org.apache.kafka.common.config.ConfigDef.parse(ConfigDef.java:418) at org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:55) at org.apache.kafka.common.config.AbstractConfig.(AbstractConfig.java:62) at org.apache.kafka.connect.runtime.TaskConfig.(TaskConfig.java:52) at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.createConnectorTasks(StandaloneHerder.java:275) at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.updateConnectorTasks(StandaloneHerder.java:308) at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:165) at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:94) Caused by: java.lang.ClassNotFoundException: org.bson.conversions.Bson at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 11 more [2016-08-25 14:31:24,120] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect:68) ... [2016-08-25 14:31:24,168] ERROR Task not found: mongodb-sink-connector-0 (org.apache.kafka.connect.runtime.Worker:416) Exception in thread "main" org.apache.kafka.connect.errors.ConnectException: Task not found: mongodb-sink-connector-0 at org.apache.kafka.connect.runtime.Worker.getTask(Worker.java:417) at org.apache.kafka.connect.runtime.Worker.stopTasks(Worker.java:373) at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.removeConnectorTasks(StandaloneHerder.java:290) at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.stop(StandaloneHerder.java:83) at org.apache.kafka.connect.runtime.Connect.stop(Connect.java:71) at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:101) [2016-08-25 14:32:03,114] INFO Reflections took 40002 ms to scan 261 urls, producing 12948 keys and 85634 values (org.reflections.Reflections:229)

CASE 2:

If I copy connect-mongodb-1.0-jar-with-dependencies.jar to $CONFLUENT_HOME/share/java/confluent-common then I can run fine Zookeeper and Kafka server, but SchemaRegistry fails to start:

[2016-08-25 13:44:43,184] ERROR Server died unexpectedly: (io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain:51) kafka.common.KafkaException: Failed to parse the broker info from zookeeper: {"jmx_port":-1,"timestamp":"1472124710146","endpoints":["PLAINTEXT://localhost:9092"],"host":"localhost","version":3,"port":9092} at kafka.cluster.Broker$.createBroker(Broker.scala:101) at kafka.utils.ZkUtils.getBrokerInfo(ZkUtils.scala:787) at kafka.utils.ZkUtils$$anonfun$getAllBrokersInCluster$2.apply(ZkUtils.scala:162) at kafka.utils.ZkUtils$$anonfun$getAllBrokersInCluster$2.apply(ZkUtils.scala:162) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at kafka.utils.ZkUtils.getAllBrokersInCluster(ZkUtils.scala:162) at io.confluent.kafka.schemaregistry.storage.KafkaStore.(KafkaStore.java:109) at io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry.(KafkaSchemaRegistry.java:136) at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.setupResources(SchemaRegistryRestApplication.java:53) at io.confluent.kafka.schemaregistry.rest.SchemaRegistryRestApplication.setupResources(SchemaRegistryRestApplication.java:37) at io.confluent.rest.Application.createServer(Application.java:117) at io.confluent.kafka.schemaregistry.rest.SchemaRegistryMain.main(SchemaRegistryMain.java:43) Caused by: java.lang.NoSuchMethodError: org.apache.kafka.common.protocol.SecurityProtocol.forName(Ljava/lang/String;)Lorg/apache/kafka/common/protocol/SecurityProtocol; at kafka.cluster.EndPoint$.createEndPoint(EndPoint.scala:49) at kafka.cluster.Broker$$anonfun$1.apply(Broker.scala:90) at kafka.cluster.Broker$$anonfun$1.apply(Broker.scala:89) at scala.collection.immutable.List.map(List.scala:273) at kafka.cluster.Broker$.createBroker(Broker.scala:89) ... 16 more

Please give me some advice on how to finalize successfully this task.

Best regards, Camelia

camelia-c commented 8 years ago

Update:

For CASE 1

I manually copied the JARs bson-3.3.0.jar, mongodb-driver-3.3.0.jar , mongodb-driver-core-3.3.0.jar into $CONFLUENT_HOME/share/java/confluent-common , so now the first issue was replaced by the following:

[2016-08-25 15:30:58,050] INFO Successfully joined group connect-mongodb-sink-connector with generation 1 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator:434) [2016-08-25 15:30:58,050] INFO Setting newly assigned partitions [railstations_topic2-0] for group connect-mongodb-sink-connector (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:219) [2016-08-25 15:30:58,127] INFO WorkerSinkTask{id=mongodb-sink-connector-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSinkTask:244) [2016-08-25 15:30:58,143] ERROR Task mongodb-sink-connector-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142) org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:328) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:345) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:226) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Illegal character ((CTRL-CHAR, code 0)): only regular white space (\r, \n, \t) is allowed between tokens at [Source: [B@7f9cf23c; line: 1, column: 2] Caused by: com.fasterxml.jackson.core.JsonParseException: Illegal character ((CTRL-CHAR, code 0)): only regular white space (\r, \n, \t) is allowed between tokens at [Source: [B@7f9cf23c; line: 1, column: 2] at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1487) at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:518) at com.fasterxml.jackson.core.base.ParserMinimalBase._throwInvalidSpace(ParserMinimalBase.java:469) at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipWSOrEnd(ReaderBasedJsonParser.java:1873) at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextToken(ReaderBasedJsonParser.java:571) at com.fasterxml.jackson.databind.ObjectMapper._initForReading(ObjectMapper.java:3604) at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3549) at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2161) at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:50) at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:326) at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:345) at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:226) at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170) at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) [2016-08-25 15:30:58,149] ERROR Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:143) [2016-08-25 15:31:09,312] INFO Reflections took 12868 ms to scan 264 urls, producing 13153 keys and 86677 values (org.reflections.Reflections:229)

Hope that I'm getting closer to solving the problem, Camelia