citrusframework / citrus

Framework for automated integration tests with focus on messaging integration
https://citrusframework.org
Apache License 2.0
457 stars 134 forks source link

Provide support for SASL protocol for Kafka Server #768

Open apupier opened 3 years ago

apupier commented 3 years ago

User story As a Kafka user, I want to be able to use Kafka SASL protocol so that i'm able to check SASL implementation of my application.

E.g. As a HTTP backend tester, I want to verify HTTP headers so that I'm able to check that my SUT sets the header correctly

Additional context

Add any other context about the feature request here. E.g. non functional requirements or technical details.

The Citrus Embedded kafka Server is really useful for testing. having the possibility to configure sasl authentication would be awesome. Will be sueful when writing tests for this feature for instance https://issues.redhat.com/browse/FUSETOOLS2-1060

apupier commented 3 years ago

I tried somehting like:

Map<String, String> brokerProperties = new HashMap<String, String>();
Path jaasConf = Files.createTempFile("jaas-kafka-server", ".conf");
byte[] jaasContent = ("KafkaServer {\n"
        + "    org.apache.kafka.common.security.plain.PlainLoginModule required\n"
        + "    username=\"admin\"\n"
        + "    password=\"admin\"\n"
        + "    user_admin=\"admin\"\n"
        + "    user_alice=\"alice\"\n"
        + "    user_bob=\"bob\"\n"
        + "    user_charlie=\"charlie\";\n"
        + "};").getBytes();
Files.write(jaasConf, jaasContent);
System.setProperty("java.security.auth.login.config", jaasConf.toAbsolutePath().toString());
brokerProperties.put("authorizer.class.name", "kafka.security.authorizer.AclAuthorizer");
brokerProperties.put("listeners", "SASL_PLAINTEXT://:9094");
brokerProperties.put("security.inter.broker.protocol", "SASL_PLAINTEXT");
brokerProperties.put("sasl.mechanism.inter.broker.protocol", "PLAIN");
brokerProperties.put("sasl.enabled.mechanisms", "PLAIN");
kafkaServer = new EmbeddedKafkaServerBuilder()
        .kafkaServerPort(9094)
        .topics("aTopic")
        .brokerProperties(brokerProperties)
        .build();
kafkaServer.start();

but it is failing with

org.apache.kafka.common.KafkaException: Tried to check server's port before server was started or checked for port of non-existing protocol
    at kafka.network.SocketServer.boundPort(SocketServer.scala:346)
    at kafka.server.KafkaServer.boundPort(KafkaServer.scala:733)
    at com.consol.citrus.kafka.embedded.EmbeddedKafkaServer.start(EmbeddedKafkaServer.java:130)
    at com.github.cameltooling.lsp.internal.completion.KafkaConnectionTest.testSasl(KafkaConnectionTest.java:69)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.platform.commons.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:688)
    at org.junit.jupiter.engine.execution.MethodInvocation.proceed(MethodInvocation.java:60)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain$ValidatingInvocation.proceed(InvocationInterceptorChain.java:131)
    at org.junit.jupiter.engine.extension.TimeoutExtension.intercept(TimeoutExtension.java:149)
    at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestableMethod(TimeoutExtension.java:140)
    at org.junit.jupiter.engine.extension.TimeoutExtension.interceptTestMethod(TimeoutExtension.java:84)
    at org.junit.jupiter.engine.execution.ExecutableInvoker$ReflectiveInterceptorCall.lambda$ofVoidMethod$0(ExecutableInvoker.java:115)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.lambda$invoke$0(ExecutableInvoker.java:105)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain$InterceptedInvocation.proceed(InvocationInterceptorChain.java:106)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.proceed(InvocationInterceptorChain.java:64)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.chainAndInvoke(InvocationInterceptorChain.java:45)
    at org.junit.jupiter.engine.execution.InvocationInterceptorChain.invoke(InvocationInterceptorChain.java:37)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:104)
    at org.junit.jupiter.engine.execution.ExecutableInvoker.invoke(ExecutableInvoker.java:98)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.lambda$invokeTestMethod$6(TestMethodTestDescriptor.java:210)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.invokeTestMethod(TestMethodTestDescriptor.java:206)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:131)
    at org.junit.jupiter.engine.descriptor.TestMethodTestDescriptor.execute(TestMethodTestDescriptor.java:65)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:139)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
    at java.util.ArrayList.forEach(ArrayList.java:1259)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
    at java.util.ArrayList.forEach(ArrayList.java:1259)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.invokeAll(SameThreadHierarchicalTestExecutorService.java:38)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$5(NodeTestTask.java:143)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$7(NodeTestTask.java:129)
    at org.junit.platform.engine.support.hierarchical.Node.around(Node.java:137)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.lambda$executeRecursively$8(NodeTestTask.java:127)
    at org.junit.platform.engine.support.hierarchical.ThrowableCollector.execute(ThrowableCollector.java:73)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.executeRecursively(NodeTestTask.java:126)
    at org.junit.platform.engine.support.hierarchical.NodeTestTask.execute(NodeTestTask.java:84)
    at org.junit.platform.engine.support.hierarchical.SameThreadHierarchicalTestExecutorService.submit(SameThreadHierarchicalTestExecutorService.java:32)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestExecutor.execute(HierarchicalTestExecutor.java:57)
    at org.junit.platform.engine.support.hierarchical.HierarchicalTestEngine.execute(HierarchicalTestEngine.java:51)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:108)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:88)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.lambda$execute$0(EngineExecutionOrchestrator.java:54)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.withInterceptedStreams(EngineExecutionOrchestrator.java:67)
    at org.junit.platform.launcher.core.EngineExecutionOrchestrator.execute(EngineExecutionOrchestrator.java:52)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:96)
    at org.junit.platform.launcher.core.DefaultLauncher.execute(DefaultLauncher.java:84)
    at org.eclipse.jdt.internal.junit5.runner.JUnit5TestReference.run(JUnit5TestReference.java:98)
    at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:40)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:541)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:768)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:464)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:210)
Caused by: java.util.NoSuchElementException: key not found: ListenerName(PLAINTEXT)
    at scala.collection.immutable.Map$Map1.apply(Map.scala:239)
    at kafka.network.SocketServer.boundPort(SocketServer.scala:338)
    ... 70 more
bbortt commented 2 years ago

ran into the same issue lately. so, the listeners array (in brokerProperties) must contain a PLAINTEXT listener due to the restriction given in EmbeddedKafkaServer.java#129. I think that acts as a kind of healthcheck.

@apupier you can have multiple listeners in the properties, e.g. for our application tests in Kubernetes we use:

advertised.listeners=CLUSTER://$SERVICE.$NAMESPACE.svc.cluster.local.:9092,PLAINTEXT://localhost:9093
# whatever you want for CLUSTER.. added yours here
listener.security.protocol.map=CLUSTER:SASL_PLAINTEXT,PLAINTEXT:PLAINTEXT
listeners=CLUSTER://0.0.0.0:9092,PLAINTEXT://127.0.0.1:9093

that allows a local admin client (e.g. a Spring application or citrus-simulator), as well as an external client to connect to the embedded server.

still @christophd, we should add a property which allows to customize the default listener name in use. i think the healtcheck itself is a good thing.

bbortt commented 2 years ago

oh, and you must change the kafka-port value too, when using multiple listeners. because it's as well looking to create default topics on the PLAINTEXT/internal listener: EmbeddedKafkaServer.java#235.