arnaud-lb / php-rdkafka

Production-ready, stable Kafka client for PHP
MIT License
2.08k stars 263 forks source link

How to ensure that all offsets are committed successfully when commit manually #543

Open hhxsv5 opened 1 year ago

hhxsv5 commented 1 year ago

Description

The following code:

    $settings = [
        'socket.keepalive.enable'  => true,
        'log_level'                => LOG_WARNING,
        'enable.auto.offset.store' => 'true',
        'auto.offset.reset'        => 'earliest',
        'enable.partition.eof'     => 'false',
        'enable.auto.commit'       => 'false',
        'max.poll.interval.ms'     => 300000,
        'session.timeout.ms'       => 45000,
        'group.id'                 => 'test-group',
        'group.instance.id'        => uniqid('', true),
        'metadata.broker.list'     => 'kafka1:9092,kafka2:9092,kafka3:9092',
    ];
    foreach ($settings as $key => $value) {
        $conf->set($key, $value);
    }
    $consumer = new KafkaConsumer($conf);
    $consumer->subscribe(['dave-test1']);
    while (!$quit) {
        $message = $consumer->consume(60 * 1000);
        switch ($message->err) {
            case RD_KAFKA_RESP_ERR_NO_ERROR:
                echo "consume: {$message->payload}\n";
                $consumer->commitAsync($message);
                break;
            case RD_KAFKA_RESP_ERR__PARTITION_EOF:
                echo "consume: no more messages; will wait for more\n";
                break;
            case RD_KAFKA_RESP_ERR__TIMED_OUT:
                echo "consume: timed out\n";
                break;
            default:
                throw new \RuntimeException($message->errstr(), $message->err);
        }
    }
    try {
        // Is this the best practice to ensure the success of commit offset, such as commitAsync failure ?
        $consumer->commit(null);
    } catch (Exception $e) {
        if ($e->getCode() !== RD_KAFKA_RESP_ERR__NO_OFFSET) {
            echo 'commit fail: ' . $e->getMessage();
        }
    }
    echo "consumer process end PID {$process->pid}\n";

Resulted in this output:

Not sure commit all offsets 

But I expected this output instead:

Commit all offsets even if commitAsync fails

php-rdkafka Version

6.0.1

librdkafka Version

1.8.2

PHP Version

7.4.33

Operating System

Aws linux 2

Kafka Version

2.1.1