nomisRev / kotlin-kafka

Kafka bindings for Kotlin `suspend`, and Kafka streaming operators for KotlinX Flow.
https://nomisRev.github.io/kotlin-kafka/
Apache License 2.0
103 stars 10 forks source link

Add e2eTest for continuous receive #193

Closed osoykan closed 3 months ago

osoykan commented 3 months ago

I thought the problem in on KafkaReceiver but it is the default polling timeout was too low (100ms). When I changed it, it worked. So we have extra tests 😄

We are free to discard the MR.


This PR adds testing of Kotlin-kafka library while using an example use case.

The entrance point of all the tests is, ProjectConfig, where Kafka is configured:

class ProjectConfig : AbstractProjectConfig() {
    override suspend fun beforeProject(): Unit = TestSystem()
        .with {
            kafka {
                KafkaSystemOptions(
                    configureExposedConfiguration = { cfg ->
                        listOf(
                            "kafka.servers=${cfg.bootstrapServers}",
                            "kafka.interceptor-classes=${cfg.interceptorClass}",
                            "kafka.receive-method=traditional" // here we can change to: 'kotlin-kafka' or 'traditional'
                        )
                    }
                )
            }
            applicationUnderTest(KafkaApplicationUnderTest())
        }.run()

    override suspend fun afterProject(): Unit = TestSystem.stop()
}

kotlin-kafka and traditional while loop can be switched with the configuration.

  "kafka.receive-method=traditional" // here we can change to: 'kotlin-kafka' or 'traditional'

Tests are located under the test-e2e folder and can be invoked with kotest plugin by pressing the play button on the test.

It will need a docker engine running locally.

The expected behaviour is shouldBeConsumed should be true.

class KafkaSystemTests : FunSpec({
  val randomString = { Random.nextInt(0, Int.MAX_VALUE).toString() }

  test("message should be committed and consumed successfully") {
    validate {
      kafka {
        val productId = randomString() + "[productCreated]"
        publish("product", message = ProductCreated(productId), key = randomString().some())
        shouldBeConsumed<ProductCreated>(10.seconds) {
          actual.productId == productId
        }
      }
    }
  }
})