yazgoo / fuse_kafka

developer repository for https://github.com/fuse-kafka/fuse_kafka
Apache License 2.0
27 stars 4 forks source link

fuse_kafka maintaining more than one connection on kafka restart #3

Closed yazgoo closed 9 years ago

yazgoo commented 9 years ago

Here is the problem:

$ # first lets start zk and kafka
$ ./build.py zookeeper_start
$ ./build.py kafka_start
$ ./src/fuse_kafka.py start
$ # lets query the number of connection opened by fuse_kafka to kafka
$ netstat -tualpn 2>/dev/null|grep 9092|sed 's/.*ESTABLISHED //'|grep fuse_kafka|sort|uniq -c
      2 13673/fuse_kafka
$ # lets rebounce kafka
$ pkill -f kafka.Kafka
$ ./build.py kafka_start
$ netstat -tualpn 2>/dev/null|grep 9092|sed 's/.*ESTABLISHED //'|grep fuse_kafka|sort|uniq -c
      3 13673/fuse_kafka
$ # the number of connection increased => it should not

To me it seems that the issue comes from src/zookeeper.c:

static void watcher(zhandle_t *zh, int type,
        int state, const char *path, void *param)
{
    /*...*/
    if (k->no_brokers || type == ZOO_CHILD_EVENT && strncmp(
                path, BROKER_PATH, sizeof(BROKER_PATH) - 1) == 0)
    {
        brokers[0] = '\0';
        set_brokerlist_from_zookeeper(zh, brokers);
        if (brokers[0] != '\0' && k->rk != NULL)
        {
            rd_kafka_brokers_add(k->rk, brokers);

We should only add brokers to rd_kafka if they where not added already.

yazgoo commented 9 years ago

Closing issue, fix validated with tests on single and multi-node kafka.