wix-incubator / kafka-connect-s3

A Kafka-Connect Sink for S3 with no Hadoop dependencies.
Other
57 stars 45 forks source link

Kafka 0.10.0.0+ support #14

Open suizman opened 8 years ago

suizman commented 8 years ago

I'm trying to run the plugin against Kafka 0.10.0.1 and i'm getting this error:

$ bin/connect-standalone.sh config/connect-standalone.properties example-connect-s3-sink.properties
[2016-10-03 17:30:26,809] INFO StandaloneConfig values: 
    cluster = connect
    rest.advertised.host.name = null
    task.shutdown.graceful.timeout.ms = 5000
    rest.host.name = null
    rest.advertised.port = null
    bootstrap.servers = [localhost:9092]
    offset.flush.timeout.ms = 5000
    offset.flush.interval.ms = 10000
    rest.port = 8083
    internal.key.converter = class org.apache.kafka.connect.json.JsonConverter
    access.control.allow.methods = 
    access.control.allow.origin = 
    offset.storage.file.filename = /tmp/connect.offsets
    internal.value.converter = class org.apache.kafka.connect.json.JsonConverter
    value.converter = class org.apache.kafka.connect.json.JsonConverter
    key.converter = class org.apache.kafka.connect.json.JsonConverter
 (org.apache.kafka.connect.runtime.standalone.StandaloneConfig:165)
[2016-10-03 17:30:26,935] INFO Logging initialized @345ms (org.eclipse.jetty.util.log:186)
[2016-10-03 17:30:27,113] INFO Kafka Connect starting (org.apache.kafka.connect.runtime.Connect:52)
[2016-10-03 17:30:27,114] INFO Herder starting (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:71)
[2016-10-03 17:30:27,114] INFO Worker starting (org.apache.kafka.connect.runtime.Worker:102)
[2016-10-03 17:30:27,120] INFO ProducerConfig values: 
    compression.type = none
    metric.reporters = []
    metadata.max.age.ms = 300000
    metadata.fetch.timeout.ms = 60000
    reconnect.backoff.ms = 50
    sasl.kerberos.ticket.renew.window.factor = 0.8
    bootstrap.servers = [localhost:9092]
    retry.backoff.ms = 100
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    buffer.memory = 33554432
    timeout.ms = 30000
    key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    ssl.keystore.type = JKS
    ssl.trustmanager.algorithm = PKIX
    block.on.buffer.full = false
    ssl.key.password = null
    max.block.ms = 9223372036854775807
    sasl.kerberos.min.time.before.relogin = 60000
    connections.max.idle.ms = 540000
    ssl.truststore.password = null
    max.in.flight.requests.per.connection = 1
    metrics.num.samples = 2
    client.id = 
    ssl.endpoint.identification.algorithm = null
    ssl.protocol = TLS
    request.timeout.ms = 2147483647
    ssl.provider = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    acks = all
    batch.size = 16384
    ssl.keystore.location = null
    receive.buffer.bytes = 32768
    ssl.cipher.suites = null
    ssl.truststore.type = JKS
    security.protocol = PLAINTEXT
    retries = 2147483647
    max.request.size = 1048576
    value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
    ssl.truststore.location = null
    ssl.keystore.password = null
    ssl.keymanager.algorithm = SunX509
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    send.buffer.bytes = 131072
    linger.ms = 0
 (org.apache.kafka.clients.producer.ProducerConfig:165)
[2016-10-03 17:30:27,140] INFO Kafka version : 0.9.0.1 (org.apache.kafka.common.utils.AppInfoParser:82)
[2016-10-03 17:30:27,140] INFO Kafka commitId : 23c69d62a0cabf06 (org.apache.kafka.common.utils.AppInfoParser:83)
[2016-10-03 17:30:27,141] INFO Starting FileOffsetBackingStore with file /tmp/connect.offsets (org.apache.kafka.connect.storage.FileOffsetBackingStore:60)
[2016-10-03 17:30:27,144] INFO Worker started (org.apache.kafka.connect.runtime.Worker:124)
[2016-10-03 17:30:27,144] INFO Herder started (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:73)
[2016-10-03 17:30:27,145] INFO Starting REST server (org.apache.kafka.connect.runtime.rest.RestServer:98)
[2016-10-03 17:30:27,235] INFO jetty-9.2.15.v20160210 (org.eclipse.jetty.server.Server:327)
Oct 03, 2016 5:30:28 PM org.glassfish.jersey.internal.Errors logErrors
WARNING: The following warnings have been detected: WARNING: The (sub)resource method listConnectors in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method createConnector in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method listConnectorPlugins in org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource contains empty path annotation.
WARNING: The (sub)resource method serverInfo in org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty path annotation.

[2016-10-03 17:30:28,069] INFO Started o.e.j.s.ServletContextHandler@5829e4f4{/,null,AVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:744)
[2016-10-03 17:30:28,079] INFO Started ServerConnector@655ef322{HTTP/1.1}{0.0.0.0:8083} (org.eclipse.jetty.server.ServerConnector:266)
[2016-10-03 17:30:28,079] INFO Started @1492ms (org.eclipse.jetty.server.Server:379)
[2016-10-03 17:30:28,081] INFO REST server listening at http://127.0.0.1:8083/, advertising URL http://127.0.0.1:8083/ (org.apache.kafka.connect.runtime.rest.RestServer:150)
[2016-10-03 17:30:28,081] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:58)
[2016-10-03 17:30:28,083] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:100)
java.lang.NoSuchMethodError: org.apache.kafka.common.config.ConfigDef.define(Ljava/lang/String;Lorg/apache/kafka/common/config/ConfigDef$Type;Lorg/apache/kafka/common/config/ConfigDef$Importance;Ljava/lang/String;Ljava/lang/String;ILorg/apache/kafka/common/config/ConfigDef$Width;Ljava/lang/String;)Lorg/apache/kafka/common/config/ConfigDef;
    at org.apache.kafka.connect.runtime.ConnectorConfig.configDef(ConnectorConfig.java:64)
    at org.apache.kafka.connect.runtime.ConnectorConfig.<init>(ConnectorConfig.java:75)
    at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.startConnector(StandaloneHerder.java:246)
    at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:164)
    at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:94)
[2016-10-03 17:30:28,087] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect:68)
[2016-10-03 17:30:28,088] INFO Stopping REST server (org.apache.kafka.connect.runtime.rest.RestServer:154)
[2016-10-03 17:30:28,092] INFO Stopped ServerConnector@655ef322{HTTP/1.1}{0.0.0.0:8083} (org.eclipse.jetty.server.ServerConnector:306)
[2016-10-03 17:30:28,101] INFO Stopped o.e.j.s.ServletContextHandler@5829e4f4{/,null,UNAVAILABLE} (org.eclipse.jetty.server.handler.ContextHandler:865)
[2016-10-03 17:30:28,103] INFO REST server stopped (org.apache.kafka.connect.runtime.rest.RestServer:165)
[2016-10-03 17:30:28,103] INFO Herder stopping (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:77)
[2016-10-03 17:30:28,104] INFO Worker stopping (org.apache.kafka.connect.runtime.Worker:128)
[2016-10-03 17:30:28,104] WARN Shutting down tasks [] uncleanly; herder should have shut down tasks before the Worker is stopped. (org.apache.kafka.connect.runtime.Worker:141)
[2016-10-03 17:30:28,104] INFO Stopped FileOffsetBackingStore (org.apache.kafka.connect.storage.FileOffsetBackingStore:68)
[2016-10-03 17:30:28,104] INFO Worker stopped (org.apache.kafka.connect.runtime.Worker:151)
[2016-10-03 17:30:29,093] INFO Reflections took 1910 ms to scan 61 urls, producing 3338 keys and 24145 values  (org.reflections.Reflections:229)
[2016-10-03 17:30:29,100] INFO Herder stopped (org.apache.kafka.connect.runtime.standalone.StandaloneHerder:91)
[2016-10-03 17:30:29,100] INFO Kafka Connect stopped (org.apache.kafka.connect.runtime.Connect:73)

It seems that "org.apache.kafka.common.config.ConfigDef.define" method is missing in the new version..

RobinDaugherty commented 7 years ago

It looks like the solution is as simple as changing kafka_version in pom.xml to be the version of Kafka you're using. I changed it to "0.10.1.1" and the next build worked without error.

guptaanamika commented 7 years ago

I am using confluent 2.0.0 version , and getting following error while starting the service java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/List;Lorg/apache/kafka/clients/consumer/ConsumerRebalanceListener;)V at org.apache.kafka.connect.runtime.WorkerSinkTask.joinConsumerGroupAndStart(WorkerSinkTask.java:143) at org.apache.kafka.connect.runtime.WorkerSinkTaskThread.execute(WorkerSinkTaskThread.java:54) at org.apache.kafka.connect.util.ShutdownableThread.run(ShutdownableThread.java:82)

Kafka version used in pom.xml is 0.10.1.1

RobinDaugherty commented 7 years ago

Sorry, my previous comment here was completely incorrect, I don't know how I could possibly have thought it worked when there are definitely SDK changes between 0.9 and 0.10.

However, Kafka 0.10.1.1 is now supported by the master branch of the project.

RobinDaugherty commented 7 years ago

Also, please keep in mind that Kafka 0.10.0 and 0.10.1 have incompatible protocols. If you are using Kafka 0.10.0 on your server, you need to run Kafka 0.10.0 client as well.

suizman commented 7 years ago

@RobinDaugherty you can force kafka server to use old protocols with this 2 params in "server.properties":

inter.broker.protocol.version=0.10.0
log.message.format.version=0.10.0

Anyways i'll try the master this weekend :D

RobinDaugherty commented 7 years ago

@suizman will those affect the protocol used by a producer (like Kafka Connect) connecting to a Kafka broker?

suizman commented 7 years ago

@RobinDaugherty yes, but Kafka Connect producer should be capable of writing the messages with the new 0.10.0+ format