redpanda-data / connect

Fancy stream processing made operationally mundane
https://docs.redpanda.com/redpanda-connect/about/
8.14k stars 839 forks source link

Unable to start benthos with kafka output in 4.23.0 #2199

Closed fredrikwangberg closed 1 year ago

fredrikwangberg commented 1 year ago

Hi,

I have an issue where I fail to start a 4.23.0 benthos that uses the kafka output, see error message below.

{"@service":"benthos","level":"error","msg":"Service closing due to: failed to init output 'my_output' path root.output: kafka: client has run out of available brokers to talk to: 3 errors occurred:\n\t* unexpected EOF\n\t* unexpected EOF\n\t* unexpected EOF\n"}

The same configuration works in version 4.22.0. For reference, the kafka cluster version is 2.8.1.

This is output part of the benthos configuration:

output:
  label: "my_output"
  kafka:
    addresses:
      - b-1.redacted.eu-west-1.amazonaws.com:9094,b-2.redacted.eu-west-1.amazonaws.com:9094,b-3.redacted.amazonaws.com:9094
    topic: my_topic
    client_id: "my_client"
    tls:
      enabled: true
      root_cas_file: /path/cacert.crt
      skip_cert_verify: true
      client_certs:
        - cert_file: /path/client.crt
          key_file: /path/client.key

I have tried splitting the addresses myself into a itemized list (a - and newline for every broker), but with the same result. I do not know how to investigate this further, but I am keen to provide more details about the environment/deployment or to try out other ideas that potentially could generate more insights on why the error occurs.

Fredrik

Jeffail commented 1 year ago

Hey @fredrikwangberg, this is likely due to a downgrade we had to perform on the underlying kafka client library: https://github.com/benthosdev/benthos/issues/2176, the default kafka protocol version would have reduced after the rollback we did. However, you should be able to explicitly force a newer kafka protocol version with: https://www.benthos.dev/docs/components/outputs/kafka#target_version

Alternatively, if you switch to the kafka_franz output you might find it'll work better as it does version negotiation automatically: https://www.benthos.dev/docs/components/outputs/kafka_franz

Let me know if either of those solutions works.

fredrikwangberg commented 1 year ago

Thank you for your quick reply and insights @Jeffail. That sounds reasonable.

I tried specifying kafka.target_version to 2.8.1 and even 2.7.0 but it did not work; got same error. Not sure if I should have forced a version above 2.8.1... ?

But kafka_franz worked out, so I am happy with this now. Again, thank you for your quick assistance 🙏

mfamador commented 1 year ago

I can also reproduce the same problem with EventHubs or with a docker container running kafka 1.0.0. The input works fine, but we can't connect now to the output though, despite setting the proper target version. It connects if we're running the latest kafka docker image, though. Will investigate to find if we can solve it on benthos or if we need to do something on sarama.