dfdx / RDKafka.jl

Wrapper for librdkafka
Other
41 stars 24 forks source link

produce segfaults when called from a package #9

Closed deyandyankov closed 3 years ago

deyandyankov commented 3 years ago

I have created a simple package with the aim of holding a Kafka handler in module scope and emitting a message using an emit() method that uses the handle from module scope.

$ tree .
.
├── Manifest.toml
├── Project.toml
└── src
    └── PubSub2.jl

1 directory, 3 files
$ cat src/PubSub2.jl 
module PubSub2
using RDKafka
import RDKafka.produce
emitter = RDKafka.KafkaProducer("localhost:9092")
emit(key, payload) = produce(emitter, "sometopic", key, payload)
end # module

I can share my Project.toml and Manifest.toml if necessary.

When I run this, I get a segfault.

$ julia -q
julia> using PubSub2
[ Info: Precompiling PubSub2 [157795d8-f1bf-440a-8720-1ccf8327ffb8]

julia> PubSub2.emit("k", "v")

signal (11): Segmentation fault
in expression starting at REPL[2]:1
pthread_rwlock_wrlock at /lib/x86_64-linux-gnu/libpthread.so.0 (unknown line)
rwlock_wrlock at /home/azo/.julia/artifacts/4bf773d4657d4a20fea405841bd2c744c3faf2a8/lib/librdkafka.so (unknown line)
rd_kafka_topic_new0 at /home/azo/.julia/artifacts/4bf773d4657d4a20fea405841bd2c744c3faf2a8/lib/librdkafka.so (unknown line)
rd_kafka_topic_new at /home/azo/.julia/artifacts/4bf773d4657d4a20fea405841bd2c744c3faf2a8/lib/librdkafka.so (unknown line)
kafka_topic_new at /home/azo/.julia/packages/RDKafka/GlW4o/src/wrapper.jl:102
KafkaTopic at /home/azo/.julia/packages/RDKafka/GlW4o/src/client.jl:78
produce at /home/azo/.julia/packages/RDKafka/GlW4o/src/producer.jl:34
produce at /home/azo/.julia/packages/RDKafka/GlW4o/src/producer.jl:42 [inlined]
emit at /home/azo/.julia/dev/PubSub2/src/PubSub2.jl:5
_jl_invoke at /buildworker/worker/package_linux64/build/src/gf.c:2237 [inlined]
jl_apply_generic at /buildworker/worker/package_linux64/build/src/gf.c:2419
jl_apply at /buildworker/worker/package_linux64/build/src/julia.h:1703 [inlined]
do_call at /buildworker/worker/package_linux64/build/src/interpreter.c:115
eval_value at /buildworker/worker/package_linux64/build/src/interpreter.c:204
eval_stmt_value at /buildworker/worker/package_linux64/build/src/interpreter.c:155 [inlined]
eval_body at /buildworker/worker/package_linux64/build/src/interpreter.c:561
jl_interpret_toplevel_thunk at /buildworker/worker/package_linux64/build/src/interpreter.c:669
jl_toplevel_eval_flex at /buildworker/worker/package_linux64/build/src/toplevel.c:877
jl_toplevel_eval_flex at /buildworker/worker/package_linux64/build/src/toplevel.c:825
jl_toplevel_eval_flex at /buildworker/worker/package_linux64/build/src/toplevel.c:825
jl_toplevel_eval_flex at /buildworker/worker/package_linux64/build/src/toplevel.c:825
jl_toplevel_eval_in at /buildworker/worker/package_linux64/build/src/toplevel.c:929
eval at ./boot.jl:360 [inlined]
eval_user_input at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/REPL/src/REPL.jl:139
repl_backend_loop at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/REPL/src/REPL.jl:200
start_repl_backend at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/REPL/src/REPL.jl:185
#run_repl#42 at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/REPL/src/REPL.jl:317
run_repl at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.6/REPL/src/REPL.jl:305
_jl_invoke at /buildworker/worker/package_linux64/build/src/gf.c:2237 [inlined]
jl_apply_generic at /buildworker/worker/package_linux64/build/src/gf.c:2419
#874 at ./client.jl:387
jfptr_YY.874_46600.clone_1 at /opt/julia-1.6.0/lib/julia/sys.so (unknown line)
_jl_invoke at /buildworker/worker/package_linux64/build/src/gf.c:2237 [inlined]
jl_apply_generic at /buildworker/worker/package_linux64/build/src/gf.c:2419
jl_apply at /buildworker/worker/package_linux64/build/src/julia.h:1703 [inlined]
jl_f__call_latest at /buildworker/worker/package_linux64/build/src/builtins.c:714
#invokelatest#2 at ./essentials.jl:708 [inlined]
invokelatest at ./essentials.jl:706 [inlined]
run_main_repl at ./client.jl:372
exec_options at ./client.jl:302
_start at ./client.jl:485
jfptr__start_41020.clone_1 at /opt/julia-1.6.0/lib/julia/sys.so (unknown line)
_jl_invoke at /buildworker/worker/package_linux64/build/src/gf.c:2237 [inlined]
jl_apply_generic at /buildworker/worker/package_linux64/build/src/gf.c:2419
jl_apply at /buildworker/worker/package_linux64/build/src/julia.h:1703 [inlined]
true_main at /buildworker/worker/package_linux64/build/src/jlapi.c:560
repl_entrypoint at /buildworker/worker/package_linux64/build/src/jlapi.c:702
main at julia (unknown line)
__libc_start_main at /lib/x86_64-linux-gnu/libc.so.6 (unknown line)
unknown function (ip: 0x4007d8)
Allocations: 5068355 (Pool: 5066268; Big: 2087); GC: 7
Segmentation fault (core dumped)
$ 

The interesting bit is, if I define the package within my julia session instead of having it installed for development, it works as expected:

$ julia -q
julia> module PubSub3
       using RDKafka
       import RDKafka.produce
       emitter = RDKafka.KafkaProducer("localhost:9092")
       emit(key, payload) = produce(emitter, "sometopic", key, payload)
       end # module
Main.PubSub3

julia> using .PubSub3

julia> PubSub3.emit("k", "v") # this no longer segfaults

julia> [PubSub3.emit("k$(i)", "v$(i)") for i in 1:1000]
1000-element Vector{Nothing}:
 nothing
...

I am running kafka using docker wurstmeister/kafka. Happy to provide further details. Your help is highly appreciated!

dfdx commented 3 years ago

Hi! Thanks for reporting the issue. Could you please specify version of Julia you are using?

deyandyankov commented 3 years ago

hi @dfdx, thanks for the reply! I am using the following:

julia> versioninfo()
Julia Version 1.6.0
Commit f9720dc2eb (2021-03-24 12:55 UTC)
Platform Info:
  OS: Linux (x86_64-pc-linux-gnu)
  CPU: Intel(R) Core(TM) i5-6500 CPU @ 3.20GHz
  WORD_SIZE: 64
  LIBM: libopenlibm
  LLVM: libLLVM-11.0.1 (ORCJIT, skylake)

julia> 

I also observed this on Julia 1.6.0 on Windows before noticing that the package is not expected to work in Windows.

dfdx commented 3 years ago

I have a number of hypotheses, but one thing that strikes me the most is holding the network object on the module scope:

module PubSub2
using RDKafka
import RDKafka.produce
emitter = RDKafka.KafkaProducer("localhost:9092")                       # <-- this
emit(key, payload) = produce(emitter, "sometopic", key, payload)
end # module

Julia will try to precompile your module, setting emitter to whatever is available at the moment of compilation. Obviously, many things like pointers or open sockets will get invalid after loading the module in another Julia session.

The closest thing that comes to mind is to wrap the initialization code into __init__() method, which is called automatically when a module is loaded, not when it's compiled:

module PubSub2
using RDKafka
import RDKafka.produce

const emitter_ref = Ref{Any}()

function __init__()
    emitter = RDKafka.KafkaProducer("localhost:9092")    
    emitter_ref[] = emitter    
end

emit(key, payload) = produce(emitter_ref[], "sometopic", key, payload)

end # module

I didn't check that the message is actually delivered, but at least this code doesn't segfault on my machine.

deyandyankov commented 3 years ago

Thanks for looking into this on such a short notice. I believe you are right - I didn't think of when the emitter is initialized. I will try this tomorrow morning in a bit more detail. Meanwhile, I managed another workaround which also resides in a module:

function emit(producer, topic, key, message)
    partition = -1
    RDKafka.produce(producer, topic, partition, key, message)
end
function kafkaproduce()
    @info "Loading all data"
    producer = RDKafka.KafkaProducer("localhost:9092")

    for filename in tqdm(readdir(rawdatadir))
        @show filename
        df = loadrawfile(filename)
        for row in Tables.namedtupleiterator(df)
            emit(producer, "quickstart-events3", string(row.time_msc), json(row))
        end
    end
end

This works without issues but ideally I would like to have the producer in module scope so that I can use it from different functions within the module. As I said - will try this tomorrow morning and will update with details.

dfdx commented 3 years ago

This reminds me of a singleton pattern, something like this:


const PRODUCER = Ref{Union{KafkaProducer, Nothing}}(nothing)

function get_or_create_producer()
    if PRODUCER[] === nothing
        PRODUCER[] = KafkaProducer("localhost:9092")
    end
    return PRODUCER[]
end

function emit(topic, key, message)
    producer = get_or_create_producer()
    partition = -1
    RDKafka.produce(producer, topic, partition, key, message)
end

This way you create producer only when it's needed and cache it for further usage. Note that long-living connection to Kafka may break due to network errors, in which case you still will need to re-initialize it, but that's the different story.

deyandyankov commented 3 years ago

I tried this and it worked brilliantly. Thank you very much for your help! Should we close the issue?

dfdx commented 3 years ago

Yeah, I don't think we can do much more about it. Please feel free to open new issues if you have any other questions.