mateusjunges / laravel-kafka

Use Kafka Producers and Consumers in your laravel app with ease!
https://laravelkafka.com/docs/v2.0/introduction
MIT License
582 stars 83 forks source link

Issue consuming records from Avro topic #278

Closed RyanGuestCF closed 6 months ago

RyanGuestCF commented 6 months ago

Describe the bug When consuming records from our Avro Topic, I am hitting an error where I am told that 'record' is a reserved type. However, I would expect to be able to consume this.

To Reproduce This is a sample of the code I have ` $cachedRegistry = new CachedRegistry( new BlockingRegistry( new PromisingRegistry( new Client(['base_uri' => config('kafka.schema_registry_url')]) ) ), new AvroObjectCacheAdapter() );

            $registry = new AvroSchemaRegistry($cachedRegistry);
            $recordSerializer = new RecordSerializer($cachedRegistry);

            $registry->addBodySchemaMappingForTopic(
                $this->channel_name,
                new KafkaAvroSchema($this->channel_name . '-value', KafkaAvroSchema::LATEST_VERSION)
            );
            $registry->addKeySchemaMappingForTopic(
                $this->channel_name,
                new KafkaAvroSchema($this->channel_name . '-key', KafkaAvroSchema::LATEST_VERSION)
            );

            $deserialiser = new AvroDeserializer($registry, $recordSerializer);

            $consumer = Kafka::createConsumer()->usingDeserializer($deserialiser);

            $consumer->subscribe([$this->channel_name])
                ->withHandler(
                    function (KafkaConsumerMessage $message) use ($eventClassString) {
                        $this->handler($message->getBody(), $eventClassString);
                    }
                )
                ->build()
                ->consume();

`

Expected behavior To consume the records and pass the message to an event

Screenshots {"message":"[ERROR] Error to consume message","context":{"message":{"err":0,"topic_name":"silo_buyapowa_pending_referrals","timestamp":1713967944408,"partition":0,"payload":"\u0000\u0000\u0000\u0000�&test5-1708385353357210002325626-1708385353357\u0002Hd37c6df3-88fe-48c2-8f18-ea95dc43b98c\u00142024-02-198Cityfibre Switch and ConnectLcityfibreswitchandconnect@buyapowa.com\u001aJonathan TEST\u001aTEST@TEST.com\u0002<2a04:4a43:4c9f:e011::13e2:17d9\u0002\u000epending\u0000\u001012345678\u001a1708385353357\u001ccityfibre_raf5\u0002\u0012confirmed\u0002&2024-04-18 02:30:04\u0002&2024-04-18 03:00:02","len":329,"key":"\u0000\u0000\u0000\u0000\u0016&test5-1708385353357","offset":288,"headers":[],"opaque":null},"throwable":{"class":"AvroSchemaParseException","message":"Name \"record\" is a reserved type name","code":0,"file":"/var/www/vendor/flix-tech/avro-php/lib/avro/schema.php:1139"}},"level":400,"level_name":"ERROR","channel":"PHP-KAFKA-CONSUMER-ERROR","datetime":"2024-04-24T14:12:45.281260+00:00","extra":{"uid":"e6502d8b7974fc736eaa9c6864f63a32"}}

[2024-04-24 14:12:45] testing.ERROR: Name "record" is a reserved type name {"exception":"[object] (AvroSchemaParseException(code: 0): Name \"record\" is a reserved type name at /var/www/vendor/flix-tech/avro-php/lib/avro/schema.php:1139) [stacktrace]

0 /var/www/vendor/flix-tech/avro-php/lib/avro/schema.php(881): AvroNamedSchemata->clone_with_new_schema(Object(AvroRecordSchema))

1 /var/www/vendor/flix-tech/avro-php/lib/avro/schema.php(1376): AvroNamedSchema->__construct('record', Object(AvroName), NULL, Object(AvroNamedSchemata), NULL, Array)

2 /var/www/vendor/flix-tech/avro-php/lib/avro/schema.php(362): AvroRecordSchema->__construct(Object(AvroName), NULL, Array, Object(AvroNamedSchemata), 'record', NULL, Array)

3 /var/www/vendor/flix-tech/avro-php/lib/avro/schema.php(315): AvroSchema::real_parse(Array, NULL, Object(AvroNamedSchemata))

4 /var/www/vendor/mateusjunges/confluent-schema-registry-api/src/Registry/PromisingRegistry.php(147): AvroSchema::parse('{\"type\":\"record...')

5 /var/www/vendor/guzzlehttp/promises/src/Promise.php(209): FlixTech\SchemaRegistryApi\Registry\PromisingRegistry->FlixTech\SchemaRegistryApi\Registry\{closure}(Object(GuzzleHttp\Psr7\Response))

6 /var/www/vendor/guzzlehttp/promises/src/Promise.php(158): GuzzleHttp\Promise\Promise::callHandler(1, Object(GuzzleHttp\Psr7\Response), NULL)

7 /var/www/vendor/guzzlehttp/promises/src/TaskQueue.php(52): GuzzleHttp\Promise\Promise::GuzzleHttp\Promise\{closure}()

8 /var/www/vendor/guzzlehttp/guzzle/src/Handler/CurlMultiHandler.php(163): GuzzleHttp\Promise\TaskQueue->run()

9 /var/www/vendor/guzzlehttp/guzzle/src/Handler/CurlMultiHandler.php(189): GuzzleHttp\Handler\CurlMultiHandler->tick()

10 /var/www/vendor/guzzlehttp/promises/src/Promise.php(251): GuzzleHttp\Handler\CurlMultiHandler->execute(true)

11 /var/www/vendor/guzzlehttp/promises/src/Promise.php(227): GuzzleHttp\Promise\Promise->invokeWaitFn()

12 /var/www/vendor/guzzlehttp/promises/src/Promise.php(272): GuzzleHttp\Promise\Promise->waitIfPending()

13 /var/www/vendor/guzzlehttp/promises/src/Promise.php(229): GuzzleHttp\Promise\Promise->invokeWaitList()

14 /var/www/vendor/guzzlehttp/promises/src/Promise.php(69): GuzzleHttp\Promise\Promise->waitIfPending()

15 /var/www/vendor/mateusjunges/confluent-schema-registry-api/src/Registry/BlockingRegistry.php(105): GuzzleHttp\Promise\Promise->wait()

16 /var/www/vendor/mateusjunges/confluent-schema-registry-api/src/Registry/CachedRegistry.php(202): FlixTech\SchemaRegistryApi\Registry\BlockingRegistry->latestVersion('silo_buyapowa_p...')

17 /var/www/vendor/mateusjunges/laravel-kafka/src/Message/Registry/AvroSchemaRegistry.php(125): FlixTech\SchemaRegistryApi\Registry\CachedRegistry->latestVersion('silo_buyapowa_p...')

18 /var/www/vendor/mateusjunges/laravel-kafka/src/Message/Registry/AvroSchemaRegistry.php(111): Junges\Kafka\Message\Registry\AvroSchemaRegistry->getSchemaDefinition(Object(Junges\Kafka\Message\KafkaAvroSchema))

19 /var/www/vendor/mateusjunges/laravel-kafka/src/Message/Registry/AvroSchemaRegistry.php(55): Junges\Kafka\Message\Registry\AvroSchemaRegistry->getSchemaForTopicAndType('silo_buyapowa_p...', 'body')

20 /var/www/vendor/mateusjunges/laravel-kafka/src/Message/Deserializers/AvroDeserializer.php(50): Junges\Kafka\Message\Registry\AvroSchemaRegistry->getBodySchemaForTopic('silo_buyapowa_p...')

21 /var/www/vendor/mateusjunges/laravel-kafka/src/Message/Deserializers/AvroDeserializer.php(30): Junges\Kafka\Message\Deserializers\AvroDeserializer->decodeBody(Object(Junges\Kafka\Message\ConsumedMessage))

22 /var/www/vendor/mateusjunges/laravel-kafka/src/Consumers/Consumer.php(213): Junges\Kafka\Message\Deserializers\AvroDeserializer->deserialize(Object(Junges\Kafka\Message\ConsumedMessage))

23 /var/www/vendor/mateusjunges/laravel-kafka/src/Consumers/Consumer.php(394): Junges\Kafka\Consumers\Consumer->executeMessage(Object(RdKafka\Message))

24 /var/www/vendor/mateusjunges/laravel-kafka/src/Consumers/Consumer.php(178): Junges\Kafka\Consumers\Consumer->handleMessage(Object(RdKafka\Message))

25 /var/www/vendor/mateusjunges/laravel-kafka/src/Consumers/Consumer.php(110): Junges\Kafka\Consumers\Consumer->doConsume()

26 /var/www/vendor/mateusjunges/laravel-kafka/src/Retryable.php(31): Junges\Kafka\Consumers\Consumer->Junges\Kafka\Consumers\{closure}()

27 /var/www/vendor/mateusjunges/laravel-kafka/src/Consumers/Consumer.php(110): Junges\Kafka\Retryable->retry(Object(Closure))

28 /var/www/app/Console/Commands/BaseSubscriberCommand.php(102): Junges\Kafka\Consumers\Consumer->consume()

29 /var/www/app/Console/Commands/testSubscriber.php(40): App\Console\Commands\BaseSubscriberCommand->subscribe('App\\Console\\Com...', 'avro')

30 /var/www/vendor/laravel/framework/src/Illuminate/Container/BoundMethod.php(36): App\Console\Commands\testSubscriber->handle()

31 /var/www/vendor/laravel/framework/src/Illuminate/Container/Util.php(41): Illuminate\Container\BoundMethod::Illuminate\Container\{closure}()

32 /var/www/vendor/laravel/framework/src/Illuminate/Container/BoundMethod.php(93): Illuminate\Container\Util::unwrapIfClosure(Object(Closure))

33 /var/www/vendor/laravel/framework/src/Illuminate/Container/BoundMethod.php(35): Illuminate\Container\BoundMethod::callBoundMethod(Object(Illuminate\Foundation\Application), Array, Object(Closure))

34 /var/www/vendor/laravel/framework/src/Illuminate/Container/Container.php(662): Illuminate\Container\BoundMethod::call(Object(Illuminate\Foundation\Application), Array, Array, NULL)

35 /var/www/vendor/laravel/framework/src/Illuminate/Console/Command.php(211): Illuminate\Container\Container->call(Array)

36 /var/www/vendor/symfony/console/Command/Command.php(326): Illuminate\Console\Command->execute(Object(Symfony\Component\Console\Input\ArgvInput), Object(Illuminate\Console\OutputStyle))

37 /var/www/vendor/laravel/framework/src/Illuminate/Console/Command.php(180): Symfony\Component\Console\Command\Command->run(Object(Symfony\Component\Console\Input\ArgvInput), Object(Illuminate\Console\OutputStyle))

38 /var/www/vendor/symfony/console/Application.php(1078): Illuminate\Console\Command->run(Object(Symfony\Component\Console\Input\ArgvInput), Object(Symfony\Component\Console\Output\ConsoleOutput))

39 /var/www/vendor/symfony/console/Application.php(324): Symfony\Component\Console\Application->doRunCommand(Object(App\Console\Commands\testSubscriber), Object(Symfony\Component\Console\Input\ArgvInput), Object(Symfony\Component\Console\Output\ConsoleOutput))

40 /var/www/vendor/symfony/console/Application.php(175): Symfony\Component\Console\Application->doRun(Object(Symfony\Component\Console\Input\ArgvInput), Object(Symfony\Component\Console\Output\ConsoleOutput))

41 /var/www/vendor/laravel/framework/src/Illuminate/Foundation/Console/Kernel.php(201): Symfony\Component\Console\Application->run(Object(Symfony\Component\Console\Input\ArgvInput), Object(Symfony\Component\Console\Output\ConsoleOutput))

42 /var/www/artisan(35): Illuminate\Foundation\Console\Kernel->handle(Object(Symfony\Component\Console\Input\ArgvInput), Object(Symfony\Component\Console\Output\ConsoleOutput))

43 {main}

"}

mateusjunges commented 6 months ago

Seems like you are using a reserved name here. From the logs you shared:

Name "record" is a reserved type name