manub / scalatest-embedded-kafka

A library that provides an in-memory Kafka instance to run your tests against.
MIT License
295 stars 87 forks source link

KTable producing empty Streams with runStreams #89

Open jacobBaumbach opened 6 years ago

jacobBaumbach commented 6 years ago

Hello, I am using runStreams in my tests to see whether my KTable is accurately tracking a user session. I am printing the results of my KTable and I am getting the expected output, but the consumer is returning an empty stream. Are KTables currently not supported, am I setting up my tests incorrectly or something else? Thanks a lot for your help!

jiminhsieh commented 6 years ago

@jacobBaumbach I noticed you didn't materialize the data stream at your code. If you add data.to(stringSerdes, stringSerdes, outputTopic), you should get non-empty Stream.

tiboun commented 6 years ago

@jiminhsieh @manub

I have the same issue with the following dependencies:

val embeddedKafka = "net.manub" %% "scalatest-embedded-kafka" % "1.0.0" % Test
val embeddedKafkaStreams = "net.manub" %% "scalatest-embedded-kafka-streams" % "1.0.0" % Test

with kafka version 1.0.0

I reproduced this issue with the following test:

package net.manub.embeddedkafka.streams

import com.lightbend.kafka.scala.streams.{KStreamS, StreamsBuilderS}
import net.manub.embeddedkafka.Codecs._
import net.manub.embeddedkafka.{EmbeddedKafkaConfig, UUIDs}
import org.apache.kafka.common.serialization.{Serde, Serdes}
import org.apache.kafka.common.utils.Bytes
import org.apache.kafka.streams.{Consumed, KafkaStreams, Topology}
import org.apache.kafka.streams.kstream.{Materialized, Produced}
import org.apache.kafka.streams.state.{KeyValueStore, QueryableStoreTypes}
import org.scalatest.{Matchers, WordSpec}

class ExampleKafkaStreamsSpec extends WordSpec with Matchers with EmbeddedKafkaStreamsAllInOne {

  implicit val config =
    EmbeddedKafkaConfig(kafkaPort = 7000, zooKeeperPort = 7001)

  val (inTopic, outTopic) = ("in", "out")

  val stringSerde: Serde[String] = Serdes.String()

  "A Kafka streams test" should {
    "be easy to run with streams and consumer lifecycle management" in {
      val streamBuilder = new StreamsBuilderS
      val stream: KStreamS[String, String] =
        streamBuilder.stream[String, String](inTopic)(Consumed.`with`(stringSerde, stringSerde))

      stream.to(outTopic)(Produced.`with`(stringSerde, stringSerde))
      val materializer =
        Materialized.as[String, String, KeyValueStore[Bytes, Array[Byte]]]("my-store")
      streamBuilder.globalTable(outTopic, materializer)(Consumed.`with`(stringSerde, stringSerde))

      myRunStreams(Seq(inTopic, outTopic), streamBuilder.build()) { kstream =>
        publishToKafka(inTopic, "hello", "world")
        publishToKafka(inTopic, "foo", "bar")
        publishToKafka(inTopic, "baz", "yaz")
        val value = kstream.store("my-store", QueryableStoreTypes.keyValueStore[String, String]())
        value.get("hello") should be("world")
      }
    }

    /**
      * Copy paste from runStreams to expose KafkaStreams
      */
    def myRunStreams(topicsToCreate: Seq[String], topology: Topology, extraConfig: Map[String, AnyRef] = Map.empty)(
        block: KafkaStreams => Any)(implicit config: EmbeddedKafkaConfig): Any =
      withRunningKafka {
        topicsToCreate.foreach(topic => createCustomTopic(topic))
        val streamId = UUIDs.newUuid().toString
        val streams = new KafkaStreams(topology, streamConfig(streamId, extraConfig))
        streams.start()
        try {
          block(streams)
        } finally {
          streams.close()
        }
      }(config)
  }
}

By the way, I have 2 questions :

tiboun commented 6 years ago

If I add consumeFirstMessageFrom[String](outTopic) before getting the value of hello, then it works.

package net.manub.embeddedkafka.streams

import com.lightbend.kafka.scala.streams.{KStreamS, StreamsBuilderS}
import net.manub.embeddedkafka.Codecs._
import net.manub.embeddedkafka.{EmbeddedKafkaConfig, UUIDs}
import org.apache.kafka.common.serialization.{Serde, Serdes}
import org.apache.kafka.common.utils.Bytes
import org.apache.kafka.streams.{Consumed, KafkaStreams, Topology}
import org.apache.kafka.streams.kstream.{Materialized, Produced}
import org.apache.kafka.streams.state.{KeyValueStore, QueryableStoreTypes}
import org.scalatest.{Matchers, WordSpec}

class ExampleKafkaStreamsSpec extends WordSpec with Matchers with EmbeddedKafkaStreamsAllInOne {

  implicit val config =
    EmbeddedKafkaConfig(kafkaPort = 7000, zooKeeperPort = 7001)

  val (inTopic, outTopic) = ("in", "out")

  val stringSerde: Serde[String] = Serdes.String()

  "A Kafka streams test" should {
    "be easy to run with streams and consumer lifecycle management" in {
      val streamBuilder = new StreamsBuilderS
      val stream: KStreamS[String, String] =
        streamBuilder.stream[String, String](inTopic)(Consumed.`with`(stringSerde, stringSerde))

      stream.to(outTopic)(Produced.`with`(stringSerde, stringSerde))
      val materializer =
        Materialized.as[String, String, KeyValueStore[Bytes, Array[Byte]]]("my-store")
      streamBuilder.globalTable(outTopic, materializer)(Consumed.`with`(stringSerde, stringSerde))

      myRunStreams(Seq(inTopic, outTopic), streamBuilder.build()) { kstream =>
        val value = kstream.store("my-store", QueryableStoreTypes.keyValueStore[String, String]())
        publishToKafka(inTopic, "hello", "world")
        publishToKafka(inTopic, "foo", "bar")
        publishToKafka(inTopic, "baz", "yaz")
        println(consumeFirstMessageFrom[String](outTopic))
        value.get("hello") should be("world")
        import scala.collection.JavaConverters._
        value.all().asScala.foreach(println)
      }
    }

    /**
      * Code comming from runStream
      * @param topicsToCreate
      * @param topology
      * @param extraConfig
      * @param block
      * @param config
      * @return
      */
    def myRunStreams(topicsToCreate: Seq[String], topology: Topology, extraConfig: Map[String, AnyRef] = Map.empty)(
        block: KafkaStreams => Any)(implicit config: EmbeddedKafkaConfig): Any =
      withRunningKafka {
        topicsToCreate.foreach(topic => createCustomTopic(topic))
        val streamId = UUIDs.newUuid().toString
        val streams = new KafkaStreams(topology, streamConfig(streamId, extraConfig))
        streams.start()
        try {
          block(streams)
        } finally {
          streams.close()
        }
      }(config)
  }
}
tiboun commented 6 years ago

We finally decided to use Eventually.eventually in scalatest. This resolve the empty stream issue. Maybe you have an alternative to that.