TouK / nussknacker

Low-code tool for automating actions on real time data | Stream processing for the users.
https://nussknacker.io
Apache License 2.0
663 stars 93 forks source link

Create a sample docker compose file with kafka sasl ssl or sasl_plaintext enabled, to understand the properties needed #7096

Open andsty opened 3 weeks ago

andsty commented 3 weeks ago

Describe the bug Not able to fetch data from kafka with sasl_ssl enabled. Not clear documentation on this topic. To Reproduce Create a kafka cluster sasl ssl enabled and try to connect nussknacker Environment (please fill in the following information):

arkadius commented 3 weeks ago

Hi @andsty . Have you checked this documentation page?

myrulezzz commented 3 weeks ago

Hi.Yes i have checked it but it does not mention how to setup my docker-compose with this configuration.I was able to connect with my schema registry but it seems that i kanno connect with my kafka environment because my sasl definition in my docker compose is wrong.How can i reference my jaas config in my docker compose?

arkadius commented 3 weeks ago

Ah, I see! In the installation example directory there is an application-customizations.conf file.

You should use it if you want to provide more advanced configuration that is not possible to achieve with only environment variables.

Please try with something like this:

scenarioTypes {
  "streaming" {
    modelConfig {
      components.kafka {
        config {
          kafkaProperties {
            "schema.registry.url": "http://schemaregistry:8081"
            "bootstrap.servers": "broker1:9092,broker2:9092"
            "security.protocol": "SASL_SSL"
            "sasl.mechanism": "PLAIN"
            "sasl.jaas.config": "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"some_user\" password=\"some_user\";"
          }
        }
      }
    }
  }
}

and let me know if it helped you.

andsty commented 3 weeks ago

Ok Trying this configuration in my application yaml file i get the below error in designer logs when i select a topic form ui 2024-10-29 09:39:59.558 [kafka-admin-client-thread | adminclient-5] INFO o.apache.kafka.clients.NetworkClient - [AdminClient clientId=adminclient-5] Node -3 disconnected. 2024-10-29 09:39:59.558 [kafka-admin-client-thread | adminclient-5] INFO o.apache.kafka.clients.NetworkClient - [AdminClient clientId=adminclient-5] Cancelled in-flight METADATA request with correlation id 45 due to node -3 being disconnected (elapsed time since creation: 301ms, elapsed time since send: 301ms, request timeout: 21134ms)

andsty commented 3 weeks ago

And i see authorization error on kafka. Is it possible there is an issue with the format passed in jaas config?

arkadius commented 3 weeks ago

I recommend checking the configuration with some cli tool (e.g. kafka-console-consumer) executed in the same network. Nussknacker passes kafkaProperties to kafka client as they are, without changing anything so it is a low possibility that the problem is on the Nu side.

Example kafka-console-consumer invocation would look like:

./kafka-console-consumer.sh --topic <topic> --bootstrap-server <boostrap-servers> --consumer.config client.properties

And client.properties would contain:

security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="<username>" password="<password>";

See Confluent documentation for more details.

It is important to ensure that kafka-console-consumer was executed from the same network as the Designer. You might achieve that by adding another container for example using kafka image but with changed entrypoint to bash.

andsty commented 13 hours ago

hi i managed to make proper configuration but now when i create a new scenario and i select kafka as source i get this error Could not create kafka: /opt/nussknacker/conf/application.conf: 21: security has type OBJECT rather than STRING what am i missing from conf file?

andsty commented 13 hours ago
 scenarioTypes {
  2   streaming {
  3     deploymentConfig {
  4       type = "flinkStreaming"
  5       engineSetupName = "My Flink Cluster"
  6       restUrl = "http://localhost:8081"
  7     }
  8     modelConfig {
  9       classPath = [
 10         "model/defaultModel.jar",
 11         "model/flinkExecutor.jar",
 12         "components/flink",
 13         "components/common"
 14       ]
 15       components {
 16         kafka {
 17           config {
 18             kafkaProperties {
 19               schema.registry.url = "http://schema-registry.xxxxxxx-as.net:8081"
 20               bootstrap.servers = "kafkabroker1.xxxxx-as.net:9092,kafkabroker2.xxxxxx-as.net:9092,kafkabroker3.xxxxx-as.net:9092"
 21               security.protocol = "SASL_SSL"
 22               sasl.mechanism = "SCRAM-SHA-512"
 23               ssl.truststore.type = "JKS"
 24               ssl.truststore.location = "/opt/nussknacker/conf/ksqldb.truststore.jks"
 25               ssl.truststore.password = "xxxxxx"
 26               sasl.jaas.config = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"xxxx\" password=\"xxxxxx\";"
 27             }
 28           }
 29         }
 30       }
 31     }
 32    category = "xxxxxx"
 33   }
 34 }

here is my config file is it correct?