uperl / Kafka-Librd

perl bindings to librdkafka
11 stars 8 forks source link

Producer not working #9

Closed veqryn closed 6 years ago

veqryn commented 6 years ago

I have a perl producer that looks like the following:

my $kafka_producer = Kafka::Librd->new( Kafka::Librd::RD_KAFKA_PRODUCER, {'debug' => 'all'} );
$kafka_producer->brokers_add("$ENV{KAFKA_HOST}:9092");

my $kafka_topic = $kafka_producer->topic( "$ENV{KAFKA_TOPIC}", {} );

# <stuff>

    eval {
        my $response = $kafka_topic->produce(
            $partition,        # partition (zero based)
            0,                 # msgflags
            $json,             # single email + metadata
            "$msg_hash"        # key
        );
    };
    if ( $@ ) {
        die "can't send to kafka: $@";
    }

The process exits successfully, and says that it produced the messages. However, the messages never show up in kafka (I've tested both with my own consumer, and a kafkacat consumer).

The output with the debug flag is:

%7|1510013279.519|BRKMAIN|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Enter main broker thread
%7|1510013279.519|STATE|rdkafka#producer-1| [thrd::0/internal]: :0/internal: Broker changed state INIT -> UP
%7|1510013279.519|BROADCAST|rdkafka#producer-1| [thrd::0/internal]: Broadcasting state change
%7|1510013279.519|WAKEUPFD|rdkafka#producer-1| [thrd:app]: kafka:9092/bootstrap: Enabled low-latency ops queue wake-ups
%7|1510013279.519|BROKER|rdkafka#producer-1| [thrd:app]: kafka:9092/bootstrap: Added new broker with NodeId -1
%7|1510013279.519|TOPIC|rdkafka#producer-1| [thrd:app]: New local topic: inbox-monitor-ref-emails_dev
%7|1510013279.519|BRKMAIN|rdkafka#producer-1| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap: Enter main broker thread
%7|1510013279.519|CONNECT|rdkafka#producer-1| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap: broker in state INIT connecting
%7|1510013279.519|TOPPARNEW|rdkafka#producer-1| [thrd:app]: NEW inbox-monitor-ref-emails_dev [-1] 0x18d8c10 (at rd_kafka_topic_new0:282)
%7|1510013279.519|METADATA|rdkafka#producer-1| [thrd:app]: Skipping metadata refresh of 1 topic(s): no usable brokers
%7|1510013279.522|CONNECT|rdkafka#producer-1| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap: Connecting to ipv4#172.18.0.2:9092 (plaintext) with socket 7
%7|1510013279.523|STATE|rdkafka#producer-1| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap: Broker changed state INIT -> CONNECT
%7|1510013279.523|BROADCAST|rdkafka#producer-1| [thrd:kafka:9092/bootstrap]: Broadcasting state change
Creating message for: xxxx
%7|1510013279.533|CONNECT|rdkafka#producer-1| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap: Connected to ipv4#172.18.0.2:9092
%7|1510013279.533|CONNECTED|rdkafka#producer-1| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap: Connected (#1)
%7|1510013279.533|APIVERSION|rdkafka#producer-1| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap: Using (configuration fallback) 0.9.0 protocol features
%7|1510013279.533|APIVERSION|rdkafka#producer-1| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap:  Feature MsgVer1: Produce (2..2) NOT supported by broker
%7|1510013279.533|APIVERSION|rdkafka#producer-1| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap:  Feature MsgVer1: Fetch (2..2) NOT supported by broker
%7|1510013279.533|APIVERSION|rdkafka#producer-1| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap: Disabling feature MsgVer1
%7|1510013279.533|APIVERSION|rdkafka#producer-1| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap:  Feature ApiVersion: ApiVersion (0..0) NOT supported by broker
%7|1510013279.533|APIVERSION|rdkafka#producer-1| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap: Disabling feature ApiVersion
%7|1510013279.533|APIVERSION|rdkafka#producer-1| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap:  Feature BrokerGroupCoordinator: GroupCoordinator (0..0) supported by broker
%7|1510013279.533|APIVERSION|rdkafka#producer-1| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap: Enabling feature BrokerGroupCoordinator
%7|1510013279.533|APIVERSION|rdkafka#producer-1| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap:  Feature BrokerBalancedConsumer: GroupCoordinator (0..0) supported by broker
%7|1510013279.533|APIVERSION|rdkafka#producer-1| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap:  Feature BrokerBalancedConsumer: OffsetCommit (1..2) supported by broker
%7|1510013279.533|APIVERSION|rdkafka#producer-1| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap:  Feature BrokerBalancedConsumer: OffsetFetch (1..1) supported by broker
%7|1510013279.533|APIVERSION|rdkafka#producer-1| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap:  Feature BrokerBalancedConsumer: JoinGroup (0..0) supported by broker
%7|1510013279.533|APIVERSION|rdkafka#producer-1| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap:  Feature BrokerBalancedConsumer: SyncGroup (0..0) supported by broker
%7|1510013279.533|APIVERSION|rdkafka#producer-1| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap:  Feature BrokerBalancedConsumer: Heartbeat (0..0) supported by broker
%7|1510013279.533|APIVERSION|rdkafka#producer-1| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap:  Feature BrokerBalancedConsumer: LeaveGroup (0..0) supported by broker
%7|1510013279.533|APIVERSION|rdkafka#producer-1| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap: Enabling feature BrokerBalancedConsumer
%7|1510013279.533|APIVERSION|rdkafka#producer-1| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap:  Feature ThrottleTime: Produce (1..2) supported by broker
%7|1510013279.533|APIVERSION|rdkafka#producer-1| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap:  Feature ThrottleTime: Fetch (1..2) supported by broker
%7|1510013279.533|APIVERSION|rdkafka#producer-1| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap: Enabling feature ThrottleTime
%7|1510013279.533|APIVERSION|rdkafka#producer-1| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap:  Feature Sasl: JoinGroup (0..0) supported by broker
%7|1510013279.533|APIVERSION|rdkafka#producer-1| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap: Enabling feature Sasl
%7|1510013279.533|APIVERSION|rdkafka#producer-1| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap:  Feature SaslHandshake: SaslHandshake (0..0) NOT supported by broker
%7|1510013279.533|APIVERSION|rdkafka#producer-1| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap: Disabling feature SaslHandshake
%7|1510013279.533|APIVERSION|rdkafka#producer-1| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap:  Feature LZ4: GroupCoordinator (0..0) supported by broker
%7|1510013279.533|APIVERSION|rdkafka#producer-1| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap: Enabling feature LZ4
%7|1510013279.533|APIVERSION|rdkafka#producer-1| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap:  Feature : Offset (1..1) NOT supported by broker
%7|1510013279.533|APIVERSION|rdkafka#producer-1| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap: Disabling feature
%7|1510013279.533|FEATURE|rdkafka#producer-1| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap: Updated enabled protocol features to BrokerBalancedConsumer,ThrottleTime,Sasl,BrokerGroupCoordinator,LZ4
%7|1510013279.533|STATE|rdkafka#producer-1| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap: Broker changed state CONNECT -> UP
%7|1510013279.533|BROADCAST|rdkafka#producer-1| [thrd:kafka:9092/bootstrap]: Broadcasting state change
%7|1510013279.533|METADATA|rdkafka#producer-1| [thrd:kafka:9092/bootstrap]: Hinted cache of 1/1 topic(s) being queried
%7|1510013279.533|METADATA|rdkafka#producer-1| [thrd:kafka:9092/bootstrap]: Requesting metadata for 1/1 topics: connected
%7|1510013279.533|METADATA|rdkafka#producer-1| [thrd:kafka:9092/bootstrap]: kafka:9092/bootstrap: Request metadata for 1 topic(s): connected

Your docs have good examples for how to create a consumer (which worked for me, thanks!), but there aren't any examples of producers. Is there something I'm missing? thank you...

trinitum commented 6 years ago

there's an example producer in bin/kafka_producer.pl. I'm not sure if that's the reason in your case, but before exiting script must wait till all messages will be actually sent to kafka, note that sleep 1 while $kafka->outq_len; in example.

veqryn commented 6 years ago

Thanks. Adding that made it work.

    # Sleep 10 milliseconds while waiting to send to kafka
    usleep(10000) while $kafka_producer->outq_len;