arnaud-lb / php-rdkafka

Production-ready, stable Kafka client for PHP
MIT License
2.08k stars 263 forks source link

consumer hangs on futex #461

Closed rocky114 closed 3 years ago

rocky114 commented 3 years ago

Info

strace -p 7713

[root@online-platform-dl_all-02 7713]# cat stack

    public function consumer(Output $output)
    {
        $conf = new \RdKafka\Conf();

        // Set a rebalance callback to log partition assignments (optional)
        $conf->setRebalanceCb(function (\RdKafka\KafkaConsumer $kafka, $err, array $partitions = null) {
            switch ($err) {
                case RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS:
                    $kafka->assign($partitions);
                    break;

                case RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS:
                    $kafka->assign(NULL);
                    break;

                default:
                    //throw new \Exception($err);
                    YXLog::error($err);
            }
        });

        // Configure the group.id. All consumer with the same group.id will consume
        // different partitions.
        $conf->set('group.id', $this->group_id);

        // Initial list of Kafka brokers
        $conf->set('metadata.broker.list', $this->broker_list);

        $topicConf = new \RdKafka\TopicConf();

        //kafka-0.10.1.X版本之前: auto.offset.reset 的值为smallest,和,largest.(offest保存在zk中)
        //kafka-0.10.1.X版本之后: auto.offset.reset 的值更改为:earliest,latest,和none (offest保存在kafka的一个特殊的topic名为:__consumer_offsets里面)
        $topicConf->set('auto.offset.reset', 'latest');

        // Set the configuration to use for subscribed/assigned topics
        $conf->setDefaultTopicConf($topicConf);

        $consumer = new \RdKafka\KafkaConsumer($conf);

        // Subscribe to topic 'test'
        $consumer->subscribe([$this->topic]);

        // echo "Waiting for partition assignment... (make take some time when\n";
        //echo "quickly re-joining the group after leaving it.)\n";

        $date_min = 0;
        while (true) {
            #发送心跳包

            $script_name = config("app_name") . config("script_name.LearningKafkaConsumerV1");
            sendHeartbeatV2($date_min, $script_name);

            $output->writeln($script_name . date("Y-m-d H:i:s"));

            $message = $consumer->consume(12 * 1000);
            $msg = "kafka_v1原始数据" . json_encode($message);
            YXLog::info($msg);
            // $output->writeln(json_encode($message));
            // continue;

            switch ($message->err) {
                case RD_KAFKA_RESP_ERR_NO_ERROR:
                    if (empty($message->payload)) {
                        //echo "kafka_v2消费payload为空\n";
                        YXLog::error('kafka消费payload为空');
                    } else {
                        $this->formatValidation($message);
                    }
                    break;
                case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                    //echo "No more messages; will wait for more\n";
                    break;
                case RD_KAFKA_RESP_ERR__TIMED_OUT:
                    //echo "No more messages; timed out\n";
                    break;
                default:
                    //throw new \Exception($message->errstr(), $message->err);
                    YXLog::error($message->errstr());
                    break;
            }
        }
    }
uzador commented 1 year ago

Hi @rocky114 could you advice how you solved the issue?

rocky114 commented 1 year ago

a compromise solution was used, roughly 100 consumptions and then re-establishing the connection

洛奇 巴布亚 @.***

 

------------------ 原始邮件 ------------------ 发件人: "arnaud-lb/php-rdkafka" @.>; 发送时间: 2022年10月18日(星期二) 晚上8:28 @.>; 抄送: "洛奇 @.**@.>; 主题: Re: [arnaud-lb/php-rdkafka] consumer hangs on futex (#461)

Hi @rocky114 could you advice how you solved the issue?

— Reply to this email directly, view it on GitHub, or unsubscribe. You are receiving this because you were mentioned.Message ID: @.***>