confluentinc / librdkafka

The Apache Kafka C/C++ library
Other
224 stars 3.15k forks source link

AIX librdkafka (through confluent-kafka-python) does not connect properly #1391

Closed tw-bert closed 7 years ago

tw-bert commented 7 years ago

Description

After building and installing librdkafka on AIX (I had to do some manual workarounds), I could install confluent-kafka-python with pip, from source, succesfully.

When trying to connect to a Kafka broker (running on Ubuntu), I get 1/1 brokers are down but they are not. I can telnet to the server:port.

I then added debug=all to the client config, and got the info below. What could be a possible cause for this behaviour?

ldd

ldd /opt/freeware/python27_64/lib/python2.7/site-packages/confluent_kafka/cimpl.so
/opt/freeware/python27_64/lib/python2.7/site-packages/confluent_kafka/cimpl.so needs:
         /opt/freeware/lib/librdkafka.so
         /usr/lib/libc.a(shr_64.o)
         /opt/freeware/lib64/libgcc_s.a(shr.o)
         /usr/lib/libpthread.a(shr_xpg5_64.o)
         /opt/freeware/lib/libz.a(libz.so.1)
         /usr/lib/libdl.a(shr_64.o)
         /opt/freeware/lib/libcrypto.a(libcrypto.so.1.0.1)
         /opt/freeware/lib/libssl.a(libssl.so.1.0.1)
         /unix
         /usr/lib/libcrypt.a(shr_64.o)
         /usr/lib/libpthreads.a(shr_xpg5_64.o)

Debug log

%7|1503479158.068|BRKMAIN|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Enter main broker thread
%7|1503479158.068|STATE|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Broker changed state INIT -> UP
%7|1503479158.068|BROADCAST|rdkafka#producer-1| [thrd::0/internal]: Broadcasting state change
%7|1503479158.068|WAKEUPFD|rdkafka#producer-1| [thrd::0/internal]: 192.168.4.87:9092/bootstrap: Enabled low-latency ops queue wake-ups
%7|1503479158.069|BROKER|rdkafka#producer-1| [thrd::0/internal]: 192.168.4.87:9092/bootstrap: Added new broker with NodeId -1
%7|1503479158.069|BRKMAIN|rdkafka#producer-1| [thrd:192.168.4.87:9092/bootstrap]: 192.168.4.87:9092/bootstrap: Enter main broker thread
%7|1503479158.069|CONNECT|rdkafka#producer-1| [thrd:192.168.4.87:9092/bootstrap]: 192.168.4.87:9092/bootstrap: broker in state INIT connecting
%7|1503479158.070|CONNECT|rdkafka#producer-1| [thrd:192.168.4.87:9092/bootstrap]: 192.168.4.87:9092/bootstrap: Connecting to ipv4#192.168.4.87:9092 (plaintext) with socket 8
%7|1503479158.070|CONNECT|rdkafka#producer-1| [thrd:192.168.4.87:9092/bootstrap]: 192.168.4.87:9092/bootstrap: couldn't connect to ipv4#192.168.4.87:9092: No such file or directory (2)
%7|1503479158.071|BROKERFAIL|rdkafka#producer-1| [thrd:192.168.4.87:9092/bootstrap]: 192.168.4.87:9092/bootstrap: failed: err: Local: Broker transport failure: (errno: No such file or directory)
%3|1503479158.071|FAIL|rdkafka#producer-1| [thrd:192.168.4.87:9092/bootstrap]: 192.168.4.87:9092/bootstrap: Failed to connect to broker at wst-tijsw.bertus.lokaal:9092: No such file or directory
%3|1503479158.071|ERROR|rdkafka#producer-1| [thrd:192.168.4.87:9092/bootstrap]: 192.168.4.87:9092/bootstrap: Failed to connect to broker at wst-tijsw.bertus.lokaal:9092: No such file or directory
%7|1503479158.071|STATE|rdkafka#producer-1| [thrd:192.168.4.87:9092/bootstrap]: 192.168.4.87:9092/bootstrap: Broker changed state INIT -> DOWN
%3|1503479158.071|ERROR|rdkafka#producer-1| [thrd:192.168.4.87:9092/bootstrap]: 1/1 brokers are down

Checklist

Please provide the following information:

edenhill commented 7 years ago

librdkafka first connects to the bootstrap brokers, queries them for the full list of brokers and then connects to those brokers returned in the cluster metadata, and it is only these later connections that can actually be used for producing and consumer (since they are mapped to a broker id and the bootstrap brokers are not).

Looking at your logs it seems like it is unable to connect to the broker hostnames as returned in the cluster metadata: Failed to connect to broker at wst-tijsw.bertus.lokaal:9092: No such file or directory

Can you succesfully telnet to that hostname and port?

tw-bert commented 7 years ago

Yes I can:

[tw@srv-benchmark-aix:/]# telnet wst-tijsw.bertus.lokaal 9092
Trying...
Connected to wst-tijsw.bertus.lokaal.
Escape character is '^]'.
bye
Connection closed.

The server log tells (only from the telnet test):

[2017-08-23 10:45:02,278] WARN Unexpected error from /10.0.2.2; closing connection (org.apache.kafka.common.network.Selector)
org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 1652122893 larger than 104857600)
        at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:91)
        at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
        at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:169)
        at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:150)
        at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:355)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:303)
        at kafka.network.Processor.poll(SocketServer.scala:494)
        at kafka.network.Processor.run(SocketServer.scala:432)
        at java.lang.Thread.run(Thread.java:748)

The server log (vanilla settings) says nothing when testing the python Producer.

sidenote: 1652122893 == 6279650D == b y e + carriage return in ascii. So the text bye seems to be recieved by the server, and interpreted as message byte size.

Just a wild guess... could this possibly be related: From python (also a custom build) , I see this:

ldd /opt/freeware/python27_64/lib/libpython2.7.so
/opt/freeware/python27_64/lib/libpython2.7.so needs:
         /opt/freeware/lib64/libssl.so
         /opt/freeware/lib64/libcrypto.so
         /usr/lib/libdl.a(shr_64.o)
         /usr/lib/libpthreads.a(shr_xpg5_64.o)
         /usr/lib/libc.a(shr_64.o)
         /opt/freeware/lib64/libgcc_s.a(shr.o)
         /unix
         /usr/lib/libcrypt.a(shr_64.o)

And from python, we can connect just fine. Python: usr/lib/libpthreads.a(shr_xpg5_64.o) and librdkafka: /usr/lib/libpthread.a(shr_xpg5_64.o) , those are different. Maybe I could try compiling with different -l flags?

EDIT never mind, it's just some symbolic linking, and effectively the same shared object library:

[tw@srv-benchmark-aix:/]# ls -l /usr/lib/libpthread.a
lrwxrwxrwx    1 bin      bin              22 Dec 22 2014  /usr/lib/libpthread.a -> /usr/lib/libpthreads.a
[tw@srv-benchmark-aix:/]# ls -l /usr/lib/libpthreads.a
lrwxrwxrwx    1 bin      bin              26 Dec 22 2014  /usr/lib/libpthreads.a -> /usr/ccs/lib/libpthreads.a
[tw@srv-benchmark-aix:/]# ls -l /usr/lib/libpthread.a
lrwxrwxrwx    1 bin      bin              22 Dec 22 2014  /usr/lib/libpthread.a -> /usr/lib/libpthreads.a
[tw@srv-benchmark-aix:/]# ls -l /usr/lib/libpthreads.a
lrwxrwxrwx    1 bin      bin              26 Dec 22 2014  /usr/lib/libpthreads.a -> /usr/ccs/lib/libpthreads.a
[tw@srv-benchmark-aix:/]# ls -l /usr/ccs/lib/libpthreads.a
-r--r--r--    1 bin      bin         1267544 Sep 19 2012  /usr/ccs/lib/libpthreads.a
edenhill commented 7 years ago

That is indeed very weird, and the error code does not make much sense either (ENOENT). AIX is unfortunately not an officially supported platform but relies on community submissions, so if you find the cause of this problem please update this issue or file a PR.

Thanks

tw-bert commented 7 years ago

Thanks Magnus, the confirmation that this is weird is already helpful. I'll see what I can find.

A couple of quick questions before I start:

edenhill commented 7 years ago
tw-bert commented 7 years ago

Very useful, thank you. I'll close this issue for now, since AIX is not officially supported. If I come up with anything useful, I'll reopen this issue joined by a PR.

Kind regards, TW

tw-bert commented 7 years ago

Abandoned. I did not get a working result, and had to abandon my effort due to priorities. I did notice using O_NDELAY instead of O_NONBLOCK on the socket fd made connect() possible. Which is strange, since O_NONBLOCK is the standardized way of O_NDELAY. Then again, trying to get this to work, after making the necessary changes to errno checking (taking EWOULDBLOCK into account), I still could not transfer data from AIX to the Kafka cluster by librdkafka. Alas, but since we have different routes in our backend at our disposal, we'll just use an intermediate service layer.

edenhill commented 7 years ago

@tw-bert Sorry to hear that. If you are interested in getting official support for AIX I would recommend reaching out to us at Confluent to discuss ways forward.

Nidhin-Sachi commented 4 years ago

Hi tw-bert,

How build the librdkafka library for AIX ?, Configuration script itself is failing for me.

Because of the unavailability of mktemp command and some another issue.

tw-bert commented 4 years ago

@Nidhin-Sachi I abandoned this, see https://github.com/edenhill/librdkafka/issues/1391#issuecomment-325332068 .

The best I can do to help, is give you my notes from that time. Feel free to take it forward from there. Here we go:

Kafka client aix install

2017Q4 TW - trying to get confluence kafka client to build on AIX. \ TL;DR : I did not get this working, aborted. If and when we really need this, I advise to build a JNI bridge and use the Java client Kafka module.

make customization

Had to adjust the Makefile (after ./configure --prefix /opt/freeware). Remove/comment everything except 'mklove-check' and 'libs' in 'all'. Run 'make' (after GccEnvironment). The .so and .a are created in libs, just copy them. Copy the include as well (install the Python module by pip from source, and you'll see which one it needs at which path).

Makefile 0.11:

C source customization

# Poor man's debugging on AIX:
# From windows, open (VS2017): T:\opt\freeware\build_and_install\librdkafka-0.11.0\win32\librdkafka.sln
# putty session A: make
# putty session B: run a fragment

A

slibclean&&make&&cp -p ./src/librdkafka.so  /opt/freeware/lib/

B (run fragment 1)

ipython
from confluent_kafka import Producer
p = Producer({'bootstrap.servers'      : 'srv-docker-dev-01:9094'})
p.produce('twtest', key='testingaix', value='hi_aix')
p.flush()
exit # (or kill from other putty)

B (run fragment 2)

cd /ota/repo/wrkdev/tw/amber_python/src/amber/msroot/mskafka/logic_xu
joe sc_mskafka_redisfollower_produce_contextchange_xu.py # Adjust config. Careful with hg Commit&Pull&Merge -> changes will be overwritten.
clear&&python sc_mskafka_redisfollower_produce_contextchange_xu.py
# Note: I used exit(0) in the librdkafka C code, to stop the session after my analysis
When using librdkafka, it would not connect to the Kafka server (and telnetting to the server:port went fine):
https:_github.com/edenhill/librdkafka/issues/1391

While debugging, I found out that the socketfd (file descriptor) starts misbehaving at connect() after setting O_NONBLOCK in the fd flags.

I tried 0x800 instead, which is on AIX: O_NDELAY

You should be able to get the same functionality with O_NDELAY , but have to check and handle different errno values afterwards (mainly EWOULDBLOCK).
However, after trying, it still would not work.
#ifdef _MSC_VER
                if (unlikely(r == SOCKET_ERROR)) {
                        if (sum > 0 || WSAGetLastError() == WSAEWOULDBLOCK)
                                return sum;
                        else {
                                rd_snprintf(errstr, errstr_size, "%s",
                                            socket_strerror(WSAGetLastError()));
                                return -1;
                        }
                }
#elif _AIX
                if (unlikely(r <= 0)) {
                    if (r == 0 || errno == EAGAIN || errno == EWOULDBLOCK)
                        return 0;
                    rd_snprintf(errstr, errstr_size, "%s",
                        socket_strerror(socket_errno));
                    return -1;
                }
#else
                if (unlikely(r <= 0)) {
                        if (r == -1 && (socket_errno == EAGAIN
#ifdef _AIX
                            || socket_errno == EWOULDBLOCK
#endif
                ))
                        if (res == SOCKET_ERROR &&
                    (socket_errno != EINPROGRESS
#ifdef _AIX
                        && socket_errno != EWOULDBLOCK
#endif
#ifdef _MSC_VER
                       && socket_errno != WSAEWOULDBLOCK
#endif
                            ))
                        r = socket_errno;
                else
                        r = 0;
rd_rkb_dbg(rkb, BROKER, "CONNECT", "Test FOO 2");
_ or, when no rkb object avail:
printf("FOO BAR\n");

                        rd_rkb_dbg(rkb, BROKER, "CONNECT", "Test FOO 2");

                        _START foo
                        int sockfd;
                        _struct hostent *server;
                        _struct sockaddr_in serv_addr;
                        sockfd = socket(AF_INET, SOCK_STREAM, 6);
                        rd_rkb_dbg(rkb, BROKER, "CONNECT", "SeparateTcpTest FOO sockfd=%i, the ip=%s", sockfd, rd_sockaddr2str(sinx, NI_NUMERICHOST));

                        int flagsX = fcntl(sockfd, F_GETFL, 0);
                        fcntl(sockfd, F_SETFL, flagsX | 0x800); _ 0x4 _ 0x800 _ O_NONBLOCK

                        _int opt = 1;
                        _ioctl(sockfd, FIONBIO, &opt);

                        _int s = socket(PF_INET, SOCK_STREAM, 0);

                        _ fcntl(sockfd, F_SETFL, O_NONBLOCK);  _ set to non-blocking

                        _int flagsY = fcntl(sockfd, F_GETFL, 0);
                        _rd_rkb_dbg(rkb, BROKER, "CONNECT", "SeparateTcpTest FOO flagsY=%i", flagsY);
                        _int ret2=fcntl(sockfd, F_SETFL, flagsY | O_NDELAY);
                        _rd_rkb_dbg(rkb, BROKER, "CONNECT", "SeparateTcpTest FOO ret2=%i", ret2);

                        _ -----------

                        /*
                        server = gethostbyname(rd_sockaddr2str(sinx, NI_NUMERICHOST));
                        if (server == NULL) {
                            fprintf(stderr, "ERROR, no such host\n");
                            exit(0);
                        }
                        rd_rkb_dbg(rkb, BROKER, "CONNECT", "SeparateTcpTest FOO server=%s (%i)", server->h_name, server->h_addrtype);

                        memset(&serv_addr, 0, sizeof(serv_addr));
                        serv_addr.sin_family = AF_INET;

                        memcpy(&serv_addr.sin_addr.s_addr, server->h_addr, server->h_length);

                        serv_addr.sin_port = 9092; _ htons(portno);
                        int restest;
                        restest = connect(sockfd, (struct sockaddr *) &serv_addr, sizeof(serv_addr));
                        if (restest < 0)
                            rd_rkb_dbg(rkb, BROKER, "CONNECT", "SeparateTcpTest FOO error connecting.");
                        rd_rkb_dbg(rkb, BROKER, "CONNECT", "SeparateTcpTest FOO after connect: %i.", restest);
                        */

                        _s = sockfd;
                        _END FOO

URLs - later searches

https://www.ibm.com/support/knowledgecenter/en/ssw_aix_72/com.ibm.aix.commtrf2/connect.htm\ http://www.linuxhowtos.org/C_C++/socket.htm\ https://stackoverflow.com/questions/2816534/aix-specific-socket-programming-query\ https://www.ibm.com/developerworks/community/forums/html/topic?id=77777777-0000-0000-0000-000014479532\ http://developerweb.net/viewtopic.php?id=3000 Put socket in non blocking mode\ ftp://public.dhe.ibm.com/aix/freeSoftware/aixtoolbox/docs/apidiffs.html Porting Differences for Linux and AIX APIs\ http://ps-2.kev009.com/rs6000/docs/running_linux_apps_on_aix.pdf IBM - Running Linux Applications on AIX\ https://github.com/confluentinc/confluent-kafka-dotnet/issues/90 Receive failed: Invalid message size 100663755 (0..100000000): increase receive.message.max.bytes\ https://mail.python.org/pipermail/python-list/1999-May/015522.html O_NONBLOCK vs. O_NDELAY for non-blocking sockets\ https://sourceforge.net/p/predef/wiki/OperatingSystems/ Pre-defined Compiler Macros AIX and other OS-es\ https://github.com/confluentinc/confluent-kafka-python\ https://sookocheff.com/post/kafka/kafka-quick-start/\ https://stackoverflow.com/questions/32797476/quit-the-whole-program-early-in-c\ https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

URLs - initial searches

http://kafka.apache.org/documentation/\ https://kafka.apache.org/downloads\ https://www.apache.org/dyn/closer.cgi?path=/kafka/0.11.0.0/kafka-0.11.0.0-src.tgz\ https://kafka.apache.org/\ https://cwiki.apache.org/confluence/display/KAFKA/Clients\ http://docs.confluent.io/current/clients/confluent-kafka-python/index.html#configuration\ https://devops.profitbricks.com/tutorials/install-and-configure-apache-kafka-on-ubuntu-1604-1/\ http://www-01.ibm.com/support/docview.wss?uid=swg21980197 shows that IBM themselves use kafka\ https://www.ibm.com/developerworks/aix/library/au-gnu.html research for 'install' binary on AIX compared to Linux/gnu\ https://github.com/edenhill/librdkafka/blob/master/README.md\ https://github.com/edenhill/librdkafka/tree/master/tests\ https://github.com/edenhill/librdkafka/releases\ https://github.com/edenhill/librdkafka/issues/1391 tw-bert AIX librdkafka (through confluent-kafka-python) does not connect properly\ https://github.com/edenhill/librdkafka/issues/423 Remove old brokers\ https://github.com/edenhill/kafkacat/issues/16 In producer mode, it sends the messages but throws errors while starting up\ https://github.com/confluentinc/confluent-kafka-python\ https://github.com/confluentinc/confluent-kafka-python/tree/master/tests\ ftp://www.oss4aix.org/RPMS/\ https://stackoverflow.com/questions/22180354/why-am-i-getting-undefined-reference-to-pthread-mutexattr-settype\ https://stackoverflow.com/questions/2127797/gcc-significance-of-pthread-flag-when-compiling\ https://gcc.gnu.org/bugzilla/show_bug.cgi?id=17480\ http://www.linuxquestions.org/questions/aix-43/overly-zealous-collect2-on-aix-4175519409/\ https://gcc.gnu.org/bugzilla/show_bug.cgi?id=54791\ https://www.digitalocean.com/community/tutorials/how-to-create-a-sudo-user-on-ubuntu-quickstart\ https://www.digitalocean.com/community/tutorials/how-to-add-and-delete-users-on-ubuntu-16-04\ https://unix.stackexchange.com/questions/4408/how-to-set-visudo-to-use-a-different-editor-than-the-default-on-fedora\ https://forums.docker.com/t/access-docker-container-files/28906/7\ getopt https://groups.google.com/forum/#!topic/git-users/wfpaqErqb8A\ http://srv-docker-dev-01.bertus.lokaal:9000/clusters/Dev_Kafka_Cluster\ http://blog.boreas.ro/2008/03/porting-git-to-hp-ux-pa-risc-and-aix.html\ https://devops.profitbricks.com/tutorials/install-and-configure-apache-kafka-on-ubuntu-1604-1/

Nidhin-Sachi commented 4 years ago

@tw-bert Thank you for the response.. I will try this one.. Actually I need the things upto library compilation only. Anyway I will let you know if I am getting any idea regarding with this issue.

Thanks.

tw-bert commented 4 years ago

@Nidhin-Sachi

I see a reference to GccEnvironment, I'll include that script here. Just some compile flags and such. Let me know if something else is missing.

#!/usr/bin/env bash
#
# GccEnvironment
# Pass statement to be executed to this script.
# Sets all generic customer-domain-specific gcc compiler options
# 64 bit, compiled with references to our standard paths including /opt/freeware/lib64
# TS+TW 2013 Q4
#

echo "Calling slibclean, to prevent file-in-use issues "
sudo -E slibclean
echo "Done with slibclean "

alias make="gmake" # Note: /usr/bin/gmake is symlink to ../../opt/freeware/bin/make
# possibly needed, but not for now: # alias getopt="/opt/freeware/bin/getopt"

if [ "${OS}" == "" ]
then
  export OS=$(uname -s)
fi

export CC=gcc
# Note on LD: the normal 'ld' is AIX native, gcc supports more linking options. Many makefiles use LD env var.
export LD=gcc
export CFLAGS="-maix64 -O2 -D_AIX -D_AIX53 -D_AIX61 -D_AIX71 -I/opt/freeware/include -I/usr/include${CFLAGS_XTRA}"
# Note on CFLAGS: -ffast-math should not be use with -O2 or -O3, see https://gcc.gnu.org/onlinedocs/gcc-4.1.0/gcc/Optimize-Options.html 
export CXX=g++
export CXXFLAGS=$CFLAGS
export LDFLAGS="-maix64 -L/opt/freeware/lib64 -L/opt/freeware/lib -Wl,-blibpath:/opt/freeware/lib64:/opt/freeware/lib/pthread/ppc64:/opt/freeware/lib:/usr/lib:/lib,-bmaxdata:0x80000000"
# Note on LDFLAGS and missing symbols: supply a symbol file like this: -Wl,-bE:<export filename>.exp , or use -Wl,-bexpall (all without starting with dot) , or use -Wl,-bexpfull (really all) 
# Further info: https://www.ibm.com/developerworks/aix/library/au-gnu.html

export AR="ar -X64"
# Note: was previously: export LIBPATH=.:/opt/freeware/lib64:/opt/freeware/lib:/usr/lib:/lib
# But, we got errors because of the '.:' (current dir) causing gcc cc1 to give a runtime error (building lxml).
export LIBPATH=/opt/freeware/lib64:/opt/freeware/lib:/usr/lib:/lib
export OBJECT_MODE=64
echo "Gcc environment variables (CC,CFLAGS,LDFLAGS,LIBPATH,etc) are set. "

if [ "$1" != "--noexec" ]
then
exec "$@"
fi

echo "GccEnvironment has finished. "

#EOF
Nidhin-Sachi commented 4 years ago

@tw-bert Thanks. I will refer this.