jpzk / mockedstreams

Scala DSL for Unit-Testing Processing Topologies in Kafka Streams
Apache License 2.0
187 stars 24 forks source link

java.lang.ClassCastException in simple kafka stream : convert string to upper case with mapValues #25

Closed thomas-chauvet closed 5 years ago

thomas-chauvet commented 6 years ago

I am trying to unit test Kafka Stream with the library.

To test a simple example, I just want to convert string into upper case.

I try the code below :

import com.madewithtea.mockedstreams.MockedStreams
import org.apache.kafka.common.serialization.Serdes
import org.scalatest.{Matchers, WordSpec}

class mockedStreamsSpec extends WordSpec with Matchers {

  val input = Seq(("x", "foo"), ("y", "bar"))
  val exp = Seq(("x", "FOO"), ("y", "BAR"))

  "Put in upper case " should {
    "always return value in upper case" in {
      MockedStreams()
        .topology {
          builder =>
            builder.stream[String, String]("topic-in")
              .mapValues[String](_.toUpperCase())
              .to("topic-out")
        }
        .input("topic-in", Serdes.String(), Serdes.String(), input)
        .output("topic-out", Serdes.String(), Serdes.String(), exp.size) shouldEqual exp
    }
  }
}

I get an error avout java.lang.String :

[info] mockedStreamsSpec:
[info] Put in upper case
[info] - should always return value in upper case *** FAILED ***
  [info]   java.lang.ClassCastException: [B cannot be cast to java.lang.String
  [info]   at org.apache.kafka.streams.kstream.internals.KStreamMapValues$KStreamMapProcessor.process(KStreamMapValues.java:40)
[info]   at org.apache.kafka.streams.processor.internals.ProcessorNode$1.run(ProcessorNode.java:46)
[info]   at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
[info]   at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:124)
[info]   at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:85)
[info]   at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:80)
[info]   at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:216)
[info]   at org.apache.kafka.test.ProcessorTopologyTestDriver.process(ProcessorTopologyTestDriver.java:276)
[info]   at org.apache.kafka.test.ProcessorTopologyTestDriver.process(ProcessorTopologyTestDriver.java:315)
[info]   at com.madewithtea.mockedstreams.MockedStreams$Builder.$anonfun$produce$1(MockedStreams.scala:110)

It seems weird, because Scala String and Java String should be identical.

blakeney commented 6 years ago

I encountered a similar problem, and it turned out to be a serialization issue. The topology was picking up values as byte arrays when I wanted strings. I solved it by passing in a configuration when building MockedStreams. For example:

import com.madewithtea.mockedstreams.MockedStreams
import org.apache.kafka.streams.StreamsConfig
import org.apache.kafka.common.serialization.Serdes

val strings = Serdes.String()
val props = new java.util.Properties();
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, strings.getClass().getName())
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, strings.getClass().getName())

val mockedStreams = MockedStreams()
      .config(props)
      // Set topology, inputs, and outputs here
schmiegelow commented 6 years ago

Indeed, specifying Serdes.String doesn't propagate properly, it assumes Array[Byte] no matter what.

thomas-chauvet commented 6 years ago

Thanks for your answer @blakeney I will try it !

jpzk commented 6 years ago

Can we close it? :)