ManageIQ / manageiq-messaging

MIT License
5 stars 26 forks source link

Add wait_for_topic option to Kafka #75

Closed agrare closed 1 year ago

agrare commented 1 year ago

Add an option to Kafka subscribe_messages and subscribe_topic to wait for topic creation with increasing backoff retry.

agrare commented 1 year ago

Testing with MIQ, passing false fails immediately:

>> MiqQueue.messaging_client("test").subscribe_messages(:service => "abcd", :wait_for_topic => false) { |msg| puts msg.inspect }
/home/grare/adam/.gem/ruby/3.0.0/gems/rdkafka-0.12.0/lib/rdkafka/consumer.rb:432:in `poll': Broker: Unknown topic or partition (unknown_topic_or_part) (Rdkafka::RdkafkaError)
    from /home/grare/adam/.gem/ruby/3.0.0/gems/rdkafka-0.12.0/lib/rdkafka/consumer.rb:458:in `block in each'

Passing true blocks until a message is published:

>> MiqQueue.messaging_client("test").subscribe_messages(:service => "abcd", :wait_for_topic => true) { |msg| puts msg.inspect }

...

(separate console `> MiqQueue.messaging_client("test").publish_message(:service => "abcd", :messag
e => "hello", :payload => {"hello" => "world"})` 

[#<ManageIQ::Messaging::ReceivedMessage:0x0000563583399478 @sender=nil, @message="hello", @payload={"hello"=>"world"}