rob3000 / nestjs-kafka

NestJS integration with KafkaJS
The Unlicense
130 stars 44 forks source link

Invalid topic on Producer #11

Open tr11 opened 3 years ago

tr11 commented 3 years ago

I'm trying to send a batch of messages with the following code:

    await this.client.send(
      {
        topic: 'topic-name',
        messages: [
          { key: '1', value: 'string1' },
          { key: '2', value: 'string2' },
        ],
      });

It seems that the serializer is trying to serialize the whole object including the topic, which results in kafkajs being called with

[ { topic: undefined, messages: undefined } ]
rob3000 commented 3 years ago

@tr11 was this with the avro serializer?

tr11 commented 3 years ago

@rob3000, no just the regular one. Using it with kafkajs@1.15.0 and @nestjs/core@7.6.12, btw.

Alpha018 commented 3 years ago

@rob3000 de you have any solution for this? i have the same issue 😢

tr11 commented 3 years ago

What I'm doing is the following:

import { Serializer } from '@nestjs/microservices';

export class RawSerializer implements Serializer {
  serialize(value: any): any {
    return value;
  }
}

and then initializing the module with

   KafkaModule.register([
      {
        ...
        options: {
          ...
          serializer: new RawSerializer(),
        },
      },
    ]),

This works well as my messages are structured as:

       await kafka.send({
          topic: ...,
          messages: messages.map((x) => ({
            key: x.id,
            value: JSON.stringify(x),
          })),
        });
Alpha018 commented 3 years ago

@tr11 thanks ❤️ this working fine!!

rivneglee commented 3 years ago

The problem is at this line https://github.com/rob3000/nestjs-kafka/blob/master/src/serializer/kafka-request.serializer.ts#L21. When call client.send the argument KafkaMessageSend is expected but in serializer it checks if field value and key appear in argument and wrap the argument.