karafka / rdkafka-ruby

Modern and performant Kafka client library for Ruby based on librdkafka
https://karafka.io
Other
350 stars 120 forks source link

Segfault when producing a lot of messages without waiting #15

Closed thijsc closed 6 years ago

thijsc commented 6 years ago

To reproduce produce more than ~ 1000 messages and don't wait for them. I think the Ruby GC cleans up the delivery handles which causes librdkafka to crash when it tries to update them.

frsyuki commented 6 years ago

We had the same issue. SEGV happens in producer only under high load. I suspect that the thread of rd_kafka_poll loop is trying to fill DeliveryHandle at Rdkafka::Bindings::DeliveryCallback and the memory of the DeliveryHandle is already freed by GC when reference to the DeliveryHandle is lost. Reference is lost when Rdkafka::Producer#produce raises an exception, or DeliveryHandle#wait finishes (successfully or with an error) and then Kafka sends another message.

thijsc commented 6 years ago

Indeed. I haven't found a great way to prevent this yet. We run this under high load, but we add all delivery handles to an array and wait for them. So we don't see this crash.

I think there will need to be an internal array that the handles are added to and cleared when the delivery handle finishes.

thijsc commented 6 years ago

I've reproduced it in this test:

it "should produce message that aren't waited for and not crash" do
  100_000.times do
    producer.produce(
      topic:   "produce_test_topic",
      payload: "payload not waiting",
      key:     "key not waiting"
    )
  end

  # Allow some time for a GC run
  sleep 2
end

crashes like this most of the time:

rspec spec/rdkafka/producer_spec.rb:163(96098,0x70000e32a000) malloc: *** error for object 0x7fe5c98c84f0: incorrect checksum for freed object - object was probably modified after being freed.
*** set a breakpoint in malloc_error_break to debug
[1]    96098 abort      bundle exec rspec spec/rdkafka/producer_spec.rb:163

Adding the handle to an array fixes the crash. I'm looking into a way to add this mechanism to the producer internally.

mensfeld commented 4 years ago

Even with the fix from the commit I still experienced this issue when running several producers in a spec and not closing them fast enough after the specs were done. I fixed that by defining finalizer hooks to close the client before GC kicks in:

ObjectSpace.define_finalizer(self, proc { close })

ref: https://github.com/karafka/waterdrop/pull/135/files

thijsc commented 4 years ago

It seems like ffi's hooks do not always trigger exactly as I expect. We should maybe switch to this approach altogether.

mensfeld commented 4 years ago

I mean for a regular producer that's probably ok as there's usually more than enough time to finish the work before GC kicks in (especially if you close the producer). Rails, Ruby process doing other stuff, Sidekiq etc, but I can imagine this ffi hook not kicking in a problem for thin periodically executable jobs that end with ruby exiting so it might be worth.

Adithya-copart commented 4 years ago

@thijsc I think https://github.com/appsignal/rdkafka-ruby/pull/108/commits/f9732b6f0e78604535b30e1488c47dd4ffcac913 is responsible for introducing this in a way.

The handle is no longer a AutoPointer object that comes with a finalizer by default. Other changes in https://github.com/appsignal/rdkafka-ruby/commit/740d72346777a4c441427176efa81ba91bbee5e1 made it safe to invoke producer#close multiple times without a SegFault. So, it could have added a finalizer to invoke close safely.

I even added https://github.com/appsignal/rdkafka-ruby/pull/108/commits/6d7b47221fb061b0e97a7eca1a2c1fed98a1f8bf to close all the open sockets in the test suite but the fix by @mensfeld in https://github.com/appsignal/rdkafka-ruby/pull/115 did not occur to me at the time.

mensfeld commented 4 years ago

but the fix by @mensfeld in #115 did not occur to me at the time.

@Adithya-copart you mean that after this fix all is good right?

Adithya-copart commented 4 years ago

Yeah. The fix looks good to me.

Any producer that is not closed will be closed by the finalizer after GC. If it is already closed by the user, the finalizer will do nothing.

Edit: In https://github.com/appsignal/rdkafka-ruby/commit/6d7b47221fb061b0e97a7eca1a2c1fed98a1f8bf, I ended up fixing the problem with open sockets in the test suite by calling close explicitly rather than addressing the root cause. The open sockets prevented the process from exit. The root cause is addressed in your PR by calling close.