zendesk / ruby-kafka

A Ruby client library for Apache Kafka
http://www.rubydoc.info/gems/ruby-kafka
Apache License 2.0
1.28k stars 337 forks source link

Confusing error message with nil topic #802

Closed nemupm closed 4 years ago

nemupm commented 4 years ago

Specifying nil for topic results in showing "Connection error EOFError" which is not good error message. This situation may occur sometimes because, for example, fluent-plugin-kafka's default_topic value is nil.

Steps to reproduce
require 'kafka'
kafka = Kafka.new(["kafka.default.svc.cluster.local:9092"], client_id: "my-application")
producer = kafka.producer
producer.produce("hello", topic: nil)
producer.deliver_messages
Actual outcome
irb(main):001:0> require 'kafka'
=> true
irb(main):002:0> kafka = Kafka.new(["kafka.default.svc.cluster.local:9092"], client_id: "my-application")
=> #<Kafka::Client:0x00007f1664ccecd8 @logger=#<Logger:0x00007f1664cce9b8 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007f1664cce918 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @instrumenter=#<Kafka::Instrumenter:0x00007f1664cce7d8 @default_payload={:client_id=>"my-application"}, @backend=nil>, @seed_brokers=[#<URI::Generic kafka://kafka.default.svc.cluster.local:9092>], @connection_builder=#<Kafka::ConnectionBuilder:0x00007f1664cccf28 @client_id="my-application", @logger=#<Logger:0x00007f1664cce9b8 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007f1664cce918 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @instrumenter=#<Kafka::Instrumenter:0x00007f1664cce7d8 @default_payload={:client_id=>"my-application"}, @backend=nil>, @connect_timeout=nil, @socket_timeout=nil, @ssl_context=nil, @sasl_authenticator=#<Kafka::SaslAuthenticator:0x00007f1664ccdb08 @logger=#<Logger:0x00007f1664cce9b8 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007f1664cce918 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @plain=#<Kafka::Sasl::Plain:0x00007f1664ccd7c0 @logger=#<Logger:0x00007f1664cce9b8 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007f1664cce918 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @authzid="", @username=nil, @password=nil>, @gssapi=#<Kafka::Sasl::Gssapi:0x00007f1664ccd5b8 @logger=#<Logger:0x00007f1664cce9b8 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007f1664cce918 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @principal=nil, @keytab=nil>, @scram=#<Kafka::Sasl::Scram:0x00007f1664ccd428 @semaphore=#<Thread::Mutex:0x00007f1664ccd360>, @username=nil, @password=nil, @logger=#<Logger:0x00007f1664cce9b8 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007f1664cce918 @datetime_format=nil>, @formatter=nil, @logdev=nil>>, @oauth=#<Kafka::Sasl::OAuth:0x00007f1664ccd248 @logger=#<Logger:0x00007f1664cce9b8 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007f1664cce918 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @token_provider=nil>, @mechanism=nil>>, @cluster=#<Kafka::Cluster:0x00007f1664ccc8c0 @logger=#<Logger:0x00007f1664cce9b8 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007f1664cce918 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @seed_brokers=[#<URI::Generic kafka://kafka.default.svc.cluster.local:9092>], @broker_pool=#<Kafka::BrokerPool:0x00007f1664cccc58 @logger=#<Logger:0x00007f1664cce9b8 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007f1664cce918 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @connection_builder=#<Kafka::ConnectionBuilder:0x00007f1664cccf28 @client_id="my-application", @logger=#<Logger:0x00007f1664cce9b8 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007f1664cce918 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @instrumenter=#<Kafka::Instrumenter:0x00007f1664cce7d8 @default_payload={:client_id=>"my-application"}, @backend=nil>, @connect_timeout=nil, @socket_timeout=nil, @ssl_context=nil, @sasl_authenticator=#<Kafka::SaslAuthenticator:0x00007f1664ccdb08 @logger=#<Logger:0x00007f1664cce9b8 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007f1664cce918 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @plain=#<Kafka::Sasl::Plain:0x00007f1664ccd7c0 @logger=#<Logger:0x00007f1664cce9b8 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007f1664cce918 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @authzid="", @username=nil, @password=nil>, @gssapi=#<Kafka::Sasl::Gssapi:0x00007f1664ccd5b8 @logger=#<Logger:0x00007f1664cce9b8 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007f1664cce918 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @principal=nil, @keytab=nil>, @scram=#<Kafka::Sasl::Scram:0x00007f1664ccd428 @semaphore=#<Thread::Mutex:0x00007f1664ccd360>, @username=nil, @password=nil, @logger=#<Logger:0x00007f1664cce9b8 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007f1664cce918 @datetime_format=nil>, @formatter=nil, @logdev=nil>>, @oauth=#<Kafka::Sasl::OAuth:0x00007f1664ccd248 @logger=#<Logger:0x00007f1664cce9b8 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007f1664cce918 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @token_provider=nil>, @mechanism=nil>>, @brokers={}>, @cluster_info=nil, @stale=true, @target_topics=#<Set: {}>>>
irb(main):003:0> producer = kafka.producer
=> #<Kafka::Producer:0x00007f1664d07b50 @cluster=#<Kafka::Cluster:0x00007f1664d07ec0 @logger=#<Logger:0x00007f1664cce9b8 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007f1664cce918 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @seed_brokers=[#<URI::Generic kafka://kafka.default.svc.cluster.local:9092>], @broker_pool=#<Kafka::BrokerPool:0x00007f1664d07f60 @logger=#<Logger:0x00007f1664cce9b8 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007f1664cce918 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @connection_builder=#<Kafka::ConnectionBuilder:0x00007f1664cccf28 @client_id="my-application", @logger=#<Logger:0x00007f1664cce9b8 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007f1664cce918 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @instrumenter=#<Kafka::Instrumenter:0x00007f1664cce7d8 @default_payload={:client_id=>"my-application"}, @backend=nil>, @connect_timeout=nil, @socket_timeout=nil, @ssl_context=nil, @sasl_authenticator=#<Kafka::SaslAuthenticator:0x00007f1664ccdb08 @logger=#<Logger:0x00007f1664cce9b8 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007f1664cce918 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @plain=#<Kafka::Sasl::Plain:0x00007f1664ccd7c0 @logger=#<Logger:0x00007f1664cce9b8 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007f1664cce918 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @authzid="", @username=nil, @password=nil>, @gssapi=#<Kafka::Sasl::Gssapi:0x00007f1664ccd5b8 @logger=#<Logger:0x00007f1664cce9b8 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007f1664cce918 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @principal=nil, @keytab=nil>, @scram=#<Kafka::Sasl::Scram:0x00007f1664ccd428 @semaphore=#<Thread::Mutex:0x00007f1664ccd360>, @username=nil, @password=nil, @logger=#<Logger:0x00007f1664cce9b8 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007f1664cce918 @datetime_format=nil>, @formatter=nil, @logdev=nil>>, @oauth=#<Kafka::Sasl::OAuth:0x00007f1664ccd248 @logger=#<Logger:0x00007f1664cce9b8 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007f1664cce918 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @token_provider=nil>, @mechanism=nil>>, @brokers={}>, @cluster_info=nil, @stale=true, @target_topics=#<Set: {}>>, @transaction_manager=#<Kafka::TransactionManager:0x00007f1664d07d30 @cluster=#<Kafka::Cluster:0x00007f1664d07ec0 @logger=#<Logger:0x00007f1664cce9b8 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007f1664cce918 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @seed_brokers=[#<URI::Generic kafka://kafka.default.svc.cluster.local:9092>], @broker_pool=#<Kafka::BrokerPool:0x00007f1664d07f60 @logger=#<Logger:0x00007f1664cce9b8 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007f1664cce918 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @connection_builder=#<Kafka::ConnectionBuilder:0x00007f1664cccf28 @client_id="my-application", @logger=#<Logger:0x00007f1664cce9b8 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007f1664cce918 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @instrumenter=#<Kafka::Instrumenter:0x00007f1664cce7d8 @default_payload={:client_id=>"my-application"}, @backend=nil>, @connect_timeout=nil, @socket_timeout=nil, @ssl_context=nil, @sasl_authenticator=#<Kafka::SaslAuthenticator:0x00007f1664ccdb08 @logger=#<Logger:0x00007f1664cce9b8 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007f1664cce918 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @plain=#<Kafka::Sasl::Plain:0x00007f1664ccd7c0 @logger=#<Logger:0x00007f1664cce9b8 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007f1664cce918 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @authzid="", @username=nil, @password=nil>, @gssapi=#<Kafka::Sasl::Gssapi:0x00007f1664ccd5b8 @logger=#<Logger:0x00007f1664cce9b8 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007f1664cce918 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @principal=nil, @keytab=nil>, @scram=#<Kafka::Sasl::Scram:0x00007f1664ccd428 @semaphore=#<Thread::Mutex:0x00007f1664ccd360>, @username=nil, @password=nil, @logger=#<Logger:0x00007f1664cce9b8 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007f1664cce918 @datetime_format=nil>, @formatter=nil, @logdev=nil>>, @oauth=#<Kafka::Sasl::OAuth:0x00007f1664ccd248 @logger=#<Logger:0x00007f1664cce9b8 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007f1664cce918 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @token_provider=nil>, @mechanism=nil>>, @brokers={}>, @cluster_info=nil, @stale=true, @target_topics=#<Set: {}>>, @logger=#<Logger:0x00007f1664cce9b8 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007f1664cce918 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @transactional=false, @transactional_id=nil, @transactional_timeout=60, @transaction_state=#<Kafka::TransactionStateMachine:0x00007f1664d07c90 @state=:uninitialized, @mutex=#<Thread::Mutex:0x00007f1664d07c18>, @logger=#<Logger:0x00007f1664cce9b8 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007f1664cce918 @datetime_format=nil>, @formatter=nil, @logdev=nil>>, @transaction_partitions={}, @idempotent=false, @producer_id=-1, @producer_epoch=0, @sequences={}>, @logger=#<Logger:0x00007f1664cce9b8 @level=0, @progname=nil, @default_formatter=#<Logger::Formatter:0x00007f1664cce918 @datetime_format=nil>, @formatter=nil, @logdev=nil>, @instrumenter=#<Kafka::Instrumenter:0x00007f1664cce7d8 @default_payload={:client_id=>"my-application"}, @backend=nil>, @required_acks=-1, @ack_timeout=5, @max_retries=2, @retry_backoff=1, @max_buffer_size=1000, @max_buffer_bytesize=10000000, @compressor=#<Kafka::Compressor:0x00007f1664d07dd0 @codec=nil, @threshold=1, @instrumenter=#<Kafka::Instrumenter:0x00007f1664cce7d8 @default_payload={:client_id=>"my-application"}, @backend=nil>>, @target_topics=#<Set: {}>, @buffer=#<Kafka::MessageBuffer:0x00007f1664d07a60 @buffer={}, @size=0, @bytesize=0>, @pending_message_queue=#<Kafka::PendingMessageQueue:0x00007f1664d079c0 @messages=[], @size=0, @bytesize=0>>
irb(main):004:0> producer.produce("hello", topic: "greetings")
=> nil
irb(main):005:0> producer.deliver_messages
=> nil
irb(main):006:0> producer.produce("hello", topic: nil)
=> nil
irb(main):007:0> producer.deliver_messages
Traceback (most recent call last):
       12: from /usr/local/bin/irb:23:in `<main>'
       11: from /usr/local/bin/irb:23:in `load'
       10: from /usr/local/lib/ruby/gems/2.6.0/gems/irb-1.0.0/exe/irb:11:in `<top (required)>'
        9: from (irb):7
        8: from /fluentd/vendor/bundle/ruby/2.6.0/gems/ruby-kafka-0.7.10/lib/kafka/producer.rb:246:in `deliver_messages'
        7: from /fluentd/vendor/bundle/ruby/2.6.0/gems/ruby-kafka-0.7.10/lib/kafka/instrumenter.rb:23:in `instrument'
        6: from /fluentd/vendor/bundle/ruby/2.6.0/gems/ruby-kafka-0.7.10/lib/kafka/producer.rb:253:in `block in deliver_messages'
        5: from /fluentd/vendor/bundle/ruby/2.6.0/gems/ruby-kafka-0.7.10/lib/kafka/producer.rb:377:in `deliver_messages_with_retries'
        4: from /fluentd/vendor/bundle/ruby/2.6.0/gems/ruby-kafka-0.7.10/lib/kafka/cluster.rb:52:in `add_target_topics'
        3: from /fluentd/vendor/bundle/ruby/2.6.0/gems/ruby-kafka-0.7.10/lib/kafka/cluster.rb:98:in `refresh_metadata!'
        2: from /fluentd/vendor/bundle/ruby/2.6.0/gems/ruby-kafka-0.7.10/lib/kafka/cluster.rb:350:in `cluster_info'
        1: from /fluentd/vendor/bundle/ruby/2.6.0/gems/ruby-kafka-0.7.10/lib/kafka/cluster.rb:396:in `fetch_cluster_info'
Kafka::ConnectionError (Could not connect to any of the seed brokers:)
- kafka://kafka.default.svc.cluster.local:9092: Connection error EOFError: EOFError
irb(main):008:0>
dasch commented 4 years ago

Would love to get a PR to fix that.

nemupm commented 4 years ago

Ok, I'll make a PR.