php-enqueue / enqueue-dev

Message Queue, Job Queue, Broadcasting, WebSockets packages for PHP, Symfony, Laravel, Magento. DEVELOPMENT REPOSITORY - provided by Forma-Pro
https://enqueue.forma-pro.com/
MIT License
2.17k stars 434 forks source link

[rdkafka] RdKafka transport does not support merging for `global` configuration #1002

Open Wirone opened 4 years ago

Wirone commented 4 years ago

Considering:

# config/packages/enqueue.yaml
enqueue:
    default:
        transport:
            dsn: "rdkafka://"
            global:
                bootstrap.servers: '%env(KAFKA_BOOTSTRAP_SERVERS)%'
                security.protocol: ssl
                group.id: '%env(KAFKA_GROUP_ID)%'
                ssl.ca.location: '%env(KAFKA_ROOT_CA)%'
                ssl.certificate.location: '%env(KAFKA_CERT)%'
                ssl.key.location: '%env(KAFKA_KEY)%'
                enable.ssl.certificate.verification: 'false'
                ssl.endpoint.identification.algorithm: 'none'
                enable.auto.commit: 'true'
            topic:
                auto.offset.reset: latest
        client: ~

and

# config/packages/dev/enqueue.yaml
enqueue:
    default:
        transport:
            global:
                debug: all

I would like debug: all to be merged to global configuration from main configuration.

Unfortunately it does not work this way (enqueue-bundle 0.9.13), the final config for global is taken from last occurence.

It works when I add:

->arrayNode('global')
    ->ignoreExtraKeys(false)
->end()

to the Enqueue\Symfony\DependencyInjection\TransportFactory::getConfiguration(). Symfony's DI container has to know that there is global node with array type, which can take any options.

:information_source: Would be great if it could be fixed in 0.9 branch because we're using sroze/messenger-enqueue-transport which currently does not allow using Enqueue 0.10.

stale[bot] commented 4 years ago

This issue has been automatically marked as stale because it has not had recent activity. It will be closed if no further activity occurs. Thank you for your contributions.

Wirone commented 4 years ago

Any news on this?

Steveb-p commented 4 years ago

@Wirone sorry, I'm occupied and need constant reminders about issues such as this :sweat_smile: I'll try to get to it soon™. Unless you'd like to create a PR (your solution might just what will be enough).

Wirone commented 4 years ago

@Steveb-p I thought I can do a PR, but this behaves really weird and I don't feel competent enough to dig into this code. Adding those 3 lines mentioned in the issue has only effect on bin/console debug:config enqueue, where it looks OK:

Current configuration for extension with alias "enqueue"
========================================================

enqueue:
    default:
        transport:
            dsn: 'rdkafka://'
            global:
                bootstrap.servers: '%env(KAFKA_BOOTSTRAP_SERVERS)%'
                security.protocol: ssl
                group.id: '%env(KAFKA_GROUP_ID)%'
                ssl.ca.location: '%env(KAFKA_ROOT_CA)%'
                ssl.certificate.location: '%env(KAFKA_CERT)%'
                ssl.key.location: '%env(KAFKA_KEY)%'
                enable.ssl.certificate.verification: 'false'
                ssl.endpoint.identification.algorithm: none
                enable.auto.commit: 'true'
                debug: all
            topic:
                auto.offset.reset: earliest
        client:
            traceable_producer: true
            prefix: enqueue
            separator: .
            app_name: app
            router_topic: default
            router_queue: default
            router_processor: null
            redelivered_delay_time: 0
            default_queue: default
            driver_options: {  }
        consumption:
            receive_timeout: 10000
        extensions:
            doctrine_ping_connection_extension: false
            doctrine_clear_identity_map_extension: false
            doctrine_odm_clear_identity_map_extension: false
            doctrine_closed_entity_manager_extension: false
            reset_services_extension: false
            signal_extension: false
            reply_extension: true

but actual factory, as far as I think, works in custom way so in RdKafkaConnectionFactory::__construct() receives $config:

^ array:4 [
  "dsn" => "rdkafka://"
  "global" => array:1 [
    "debug" => "all"
  ]
  "topic" => array:1 [
    "auto.offset.reset" => "earliest"
  ]
  "error_cb" => (...)
]

So RdKafka context has, under global key, only values from config/packages/dev/enqueue.yamland NOT from config/packages/enqueue.yaml. Honestly I don't know where to look for it. I searched if there is disallowNewKeysInSubsequentConfigs() used in DI config, but not. Can you point me somewhere?

PS. We're currently on enqueue-bundle in 0.9.15 version and problem is still there.

Wirone commented 2 years ago

@Steveb-p After over 2 years I think it's a good time for a friendly ping 😉

Steveb-p commented 2 years ago

@Wirone I can't promise I'll work on this, as I haven't touched anything Kafka related in a year or so, but I'll try to find some time and squeeze a solution for you.