uperl / Kafka-Librd

perl bindings to librdkafka
11 stars 8 forks source link

Add flush method, and explicit topic destroy #19

Closed miketonks closed 4 years ago

miketonks commented 4 years ago

I noticed one of my test scripts sometimes doesn't shut down correctly.

From the librdkafka docs:

The proper termination sequence for Producers is:

 /* 1) Make sure all outstanding requests are transmitted and handled. */
 rd_kafka_flush(rk, 60*1000); /* One minute timeout */

 /* 2) Destroy the topic and handle objects */
 rd_kafka_topic_destroy(rkt);  /* Repeat for all topic objects held */
 rd_kafka_destroy(rk);

https://github.com/edenhill/librdkafka/blob/master/INTRODUCTION.md#termination

So I added the flush method and an explicit destroy for topic, and this seems to help

#!/usr/bin/env perl
use 5.014;
use strict;
use warnings;

use Kafka::Librd;
my $brokers = "localhost:9092";
my $topic = "test";

my $key = 1; # use partition key to messages are received in order

my $kafka = Kafka::Librd->new(Kafka::Librd::RD_KAFKA_PRODUCER, {});
print "VERSION: " . Kafka::Librd::rd_kafka_version();

my $added = $kafka->brokers_add($brokers);
say "Added $added brokers";

my $ktopic = $kafka->topic( $topic, {} );

send_message("start >>>>>");
sleep 1;

for (my $i=1; $i <= 10; $i++) {
   my $err = send_message("message: $i");
   $err and die "Couldn't produce: ", Kafka::Librd::Error::to_string($err);
   print "$i\n";
}

sleep 1;
send_message("end >>>");

#sleep 1 while $kafka->outq_len;

$kafka->flush(1000);
$kafka->destroy();

while (Kafka::Librd::rd_kafka_wait_destroyed(1000) == -1) {
  say "Some kafka resources are still allocated";
}
say "Shutdown complete";

sub send_message {
  my ($message) = @_;
  my $err = $ktopic->produce( -1, 0, $message, $key );
  $err and die "Couldn't produce: ", Kafka::Librd::Error::to_string($err);
}
miketonks commented 4 years ago

Thanks for the feedback. Updated now.

plicease commented 4 years ago

merged and released as 0.14. Thank you for contributing!