pipelinedb / pipeline_kafka

PipelineDB extension for Kafka support
Other
61 stars 22 forks source link

Unable to read data from kafka #59

Closed jayesh4192 closed 7 years ago

jayesh4192 commented 7 years ago

I have setup pipelinedb with kafka extension and it works with example provided if I use localhost. When I use a different hosts for some reason it doesn't work, here is the error that I get when I say consume.begin - ERROR: [pipeline_kafka] kafka_perf_stream <- fact_performance_event (PID 25571): failed to acquire metadata: Local: Broker transport failure. So to confirm that things are working correctly on the same box where pipelinedb has been installed, I use librdkafka library to read the data from my kafka and it works correctly. So their is something I am missing in pipeline_kakfa extension that is causing this issue, is there any configuration that I am missing.

derekjn commented 7 years ago

Hi @jayesh4192, that's a librdkafka error you're seeing. Is there any more information in the Kafka broker logs around the time the error occurs? Also, have you verified that all of your brokers have been added using pipeline_kafka.add_broker()?

derekjn commented 7 years ago

This seems like a broker connectivity issue, closing unless/until more information becomes available.

jayesh4192 commented 7 years ago

Hi,

Sorry I was out so couldn't respond. I don't see any logs in broker. I tested with 2 setup, one with one broker and another with multiple brokers I do see the same issue. As I mentioned, I am able to read the data through librdkafka using their client, for example -

-bash-4.1$ bin/kafka-console-producer.sh --broker-list broker_hostname:9092 --topic test 
hello...

-bash-4.1$ ./rdkafka_consumer_example -b pipeline_hostname:9092  test
% Subscribing to 1 topics
% Consumer group rebalanced: assigned:
 test [0] offset -1001
% Consumer reached end of test [0] message queue at offset 64
% Message (topic test [0], offset 64, 9 bytes):
Message Payload hexdump (9 bytes):
00000000: 68 65 6c 6c 6f 2e 2e 2e 20                       hello...        
% Consumer reached end of test [0] message queue at offset 65

Using pipelinedb with following steps, I don't see any data coming through -

CREATE STREAM logs_stream (payload json);
CREATE CONTINUOUS VIEW message_count AS SELECT COUNT(*) FROM logs_stream;
SELECT pipeline_kafka.add_broker('broker_hostname:9092');
SELECT pipeline_kafka.consume_begin('test', 'logs_stream', format := 'json');

.. I do see the error immediately in pipelinedb logs

ERROR:  [pipeline_kafka] logs_stream <- test (PID 15528): failed to acquire metadata: Local: Broker transport failure

... At this point I send the data using kakfa producer from another host.

pipeline=# select * from message_count;
 count 
-------
(0 rows)

Also if their was broker connectivity issue than I would not be able to read the data using kafka example. Let me know what other details you need, also can you reopen this issue from your end?

derekjn commented 7 years ago

@jayesh4192 thanks for the follow up, we'll take a look!

jayesh4192 commented 7 years ago

Thanks Derek, do let me know if you need any information. I am trying to prototype this and show its potential.

jayesh4192 commented 7 years ago

I debugged the issue from my end, firstly I dont see any connection opening up from pipeline db box to kafka. I took a tcpdump and I could not see any connections opening up. I was able to get metadata information using librdkafka from remote box, I copied working code to get metadata information from librdkafka example and I was still not able to make it work using pipelinedb. I suspect to do with pipeline db setup/configuration. Is there a way I can printout log messages from librdkafka? Is there a way I can attach the pipelinedb process to gdb, I tried without any success. Do you know any customer who have this kind of setup and it worked?

This is the suspicious code that is causing this issue - while (1) { rd_kafka_broker_t *rkb; int remains; int version = rd_kafka_brokers_get_state_version(rk);

            /* Try non-blocking (e.g., non-fetching) brokers first. */
            if (do_lock)
                    rd_kafka_rdlock(rk);
            rkb = rd_kafka_broker_any(rk, RD_KAFKA_BROKER_STATE_UP,
                                      rd_kafka_broker_filter_non_blocking,
                                      NULL);
            if (!rkb)
                    rkb = rd_kafka_broker_any(rk, RD_KAFKA_BROKER_STATE_UP,
                                              NULL, NULL);
            if (do_lock)
                    rd_kafka_rdunlock(rk);

            if (rkb)
                    return rkb;

            remains = rd_timeout_remains(ts_end);
            if (rd_timeout_expired(remains))
                    return NULL;

            rd_kafka_brokers_wait_state_change(rk, version, remains);
    }
derekjn commented 7 years ago

Hi @jayesh4192, which version of Kafka are you running?

jayesh4192 commented 7 years ago

I tried with these versions - kafka_2.11-0.10.1.0 and kafka-0.10.0.1.4

jayesh4192 commented 7 years ago

We are building this pipeline for Yahoo and another person from another team faced same issue as me. Is there a way we can resolve this quickly?

derekjn commented 7 years ago

Hi @jayesh4192, so far everything is pointing to a problem with the environment/configuration. We have a lot of users using pipeline_kafka with the versions you noted, and the error you're seeing is coming from the librdkafka library so unfortunately there isn't much we can do with the information provided. We're happy to help you get to the bottom of this if we can though...

What version of librdkafka did you build?

derekjn commented 7 years ago

You also might want to try setting pipeline_kafka.broker_version in pipelinedb.conf to your specific broker version. For example,

pipeline_kafka.broker_version = '0.10.1.0'
derekjn commented 7 years ago

@jayesh4192 did setting pipeline_kafka.broker_version help at all?

jayesh4192 commented 7 years ago

This issue can be closed now, there was a conflict with linux kernel version of glibc with pipelinedb version. After fixing that I was able to resolve it.

derekjn commented 7 years ago

Yikes, thanks @jayesh4192, nice find! If you think there's an issue with our build, please let us know at https://github.com/pipelinedb/pipelinedb/issues.