celery / kombu

Messaging library for Python.
http://kombu.readthedocs.org/
BSD 3-Clause "New" or "Revised" License
2.81k stars 919 forks source link

confluent_kafka #1647

Open VDneprovskij opened 1 year ago

VDneprovskij commented 1 year ago

how to use confluent_kafka? I didn't see the solution in the examples or tests. Now i use: with Connection('confluentkafka://localhost:9092') as conn - its worked. But how i can create Consumer and pass parametrs (group.id, sasl.mechanism and others). Now I'm trying to follow the analogy, but it doesn't work.

from kombu import Connection, Exchange, Queue
from kombu.transport.confluentkafka import Channel

ex = Exchange('test', type='topic')
queue = Queue('test', exchange=ex, routing_key='t.*')
cfg = {
    'group.id': 'my-group',
    'auto.offset.reset': 'earliest',
    # 'value.deserializer': lambda x, y: x.decode('utf-8'),
    'sasl.mechanism': "SASL_PLAINTEXT",
    'sasl.username': None,
    'sasl.password': None
}
def process_media(body, message):
    print(body)
    message.ack()

with Connection('confluentkafka://localhost:9092') as conn:
  ch = Channel(conn.connection, kafka_consumer_config=cfg)
  with conn.Consumer(queue, callbacks=[process_media], channel=ch) as consumer:
      # Process messages and handle events on all channels
      while True:
          conn.drain_events()
open-collective-bot[bot] commented 1 year ago

Hey @VDneprovskij :wave:, Thank you for opening an issue. We will get back to you as soon as we can. Also, check out our Open Collective and consider backing us - every little helps!

We also offer priority support for our sponsors. If you require immediate assistance please consider sponsoring us.

VDneprovskij commented 1 year ago

in this case:

cfg = {
    'group.id': 'my-group',
    'auto.offset.reset': 'earliest',
    # 'value.deserializer': lambda x, y: x.decode('utf-8'),
    'sasl.mechanism': "SASL_PLAINTEXT",
    'sasl.username': None,
    'sasl.password': None
}
def process_media(body, message):
    print(body)
    message.ack()
with Connection('confluentkafka://localhost:9092', transport_options=cfg) as conn:
  with conn.Consumer(Queue('kontext-events_dict', channel=conn.channel()), conn.channel(),
                     callbacks=[process_media],
                     accept=['json', 'pickle', 'msgpack', 'yaml', 'application/x-python-serialize']) as consumer:
      print(consumer.channel.client, consumer.channel.common_config)
      while True:
          print(conn.info())
          conn.drain_events()

connecting and receiving the first message works, but then an error occurs, becous payload has no different section such as properties, content-encoding, headers and others: image image

test message in kafka: {'first': '1', 'second': '2'}

thuibr commented 3 months ago

Hi @VDneprovskij if you're still using Kafka with Kombu, I am adding documentation in https://github.com/celery/celery/pull/8935. Would you be willing to a.) help fill in the blanks in terms of how to authenticate using SASL, and b.) help verify the docs? Thanks!

thuibr commented 3 months ago

@VDneprovskij upon closer inspection of the options in the transport file, I think maybe the username and password needs to be in the broker string? I'll update if I can get that to work.

thuibr commented 3 months ago

It actually looks like it needs to be in the kafka_admin_config:

kafka_admin_config = {
    "sasl.username": sasl_username,
    "sasl.password": sasl_password,
}
thuibr commented 3 months ago

This is working for me, but it might be able to be trimmed down:

import os

task_serializer = 'json'
broker_transport_options = {
    # "allow_create_topics": True,
}
broker_connection_retry_on_startup = True

# For using SQLAlchemy as the backend
# result_backend = 'db+postgresql://postgres:example@localhost/postgres'

broker_transport_options.update({
    "security_protocol": "SASL_SSL",
    "sasl_mechanism": "SCRAM-SHA-512",
})
sasl_username = os.environ["SASL_USERNAME"]
sasl_password = os.environ["SASL_PASSWORD"]
broker_url = f"confluentkafka://{sasl_username}:{sasl_password}@broker:9094"
kafka_admin_config = {
    "sasl.username": sasl_username,
    "sasl.password": sasl_password,
}
kafka_common_config = {
    "sasl.username": sasl_username,
    "sasl.password": sasl_password,
    "security.protocol": "SASL_SSL",
    "sasl.mechanism": "SCRAM-SHA-512",
    "bootstrap_servers": "broker:9094",
}