lensesio / stream-reactor

A collection of open source Apache 2.0 Kafka Connector maintained by Lenses.io.
https://lenses.io
Apache License 2.0
1.01k stars 364 forks source link

Different Google guava versions #262

Closed hleb-albau closed 7 years ago

hleb-albau commented 7 years ago

I want to use your kafka cassandra connector with docker confluentinc/cp-kafka-connect:3.3.0 image. When i run this image without your connector, its started successfully. If i add it to classpath kafka-connect-cassandra-0.3.0-3.3.0-all.jar it throws error.

Exception in thread "main" java.lang.NoSuchMethodError: com.google.common.collect.Sets$SetView.iterator()Lcom/google/common/collect/UnmodifiableIterator;
    at org.reflections.Reflections.expandSuperTypes(Reflections.java:380)
    at org.reflections.Reflections.<init>(Reflections.java:126)
    at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:221)
    at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:198)
        at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:159)
    at org.apache.kafka.connect.runtime.isolation.Plugins.<init>(Plugins.java:47)
    at org.apache.kafka.connect.cli.ConnectDistributed.main(ConnectDistributed.java:63)

I completely deleted guava from this jar, repack it and provide to kafka connect. Than it starts ok, but if i start you tasks it fails with error:


java.lang.NoSuchMethodError: com.google.common.util.concurrent.Futures.transform(Lcom/google/common/util/concurrent/ListenableFuture;Lcom/google/common/util/concurrent/AsyncFunction;Ljava/util/concurrent/Executor;)Lcom/google/common/util/concurrent/ListenableFuture;
    at com.datastax.driver.core.Connection.initAsync(Connection.java:173)
    at com.datastax.driver.core.Connection$Factory.open(Connection.java:731)
    at com.datastax.driver.core.ControlConnection.tryConnect(ControlConnection.java:251)
    at com.datastax.driver.core.ControlConnection.reconnectInternal(ControlConnection.java:199)
    at com.datastax.driver.core.ControlConnection.connect(ControlConnection.java:77)
    at com.datastax.driver.core.Cluster$Manager.init(Cluster.java:1414)
    at com.datastax.driver.core.Cluster.init(Cluster.java:162)
    at com.datastax.driver.core.Cluster.connectAsync(Cluster.java:333)
    at com.datastax.driver.core.Cluster.connect(Cluster.java:283)
    at com.datamountaineer.streamreactor.connect.cassandra.CassandraConnection$.getSession(CassandraConnection.scala:65)
    at com.datamountaineer.streamreactor.connect.cassandra.CassandraConnection$.apply(CassandraConnection.scala:35)
    at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraWriter$$anonfun$1.apply(CassandraWriter.scala:32)
    at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraWriter$$anonfun$1.apply(CassandraWriter.scala:32)
    at scala.util.Try$.apply(Try.scala:192)
    at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraWriter$.apply(CassandraWriter.scala:32)
    at com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkTask.start(CassandraSinkTask.scala:58)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.initializeAndStart(WorkerSinkTask.java:232)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:145)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:146)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:190)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)```
andrewstevenson commented 7 years ago

We have a release for 3.3 pending, Confluent 3.3 adds classloader isolation, try setting the plugin.dir in connect-avro-distributed.properties to a folder, build the latest from master and drop the jar in there. Restart connect. Our integration tests are passing, alternatively do export CLASSPATH=cassandra.jar

hleb-albau commented 7 years ago

I don't get a point. What i should i rebuild and where i can find this property file?

I start confluent kafka connect using docker compose

  kafka-connect:
    network_mode: host
    image: confluentinc/cp-kafka-connect:3.3.0
    environment:

      CONNECT_REST_ADVERTISED_HOST_NAME: "localhost"
      CONNECT_REST_ADVERTISED_PORT: "8083"
      CONNECT_REST_PORT: 8083

      CONNECT_BOOTSTRAP_SERVERS: localhost:9092
      CONNECT_ZOOKEEPER_CONNECT: localhost:2181

      CONNECT_GROUP_ID: "cybernode"
      CONNECT_CONFIG_STORAGE_TOPIC: "cybernode.config"
      CONNECT_OFFSET_STORAGE_TOPIC: "cybernode.offsets"
      CONNECT_STATUS_STORAGE_TOPIC: "cybernode.status"
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 5000

      CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_KEY_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"
      CONNECT_INTERNAL_VALUE_CONVERTER: "org.apache.kafka.connect.json.JsonConverter"

      CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"

      CONNECT_ACCESS_CONTROL_ALLOW_ORIGIN: '*'
      CONNECT_ACCESS_CONTROL_ALLOW_METHODS: 'GET,OPTIONS,HEAD,POST,PUT,DELETE'
    volumes:
      - ../connectors/ethereum/build/libs/connect/:/etc/kafka-connect/jars

Also i already build your master using ./gradlew. I just copy kafka-connect-cassandra-0.3.0-3.3.0-all.jar to this location ../connectors/ethereum/build/libs/connect/

Thanks, Hleb Albau

andrewstevenson commented 7 years ago

Kafka Connect in Kafka 0.11 has classloader isolation to resolve these issues. Set CONNECT_PLUGINS_DIR to this volume.

hleb-albau commented 7 years ago

@andrewstevenson I want to say big thanks for your fast answers, you save a lot of my time.

Finally, i managed it. I found at kafka connector docs that, we should use plugin.path property. So, I just added to my compose file and it started!

    environment:
       CONNECT_PLUGIN_PATH: /etc/kafka-connect/custom-plugins
    volumes:
      - ../connectors/ethereum/build/libs/connect/:/etc/kafka-connect/custom-plugins
andrewstevenson commented 7 years ago

May we ask your use case?

hleb-albau commented 7 years ago

Yes, sure.

I have wrote own source connectors for some blockchains system. After that i need Cassandra sink connector to drop some information from kafka topics(raw blocks, some analytics and etc).

Here you can find our repo, all our development is open.

andrewstevenson commented 7 years ago

Nice, we have a connector for BlockChain.info, and it's kotlin, one of our developers @stheppi likes kotlin, he even wrote a book https://www.amazon.com/Programming-Kotlin-Stephen-Samuel/dp/1787126366

If you interested in a joint blog let us know. Send me a mail at andrew@datamountaineer.com or join our slack https://slackpass.io/datamountaineers

andrewstevenson commented 7 years ago

also checkout http://kafka-lenses.io/#

Antwnis commented 7 years ago

Also we are happy with PRs - if you feel like bringing the etherium connector to this Apache 2.0 project - as we also make sure we do integrations tests and package everything and making it available to kafka-lenses and https://github.com/Landoop/fast-data-dev ..just a suggestion

hleb-albau commented 7 years ago

Thanks for your project. Yes, we will contribute later, after stabilization. Our plan is to define common architecture for blockchains connectors, after that write concrete connectors using this 'base connector'.

hleb-albau commented 7 years ago

Also, I suggest you to rename blockchain connector to more specific like 'blockchain_bitcoinblockchaininfo'. Template is blockchain_.

Antwnis commented 7 years ago

Thanks i've filled in #263 in order to rename connector into appropriate template This will be done in the next release, as we are in the process of releasing the entire collection for Kafka 0.10.2 and Kafka 0.11 right now :)