sroze / messenger-enqueue-transport

Uses Enqueue with Symfony's Messenger component.
MIT License
190 stars 53 forks source link

It's not possible to pass consumer specific options from DSN #88

Open maks-rafalko opened 4 years ago

maks-rafalko commented 4 years ago

We use Kafka with https://github.com/php-enqueue/rdkafka transport and it's impossible to set a Consumer-specific options, in particular group.id.

There is only global group.id setting that adds the group for all consumers rather than on per-consumer basis.

Class QueueInteropTransport creates a consumer here:

https://github.com/sroze/messenger-enqueue-transport/blob/a707be977963a386c45a63e9e9a0d63a9eb4421c/QueueInteropTransport.php#L250-L257

And there is no way to send any options/settings except the Destination class, which is a RdKafkaTopic in our case.

What we would like to be able to do is to set settings that would be passed to the consumer constructor

https://github.com/php-enqueue/rdkafka/blob/2449f06c892656d598b1c8ab8ceb4a4ac848ab6c/RdKafkaContext.php#L114

by specifing transport-specific options in messenger.yaml

batch:
     dsn: enqueue://default?queue[name]=name&group.id=custom-consumer-group

Any hints how to do it with all these tiers layers of abstractions?

Steveb-p commented 4 years ago

Current workaround would be creating another enqueue entry with another Kafka configuration, and then selecting it by dsn, like: enqueue://another.

maks-rafalko commented 4 years ago

Yes, there are definitely many workarounds (we have overridden RdFakfaContext for this case). But let's think about what architecture changes needs to be done to allow passing consumer specific options.

Currently, interop interfaces does not allow to do it. E.g. public function createConsumer(Destination $destination): Consumer; - we can pass only destination which is RdKafkaTopic. No settings for consumer itself.

rdotter commented 4 years ago

@maks-rafalko we use the folowing enqueue config and works for multiple envs and multiple settings within global / topic setting for Kafka:

            dsn: 'kafka://%env(resolve:ENQUEUE_KAFKA_DSN_BROKER)%'
            global:
                metadata.broker.list: '%env(resolve:ENQUEUE_KAFKA_DSN_BROKER)%'
                group.id: app-id
                enable.auto.offset.store: 'false'
            topic:
                auto.offset.reset: beginning
Nyholm commented 4 years ago

Hm... If you want something "transport" specific, the best way to pass variables is to use options.


framework:
    messenger:
        transports:
            foobar: 
                dsn: '....'
                options: { group.id: custom-consumer-group }

These options is passed to the TransportFactory. We probably need some changes to that factory so it all will work.

maks-rafalko commented 4 years ago

the best way to pass variables is to use options.

DSN options are merged with options, so it does not matter what to use here (though options looks better).

https://github.com/sroze/messenger-enqueue-transport/blob/0e4c9348427c815b6e9090808b6281b13543a076/QueueInteropTransportFactory.php#L52-L62

Current issues:

https://github.com/sroze/messenger-enqueue-transport/blob/0e4c9348427c815b6e9090808b6281b13543a076/QueueInteropTransport.php#L170-L197

https://github.com/sroze/messenger-enqueue-transport/blob/0e4c9348427c815b6e9090808b6281b13543a076/QueueInteropTransport.php#L265

Nyholm commented 4 years ago

Could you make a PR for this?

Maybe adding an option key with is called consumer_options?

Im not sure what the best solution is at the moment. But come with a suggestion.