akka / alpakka-kafka

Alpakka Kafka connector - Alpakka is a Reactive Enterprise Integration library for Java and Scala, based on Reactive Streams and Akka.
https://doc.akka.io/docs/alpakka-kafka/current/home.html
Other
1.42k stars 387 forks source link

Support version 0.10.0 of Kafka #160

Closed RodrigoLimasss closed 8 years ago

RodrigoLimasss commented 8 years ago

Hi guys.

In the Readme says "New API: 0.11-M3 Supports Kafka 0.9.0.x"

This client/driver supports the new version of Kafka 0.10.0?

Thanks

kciesielski commented 8 years ago

@RodrigoLimasss 0.9 client should work with 0.10 broker, so akka-kafka 0.11 should be compatible with Kafka 0.10. After some initial checks we see that it is so.

xelax commented 8 years ago

I compiled with kakfa 0.10.0 and there were a couple of small changes needed to take care of API changes and deprecations in the tests. There is still a problem with the integration test since the dependency "net.manub" %% "scalatest-embedded-kafka" % "0.5.0" % "test" does not have a 0.10.0 version yet, so the integration test fails.

diff --git a/build.sbt b/build.sbt
index 59ccba7..06ad1f7 100644
--- a/build.sbt
+++ b/build.sbt
@@ -4,7 +4,7 @@ import de.heikoseeberger.sbtheader.HeaderPattern
 name := "akka-stream-kafka"

 val akkaVersion = "2.4.6"
-val kafkaVersion = "0.9.0.1"
+val kafkaVersion = "0.10.0.0"

 val kafkaClients = "org.apache.kafka" % "kafka-clients" % kafkaVersion

diff --git a/core/src/main/scala/akka/kafka/internal/ConsumerStage.scala b/core/src/main/scala/akka/kafka/internal/ConsumerStage.scala
index 838c83e..5e3916b 100644
--- a/core/src/main/scala/akka/kafka/internal/ConsumerStage.scala
+++ b/core/src/main/scala/akka/kafka/internal/ConsumerStage.scala
@@ -387,9 +387,9 @@ private[kafka] abstract class ConsumerStageLogic[K, V, Out](
         // note that consumer.assignment() is automatically changed when using
         // dynamic subscriptions and we must use the current value to resume/pause
         if (isAvailable(out))
-          consumer.resume(consumer.assignment().asScala.toArray: _*)
+          consumer.resume(consumer.assignment())
         else
-          consumer.pause(consumer.assignment().asScala.toArray: _*)
+          consumer.pause(consumer.assignment())
       }

       def handleResult(records: ConsumerRecords[K, V]) = {
diff --git a/core/src/test/scala/akka/kafka/internal/ProducerTest.scala b/core/src/test/scala/akka/kafka/internal/ProducerTest.scala
index 4288585..32a6539 100644
--- a/core/src/test/scala/akka/kafka/internal/ProducerTest.scala
+++ b/core/src/test/scala/akka/kafka/internal/ProducerTest.scala
@@ -30,6 +30,7 @@ import org.mockito.stubbing.Answer
 import org.mockito.verification.VerificationMode
 import org.scalatest.{FlatSpecLike, Matchers}
 import org.scalatest.BeforeAndAfterAll
+import org.apache.kafka.common.record.Record

 class ProducerTest(_system: ActorSystem)
     extends TestKit(_system)
@@ -53,7 +54,7 @@ class ProducerTest(_system: ActorSystem)

   def recordAndMetadata(seed: Int) = {
     new ProducerRecord("test", seed.toString, seed.toString) ->
-      new RecordMetadata(new TopicPartition("test", seed), seed.toLong, seed.toLong)
+      new RecordMetadata(new TopicPartition("test", seed), seed.toLong, seed.toLong, Record.NO_TIMESTAMP, -1, -1, -1)
   }

   def toMessage(tuple: (Record, RecordMetadata)) = Message(tuple._1, NotUsed)
kciesielski commented 8 years ago

Another problem is that Kafka Broker 0.9.0 does not support client 0.10, so we want probably to hold these changes until the first release.

patriknw commented 8 years ago

@xelax @kciesielski Do I understand it correctly that we can make it work with 0.10 broker and still use 0.9 client library? Are there any changes that we need to do to make that work? I have heard some interest of using 0.10 broker.

kciesielski commented 8 years ago

@patriknw That's how I understand it https://groups.google.com/forum/#!msg/confluent-platform/jdv10HahRd8/l-5twegkIAAJ Probably some features won't be available (like timestamps).

iravid commented 8 years ago

I'd just like to chime in and mention that a 0.10 client version would be very appreciated; the 0.9 of the Kafka client had some annoying bugs that wee fixed in 0.10 (spurious IllegalStateExceptions being thrown from the producer if retries are enabled, for example)

On 27 Jun 2016, 19:15 +0300, Krzysiek Ciesielskinotifications@github.com, wrote:

@patriknw(https://github.com/patriknw)That's how I understand ithttps://groups.google.com/forum/#!msg/confluent-platform/jdv10HahRd8/l-5twegkIAAJ Probably some features won't be available (like timestamps).

— You are receiving this because you are subscribed to this thread. Reply to this email directly,view it on GitHub(https://github.com/akka/reactive-kafka/issues/160#issuecomment-228794963), ormute the thread(https://github.com/notifications/unsubscribe/AARSxKzv0veArVtSxbXKpJIh7OmU53kqks5qP_czgaJpZM4I7EJQ).

13h3r commented 8 years ago

I see two ways right now:

Sure it is possible only if APIs is compatible

iravid commented 8 years ago

I assumed that since the 0.11 version of reactive-Kafka is still in flux and being developed, there are no compatibility guarantuees in effect and you'll be able to switch dependency versions at will :-)

Am I assuming wrong?

RodrigoLimasss commented 8 years ago

Hi guys, I did a downgrade to Kafka 0.9.0 and reactivekafka client 0.10.0, it works fine.

I will wait the final version 0.11.0 to upgrade my Kafka to 0.10.0

Thanks

xelax commented 8 years ago

I fear that a fork might be needed to support kafka 0.10.0, since it would be nice to take advantage of the new timestamp API, but probably a lot of folks will be running kafka 0.9.0 for a while as well.

13h3r commented 8 years ago

I think we should move to 0.10 at least in master branch. We could easy provide a backport for 0.9 if someone requires it. WDYT @kciesielski @patriknw @drewhk?

patriknw commented 8 years ago

I agree that we should use the latest in master.

xelax commented 8 years ago

I also just noticed that https://github.com/manub/scalatest-embedded-kafka got updated to kakfa 0.10