nomisRev / kotlin-kafka

Kafka bindings for Kotlin `suspend`, and Kafka streaming operators for KotlinX Flow.
https://nomisRev.github.io/kotlin-kafka/
Apache License 2.0
103 stars 10 forks source link

[DOCS] Using KafkaReceiver with Ktor Server (cancellation, and terminal events) #76

Open ageorgousakis opened 1 year ago

ageorgousakis commented 1 year ago

Hi,

I'm trying to user KafkaReceiver inside Ktor 2. I'm not quite sure what is the best way to start KafkaReciever with Ktor. What I did was to create an application plugin like the following and install it in Ktor engine.

import io.github.nomisRev.kafka.map
import io.github.nomisRev.kafka.receiver.AutoOffsetReset
import io.github.nomisRev.kafka.receiver.KafkaReceiver
import io.github.nomisRev.kafka.receiver.ReceiverSettings
import io.ktor.server.application.*
import io.ktor.server.application.hooks.*
import io.ktor.server.config.*
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.common.config.SaslConfigs
import java.util.*

import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.ByteArrayDeserializer

@JvmInline
value class Key(val key: String)

@JvmInline
value class Message(val content: String)

val EntityUpdateEventPlugin = createApplicationPlugin(name = "EntityUpdateEventPlugin") {
    on(MonitoringEvent(ApplicationStarted)) { application ->
        val topicName = "odm-entity-update-event"
        val environment = application.environment
        val bootstrapServers = environment.config.property("kafka.bootstrapServers").getString()
        val kafkaPropertiesConfig = environment.config.config("kafka.properties")
        val receiverProperties = Properties().apply {
            kafkaPropertiesConfig.propertyOrNull(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG)?.let {
                this[CommonClientConfigs.SECURITY_PROTOCOL_CONFIG] = it.getString()
            }
            kafkaPropertiesConfig.propertyOrNull(SaslConfigs.SASL_JAAS_CONFIG)?.let {
                this[SaslConfigs.SASL_JAAS_CONFIG] = it.getString()
            }
            kafkaPropertiesConfig.propertyOrNull(SaslConfigs.SASL_MECHANISM)?.let {
                this[SaslConfigs.SASL_MECHANISM] = it.getString()
            }
        }

        runBlocking(Dispatchers.Default) {
            coroutineScope {
                launch(Dispatchers.IO) {
                    val settings: ReceiverSettings<Key, Message> = ReceiverSettings(
                        bootstrapServers,
                        StringDeserializer().map(::Key),
                        ByteArrayDeserializer().map {
                            Message(String(it))
                        },
                        groupId = "my-event-group",
                        autoOffsetReset = AutoOffsetReset.Earliest,
                        properties = receiverProperties
                    )
                    KafkaReceiver(settings)
                        .receive(topicName)
                        .map { "${it.key()} -> ${it.value()}" }
                        .collect {
                            application.log.info(it)
                        }
                }
            }
        }
    }
}

I seems the receiver starts and collects the messages. The question is how can we stop the receiver when Ktor shutdowns. When I stop the Ktor application it seems stuck and I have to kill the process.

ageorgousakis commented 1 year ago

I found a way to make it work. It seems runBlocking blocks the event loop of Ktor engine. The receiver and Ktor route handlers works fine if it is started in the following way:

on(MonitoringEvent(ApplicationStarted)) { application ->
  // .. code to initialise the receiver
  CoroutineScope(Dispatchers.IO).launch {
    receiver
        .receive("my-topic")
        .collect {
            application.log.info("${it.key()} -> ${it.value()}")
            it.offset.acknowledge()
        }
  }
}

I don't know which is the best way to start a receiver in Ktor.

@nomisRev Can you recommend or give us an example how to use it with Ktor?

Regards

nomisRev commented 1 year ago

Hey @ageorgousakis,

Thanks for your interest in the library, and opening a ticket! 🙏 As you've show there already, the returning Flow is coupled to it's surrounding CoroutineScope. The simplest way to running a Flow for the same length as the server is to use launchIn with Application.

So to provide a small snippet:

val flow: Flow<Unit> = receiver
        .receive("my-topic")
        .map { // <-- changed collect to map, so the result is still a Flow
            application.log.info("${it.key()} -> ${it.value()}")
            it.offset.acknowledge()
        }

val application: Application = TODO("This is the Application from Ktor")
flow.launchIn(application)

The Ktor Application implements CoroutineScope, and it cancels the CoroutineScope when the server is cancelled. So it will also cancel your Flow.

NOTE: I've run into some issues where Ktor doesn't properly cancel on SIGINT from K8S for example, and I've therefor build suspendapp. SuspendApp With Ktor and K8S SuspendApp with Kafka.

nomisRev commented 1 year ago

Alternatively, you could also use ApplicationEngine#addShutdownHook and use it to manually control a CoroutineScope.

The take-away is the the Flow cancels with the CoroutineScope it's called from.

Hope that helps @ageorgousakis ! Let's leave this issue open, so it can serve as reminder to include this information it in the documentation 👍