KonstantinCodes / messenger-kafka

Simple Kafka transport for Symfony Messenger.
MIT License
84 stars 35 forks source link

Custom header option for consumers #29

Open ilyashtrikul opened 3 years ago

ilyashtrikul commented 3 years ago

Messenger Serializer requires type key with class FQN. This PR makes it possible with headers option in transport config.

framework:
    messenger:
        transports:
            producer:
               # ...
            consumer:
                dsn: '%env(KAFKA_URL)%'
                # serializer: App\Infrastructure\Messenger\MySerializer
                options:
                    commitAsync: true
                    receiveTimeout: 10000
                    topic:
                        name: "events"
                    kafka_conf:
                        enable.auto.offset.store: 'false'
                        group.id: 'my-group-id' # should be unique per consumer
                        security.protocol: 'sasl_ssl'
                        ssl.ca.location: '%kernel.project_dir%/config/kafka/ca.pem'
                        sasl.username: '%env(KAFKA_SASL_USERNAME)%'
                        sasl.password: '%env(KAFKA_SASL_PASSWORD)%'
                        sasl.mechanisms: 'SCRAM-SHA-256'
                        max.poll.interval.ms: '45000'
                    topic_conf:
                        auto.offset.reset: 'earliest'
                    # Custom headers for all messages
                    headers:
                      type: Some\Custom\Message\Class
KonstantinCodes commented 3 years ago

Hi @ilyashtrikul, thanks for the PR!

I also see you fixed the test :)

The headers field is definitely useful. But you can achieve the same result if you supply a custom serializer. And supplying a custom serializer is standard for symfony messenger.

So I'm wondering if you have considered that option?

ilyashtrikul commented 3 years ago

You're right, custom serializer is a solution, but it's not just copy-paste with some fixes, it's a bundle with own configuration and etc only for one header field.

ilyashtrikul commented 3 years ago

If you think this changes are not part of this library or will not make work a bit easy - feel free to close this PR ;)

KonstantinCodes commented 3 years ago

@ilyashtrikul I'm not sure what you mean with own bundle with configuration.

You just specify the service ID of your Serializer in the Messenger config

            consumer:
                dsn: '%env(KAFKA_URL)%'
                serializer: App\Infrastructure\Messenger\MySerializer
ilyashtrikul commented 3 years ago

MySerializer don't know which type of message come in (without specific headers), so it should be some hardcode for topic -> class in MySerializer or make serializer with own configuration for topic-class (which is better move to bundle for flexible configuring) and it will be a lot of serializer services. My way is set topic-class config nearby all consumer configuration.

KonstantinCodes commented 3 years ago

@ilyashtrikul Are you using Avro? Avro includes the qualified name, that you can use in your serializer to determine the output class

gayansanjeewa commented 2 years ago

Hi @ilyashtrikul, thanks for the PR!

I also see you fixed the test :)

The headers field is definitely useful. But you can achieve the same result if you supply a custom serializer. And supplying a custom serializer is standard for symfony messenger.

So I'm wondering if you have considered that option?

Hi @KonstantinCodes!

I find it really helpful to have the ability to pass the Event class as a header option.

I'm using your package in one of my projects (Huge thanks for creating this package!!) and I consume about 10+ different topics and for each, I had to introduce 10+ serializers.

If we have the ability to mention, okay for this topic this is the Event class by mentioning it in the header, then I can simplify this by just having a single generic serializer

What I mean is this

    order_delayed:
        dsn: '%env(KAFKA_URL)%'
        serializer: App\KafkaSerializer
        retry_strategy:
            max_retries: 0
        options:
            topic:
                name: order.delayed
            kafka_conf:
                enable.auto.offset.store: 'false'
                group.id: tracking
            headers:
                type: App\OrderDelayedEvent

Then in a KafkaSerializer, I would just have

$message = $this->serializer->deserialize($encodedEnvelope['body'], $encodedEnvelope['headers']['type'], self::FORMAT);

What do you think?


Update: If you have a doubt as to whether the headers is the best place to mention the event class, I'd like to suggest an alternative approach like below

      options:
          topic:
              name: order.delayed
              event: App\OrderDelayedEvent
KonstantinCodes commented 2 years ago

@gayansanjeewa May I ask, how the payload is serialized? it it plain json?

gayansanjeewa commented 2 years ago

Hey @KonstantinCodes

So the decode method of the serializer is like

    /**
     * @param string[] $encodedEnvelope
     */
    public function decode(array $encodedEnvelope): Envelope
    {
        if (empty($encodedEnvelope['body'])) {
            return new Envelope(new EmptyMessageEvent());
        }

        /** @var OrderDelayedEvent $message */
        $message = $this->serializer->deserialize($encodedEnvelope['body'], OrderDelayedEvent::class, 'json');

        return new Envelope($message);
    }

and the payload would be like

{
    "order_id": "3c77d2f1-4b2a-4eac-bef3-a3c4c74fc64a",
    "user_id": 3300317213,
    "user_email": "user@email.com",
    "order_number": "OPL-100001111"
}
gayansanjeewa commented 2 years ago

Hey @KonstantinCodes ping :smile: I'd like to hear your feedback on my previous comment. Thanks!

KonstantinCodes commented 2 years ago

@gayansanjeewa Overriding these properties doesn't really seem like an ideal solution. If you do it this way, you'll need to configure different transports for each topic you consume.

I'd rather just pass the KafkaMessage to the Serializer so you can map the topic name to a class.

gayansanjeewa commented 2 years ago

Okay, thanks for your feedback @KonstantinCodes!