abstracta / jmeter-java-dsl

Simple JMeter performance tests API
https://abstracta.github.io/jmeter-java-dsl/
Apache License 2.0
468 stars 59 forks source link

Kafka base realisation #83

Open kirillyu opened 2 years ago

kirillyu commented 2 years ago

I revied the plugins which implement it and make the conclusion that they are really more complex and overfeatured that the base needed. In my mind there is only two base needs: Configure kafka, Produce messages to kafka. I create this code on Kotlin to realise it. Maybe it will be helpfull for anybody, or maybe implement it in dsl:

`import java.util.Properties import kotlin.collections.HashMap import org.apache.kafka.clients.producer.KafkaProducer import org.apache.kafka.clients.producer.ProducerRecord import us.abstracta.jmeter.javadsl.JmeterDsl import us.abstracta.jmeter.javadsl.java.DslJsr223Sampler

object Kafka { private val kafkaSets = HashMap<String, KafkaProducer<String, String>>()

var propsTemplate = Properties()

init {
    // propsSet["bootstrap.servers"] ="host1:9092,host2:9092host3:9092" - example
    propsTemplate["key.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
    propsTemplate["value.serializer"] = "org.apache.kafka.common.serialization.StringSerializer"
    propsTemplate["compression.type"] = "none"
    propsTemplate["batch.size"] = "16384"
    propsTemplate["linger.ms"] = "0"
    propsTemplate["buffer.memory"] = "33554432"
    propsTemplate["acks"] = "all"
    propsTemplate["send.buffer.bytes"] = "131072"
    propsTemplate["receive.buffer.bytes"] = "32768"
    propsTemplate["security.protocol"] = "PLAINTEXT"
    propsTemplate["sasl.kerberos.service.name"] = "kafka"
    propsTemplate["sasl.mechanism"] = "GSSAPI"
    propsTemplate["message.key.placeholder"] = "KEY"
    propsTemplate["message.value.placeholder"] = "MESSAGE"
    propsTemplate["kerberos.auth.enabled"] = "NO"
    propsTemplate["java.security.auth.login.config"] = "null"
    propsTemplate["java.security.krb5.conf"] = "null"
    propsTemplate["ssl.enabled"] = "NO"
    propsTemplate["ssl.key.password"] = "null"
    propsTemplate["ssl.keystore.location"] = "null"
    propsTemplate["ssl.keystore.password"] = "null"
    propsTemplate["ssl.keystore.type"] = "JKS"
    propsTemplate["ssl.truststore.location"] = "null"
    propsTemplate["ssl.truststore.password"] = "null"
    propsTemplate["ssl.truststore.type"] = "JKS"
}

fun kafkaConfig(configName: String, propSet: Properties): DslJsr223Sampler? {
    return JmeterDsl.jsr223Sampler("KafkaConfig") {
        if (!kafkaSets.containsKey(configName)) {
            for (prop in propsTemplate) {
                if (!propSet.containsKey(prop.key)) {
                    propSet.setProperty(prop.key as String?, prop.value as String?)
                }
            }
            if (!propSet.containsKey("bootstrap.servers"))
                throw Exception("Couldn't create config without property \"bootstrap.servers\"")
            val kp = KafkaProducer<String, String>(propSet)
            kafkaSets[configName] = kp
        }
    }
}

fun kafkaConfig(configName: String, bootstraps: String): DslJsr223Sampler? {
    return JmeterDsl.jsr223Sampler("KafkaConfig") {
        if (!kafkaSets.containsKey(configName)) {
            val propSet = propsTemplate
            propSet.setProperty("bootstrap.servers", bootstraps)
            val kp = KafkaProducer<String, String>(propSet)
            kafkaSets[configName] = kp
        }
    }
}

fun kafkaProduce(configName: String, topic: String, bodyMessage: String): DslJsr223Sampler? {
    return JmeterDsl.jsr223Sampler("KafkaProducer") {
        if (!kafkaSets.containsKey(configName)) {
            throw Exception("Need to create kafkaConfig before sending message")
        } else {
            kafkaSets[configName]?.send(ProducerRecord(topic, "key", bodyMessage)) // key needed?
        }
    }
}

fun kafkaProduce(configName: String, topic: String, partition: Int, bodyMessage: String): DslJsr223Sampler? {
    return JmeterDsl.jsr223Sampler("KafkaProducer") {
        if (!kafkaSets.containsKey(configName)) {
            throw Exception("Need to create kafkaConfig before sending message")
        } else {
            kafkaSets[configName]?.send(ProducerRecord(topic, partition, "key", bodyMessage))
        }
    }
}

fun kafkaProduceWithTimestamp(configName: String, topic: String, partition: Int, bodyMessage: String): DslJsr223Sampler? {
    return JmeterDsl.jsr223Sampler("KafkaProducer") {
        if (!kafkaSets.containsKey(configName)) {
            throw Exception("Need to create kafkaConfig before sending message")
        } else {
            kafkaSets[configName]?.send(ProducerRecord(topic, partition, System.currentTimeMillis(), "key", bodyMessage))
        }
    }
}

fun kafkaProduce(configName: String, topic: String, createBodyMessage: (input: DslJsr223Sampler.SamplerVars) -> String):
    DslJsr223Sampler? {
    return JmeterDsl.jsr223Sampler("KafkaProducer") {
        if (!kafkaSets.containsKey(configName)) {
            throw Exception("Need to create kafkaConfig before sending message")
        } else {
            kafkaSets[configName]?.send(ProducerRecord(topic, "key", createBodyMessage.invoke(it))) // key needed?
        }
    }
}

fun kafkaProduce(
    configName: String,
    topic: String,
    partition: Int,
    createBodyMessage: (input: DslJsr223Sampler.SamplerVars) -> String
): DslJsr223Sampler? {
    return JmeterDsl.jsr223Sampler("KafkaProducer") {
        if (!kafkaSets.containsKey(configName)) {
            throw Exception("Need to create kafkaConfig before sending message")
        } else {
            kafkaSets[configName]?.send(
                ProducerRecord(
                    topic, partition, "key",
                    createBodyMessage.invoke(it)
                )
            )
        }
    }
}

fun kafkaProduceWithTimestamp(
    configName: String,
    topic: String,
    partition: Int,
    createBodyMessage: (input: DslJsr223Sampler.SamplerVars) -> String
): DslJsr223Sampler? {
    return JmeterDsl.jsr223Sampler("KafkaProducer") {
        if (!kafkaSets.containsKey(configName)) {
            throw Exception("Need to create kafkaConfig before sending message")
        } else {
            kafkaSets[configName]?.send(
                ProducerRecord(
                    topic, partition, System.currentTimeMillis(),
                    "key",
                    createBodyMessage.invoke(it)
                )
            )
        }
    }
}

}`

Sonal94539 commented 1 year ago

We also need Kafka based support in JMeter Java DSL for async API load testing. We are using Kafka Support and Kafka backend Listener from Jmeter plugin manager and hope to have similar features in DSL so that we can start using it . Thanks !

kirillyu commented 1 year ago

Can you tell me what is missing in the example I provided?

Sonal94539 commented 1 year ago

I am still learning about DSL and how we can use it to replace a conventional Jmeter based performance test. And My requirement is to create new performance/load tests for our Kafka Based async APIs. So I read this thread which talks about kafka configuration and producing messages but as per description of this issue , seems like its not already a part of this DSL hence considered a future enhancement ?

kirillyu commented 1 year ago

This is based on jsr223 Sampler, which is the part of dsl. DSL give you the ability to use any java code inside. So that's it

Sonal94539 commented 1 year ago

Thank you for your response . I am new to DSL and Jmeter both so trying to understand how could I use these methods to build a performance test using DSL . Would it be possible for you to provide similar methods for a creating a kafka consumer ?