arnaud-lb / php-rdkafka

Production-ready, stable Kafka client for PHP
MIT License
2.08k stars 263 forks source link

Low level api offsetStore failed. #462

Closed studyzhanglei closed 3 years ago

studyzhanglei commented 3 years ago

If possible, add debug logs from librdkafka, which you can obtain by adjusting your configuration:

<?php

$conf = new RdKafka\Conf();

// Set the group id. This is required when storing offsets on the broker
$conf->set('group.id', 'myConsumerGroup');
$conf->set('enable.auto.offset.store', 0);
//$conf->set('version', '2.8.0');
$rk = new RdKafka\Consumer($conf);
$rk->addBrokers("127.0.0.1");

$topicConf = new RdKafka\TopicConf();
//$topicConf->set('auto.commit.interval.ms', 0);

// Set the offset store method to 'file'
$topicConf->set('offset.store.method', 'broker');

// Alternatively, set the offset store method to 'none'
// $topicConf->set('offset.store.method', 'none');

// Set where to start consuming messages when there is no initial offset in
// offset store or the desired offset is out of range.
// 'earliest': start from the beginning
$topicConf->set('auto.offset.reset', 'earliest');
$topicConf->set('enable.auto.commit', 0);
$topic = $rk->newTopic("test", $topicConf);

// Start consuming partition 0
$topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);

while (true) {
    $message = $topic->consume(0, 120*10000);
    switch ($message->err) {
        case RD_KAFKA_RESP_ERR_NO_ERROR:
            $topic->offsetStore($message->partition, $message->offset);
            var_dump($message->partition);
            var_dump($message->offset);
            var_dump($message);
//            $topic->offsetStore();
            break;
        case RD_KAFKA_RESP_ERR__PARTITION_EOF:
            echo "No more messages; will wait for more\n";
            break;
        case RD_KAFKA_RESP_ERR__TIMED_OUT:
            echo "Timed out\n";
            break;
        default:
            throw new \Exception($message->errstr(), $message->err);
            break;
    }
}

I follow the document but commit offset failed, could you tell me why? Thank you~

studyzhanglei commented 3 years ago

image