KonstantinCodes / messenger-kafka

Simple Kafka transport for Symfony Messenger.
MIT License
84 stars 35 forks source link

Feature request: Low level consumer: Multiple topics/partitions #76

Open jry25 opened 1 year ago

jry25 commented 1 year ago

As shown here. It can be useful to have a low level consumer to handle mulltiple topics/partitions.

Code extracted from example:

<?php

// Consuming from multiple topics and/or partitions can be done by telling
// librdkafka to forward all messages from these topics/partitions to an
// internal queue, and then consuming from this queue.

$queue = $rk->newQueue();

$topicConf = new RdKafka\TopicConf();
$topicConf->set(...);

$topic1 = $rk->newTopic("topic1", $topicConf);
$topic1->consumeQueueStart(0, RD_KAFKA_OFFSET_BEGINNING, $queue);
$topic1->consumeQueueStart(1, RD_KAFKA_OFFSET_BEGINNING, $queue);

$topic2 = $rk->newTopic("topic2", $topicConf);
$topic2->consumeQueueStart(0, RD_KAFKA_OFFSET_BEGINNING, $queue);

// Now, consume from the queue instead of the topics:

while (true) {
    $message = $queue->consume(120*1000);
    // ...
}

?>