arnaud-lb / php-rdkafka

Production-ready, stable Kafka client for PHP
MIT License
2.09k stars 265 forks source link

How to count successfully sent messages, producer message order #368

Closed seth-shi closed 4 years ago

seth-shi commented 4 years ago

my php code


       $max =1000;

        $configBrokers = config('kafka.brokers');
        $configTopic = config('kafka.topic');

        // 写入 kafka
        $producer = new \RdKafka\Producer();
        $producer->addBrokers($configBrokers);

        $configObj = new \RdKafka\TopicConf();
        $topic = $producer->newTopic($configTopic, $configObj);

        for ($i = 0; $i < $max; ++ $i) {
            $topic->produce(RD_KAFKA_PARTITION_UA, 0, "php send " . $i);
        }

        while (($len = $producer->getOutQLen()) > 0) {

            $diff = $max - $len;
            echo "\r {$diff}/$max";
            $producer->poll(1);
        }

        $diff = $max - $len;
        echo "\r {$diff}/$max";

But when I go to consume, the quantity always does not match 1000. always

Steveb-p commented 4 years ago
  • librdkafka version: 0.9.1

This version is old enough to suggest that the issue lies in it. Support for it was dropped for newer versions of phprdkafka. Update the library and see if the problem persists.

seth-shi commented 4 years ago

@Steveb-p But when I tested it in a Windows environment, it was normal, the same version. Because I downloaded it from here first https://windows.php.net/downloads/pecl/releases/rdkafka/3.0.5/. Rdkafka3.1.2 here binds librdkafka 0.9.1. When I deployed the Linux environment, I kept their versions consistent.

If the two versions are unrelated, can I download the latest version of librdkafka. thanks.

nick-zh commented 4 years ago

First of all i totally agree with @Steveb-p regarding updating librdkafka:0.9.1, it is way too old. But i need to mention, you cannot count like this, poll just triggers events, but you don't know how many. If you want to count them, best way would probably be to pass the setDrMsgCb closure a count variable reference:

function (Producer $kafka, Message $message) use (&$count) {
 if ($message->err) {
       echo 'Oh no error' . PHP_EOL;
    } else {
        ++$count;
        echo sprintf('Successfully sent a message, in total %d',$count) . PHP_EOL;
    }
}
nick-zh commented 4 years ago

also forgot to mention, php-rdkafka is not bound to a fixed librdkafka version, but the higher librdkafka, the more stable it is and you will be able to use more features of the extension.

seth-shi commented 4 years ago

@nick-zh thank you, My final solution is to upgrade librdkafka. I tried to find the Message object from readme, but I couldn't find the content. Message $message Can you give a more detailed example?

nick-zh commented 4 years ago

You can find a lot of stuff in the official doc, for Message it could be maybe a bit clearer: https://arnaud.le-blanc.net/php-rdkafka-doc/phpdoc/class.rdkafka-message.html but basically you can just access all those public properties, like:

$message->payload
$message->key
etc.

Hope that helps :v:

seth-shi commented 4 years ago

i get!!! I see to consumer $msg = $topic->consume(0, 1000);

but product Message from where?because Product::product return null.

nick-zh commented 4 years ago

I am not sure i understand the question, but i think you mean Producer::produce or Producer::producev. So basically if you don't register setDrMsgCb, you actually don't see if it got delivered. This callback is used to check if the message was truly sent, so similar to the example i provided here: https://github.com/arnaud-lb/php-rdkafka/issues/368#issuecomment-632752372 So you might want to react if you have an error in the callback, indicating the message was not sent. Hope that helps

seth-shi commented 4 years ago

sorry, I didn't see this setDrMsgCb. Register with setDrMsgCb callback to see if it was sent successfully. If I could do this on the web, every request would be written to kafka instead of bulk writing through the cli ?

nick-zh commented 4 years ago

@seth-shi i am not sure if i understood correctly, but you can produce in fpm, swoole, cli whatev$ver you like and yes you can write something during a request, no problem at all, just be sure to call flush at the end of the request and you'll be fine.

seth-shi commented 4 years ago

What I see is that "flush" is a 4.0+ feature. Does 3.0 only need while poll?

Steveb-p commented 4 years ago

What I see is that "flush" is a 4.0+ feature. Does 3.0 only need while poll?

@seth-shi In 3.x versions background thread that is responsible for sending messages will block process from terminating if there are pending messages, but it's better to poll manually until queue is empty.

Also, in 4.x versions poll is required to call your callbacks (just as it is in 3.x). It is used to allow phprdkafka/librdkafka to check the aforementioned background thread to call those callbacks. Otherwise, you won't see them executed.

seth-shi commented 4 years ago

@Steveb-p thanks

seth-shi commented 4 years ago

Because my problem is still there, I need to reopen it

################# php code
## producer.php
        $max = 10;

        $configBrokers = config('kafka.brokers');
        $configTopic = config('kafka.topic');

        $conf = new \RdKafka\Conf();
        $conf->setDrMsgCb(function ($kafka, $message) {

            var_dump('msg:', $message);
        });

        $conf->setErrorCb(function ($kafka, $err, $reason) {

            var_dump('error', $err, $reason);
        });
        // 写入 kafka
        $producer = new \RdKafka\Producer($conf);
        $producer->addBrokers($configBrokers);

        $configObj = new \RdKafka\TopicConf();
        $topic = $producer->newTopic($configTopic, $configObj);

        for ($i = 0; $i < $max; ++ $i) {
            // RD_KAFKA_PARTITION_UA 让 kafka 自由选择分区
            $topic->produce(RD_KAFKA_PARTITION_UA, 0, "php send " . $i);
        }

        // 阻塞发送 kafka 消息
        while (($len = $producer->getOutQLen()) > 0) {
            $producer->poll(1);
        }

##########################################
## consumer.php
       $configTopic = config('kafka.topic');
        $configBrokers = config('kafka.brokers');

        $consumer = new \RdKafka\Consumer();
        $consumer->addBrokers($configBrokers);

        $topic = $consumer->newTopic($configTopic);
        $topic->consumeStart(0, RD_KAFKA_OFFSET_END);

        $count = 0;
        while (true) {
            $message = $topic->consume(0, 120*10000);
            switch ($message->err) {
                case RD_KAFKA_RESP_ERR_NO_ERROR:
                    ++ $count;
                    var_dump($message, $count);
                    break;
                case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                    var_dump("No more messages; will wait for more");
                    break;
                case RD_KAFKA_RESP_ERR__TIMED_OUT:
                    var_dump("Timed out");
                    break;
                default:
                    var_dump('error', $message);
                    break;
            }
        }

##################################
# Windows version
C:\>ver
Microsoft Windows [版本 10.0.18362.836]

# PHP version
C:\>php -v
PHP 7.1.9 (cli) (built: Aug 30 2017 18:33:57) ( NTS MSVC14 (Visual C++ 2015) x64 )
Copyright (c) 1997-2017 The PHP Group
Zend Engine v3.1.0, Copyright (c) 1998-2017 Zend Technologies

# Kafka version
C:\>php --ri rdkafka

rdkafka

rdkafka support => enabled
version => 3.1.2
build date => Jul 10 2019 05:19:52
librdkafka version (runtime) => 0.9.1
librdkafka version (build) => 0.9.1.255
##  On Windows, the librdkafka version 0.9.1 I use is also normal
### run in windows
## producer output
string(36) "No more messages; will wait for more"
object(RdKafka\Message)#1333 (7) {
  ["err"]=>
  int(0)
  ["topic_name"]=>
  string(4) "test"
  ["partition"]=>
  int(0)
  ["payload"]=>
  string(10) "php send 0"
  ["len"]=>
  int(10)
  ["key"]=>
  NULL
  ["offset"]=>
  int(1926)
}
int(1)
object(RdKafka\Message)#1332 (7) {
  ["err"]=>
  int(0)
  ["topic_name"]=>
  string(4) "test"
  ["partition"]=>
  int(0)
  ["payload"]=>
  string(10) "php send 1"
  ["len"]=>
  int(10)
  ["key"]=>
  NULL
  ["offset"]=>
  int(1927)
}
int(2)
object(RdKafka\Message)#1333 (7) {
  ["err"]=>
  int(0)
  ["topic_name"]=>
  string(4) "test"
  ["partition"]=>
  int(0)
  ["payload"]=>
  string(10) "php send 2"
  ["len"]=>
  int(10)
  ["key"]=>
  NULL
  ["offset"]=>
  int(1928)
}
int(3)
object(RdKafka\Message)#1332 (7) {
  ["err"]=>
  int(0)
  ["topic_name"]=>
  string(4) "test"
  ["partition"]=>
  int(0)
  ["payload"]=>
  string(10) "php send 3"
  ["len"]=>
  int(10)
  ["key"]=>
  NULL
  ["offset"]=>
  int(1929)
}
int(4)
object(RdKafka\Message)#1333 (7) {
  ["err"]=>
  int(0)
  ["topic_name"]=>
  string(4) "test"
  ["partition"]=>
  int(0)
  ["payload"]=>
  string(10) "php send 4"
  ["len"]=>
  int(10)
  ["key"]=>
  NULL
  ["offset"]=>
  int(1930)
}
int(5)
object(RdKafka\Message)#1332 (7) {
  ["err"]=>
  int(0)
  ["topic_name"]=>
  string(4) "test"
  ["partition"]=>
  int(0)
  ["payload"]=>
  string(10) "php send 5"
  ["len"]=>
  int(10)
  ["key"]=>
  NULL
  ["offset"]=>
  int(1931)
}
int(6)
object(RdKafka\Message)#1333 (7) {
  ["err"]=>
  int(0)
  ["topic_name"]=>
  string(4) "test"
  ["partition"]=>
  int(0)
  ["payload"]=>
  string(10) "php send 6"
  ["len"]=>
  int(10)
  ["key"]=>
  NULL
  ["offset"]=>
  int(1932)
}
int(7)
object(RdKafka\Message)#1332 (7) {
  ["err"]=>
  int(0)
  ["topic_name"]=>
  string(4) "test"
  ["partition"]=>
  int(0)
  ["payload"]=>
  string(10) "php send 7"
  ["len"]=>
  int(10)
  ["key"]=>
  NULL
  ["offset"]=>
  int(1933)
}
int(8)
object(RdKafka\Message)#1333 (7) {
  ["err"]=>
  int(0)
  ["topic_name"]=>
  string(4) "test"
  ["partition"]=>
  int(0)
  ["payload"]=>
  string(10) "php send 8"
  ["len"]=>
  int(10)
  ["key"]=>
  NULL
  ["offset"]=>
  int(1934)
}
int(9)
object(RdKafka\Message)#1332 (7) {
  ["err"]=>
  int(0)
  ["topic_name"]=>
  string(4) "test"
  ["partition"]=>
  int(0)
  ["payload"]=>
  string(10) "php send 9"
  ["len"]=>
  int(10)
  ["key"]=>
  NULL
  ["offset"]=>
  int(1935)
}
int(10)
string(36) "No more messages; will wait for more"

## consumer output
string(4) "msg:"
object(RdKafka\Message)#1336 (7) {
  ["err"]=>
  int(0)
  ["topic_name"]=>
  string(4) "test"
  ["partition"]=>
  int(0)
  ["payload"]=>
  string(10) "php send 0"
  ["len"]=>
  int(10)
  ["key"]=>
  NULL
  ["offset"]=>
  int(0)
}
string(4) "msg:"
object(RdKafka\Message)#1336 (7) {
  ["err"]=>
  int(0)
  ["topic_name"]=>
  string(4) "test"
  ["partition"]=>
  int(0)
  ["payload"]=>
  string(10) "php send 1"
  ["len"]=>
  int(10)
  ["key"]=>
  NULL
  ["offset"]=>
  int(0)
}
string(4) "msg:"
object(RdKafka\Message)#1336 (7) {
  ["err"]=>
  int(0)
  ["topic_name"]=>
  string(4) "test"
  ["partition"]=>
  int(0)
  ["payload"]=>
  string(10) "php send 2"
  ["len"]=>
  int(10)
  ["key"]=>
  NULL
  ["offset"]=>
  int(0)
}
string(4) "msg:"
object(RdKafka\Message)#1336 (7) {
  ["err"]=>
  int(0)
  ["topic_name"]=>
  string(4) "test"
  ["partition"]=>
  int(0)
  ["payload"]=>
  string(10) "php send 3"
  ["len"]=>
  int(10)
  ["key"]=>
  NULL
  ["offset"]=>
  int(0)
}
string(4) "msg:"
object(RdKafka\Message)#1336 (7) {
  ["err"]=>
  int(0)
  ["topic_name"]=>
  string(4) "test"
  ["partition"]=>
  int(0)
  ["payload"]=>
  string(10) "php send 4"
  ["len"]=>
  int(10)
  ["key"]=>
  NULL
  ["offset"]=>
  int(0)
}
string(4) "msg:"
object(RdKafka\Message)#1336 (7) {
  ["err"]=>
  int(0)
  ["topic_name"]=>
  string(4) "test"
  ["partition"]=>
  int(0)
  ["payload"]=>
  string(10) "php send 5"
  ["len"]=>
  int(10)
  ["key"]=>
  NULL
  ["offset"]=>
  int(0)
}
string(4) "msg:"
object(RdKafka\Message)#1336 (7) {
  ["err"]=>
  int(0)
  ["topic_name"]=>
  string(4) "test"
  ["partition"]=>
  int(0)
  ["payload"]=>
  string(10) "php send 6"
  ["len"]=>
  int(10)
  ["key"]=>
  NULL
  ["offset"]=>
  int(0)
}
string(4) "msg:"
object(RdKafka\Message)#1336 (7) {
  ["err"]=>
  int(0)
  ["topic_name"]=>
  string(4) "test"
  ["partition"]=>
  int(0)
  ["payload"]=>
  string(10) "php send 7"
  ["len"]=>
  int(10)
  ["key"]=>
  NULL
  ["offset"]=>
  int(0)
}
string(4) "msg:"
object(RdKafka\Message)#1336 (7) {
  ["err"]=>
  int(0)
  ["topic_name"]=>
  string(4) "test"
  ["partition"]=>
  int(0)
  ["payload"]=>
  string(10) "php send 8"
  ["len"]=>
  int(10)
  ["key"]=>
  NULL
  ["offset"]=>
  int(0)
}
string(4) "msg:"
object(RdKafka\Message)#1336 (7) {
  ["err"]=>
  int(0)
  ["topic_name"]=>
  string(4) "test"
  ["partition"]=>
  int(0)
  ["payload"]=>
  string(10) "php send 9"
  ["len"]=>
  int(10)
  ["key"]=>
  NULL
  ["offset"]=>
  int(1926)
}

###################################################
## Next up is the Linux environment
# Linux version
[root@Server-01]cat /proc/version
Linux version 4.19.67-16.al7.x86_64 (mockbuild@e69b16569.et15sqa) (gcc version 4.8.5 20150623 (Red Hat 4.8.5-39) (GCC)) #1 SMP Mon Nov 18 14:15:04 CST 2019

# PHP version
[root@Server-01 ]# php -v
PHP 7.1.33 (cli) (built: Dec 25 2019 11:06:35) ( NTS )
Copyright (c) 1997-2018 The PHP Group
Zend Engine v3.1.0, Copyright (c) 1998-2018 Zend Technologies

# Kafka version
[root@Server-01 ]# php --ri rdkafka

rdkafka

rdkafka support => enabled
version => 3.0.4
build date => May 25 2020 10:48:20
librdkafka version (runtime) => 1.2.1
librdkafka version (build) => 1.2.1.0
#####  I used to use librdkafka version which is the same as my Windows, but you are the outdated version, and then I used 1.2.1,

### run in linux
## producer output
object(RdKafka\Message)#1332 (6) {
  ["err"]=>
  int(0)
  ["topic_name"]=>
  string(15) "yingtao_datalog"
  ["partition"]=>
  int(0)
  ["payload"]=>
  string(10) "php send 0"
  ["key"]=>
  NULL
  ["offset"]=>
  int(22407)
}
int(1)
object(RdKafka\Message)#1333 (6) {
  ["err"]=>
  int(0)
  ["topic_name"]=>
  string(15) "yingtao_datalog"
  ["partition"]=>
  int(0)
  ["payload"]=>
  string(10) "php send 1"
  ["key"]=>
  NULL
  ["offset"]=>
  int(22408)
}
int(2)
object(RdKafka\Message)#1332 (6) {
  ["err"]=>
  int(0)
  ["topic_name"]=>
  string(15) "yingtao_datalog"
  ["partition"]=>
  int(0)
  ["payload"]=>
  string(10) "php send 2"
  ["key"]=>
  NULL
  ["offset"]=>
  int(22409)
}
int(3)
object(RdKafka\Message)#1333 (6) {
  ["err"]=>
  int(0)
  ["topic_name"]=>
  string(15) "yingtao_datalog"
  ["partition"]=>
  int(0)
  ["payload"]=>
  string(10) "php send 5"
  ["key"]=>
  NULL
  ["offset"]=>
  int(22410)
}
int(4)
object(RdKafka\Message)#1332 (6) {
  ["err"]=>
  int(0)
  ["topic_name"]=>
  string(15) "yingtao_datalog"
  ["partition"]=>
  int(0)
  ["payload"]=>
  string(10) "php send 8"
  ["key"]=>
  NULL
  ["offset"]=>
  int(22411)
}
int(5)
object(RdKafka\Message)#1333 (6) {
  ["err"]=>
  int(0)
  ["topic_name"]=>
  string(15) "yingtao_datalog"
  ["partition"]=>
  int(0)
  ["payload"]=>
  string(10) "php send 9"
  ["key"]=>
  NULL
  ["offset"]=>
  int(22412)
}
int(6)

## consumer output
string(4) "msg:"
object(RdKafka\Message)#1336 (6) {
  ["err"]=>
  int(0)
  ["topic_name"]=>
  string(15) "yingtao_datalog"
  ["partition"]=>
  int(0)
  ["payload"]=>
  string(10) "php send 0"
  ["key"]=>
  NULL
  ["offset"]=>
  int(22407)
}
string(4) "msg:"
object(RdKafka\Message)#1336 (6) {
  ["err"]=>
  int(0)
  ["topic_name"]=>
  string(15) "yingtao_datalog"
  ["partition"]=>
  int(0)
  ["payload"]=>
  string(10) "php send 1"
  ["key"]=>
  NULL
  ["offset"]=>
  int(22408)
}
string(4) "msg:"
object(RdKafka\Message)#1336 (6) {
  ["err"]=>
  int(0)
  ["topic_name"]=>
  string(15) "yingtao_datalog"
  ["partition"]=>
  int(0)
  ["payload"]=>
  string(10) "php send 2"
  ["key"]=>
  NULL
  ["offset"]=>
  int(22409)
}
string(4) "msg:"
object(RdKafka\Message)#1336 (6) {
  ["err"]=>
  int(0)
  ["topic_name"]=>
  string(15) "yingtao_datalog"
  ["partition"]=>
  int(0)
  ["payload"]=>
  string(10) "php send 5"
  ["key"]=>
  NULL
  ["offset"]=>
  int(22410)
}
string(4) "msg:"
object(RdKafka\Message)#1336 (6) {
  ["err"]=>
  int(0)
  ["topic_name"]=>
  string(15) "yingtao_datalog"
  ["partition"]=>
  int(0)
  ["payload"]=>
  string(10) "php send 8"
  ["key"]=>
  NULL
  ["offset"]=>
  int(22411)
}
string(4) "msg:"
object(RdKafka\Message)#1336 (6) {
  ["err"]=>
  int(0)
  ["topic_name"]=>
  string(15) "yingtao_datalog"
  ["partition"]=>
  int(0)
  ["payload"]=>
  string(10) "php send 9"
  ["key"]=>
  NULL
  ["offset"]=>
  int(22412)
}
string(4) "msg:"
object(RdKafka\Message)#1336 (6) {
  ["err"]=>
  int(0)
  ["topic_name"]=>
  string(15) "yingtao_datalog"
  ["partition"]=>
  int(1)
  ["payload"]=>
  string(10) "php send 3"
  ["key"]=>
  NULL
  ["offset"]=>
  int(8449)
}
string(4) "msg:"
object(RdKafka\Message)#1336 (6) {
  ["err"]=>
  int(0)
  ["topic_name"]=>
  string(15) "yingtao_datalog"
  ["partition"]=>
  int(1)
  ["payload"]=>
  string(10) "php send 6"
  ["key"]=>
  NULL
  ["offset"]=>
  int(8450)
}
string(4) "msg:"
object(RdKafka\Message)#1336 (6) {
  ["err"]=>
  int(0)
  ["topic_name"]=>
  string(15) "yingtao_datalog"
  ["partition"]=>
  int(2)
  ["payload"]=>
  string(10) "php send 4"
  ["key"]=>
  NULL
  ["offset"]=>
  int(8674)
}
string(4) "msg:"
object(RdKafka\Message)#1336 (6) {
  ["err"]=>
  int(0)
  ["topic_name"]=>
  string(15) "yingtao_datalog"
  ["partition"]=>
  int(2)
  ["payload"]=>
  string(10) "php send 7"
  ["key"]=>
  NULL
  ["offset"]=>
  int(8675)
}

Under Linux, the amount of missing is not fixed. I think I may have found the problem, it should be related to the order of sending, is it the order of consumers?

nick-zh commented 4 years ago

@seth-shi hello again, so this is no problem, but stems from the reason, that you produce async, meaning you don't poll like this: $producer->poll(-1);. What you need to understand is, that messages to be sent out go into an internal queue. If a problems occurs, like network interruptions, sending a message will be retried, causing, the order the producer sent them in, to be lost. If the order is important to you, you need to poll sync (see above), meaning you will wait for an event (delivery report) until you proceed sending the next message. You need to do this right after produce. An other option would be to use the idempotent producer if you also need exactly once semantics: https://arnaud.le-blanc.net/php-rdkafka-doc/phpdoc/rdkafka.examples-producer.html

Hope this helps :v:

seth-shi commented 4 years ago
#1
       for ($i = 0; $i < 10; $i++) {
            $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");
            $producer->poll(0);
        }

#2
        for ($i = 0; $i < 10; $i++) {
            $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");
            $producer->poll(-1);
        }

#3
        for ($i = 0; $i < 10; $i++) {
            $topic->produce(RD_KAFKA_PARTITION_UA, 0, "Message $i");

            while (($len = $producer->getOutQLen()) > 0) {
                $producer->poll(1);
            }
        }

I tried three solutions, none of which was successful. Because my native language is not English when I go to read https://arnaud.le-blanc.net/php-rdkafka-doc/phpdoc/rdkafka.examples-producer.html,

I see that the parameter of the poll is 0, the parameter that you give is -1,

But I use the third one, and after every send, I try to poll directly no, but it doesn't work

Specifies the maximum amount of time (in milliseconds) that the call will block waiting for events. For non-blocking calls, provide 0 as timeout_ms. To wait indefinately for an event, provide -1.

I don't think 0 is appropriate, because I'm sending it synchronously, which means I have to block.

nick-zh commented 4 years ago

@seth-shi no worries about the language thing, i just pointed it out for the idempotence option. Examples are hard to write in the doc, because it really depends on the use case of the user what settings might be appropriate, maybe i will add a hint to the doc for this.

Back to your problem, so #3 works if you have one partition. I see that you have multiple partitions, please bear in mind that order can only be guaranteed in a partition not over the whole topic, if you want order in partitions, etc. start using producev where you can set a key which is important for partitioning (e.g. id of the entity, but really depends on your usecase). Bear in mind if you have something like fpm or swoole which has multiple php processes to handle your http requests, you will never have order per default. Some sort of order can be achieved (per partition if you have stick sessions etc.), but it really depends on what you need exactly, what your service does etc. Hope this helps :v:

seth-shi commented 4 years ago

If I only have one consumer, multiple partitions won't matter, right? I have now started only one cli process to consume messages. But the quantity consumed is not equal to the quantity produced.

My actual scenario is to produce a message in an HTTP request and then consume the message through a cli script.

But before that, I need to test if any messages are missing. as you see. I mass-produced 10 messages in the cli process. But my cli consuming process doesn't get 10 messages

nick-zh commented 4 years ago

@seth-shi multiple partitions do matter regarding message order, it doesn't matter regarding the amount. I produced with this:

for ($i = 0; $i < 10; $i++) {
    $topic->producev(RD_KAFKA_PARTITION_UA, 0, "Message $i", "key $i");

    while (($len = $producer->getOutQLen()) > 0) {
        $producer->poll(-1);
    }
}

and i had no problem getting 100 messages in and out. The while here is probably not necessary, but i wanted to be on the safe side.

seth-shi commented 4 years ago

Still no, I don't think there's anything wrong with my code, Because I have no problem running under Windows,. The problem is with Linux. But I tried to write a simple program in golang that produced 100 messages that my PHP cli could consume

for ($i = 0; $i < 10; $i++) {
    $topic->producev(RD_KAFKA_PARTITION_UA, 0, "Message $i");

        while (($len = $producer->getOutQLen()) > 0) {
            $producer->poll(1);

        }
}

## consumer output
object(RdKafka\Message)#1332 (9) {
  ["err"]=>
  int(0)
  ["topic_name"]=>
  string(15) "yingtao_datalog"
  ["timestamp"]=>
  int(1590399478179)
  ["partition"]=>
  int(0)
  ["payload"]=>
  string(9) "Message 0"
  ["len"]=>
  int(9)
  ["key"]=>
  string(5) "key 0"
  ["offset"]=>
  int(22475)
  ["headers"]=>
  NULL
}
int(1)
object(RdKafka\Message)#1333 (9) {
  ["err"]=>
  int(0)
  ["topic_name"]=>
  string(15) "yingtao_datalog"
  ["timestamp"]=>
  int(1590399478182)
  ["partition"]=>
  int(0)
  ["payload"]=>
  string(9) "Message 1"
  ["len"]=>
  int(9)
  ["key"]=>
  string(5) "key 1"
  ["offset"]=>
  int(22476)
  ["headers"]=>
  NULL
}
int(2)
object(RdKafka\Message)#1332 (9) {
  ["err"]=>
  int(0)
  ["topic_name"]=>
  string(15) "yingtao_datalog"
  ["timestamp"]=>
  int(1590399478185)
  ["partition"]=>
  int(0)
  ["payload"]=>
  string(9) "Message 3"
  ["len"]=>
  int(9)
  ["key"]=>
  string(5) "key 3"
  ["offset"]=>
  int(22477)
  ["headers"]=>
  NULL
}
int(3)
object(RdKafka\Message)#1333 (9) {
  ["err"]=>
  int(0)
  ["topic_name"]=>
  string(15) "yingtao_datalog"
  ["timestamp"]=>
  int(1590399478187)
  ["partition"]=>
  int(0)
  ["payload"]=>
  string(9) "Message 4"
  ["len"]=>
  int(9)
  ["key"]=>
  string(5) "key 4"
  ["offset"]=>
  int(22478)
  ["headers"]=>
  NULL
}
int(4)
nick-zh commented 4 years ago

I am not saying no, but most certainly Linux is not the problem :wink: i am using Linux as well and never lost any messages in the years of using kafka and this lib

To your problem, sry that i missed this earlier, but you are using the low level consumer, i advise you to use the high level consumer RdKafka\KafkaConsumer which handles balancing, etc. itself and is less overhead for you.

Warning: There are implementations and frameworks out there who handle this not properly for the low level consumer, so with multiple partitions you might need to wait for a consumer timeout (wich might be high, your example uses 20m) before the framework / lib runs into a timeout and switches partitions etc.

seth-shi commented 4 years ago

I used a program written by golang to produce 100 messages, and low-level consumption in the PHP cli process is normal.

seth-shi commented 4 years ago

I tried higher order consumption, and he was right. I can completely consume

seth-shi commented 4 years ago

Higher order consumption can get all the data, so what should I do so that lower order consumption can get all the information?

nick-zh commented 4 years ago

Ok first of all, i need to point out the inconsistencies, which makes it hard for me to properly support your case:

Now i need to state that php-rdkafka:3.0.4 doesn't yet have producev and sending messages without keys can have a lot of implications in Kafka.

Also as stated before, with an example producer / consumer for: OS: alpine3.10 PHP: 7.1.33 ext-rdkafka: 3.0.4 / 3.1.2 librdkafka: 1.2.1

I was not able to "lose" any messages, sticking close to your example. I used the high level consumer, but low level is fine as well if handled correctly.

At this point, i think i can only help to debug your case, if you provide a reproducible example in a docker environment. This is the setup i have to debug issues and this is what i use to reproduce an error for a specific setup. You can fork it and adjust / add your php example in your fork. If the error still happens, you can point me to your fork so i can debug this better. Adjust these arguments in your fork to represent your setup.

nick-zh commented 4 years ago

@seth-shi ok sry, while i was writing i did not see the comments, ok good to hear. My question would be, why do you want to use the low level consumer? If you really insist on sticking to it, you actually need to consume from every partition, so for 3 partitions:

$topic->consume(0, 1000);
$topic->consume(1, 1000);
$topic->consume(2, 1000);

This can get really messy / bothersome, either you implement logic to rotate through your partitions yourself or use a library that does this for you. Again i advice to use the high level consumer, it is really flexible and if you at some point need to have multiple tasks consuming, this will still work with high level without changing the code.

seth-shi commented 4 years ago

Because I am very strange to kafka, now I know, thank you very much.

nick-zh commented 4 years ago

@seth-shi ah no worries :+1: , i will close this now. Feel free to open a new issue if you need help with anything else :v: