Open harmathy opened 3 years ago
The original implementation was removed by fd1040a04c41e88324b9cfbec811d868e40fbc80 with reference to problems with threading. The problem is, that the registered callback function will be called from a different thread than the one, which initializes the library and thus where the perl interpreter runs.
There was, however, reference that such callbacks are in fact possible: There is a forum discussion (in German) about this problem. The actual implementation can be found at MaxPerl/MPV-Simple. It boils down to making a copy of the interpreter context
+PerlInterpreter *parent_perl = NULL;
+extern PerlInterpreter *parent_perl;
+PerlInterpreter *current_perl = NULL;
and not setting the variable PERL_NO_GET_CONTEXT
.
-#define PERL_NO_GET_CONTEXT
I haven't grasped the reason for this, but it seems to have the desired effect.
The actual callback function is stored in a variable in the name space of Kafka::Librd
.
I would have loved to have the function stored as reference at the object itself, but even after digging deep in Perls guts, I haven't found a solution.
I noticed that the compilation fails in CI, but I couldn't figure out the reason. I tried with an Ubuntu container, and it would build just fine. Even including proto.h
in rdkafkaxs.c
didn't fix it.
I suppose it would be a good idea to also implement the log level constants:
#define LOG_EMERG 0
#define LOG_ALERT 1
#define LOG_CRIT 2
#define LOG_ERR 3
#define LOG_WARNING 4
#define LOG_NOTICE 5
#define LOG_INFO 6
#define LOG_DEBUG 7
https://github.com/edenhill/librdkafka/blob/40e40da3c13db0d1933c51d24fc7e204d264512f/src/rd.h#L81
Sorry I lost the thread on this after saying I would review a PR on this subject. There seems to be a an error reported in CI related to perl_clone
that needs to be fixed. Also you will need to rebase onto the current main
branch to get GH actions to run the CI instead of travis-ci. I've also assigned myself as reviewer for this PR so that I can come back and review it in more detail in the near future.
cc -c -fwrapv -fno-strict-aliasing -pipe -fstack-protector-strong -I/usr/local/include -D_LARGEFILE_SOURCE -D_FILE_OFFSET_BITS=64 -D_FORTIFY_SOURCE=2 -O2 -DVERSION=\"0.19\" -DXS_VERSION=\"0.19\" -fPIC "-I/opt/perl/5.32.0/lib/5.32.0/x86_64-linux/CORE" rdkafkaxs.c
rdkafkaxs.c: In function 'krd_parse_config':
rdkafkaxs.c:124:24: warning: implicit declaration of function 'perl_clone'; did you mean 'PerlIO_clone'? [-Wimplicit-function-declaration]
current_perl = perl_clone(parent_perl, CLONEf_KEEP_PTR_TABLE);
^~~~~~~~~~
PerlIO_clone
rdkafkaxs.c:124:22: warning: assignment to 'PerlInterpreter *' {aka 'struct interpreter *'} from 'int' makes pointer from integer without a cast [-Wint-conversion]
current_perl = perl_clone(parent_perl, CLONEf_KEEP_PTR_TABLE);
^
rm -f blib/arch/auto/Kafka/Librd/Librd.so
LD_RUN_PATH="/usr/lib/x86_64-linux-gnu" cc -shared -O2 -L/usr/local/lib -fstack-protector-strong Rdkafka.o rdkafkaxs.o -o blib/arch/auto/Kafka/Librd/Librd.so \
-lrdkafka \
chmod 755 blib/arch/auto/Kafka/Librd/Librd.so
+ make test TEST_VERBOSE=1
"/opt/perl/5.32.0/bin/perl" -MExtUtils::Command::MM -e 'cp_nonempty' -- Librd.bs blib/arch/auto/Kafka/Librd/Librd.bs 644
PERL_DL_NONLAZY=1 "/opt/perl/5.32.0/bin/perl" "-MExtUtils::Command::MM" "-MTest::Harness" "-e" "undef *Test::Harness::Switches; test_harness(1, 'blib/lib', 'blib/arch')" t/*.t
# Failed test 'use Kafka::Librd;'
# at t/00-load.t line 8.
Bailout called. Further testing stopped:
# Tried to use 'Kafka::Librd'.
# Error: Can't load '/work/blib/arch/auto/Kafka/Librd/Librd.so' for module Kafka::Librd: /work/blib/arch/auto/Kafka/Librd/Librd.so: undefined symbol: perl_clone at /opt/perl/5.32.0/lib/5.32.0/x86_64-linux/DynaLoader.pm line 193.
# at t/00-load.t line 8.
# Compilation failed in require at t/00-load.t line 8.
# BEGIN failed--compilation aborted at t/00-load.t line 8.
# Testing Kafka::Librd 0.19, Perl 5.032000, /opt/perl/5.32.0/bin/perl
# Looks like you failed 1 test of 1.
FAILED--Further testing stopped.
make: *** [Makefile:1071: test_dynamic] Error 1
The command "cip script" exited with 2.
cache.2
store build cache
I suppose it would be a good idea to also implement the log level constants:
#define LOG_EMERG 0 #define LOG_ALERT 1 #define LOG_CRIT 2 #define LOG_ERR 3 #define LOG_WARNING 4 #define LOG_NOTICE 5 #define LOG_INFO 6 #define LOG_DEBUG 7
https://github.com/edenhill/librdkafka/blob/40e40da3c13db0d1933c51d24fc7e204d264512f/src/rd.h#L81
Constants can be added to utils/generate_const.pl
to expose them into Perl space. They should also be added to @EXPORT_OK
.
Thanks for looking into this! I will take some time to dive into this again.
I should also mention, I remember, I ran a memory test with Test::Valgrind
and there was a small memory leak somewhere with this implementation.
There is another problem with these changes: The consumer crashes with a segmentation fault if one of the brokers go down during runtime.
Thanks to a second pair of eyes of my colleague, here is a version, which keeps PERL_NO_GET_CONTEXT
.
The main issue, which caused the hassle with the perl interpreter context, was trying get the rd_kafka_t
from the logging callback function into perl context. But leaving it alone and passing only the actual logging information to the perl function not only solved that issue, but makes the code a little cleaner.
I re-introduced the functionality of the config option
log_cb
, which basically implementsrd_kafka_conf_set_log_cb
from librdkafka.