KonstantinCodes / messenger-kafka

Simple Kafka transport for Symfony Messenger.
MIT License
84 stars 35 forks source link

Add a way to log statistics #67

Open jry25 opened 2 years ago

jry25 commented 2 years ago

Hi, I didn't find anything to log statistics.

I tried to set statistics.interval.ms and stats_cb but got errors.

Using

        statistics.interval.ms: '100'
        stats_cb: '@App\Messenger\StatsCallBack'

Result on

In PhpDumper.php line 1561:

  [Symfony\Component\DependencyInjection\Exception\InvalidArgumentException]                                                                                          
  You cannot dump a container with parameters that contain references to other services (reference to service "App\Messenger\StatsCallBack" found in "/0/stats_cb").  

Exception trace:
  at /srv/app/vendor/symfony/dependency-injection/Dumper/PhpDumper.php:1561
 Symfony\Component\DependencyInjection\Dumper\PhpDumper->exportParameters() at /srv/app/vendor/symfony/dependency-injection/Dumper/PhpDumper.php:1553
 Symfony\Component\DependencyInjection\Dumper\PhpDumper->exportParameters() at /srv/app/vendor/symfony/dependency-injection/Dumper/PhpDumper.php:1439
 Symfony\Component\DependencyInjection\Dumper\PhpDumper->addDefaultParametersMethod() at /srv/app/vendor/symfony/dependency-injection/Dumper/PhpDumper.php:221
 Symfony\Component\DependencyInjection\Dumper\PhpDumper->dump() at /srv/app/vendor/symfony/http-kernel/Kernel.php:780
 Symfony\Component\HttpKernel\Kernel->dumpContainer() at /srv/app/vendor/symfony/http-kernel/Kernel.php:606
 Symfony\Component\HttpKernel\Kernel->initializeContainer() at /srv/app/vendor/symfony/http-kernel/Kernel.php:136
 Symfony\Component\HttpKernel\Kernel->boot() at /srv/app/vendor/symfony/framework-bundle/Console/Application.php:169
 Symfony\Bundle\FrameworkBundle\Console\Application->registerCommands() at /srv/app/vendor/symfony/framework-bundle/Console/Application.php:75
 Symfony\Bundle\FrameworkBundle\Console\Application->doRun() at /srv/app/vendor/symfony/console/Application.php:149
 Symfony\Component\Console\Application->run() at /srv/app/bin/console:39

Using

        statistics.interval.ms: '100'
        stats_cb: 'App\Messenger\StatsCallBack'

Result on

In KafkaTransportFactory.php line 68:

  [RdKafka\Exception (-1)]                                                
  Property "stats_cb" must be set through dedicated .._set_..() function  

Exception trace:
  at /srv/app/vendor/koco/messenger-kafka/src/Messenger/KafkaTransportFactory.php:68
 RdKafka\Conf->set() at /srv/app/vendor/koco/messenger-kafka/src/Messenger/KafkaTransportFactory.php:68
 Koco\Kafka\Messenger\KafkaTransportFactory->createTransport() at /srv/app/vendor/symfony/messenger/Transport/TransportFactory.php:36
 Symfony\Component\Messenger\Transport\TransportFactory->createTransport() at /srv/app/var/cache/test/ContainerT9GqRDx/getMessenger_Transport_ProductupdateService.php:13
 require() at /srv/app/var/cache/test/ContainerT9GqRDx/srcApp_KernelTestDebugContainer.php:787
 ContainerT9GqRDx\srcApp_KernelTestDebugContainer->load() at /srv/app/vendor/symfony/dependency-injection/Container.php:448
 Symfony\Component\DependencyInjection\Container->getService() at /srv/app/vendor/symfony/dependency-injection/Argument/ServiceLocator.php:42
 Symfony\Component\DependencyInjection\Argument\ServiceLocator->get() at /srv/app/vendor/symfony/messenger/Command/ConsumeMessagesCommand.php:153
 Symfony\Component\Messenger\Command\ConsumeMessagesCommand->execute() at /srv/app/vendor/symfony/console/Command/Command.php:255
 Symfony\Component\Console\Command\Command->run() at /srv/app/vendor/symfony/console/Application.php:1027
 Symfony\Component\Console\Application->doRunCommand() at /srv/app/vendor/symfony/framework-bundle/Console/Application.php:97
 Symfony\Bundle\FrameworkBundle\Console\Application->doRunCommand() at /srv/app/vendor/symfony/console/Application.php:273
 Symfony\Component\Console\Application->doRun() at /srv/app/vendor/symfony/framework-bundle/Console/Application.php:83
 Symfony\Bundle\FrameworkBundle\Console\Application->doRun() at /srv/app/vendor/symfony/console/Application.php:149
 Symfony\Component\Console\Application->run() at /srv/app/bin/console:39

Here is my logger:

<?php

namespace App\Messenger;

use Psr\Log\LoggerInterface;

class StatsCallBack
{
    private LoggerInterface $logger;

    public function __construct(LoggerInterface $logger)
    {
        $this->logger = $logger;
    }

    public function __invoke($kafka, $json, $json_len): void
    {
        /**
         * Allow logging statistics
         * @see https://github.com/edenhill/librdkafka/blob/master/STATISTICS.md#partitions
         * @see https://github.com/edenhill/librdkafka/wiki/Consumer-lag-monitoring
         */
        foreach ($kafka->getAssignment() as $partition) {
            $this->logger->info(
                'Kafka: Stats on {topic}, {partition}',
                [
                    'json_len' => $json_len,
                    'json' => $json,
                    'topic' => $partition->getTopic(),
                    'partition' => $partition->getPartition(),
                ]
            );
        }
    }
}

I need it to keep an eye on my consumer lag in order to achieve auto-scaling when lag increase.

Versions

rdkafka

rdkafka support => enabled version => 4.0.3 build date => May 3 2022 14:54:39 librdkafka version (runtime) => 1.5.0 librdkafka version (build) => 1.5.0.255

koco/messenger-kafka: v0.17.0

jry25 commented 2 years ago

Just added a PR. Please let me know if I'm heading forward on the right way. I have no clue how to test it, I'd be happy to add test if you let me know what is the test strategy on this.