argoproj / argo-events

Event-driven Automation Framework for Kubernetes
https://argoproj.github.io/argo-events/
Apache License 2.0
2.37k stars 738 forks source link

Allow SSL connections to Kafka, even when not using TLS authentication #1277

Open signaleleven opened 3 years ago

signaleleven commented 3 years ago

Is your feature request related to a problem? Please describe.

The kafka library used by argo-events supports SASL_SCRAM authentication over an SSL connection. See for instance https://github.com/Shopify/sarama/blob/master/examples/sasl_scram_client/main.go

However, when configuring an EventSource for kafka like in the snippet below, the Eventsource pod attempts to connect to Kafka in Plaintext.

 kafka:
    cdm:
      url: blahblah-msk-endpoint.eu-central-1.amazonaws.com:9096
      topic: test
      jsonBody: true
      # partition id
      partition: "1"
      # optional backoff time for connection retries.
      # if not provided, default connection backoff time will be used.
      connectionBackoff:
        # duration in nanoseconds, or strings like "3s", "2m". following value is 10 seconds
        duration: 1000000000 # "10s" is reported invalid here, but that's another issue
        # how many backoffs
        steps: 5
        factor: 2
        jitter: 0.2
     # tls: TLS IS NOT the answer. TLS enables an SSL connection, but it's for client authentication using SSL. Something that MSK supports only with a very expensive private CA. AWS, amirite? 🤷 
      sasl:
        mechanism: SCRAM-SHA-512
        passwordSecret:
          key: password
          name: kafkauser
        userSecret:
          key: user
          name: kafkauser
namespace=orekit-fetcher, eventSourceName=kafka, eventSourceType=kafka, eventName=cdm, level=info, time=2021-07-14T16
:14:23Z, msg=start kafka event source...
namespace=orekit-fetcher, eventSourceName=kafka, eventSourceType=kafka, eventName=cdm, level=info, time=2021-07-14T16
:14:23Z, msg=connecting to Kafka cluster...
namespace=orekit-fetcher, eventSourceName=kafka, eventSourceType=kafka, eventName=cdm, level=error, time=2021-07-14T1
6:14:23Z, msg=failed to start listening eventsource
namespace=orekit-fetcher, eventSourceName=kafka, level=error, time=2021-07-14T16:14:23Z, msg=Erroring out, no active
event server running
namespace=orekit-fetcher, eventSourceName=kafka, level=info, time=2021-07-14T16:14:23Z, msg=exiting eventbus connecti
on daemon...
namespace=orekit-fetcher, eventSourceName=kafka, level=error, time=2021-07-14T16:14:23Z, msg=failed to start
namespace=orekit-fetcher, eventSourceName=kafka, level=error, time=2021-07-14T16:14:23Z, msg=NATS connection lost

AWS MSK does not allow to set up SASL_SCRAM while allowing plaintext connections, therefore I cannot connect to my MSK cluster with an EventSource.

Describe the solution you'd like I believe Sarama allows to configure a TLS socket, without client authentication (but optional server certificate validation). However, there is no way, that I could find, to configure an EventSource to connect to Kafka with TLS, when not using TLS authentication.

Describe alternatives you've considered I'm considering temporarily dropping TLS and Authentication in MSK to continue integrating argo-events with our systems, but that's not how I'd like to run in production.


Message from the maintainers:

If you wish to see this enhancement implemented please add a 👍 reaction to this issue! We often sort issues this way to know what to prioritize.

whynowy commented 3 years ago

@signaleleven - would you like to file a PR for it?

signaleleven commented 3 years ago

I've never written go code, and I don't know if I'd be able to. If I do try, I'll mention here that I started the effort. Meanwhile, if somebody with a ready dev setup for argo-events wants to try, it's up for grabs.

whynowy commented 3 years ago

@signaleleven - I assume allowing both sasl and tls to be configured together will solve the issue, correct?

whynowy commented 3 years ago

@signaleleven - I assume allowing both sasl and tls to be configured together will solve the issue, correct?

Just checked the code, we actually already allow both sasl and tls configured together, could you try that?

signaleleven commented 3 years ago

AFAICT, TLS here implies using a client certificate for authentication. I do not want that. I want to make an SSL connection to the kafka cluster, but without providing a client certificate and key. Only the server has a SSL certificate.

And this seems to require a client secret. https://github.com/argoproj/argo-events/blob/d19cb22c4b67dcd993c362b9c6310f29b035037a/common/util.go#L213

I didn't try to provide an empty one...

signaleleven commented 3 years ago

Oh maybe I see where I got confused. I will try.

whynowy commented 3 years ago

@signaleleven - I'm also confused 😐 , I don't have a kafka server with the setting like you mentioned handy, please let me know if you find anything.

whynowy commented 3 years ago

@signaleleven - I'm also confused 😐 , I don't have a kafka server with settings like you mentioned handy, please let me know if you find anything.

usamaB commented 2 years ago

I have the same issue, We have MSK with SASL/SSL. And I'm unable to connect to it. Any fix to this? https://github.com/Shopify/sarama/issues/1944 The sarama library doesn't have this option, to set SSL, I suppose.

Here are my logs with consumerGroup. If I use partitions instead of consumergroup the "Kafka client run out of available brokers" isn't there but still timed out. {"level":"info","ts":1641920166.4959314,"logger":"argo-events.eventsource","caller":"kafka/start.go:78","msg":"start kafka event source...","eventSourceName":"kafka","eventSourceType":"kafka","eventName":"example"} │ │ {"level":"error","ts":1641920167.3069851,"logger":"argo-events.eventsource","caller":"kafka/start.go:119","msg":"Error creating consumer group client: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)","eventSourceName":"kafka","ev │ │ {"level":"info","ts":1641920167.3070822,"logger":"argo-events.eventsource","caller":"kafka/start.go:78","msg":"start kafka event source...","eventSourceName":"kafka","eventSourceType":"kafka","eventName":"example"} │ │ {"level":"error","ts":1641920168.1086795,"logger":"argo-events.eventsource","caller":"kafka/start.go:119","msg":"Error creating consumer group client: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)","eventSourceName":"kafka","ev │ │ {"level":"info","ts":1641920168.1088238,"logger":"argo-events.eventsource","caller":"kafka/start.go:78","msg":"start kafka event source...","eventSourceName":"kafka","eventSourceType":"kafka","eventName":"example"} │ │ {"level":"error","ts":1641920168.9209,"logger":"argo-events.eventsource","caller":"kafka/start.go:119","msg":"Error creating consumer group client: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)","eventSourceName":"kafka","event │ │ {"level":"info","ts":1641920168.9210088,"logger":"argo-events.eventsource","caller":"kafka/start.go:78","msg":"start kafka event source...","eventSourceName":"kafka","eventSourceType":"kafka","eventName":"example"} │ │ {"level":"error","ts":1641920169.7508693,"logger":"argo-events.eventsource","caller":"kafka/start.go:119","msg":"Error creating consumer group client: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)","eventSourceName":"kafka","ev │ │ {"level":"error","ts":1641920169.7509682,"logger":"argo-events.eventsource","caller":"eventsources/eventing.go:417","msg":"failed to start listening eventsource","eventSourceName":"kafka","eventSourceType":"kafka","eventName":"example","error":"timed out waiting for t │ │ {"level":"error","ts":1641920169.7510505,"logger":"argo-events.eventsource","caller":"eventsources/eventing.go:440","msg":"Erroring out, no active event server running","eventSourceName":"kafka","stacktrace":"github.com/argoproj/argo-events/eventsources.(*EventSourceA │ │ {"level":"info","ts":1641920169.7510972,"logger":"argo-events.eventsource","caller":"eventsources/eventing.go:338","msg":"exiting eventbus connection daemon...","eventSourceName":"kafka"} │ │ {"level":"error","ts":1641920169.769013,"logger":"argo-events.eventsource","caller":"eventsources/eventing.go:296","msg":"failed to start","eventSourceName":"kafka","error":"no active event server running","errorVerbose":"no active event server running\ngithub.com/arg │ │ {"level":"error","ts":1641920169.7691605,"logger":"argo-events.eventsource","caller":"driver/nats.go:84","msg":"NATS connection lost","eventSourceName":"kafka","clientID":"client-kafka-eventsource-glsrl-56f6f5857b-zfmc5-443","stacktrace":"github.com/argoproj/argo-even │ │

usamaB commented 2 years ago
spec:
  kafka:
    example:
      url: "blahblah-msk-endpoint.eu-central-1.amazonaws.com:9096"
      topic: test
      jsonBody: true
      partition: 1
      connectionBackoff:
        duration: 10s
        steps: 5
        factor: 2
        jitter: 0.2
##    Enable SASL authentication (not to be used with TLS)
      sasl:
        mechanism: SCRAM-SHA-512
        passwordSecret:
          key: kafka-akhq
          name: team-secrets
        userSecret:
          key: user
          name: kafka-user-secret
usamaB commented 2 years ago

@whynowy could you tell what you mean by trying both TLS and SASL? TLS requires certificates.

MaorDavidzon commented 2 years ago

We are facing the same issue with MSK and Confluent

vijaykumarmcp commented 2 years ago

Any Update on the following issue. We are experiencing similar issue with MSK and Argo events.

Eventsource Pod Logs:

{"level":"info","ts":1646827808.553952,"logger":"argo-events.eventsource","caller":"cmd/main.go:63","msg":"starting eventsource server","eventSourceName":"kafka","version":"vv1.3.1+f47cb23.dirty"} {"level":"info","ts":1646827808.55421,"logger":"argo-events.eventsource","caller":"metrics/metrics.go:172","msg":"starting metrics server","eventSourceName":"kafka"} {"level":"info","ts":1646827808.5563707,"logger":"argo-events.eventsource","caller":"leaderelection/leaderelection.go:122","msg":"Not the LEADER, stand by ...","eventSourceName":"kafka"} {"level":"info","ts":1646827809.3139284,"logger":"argo-events.eventsource","caller":"leaderelection/leaderelection.go:131","msg":"Becoming a Candidate, stand by ...","eventSourceName":"kafka"} {"level":"info","ts":1646827809.3141239,"logger":"argo-events.eventsource","caller":"leaderelection/leaderelection.go:128","msg":"I'm the LEADER, starting ...","eventSourceName":"kafka"} {"level":"info","ts":1646827809.3141649,"logger":"argo-events.eventsource","caller":"eventsources/eventing.go:309","msg":"Starting event source server...","eventSourceName":"kafka"} {"level":"info","ts":1646827809.3144376,"logger":"argo-events.eventsource","caller":"driver/nats.go:93","msg":"NATS auth strategy: Token","eventSourceName":"kafka","clientID":"client-kafka-eventsource-tjj7j-75c876678f-b28vj-738"} {"level":"info","ts":1646827809.315837,"logger":"argo-events.eventsource","caller":"driver/nats.go:105","msg":"Connected to NATS server.","eventSourceName":"kafka","clientID":"client-kafka-eventsource-tjj7j-75c876678f-b28vj-738"} {"level":"info","ts":1646827809.3175008,"logger":"argo-events.eventsource","caller":"driver/nats.go:118","msg":"Connected to NATS streaming server.","eventSourceName":"kafka","clientID":"client-kafka-eventsource-tjj7j-75c876678f-b28vj-738"} {"level":"info","ts":1646827809.3175228,"logger":"argo-events.eventsource","caller":"eventsources/eventing.go:423","msg":"Eventing server started.","eventSourceName":"kafka"} {"level":"info","ts":1646827809.317542,"logger":"argo-events.eventsource","caller":"eventsources/eventing.go:332","msg":"starting eventbus connection daemon...","eventSourceName":"kafka"} {"level":"info","ts":1646827809.317596,"logger":"argo-events.eventsource","caller":"kafka/start.go:78","msg":"start kafka event source...","eventSourceName":"kafka","eventSourceType":"kafka","eventName":"example"} {"level":"info","ts":1646827809.317614,"logger":"argo-events.eventsource","caller":"kafka/start.go:161","msg":"start kafka event source...","eventSourceName":"kafka","eventSourceType":"kafka","eventName":"example"} {"level":"info","ts":1646827809.3176177,"logger":"argo-events.eventsource","caller":"kafka/start.go:165","msg":"connecting to Kafka cluster...","eventSourceName":"kafka","eventSourceType":"kafka","eventName":"example"} {"level":"info","ts":1646827813.1599393,"logger":"argo-events.eventsource","caller":"kafka/start.go:78","msg":"start kafka event source...","eventSourceName":"kafka","eventSourceType":"kafka","eventName":"example"} {"level":"info","ts":1646827813.1599627,"logger":"argo-events.eventsource","caller":"kafka/start.go:161","msg":"start kafka event source...","eventSourceName":"kafka","eventSourceType":"kafka","eventName":"example"} {"level":"info","ts":1646827813.1599667,"logger":"argo-events.eventsource","caller":"kafka/start.go:165","msg":"connecting to Kafka cluster...","eventSourceName":"kafka","eventSourceType":"kafka","eventName":"example"} {"level":"info","ts":1646827817.0189812,"logger":"argo-events.eventsource","caller":"kafka/start.go:78","msg":"start kafka event source...","eventSourceName":"kafka","eventSourceType":"kafka","eventName":"example"} {"level":"info","ts":1646827817.0190055,"logger":"argo-events.eventsource","caller":"kafka/start.go:161","msg":"start kafka event source...","eventSourceName":"kafka","eventSourceType":"kafka","eventName":"example"} {"level":"info","ts":1646827817.019011,"logger":"argo-events.eventsource","caller":"kafka/start.go:165","msg":"connecting to Kafka cluster...","eventSourceName":"kafka","eventSourceType":"kafka","eventName":"example"} {"level":"info","ts":1646827820.86587,"logger":"argo-events.eventsource","caller":"kafka/start.go:78","msg":"start kafka event source...","eventSourceName":"kafka","eventSourceType":"kafka","eventName":"example"} {"level":"info","ts":1646827820.8658972,"logger":"argo-events.eventsource","caller":"kafka/start.go:161","msg":"start kafka event source...","eventSourceName":"kafka","eventSourceType":"kafka","eventName":"example"} {"level":"info","ts":1646827820.8659015,"logger":"argo-events.eventsource","caller":"kafka/start.go:165","msg":"connecting to Kafka cluster...","eventSourceName":"kafka","eventSourceType":"kafka","eventName":"example"} {"level":"info","ts":1646827824.7309763,"logger":"argo-events.eventsource","caller":"kafka/start.go:78","msg":"start kafka event source...","eventSourceName":"kafka","eventSourceType":"kafka","eventName":"example"} {"level":"info","ts":1646827824.7309978,"logger":"argo-events.eventsource","caller":"kafka/start.go:161","msg":"start kafka event source...","eventSourceName":"kafka","eventSourceType":"kafka","eventName":"example"} {"level":"info","ts":1646827824.731002,"logger":"argo-events.eventsource","caller":"kafka/start.go:165","msg":"connecting to Kafka cluster...","eventSourceName":"kafka","eventSourceType":"kafka","eventName":"example"} {"level":"info","ts":1646827828.671456,"logger":"argo-events.eventsource","caller":"kafka/start.go:78","msg":"start kafka event source...","eventSourceName":"kafka","eventSourceType":"kafka","eventName":"example"} {"level":"info","ts":1646827828.6714823,"logger":"argo-events.eventsource","caller":"kafka/start.go:161","msg":"start kafka event source...","eventSourceName":"kafka","eventSourceType":"kafka","eventName":"example"} {"level":"info","ts":1646827828.671487,"logger":"argo-events.eventsource","caller":"kafka/start.go:165","msg":"connecting to Kafka cluster...","eventSourceName":"kafka","eventSourceType":"kafka","eventName":"example"} {"level":"info","ts":1646827832.5178928,"logger":"argo-events.eventsource","caller":"kafka/start.go:78","msg":"start kafka event source...","eventSourceName":"kafka","eventSourceType":"kafka","eventName":"example"} {"level":"info","ts":1646827832.5179172,"logger":"argo-events.eventsource","caller":"kafka/start.go:161","msg":"start kafka event source...","eventSourceName":"kafka","eventSourceType":"kafka","eventName":"example"} {"level":"info","ts":1646827832.5179212,"logger":"argo-events.eventsource","caller":"kafka/start.go:165","msg":"connecting to Kafka cluster...","eventSourceName":"kafka","eventSourceType":"kafka","eventName":"example"} {"level":"info","ts":1646827836.356999,"logger":"argo-events.eventsource","caller":"kafka/start.go:78","msg":"start kafka event source...","eventSourceName":"kafka","eventSourceType":"kafka","eventName":"example"} {"level":"info","ts":1646827836.3570304,"logger":"argo-events.eventsource","caller":"kafka/start.go:161","msg":"start kafka event source...","eventSourceName":"kafka","eventSourceType":"kafka","eventName":"example"} {"level":"info","ts":1646827836.3570344,"logger":"argo-events.eventsource","caller":"kafka/start.go:165","msg":"connecting to Kafka cluster...","eventSourceName":"kafka","eventSourceType":"kafka","eventName":"example"} {"level":"info","ts":1646827840.2145286,"logger":"argo-events.eventsource","caller":"kafka/start.go:78","msg":"start kafka event source...","eventSourceName":"kafka","eventSourceType":"kafka","eventName":"example"} {"level":"info","ts":1646827840.2145538,"logger":"argo-events.eventsource","caller":"kafka/start.go:161","msg":"start kafka event source...","eventSourceName":"kafka","eventSourceType":"kafka","eventName":"example"} {"level":"info","ts":1646827840.2145584,"logger":"argo-events.eventsource","caller":"kafka/start.go:165","msg":"connecting to Kafka cluster...","eventSourceName":"kafka","eventSourceType":"kafka","eventName":"example"} {"level":"info","ts":1646827844.055794,"logger":"argo-events.eventsource","caller":"kafka/start.go:78","msg":"start kafka event source...","eventSourceName":"kafka","eventSourceType":"kafka","eventName":"example"} {"level":"info","ts":1646827844.055817,"logger":"argo-events.eventsource","caller":"kafka/start.go:161","msg":"start kafka event source...","eventSourceName":"kafka","eventSourceType":"kafka","eventName":"example"} {"level":"info","ts":1646827844.0558214,"logger":"argo-events.eventsource","caller":"kafka/start.go:165","msg":"connecting to Kafka cluster...","eventSourceName":"kafka","eventSourceType":"kafka","eventName":"example"} {"level":"error","ts":1646827847.912883,"logger":"argo-events.eventsource","caller":"eventsources/eventing.go:417","msg":"failed to start listening eventsource","eventSourceName":"kafka","eventSourceType":"kafka","eventName":"example","error":"timed out waiting for the condition: failed to connect to Kafka broker for event source example: timed out waiting for the condition: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)","stacktrace":"github.com/argoproj/argo-events/eventsources.(EventSourceAdaptor).run.func3\n\t/home/runner/work/argo-events/argo-events/eventsources/eventing.go:417"} {"level":"error","ts":1646827847.9129646,"logger":"argo-events.eventsource","caller":"eventsources/eventing.go:440","msg":"Erroring out, no active event server running","eventSourceName":"kafka","stacktrace":"github.com/argoproj/argo-events/eventsources.(EventSourceAdaptor).run\n\t/home/runner/work/argo-events/argo-events/eventsources/eventing.go:440\ngithub.com/argoproj/argo-events/eventsources.(EventSourceAdaptor).Start.func1\n\t/home/runner/work/argo-events/argo-events/eventsources/eventing.go:295"} {"level":"info","ts":1646827847.9129875,"logger":"argo-events.eventsource","caller":"eventsources/eventing.go:338","msg":"exiting eventbus connection daemon...","eventSourceName":"kafka"} {"level":"error","ts":1646827847.9143217,"logger":"argo-events.eventsource","caller":"driver/nats.go:84","msg":"NATS connection lost","eventSourceName":"kafka","clientID":"client-kafka-eventsource-tjj7j-75c876678f-b28vj-738","stacktrace":"github.com/argoproj/argo-events/eventbus/driver.(natsStreaming).Connect.func1\n\t/home/runner/work/argo-events/argo-events/eventbus/driver/nats.go:84\ngithub.com/nats-io/nats%2ego.(Conn).close.func1\n\t/home/runner/go/pkg/mod/github.com/nats-io/nats.go@v1.10.0/nats.go:3802\ngithub.com/nats-io/nats%2ego.(asyncCallbacksHandler).asyncCBDispatcher\n\t/home/runner/go/pkg/mod/github.com/nats-io/nats.go@v1.10.0/nats.go:2082"} {"level":"error","ts":1646827847.9143271,"logger":"argo-events.eventsource","caller":"eventsources/eventing.go:296","msg":"failed to start","eventSourceName":"kafka","error":"no active event server running","errorVerbose":"no active event server running\ngithub.com/argoproj/argo-events/eventsources.(EventSourceAdaptor).run\n\t/home/runner/work/argo-events/argo-events/eventsources/eventing.go:443\ngithub.com/argoproj/argo-events/eventsources.(EventSourceAdaptor).Start.func1\n\t/home/runner/work/argo-events/argo-events/eventsources/eventing.go:295\nruntime.goexit\n\t/opt/hostedtoolcache/go/1.14.15/x64/src/runtime/asm_amd64.s:1373","stacktrace":"github.com/argoproj/argo-events/eventsources.(*EventSourceAdaptor).Start.func1\n\t/home/runner/work/argo-events/argo-events/eventsources/eventing.go:296"}

VincSch commented 2 years ago

Hey guys, don't know if I exactly hit your problem, but I still wanted to leave a comment.

I looked into the kafka event source implementation and saw that there was a new property integrated that allows to use the tls config in addition to the sasl config called insecureSkipVerify refer to: https://github.com/argoproj/argo-events/pull/1528/files#.

This enables the following config which lead to a successful connection against Confluence Cloud (guess AWS should be the same TLS requirement)

spec:
  kafka:
    sample:
      tls:
        insecureSkipVerify: true
      sasl:
        mechanism: PLAIN
        passwordSecret:
          key: kafkaSecret
          name: kafka-creds
        userSecret:
          key: kafkaClient
          name: kafka-creds

Hope its gonna help you solving your problems.

PS: Its not properly reflected in the docs and samples!

adam-gooten commented 1 year ago

Apologies for the necro but the above solution works with Confluent which uses SASL Username and Password with SSL!

whynowy commented 1 year ago

Hey guys, don't know if I exactly hit your problem, but I still wanted to leave a comment.

I looked into the kafka event source implementation and saw that there was a new property integrated that allows to use the tls config in addition to the sasl config called insecureSkipVerify refer to: https://github.com/argoproj/argo-events/pull/1528/files#.

This enables the following config which lead to a successful connection against Confluence Cloud (guess AWS should be the same TLS requirement)

spec:
  kafka:
    sample:
      tls:
        insecureSkipVerify: true
      sasl:
        mechanism: PLAIN
        passwordSecret:
          key: kafkaSecret
          name: kafka-creds
        userSecret:
          key: kafkaClient
          name: kafka-creds

Hope its gonna help you solving your problems.

PS: Its not properly reflected in the docs and samples!

@VincSch - Could you help fix the docs/examples?