KonstantinCodes / messenger-kafka

Simple Kafka transport for Symfony Messenger.
MIT License
84 stars 35 forks source link

Feature request: allow to pass RdKafka\Conf instance into transport factory #78

Open Arkemlar opened 1 year ago

Arkemlar commented 1 year ago

With optional constructor argument or setter injection in KafkaTransportFactory for RdKafka\Conf (imported as KafkaConf in that file) we could set any callbacks we want, and no extra PRs for callback support features needed.

Example/change proposal

Make these changes

    private KafkaConf $conf = null;
    public function setConfInstance(KafkaConf $conf): void
    {
        $this->conf = $conf;
    }
    public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface
    {
        $conf = null !== $this->conf ? clone $this->conf : new KafkaConf;
        ...
    }

THEN we can pass configured RdKafka\Conf instance.

How client code might look like

To pass RdKafka\Conf instance we would need to make only 2 steps: 1) First create configurator - a place that cooks our RdKafka\Conf instance

namespace App\DI;

use Psr\Log\NullLogger;
use Koco\Kafka\Messenger\KafkaTransportFactory;
use RdKafka\Conf as KafkaConf;

class KafkaTransportConfigurator
{
    public function __construct(private ?LoggerInterface $logger)
    {
        $this->logger = $logger ?? new NullLogger();
    }

    public function __invoke(KafkaTransportFactory $transportFactory)
    {
        $conf = new KafkaConf;

        // defne callbacks like that
        $conf->setStatsCb($this->setStatsCb());
        // or like that
        $conf->setErrorCb(fn ($kafka, $err, $reason) => $this->logger->error($reason));

        // inject configured instance into transport factory
        $transportFactory->setConfInstance($conf);
    }

2) And define configurator for KafkaTransportFactory service

    Koco\Kafka\Messenger\KafkaTransportFactory:
        configurator: '@App\DI\KafkaTransportConfigurator'

That must work. Symfony calls KafkaTransportConfigurator->__invoke() right after transport instance created and before it is used to produce transport.

This symfony container feature documented here

Arkemlar commented 1 year ago

Alternative way to set conf instance via setter injection:

     Koco\Kafka\Messenger\KafkaTransportFactory:
         ... # config from https://github.com/KonstantinCodes/messenger-kafka/blob/master/src/Resources/config/services.xml
         calls:
             - setConfInstance: ["@=service('App\\\\DI\\\\KafkaTransportConfigurator').getConf()"]

Symfony docs about this feature.

But configurator looks better, it does not override original service