swoole / phpkafka

PHP Kafka client is used in PHP-FPM and Swoole. PHP Kafka client supports 50 APIs, which might be one that supports the most message types ever.
https://longlang.org/
Apache License 2.0
271 stars 47 forks source link

Support for RedPanda? #61

Open slushpuppy opened 2 years ago

slushpuppy commented 2 years ago

PHP Fatal error: Uncaught longlang\phpkafka\Exception\KafkaErrorException: [35] The version of API is not supported. in /src/vendor/longlang/phpkafka/src/Protocol/ErrorCode.php:385 Stack trace:

0 /src/vendor/longlang/phpkafka/src/Util/KafkaUtil.php(69): longlang\phpkafka\Protocol\ErrorCode::check()

1 /src/vendor/longlang/phpkafka/src/Group/GroupManager.php(111): longlang\phpkafka\Util\KafkaUtil::retry()

2 /src/vendor/longlang/phpkafka/src/Consumer/Consumer.php(162): longlang\phpkafka\Group\GroupManager->syncGroup()

3 /src/vendor/longlang/phpkafka/src/Consumer/Consumer.php(128): longlang\phpkafka\Consumer\Consumer->rejoin()

4 /src/Lib/AbstractServiceEventWorker.php(53): longlang\phpkafka\Consumer\Consumer->__construct()

5 /src/App/cli/serviceEventWorker.php(19): Lib\AbstractServiceEventWorker->start()

apt installation

openswoole

Open Swoole => enabled Author => Open Swoole Group & Contributors hello@swoole.co.uk Version => 4.8.1 Built => Dec 6 2021 17:59:05 coroutine => enabled with boost asm context epoll => enabled eventfd => enabled signalfd => enabled cpu_affinity => enabled spinlock => enabled rwlock => enabled sockets => enabled openssl => OpenSSL 1.1.1 11 Sep 2018 dtls => enabled http2 => enabled json => enabled curl-native => enabled pcre => enabled zlib => 1.2.11 mutex_timedlock => enabled pthread_barrier => enabled futex => enabled mysqlnd => enabled async_redis => enabled

Directive => Local Value => Master Value swoole.enable_coroutine => On => On swoole.enable_library => On => On swoole.enable_preemptive_scheduler => Off => Off swoole.display_errors => On => On swoole.use_shortname => On => On swoole.unixsock_buffer_size => 8388608 => 8388608

"longlang/phpkafka": "^1.2",

# Paste here
// Your code
         $config = new ConsumerConfig();
      $addr = Config::getBroker();
      $config->setBroker($addr->toIPPortFormat());
      $config->setTopic($this->getTopic()); // topic
      $config->setClientId($this->sanitizeClassName(static::class)); // client ID. Use
      $config->setInterval(0.1);
      $config->setSsl(Config::getSSLConfig());
      $config->setGroupId($this->sanitizeClassName($this->getGroupId()));
      $config->setGroupInstanceId($this->sanitizeClassName(static::class));
      $this->config = $config;

......

        $this->consumer = new Consumer($this->config, [$this,'consume']);

        $this->consumer->start();

I am able to connect and consume topics using other php kafka libraries such as simple-kafka-client

 $builder = KafkaConsumerBuilder::create();

        $consumer = $builder->withAdditionalConfig(
            [
                // start at the very beginning of the topic when reading for the first time
                'auto.offset.reset' => 'earliest',

                // will be visible in broker logs
                'client.id' => 'php-kafka-lib-high-level-consumer',

                // SSL settings
                'security.protocol' => 'ssl',
                'ssl.ca.location' => Config::CA_CERT,
                'ssl.certificate.location' => Config::CLIENT_CERT,
                'ssl.key.location' => Config::CLIENT_KEY,

                // SASL settings
                //'sasl.mechanisms' => '',
                //'ssl.endpoint.identification.algorithm' => 'https',
                //'sasl.username' => '',
                //'sasl.password' => '',

                // Add additional output if you need to debug a problem
                // 'log_level' => (string) LOG_DEBUG,
                // 'debug' => 'all'
            ]
        )
            ->withAdditionalBroker(Config::getBroker()->toIPPortFormat())
            ->withConsumerGroup('php-kafka-lib-high-level-consumer')
            ->withSubscription('php-kafka-lib-test-topic')
            ->build();

        $consumer->subscribe();

        while (true) {
            try {
                $message = $consumer->consume(10000);
            } catch (KafkaConsumerTimeoutException|KafkaConsumerEndOfPartitionException $e) {
                echo 'Didn\'t receive any messages, waiting for more...' . PHP_EOL;
                continue;
            } catch (KafkaConsumerConsumeException $e) {
                echo $e->getMessage() . PHP_EOL;
                continue;
            }

            echo sprintf(
                    'Read message with key:%s payload:%s topic:%s partition:%d offset:%d headers:%s',
                    $message->getKey(),
                    $message->getBody(),
                    $message->getTopicName(),
                    $message->getPartition(),
                    $message->getOffset(),
                    implode(',', $message->getHeaders())
                ) . PHP_EOL;

            $consumer->commit($message);
        }