dfdx / RDKafka.jl

Wrapper for librdkafka
Other
41 stars 24 forks source link

Segmentation fault trying to capture SIGINT #12

Open attdona opened 3 years ago

attdona commented 3 years ago

In my consumer I'm not able to capture SIGINT and exit in a controlled way: instead of capturing an InterruptException I got a segmentation fault.

Julia Version 1.6.0
Commit f9720dc2eb (2021-03-24 12:55 UTC)
Platform Info:
  OS: Linux (x86_64-pc-linux-gnu)
  CPU: Intel(R) Xeon(R) CPU E3-1225 v5 @ 3.30GHz
  WORD_SIZE: 64
  LIBM: libopenlibm
  LLVM: libLLVM-11.0.1 (ORCJIT, skylake)

Below a minimal working example, consume.jl, for reproducing the problem:

using RDKafka

Base.exit_on_sigint(false)

function main()

    try
        c = KafkaConsumer("localhost:9092", "my-consumer-group")
        parlist = [("quickstart-events", 0)]
        subscribe(c, parlist)
        timeout_ms = 1000
        while true
            msg = poll(String, String, c, timeout_ms)
            @show(msg)
        end
    catch e
        @info e
    end
end

main()

result:

$ julia consume.jl
$ Ctrl-C
signal (11): Segmentation fault
in expression starting at /home/adona/dev/juliadev/rdkafka/src/consume.jl:21
pthread_mutex_lock at /lib/x86_64-linux-gnu/libpthread.so.0 (unknown line)
mtx_lock at /home/adona/.julia/artifacts/4bf773d4657d4a20fea405841bd2c744c3faf2a8/lib/librdkafka.so (unknown line)
rd_kafka_q_serve at /home/adona/.julia/artifacts/4bf773d4657d4a20fea405841bd2c744c3faf2a8/lib/librdkafka.so (unknown line)
rd_kafka_poll at /home/adona/.julia/artifacts/4bf773d4657d4a20fea405841bd2c744c3faf2a8/lib/librdkafka.so (unknown line)
kafka_poll at /home/adona/.julia/packages/RDKafka/GlW4o/src/wrapper.jl:114 [inlined]
#3 at /home/adona/.julia/packages/RDKafka/GlW4o/src/client.jl:52 [inlined]
macro expansion at ./asyncevent.jl:252 [inlined]
#583 at ./task.jl:406
unknown function (ip: 0x7f4949c0d1ac)
_jl_invoke at /buildworker/worker/package_linux64/build/src/gf.c:2237 [inlined]
jl_apply_generic at /buildworker/worker/package_linux64/build/src/gf.c:2419
jl_apply at /buildworker/worker/package_linux64/build/src/julia.h:1703 [inlined]
start_task at /buildworker/worker/package_linux64/build/src/task.c:839
unknown function (ip: (nil))
Allocations: 2788065 (Pool: 2786867; Big: 1198); GC: 4
Segmentation fault (core dumped)
dfdx commented 3 years ago

Thanks for reporting this! The problem is not 100% reproducible, but I think it comes from our finalizers - before Julia object is garbage-collected, we destroy the corresponding C objects. But librdkafka seems to also catch SIGINT before us and destroy the resources on its own, leaving us with invalid handles. Unfortunately I can't see any API to check if a handler is still valid to include it into finalizers.

Is there a reason you want to terminate the process with SIGINT instead of, say, checking a terminating flag in the consumption loop?

Relevant docs: https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#termination

attdona commented 3 years ago

SIGINT handling was my first approach because my application is a background process that is managed by a supervisor, in my case supervisord.

The default mechanism of stopping a process by a supervisor is sending a signal to the process.

Apart from that I think my overall approach has to be changed because, for what I understand, librdkafka manages it own set of threads and an internal I/O event loop that goes in addition with the libuv event loop of Julia.

The presence of two event loops, and the internal houskeeping of threads and async tasks of Julia and the "out of julia control" librdkafka threads, is probably a source of potential issues.

dfdx commented 3 years ago

According to the documentation:

librdkafka uses multiple threads internally to fully utilize modern hardware. The API is completely thread-safe and the calling application may call any of the API functions from any of its own threads at any time.

I don't think there's additional (asynchronous) event loop in librdkafka, just old plain synchronous threads, 1 main thread and one thread per broker. In a typical scenario you start a consumer and run it for a long time without interruptions, so I don't see any source for conflicts between threads or asynchronous tasks here. Segfault instead of SIGINT is an unfortunate detail which I hope we will find a way to deal with, but if it's caused by supervisord killing the service, there result seems to be the same - the program is stopped and the resources are released. Do you see any other reasons to worry about multithreading in librdkafka?

attdona commented 3 years ago

Thanks! looking at the sources there is not an event loop in librdkafa, I probably got confused from a misinterpretation of some test case.

Based on this, I don't see any others reason to worry about multithreading in librdkfka.

attdona commented 3 years ago

I've a patch that seems to works correctly regarding this issue.

I've removed all finalizer call because for what I know it is hard to say in what orders finalizer get called and this may cause problems with the correct order of api call expected by librdkafka:

https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#high-level-kafkaconsumer

In the case of consumer this implies that he user has to explicitly close the consumer:

Base.exit_on_sigint(false)

consumer = KafkaConsumer("localhost:9092", "my-consumer-group")
try
    parlist = [("my_partition", 0)]
    subscribe(consumer, parlist)
    timeout_ms = 3000
    while true
        msg = poll(String, String, consumer, timeout_ms)
        @show(msg)
    end
catch e
    # just trace InterruptException
    @info e
finally
    close(consumer)
end

Another source of problem was the timer kafka_poll inside KafkaClient function.

I'm sending a Pull Request ...

Attilio