aseigneurin / kafka-streams-scala

Thin Scala wrapper for the Kafka Streams API
Apache License 2.0
50 stars 11 forks source link

Support kafka-0.11.0.1 #3

Open frgomes opened 7 years ago

frgomes commented 7 years ago

I've tryed to change Kafka version and some compilation errors arised, such as:

[error] /home/rgomes/workspace/kafka-streams-scala/src/main/scala/com/github/aseigneurin/kafka/streams/scala/KGroupedTableS.scala:18: overloaded method value mapValues with alternatives:
[error]   (x$1: org.apache.kafka.streams.kstream.ValueMapper[_ >: java.lang.Long, _ <: scala.Long],x$2: org.apache.kafka.common.serialization.Serde[scala.Long],x$3: org.apache.kafka.streams.processor.StateStoreSupplier[org.apache.kafka.streams.state.KeyValueStore[_, _]])org.apache.kafka.streams.kstream.KTable[K,scala.Long] <and>
[error]   (x$1: org.apache.kafka.streams.kstream.ValueMapper[_ >: java.lang.Long, _ <: scala.Long],x$2: org.apache.kafka.common.serialization.Serde[scala.Long],x$3: String)org.apache.kafka.streams.kstream.KTable[K,scala.Long] <and>
[error]   (x$1: org.apache.kafka.streams.kstream.ValueMapper[_ >: java.lang.Long, _ <: scala.Long])org.apache.kafka.streams.kstream.KTable[K,scala.Long]
[error]  cannot be applied to (scala.Long => java.lang.Long)
[error]       .mapValues[Long](javaLong => Long.box(javaLong))

Apparently, we will have to switch to multiple modules if we intend to support multiple Kafka versions. For the time being, I would like to try Kafka 0.11.0.1 first, postponing support for multiple modules. Any idea what needs to be done in order to fix these compilation errors?

frgomes commented 7 years ago

I guess the error is due to changes on API from 0.10 to 0.11. In 0.10 we have something like this:

<K1,V1> KStream<K1,V1>  map(KeyValueMapper<K,V,KeyValue<K1,V1>> mapper)

... whilst on 0.11 we have this:

<KR,VR> KStream<KR,VR>  map(KeyValueMapper<? super K,? super V,? extends KeyValue<? extends KR,? extends VR>> mapper)
frgomes commented 7 years ago

It's necessary to replace code such as:

  def count(storeName: String): KTableS[K, Long] = {
    inner.count(storeName)
      .mapValues[Long](javalong => Long.box(javalong))
  }

... by something like:

  def count(storeName: String): KTableS[K, Long] = {
    inner.count(storeName)
      .mapValues[Long](new ValueMapper[java.lang.Long, scala.Long] {
      override def apply(value: java.lang.Long): scala.Long = Long.box(value)
    })
  }
frgomes commented 7 years ago

It is implemented at https://github.com/frgomes/kafka-streams-scala/tree/support-kafka-0.11 However, it is still necessary to create distinct modules for Kafka 0.10 and Kafka 0.11

aseigneurin commented 7 years ago

Ok, I will review this and get back to you. Thanks.

frgomes commented 7 years ago

@aseigneurin : I've tried with Scala-2.12.3 in the hope that it was some sort of interoperability problem solved as part of better employing SAM in Java8, but the issue remains. I suppose that the issue is related to the use of wildcards, which changed from Kafka 0.10 to Kafka 0.11, which prevents the compiler to understand that a SAM could be employed in such situation.

frgomes commented 7 years ago

@aseigneurin : I've added support for cross compilation to Scala 2.11 and Scala 2.12.

Apart from that, I think that makes sense to support multiple versions of kafka-streams. In order to do that, we probably would have to employ sbt-crossproject and create multiple code trees, one for each version of kafka-streams we are going to support, each of those having their own specific dependencies.