confluentinc / confluent-kafka-python

Confluent's Kafka Python Client
http://docs.confluent.io/current/clients/confluent-kafka-python
Other
140 stars 898 forks source link

Getting this package work with asyncio #185

Closed amitripshtos closed 4 years ago

amitripshtos commented 7 years ago

Hey guys, great job on this package!

I wanted to know if there is a way to use asyncio with this package, since I saw some code that using "async await" thingy

If does, there is any example? thanks, Amit

edenhill commented 7 years ago

Hi, we're currently looking into making it compatible with asyncio.

Stay tuned!

related issue: https://github.com/confluentinc/confluent-kafka-python/issues/100

ask commented 7 years ago

Are there any updates on this? Is anyone working on it?

vineetgoel commented 7 years ago

+1

edenhill commented 7 years ago

Not yet, community contributions are very much welcome. Some initial research on what needs to be done would be great!

nurikk commented 7 years ago

+1

jeffwidman commented 7 years ago

In the meantime, https://github.com/aio-libs/aiokafka is a python / kafka library that supports asyncio.

marirs commented 6 years ago

Hi, just checking, is it yet compatiable with asyncio

secretmike commented 6 years ago

Hi, I started looking at this a bit.

asyncio has the ability to watch file descriptors for read availability (https://docs.python.org/3/library/asyncio-eventloop.html#watch-file-descriptors)

librdkafka can write to a file descriptor when a queue is readable with rd_kafka_queue_io_event_enable() (https://github.com/edenhill/librdkafka/blob/7478b5ef16aadd6543fe38bc6a2deb895c70da98/src/rdkafka.h#L2362)

Would it be possible to expose rd_kafka_queue_io_event_enable() in the python API? I don't believe the queues are exposed currently, so confluent-kafka-python would have to attach the appropriate internal queue itself.

Ideally you should be able to pass a file descriptor and a payload. When the file descriptor becomes readable, call poll() or consume().

There are still a handful of synchronous operations (like commit(), committed(), etc.) but consuming and producing messages is still the most heavily used operation.

asyncio support could probably be integrated more directly in this library, but the above might be a good starting point to allow for an asycio wrapper.

tongda commented 6 years ago

Since go-lang wrapper can benefit channel mechanism, which makes the go-lang wrapper more idiom, async/await support may get inspired from it.

vineetgoel commented 6 years ago

Any updates here?

CrazyWisdom commented 6 years ago

+1

StarLightPL commented 5 years ago

+1

andreportela commented 5 years ago

+1

madisvain commented 5 years ago

Leaving this here for reference until this feature gets implemented. It's possible to process the messages with Asyncio it seems.

https://stackoverflow.com/a/55498503

I have not tried this out myself yet but might be of help.

h12w commented 5 years ago

I guess some people might just want it working with asyncio, not working with maximum efficiency. So here is an example that works now:

import asyncio

async def run_consumer(consumer):
    while True:
        msg = consumer.poll(timeout=0)
        if msg is None:
            await asyncio.sleep(delay=0)
            continue
        # handle msg
        consumer.commit(msg, asynchronous=True)

await run_consumer(consumer)
matrixise commented 5 years ago

The main issue is the support of the Schema Registry, this one does not exist with aiokafka.

gjcarneiro commented 5 years ago

asyncio has the ability to watch file descriptors for read availability (https://docs.python.org/3/library/asyncio-eventloop.html#watch-file-descriptors)

Yes, but keep in mind that this API is not very portable, e.g. it does not work with Windows ProactorEventLoop.

mhowlett commented 5 years ago

keep an eye on the confluent blog over the next couple of weeks for an in depth blog post on this client and asyncio. it walks through the producer only, though. @h12w - your solution will busy loop and will be very inefficient. to be efficient, you need to do blocking consumer calls, and to do that you'll need another thread.

sangmeshcp commented 4 years ago

@mhowlett any update on the detailed blog.. couldn't search for it

mhowlett commented 4 years ago

https://www.confluent.io/blog/kafka-python-asyncio-integration/

galen211 commented 4 years ago

Has anyone been able to successfully connect to Confluent Cloud with AIOKafka? I tried using a modified version of the ssl_consume_produce.py example from the AIOKafka repo at https://github.com/aio-libs/aiokafka/blob/master/examples/ssl_consume_produce.py and was unsuccessful. I've configured my AIOKafkaConsumer and AIOKafkaProducer with what I believe to be the correct parameters but am getting the below runtime error. I've also included my modified script and config further below.

/Users/galen/opt/anaconda3/envs/ds/bin/python /Volumes/ThunderBlade/github/kafka/aiokafka/examples/ssl_consume_produce.py
[2020-02-06 17:26:57,060] DEBUG [asyncio]: Using selector: KqueueSelector
[2020-02-06 17:26:57,061] DEBUG [aiokafka.producer.producer]: Starting the Kafka producer
[2020-02-06 17:26:57,061] DEBUG [aiokafka]: Attempting to bootstrap via node at pkc-43n10.us-central1.gcp.confluent.cloud:9092
[2020-02-06 17:26:57,223] DEBUG [aiokafka.conn]: <AIOKafkaConnection host=pkc-43n10.us-central1.gcp.confluent.cloud port=9092> Request 1: ApiVersionRequest_v0()
[2020-02-06 17:26:57,265] DEBUG [aiokafka.conn]: <AIOKafkaConnection host=pkc-43n10.us-central1.gcp.confluent.cloud port=9092> Response 1: ApiVersionResponse_v0(error_code=0, api_versions=[(api_key=0, min_version=0, max_version=8), (api_key=1, min_version=0, max_version=11), (api_key=2, min_version=0, max_version=5), (api_key=3, min_version=0, max_version=9), (api_key=4, min_version=0, max_version=4), (api_key=5, min_version=0, max_version=2), (api_key=6, min_version=0, max_version=6), (api_key=7, min_version=0, max_version=3), (api_key=8, min_version=0, max_version=8), (api_key=9, min_version=0, max_version=6), (api_key=10, min_version=0, max_version=3), (api_key=11, min_version=0, max_version=6), (api_key=12, min_version=0, max_version=4), (api_key=13, min_version=0, max_version=4), (api_key=14, min_version=0, max_version=4), (api_key=15, min_version=0, max_version=5), (api_key=16, min_version=0, max_version=3), (api_key=17, min_version=0, max_version=1), (api_key=18, min_version=0, max_version=3), (api_key=19, min_version=0, max_version=5), (api_key=20, min_version=0, max_version=4), (api_key=21, min_version=0, max_version=1), (api_key=22, min_version=0, max_version=3), (api_key=23, min_version=0, max_version=3), (api_key=24, min_version=0, max_version=1), (api_key=25, min_version=0, max_version=1), (api_key=26, min_version=0, max_version=1), (api_key=27, min_version=0, max_version=0), (api_key=28, min_version=0, max_version=2), (api_key=29, min_version=0, max_version=1), (api_key=30, min_version=0, max_version=2), (api_key=31, min_version=0, max_version=2), (api_key=32, min_version=0, max_version=2), (api_key=33, min_version=0, max_version=1), (api_key=34, min_version=0, max_version=1), (api_key=35, min_version=0, max_version=1), (api_key=36, min_version=0, max_version=1), (api_key=37, min_version=0, max_version=1), (api_key=38, min_version=0, max_version=2), (api_key=39, min_version=0, max_version=1), (api_key=40, min_version=0, max_version=1), (api_key=41, min_version=0, max_version=1), (api_key=42, min_version=0, max_version=2), (api_key=43, min_version=0, max_version=2), (api_key=44, min_version=0, max_version=1), (api_key=45, min_version=0, max_version=0), (api_key=46, min_version=0, max_version=0), (api_key=47, min_version=0, max_version=0), (api_key=10000, min_version=0, max_version=0)])
[2020-02-06 17:26:57,266] DEBUG [aiokafka.conn]: <AIOKafkaConnection host=pkc-43n10.us-central1.gcp.confluent.cloud port=9092> Request 2: SaslHandShakeRequest_v1(mechanism='PLAIN')
[2020-02-06 17:26:57,303] DEBUG [aiokafka.conn]: <AIOKafkaConnection host=pkc-43n10.us-central1.gcp.confluent.cloud port=9092> Response 2: SaslHandShakeResponse_v1(error_code=0, enabled_mechanisms=['PLAIN', 'OAUTHBEARER'])
Traceback (most recent call last):
  File "/Volumes/ThunderBlade/github/kafka/aiokafka/examples/ssl_consume_produce.py", line 78, in <module>
    loop.run_until_complete(task)
  File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/asyncio/base_events.py", line 579, in run_until_complete
    return future.result()
  File "/Volumes/ThunderBlade/github/kafka/aiokafka/examples/ssl_consume_produce.py", line 73, in <module>
    loop.run_until_complete(task)
  File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/asyncio/base_events.py", line 579, in run_until_complete
    return future.result()
  File "/Volumes/ThunderBlade/github/kafka/aiokafka/examples/ssl_consume_produce.py", line 34, in produce_and_consume
    start_future = await producer.start()
  File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/site-packages/aiokafka/producer/producer.py", line 171, in start
    await self.client.bootstrap()
  File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/site-packages/aiokafka/client.py", line 203, in bootstrap
    version_hint=version_hint)
  File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/site-packages/aiokafka/conn.py", line 90, in create_conn
    await conn.connect()
  File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/site-packages/aiokafka/conn.py", line 214, in connect
    await self._do_sasl_handshake()
  File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/site-packages/aiokafka/conn.py", line 281, in _do_sasl_handshake
    payload, expect_response = res
RuntimeError: await wasn't used with future
[2020-02-06 17:26:57,315] ERROR [asyncio]: Unclosed AIOKafkaProducer
producer: <aiokafka.producer.producer.AIOKafkaProducer object at 0x7fcee03a84d0>

Process finished with exit code 1

My adapted version of the code is:

import asyncio
import os
import logging

from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
from aiokafka.helpers import create_ssl_context
from kafka.common import TopicPartition
from aiokafka.errors import KafkaError

from aiokafka import AIOKafkaClient

import ccloud_lib

conf = ccloud_lib.read_ccloud_config('kafka_config.conf')
ssl_context = create_ssl_context(cafile='cacert.pem')

log_level = logging.DEBUG
log_format = '[%(asctime)s] %(levelname)s [%(name)s]: %(message)s'
logging.basicConfig(level=logging.DEBUG, format=log_format)

async def produce_and_consume(loop):
    # Produce
    producer = AIOKafkaProducer(
        bootstrap_servers=conf['bootstrap.servers'],
        loop = loop,
        security_protocol=conf['security.protocol'],
        sasl_mechanism=conf['sasl.mechanism'],
        ssl_context=ssl_context,
        sasl_plain_username=conf['sasl.username'],
        sasl_plain_password=conf['sasl.password'],
        api_version='0.10'
    )
    try:
        start_future = await producer.start()
        response = await start_future  # wait until message is produced
    except KafkaError as err:
        print("some kafka error on produce: {}".format(err))

    try:
        msg = await producer.send_and_wait(
            'my_topic', b"Super Message", partition=0)
    finally:
        await producer.stop()

    consumer = AIOKafkaConsumer(
        bootstrap_servers=conf['bootstrap.servers'],
        loop=loop,
        ssl_context=ssl_context,
        security_protocol=conf['security.protocol'],
        sasl_mechanism=conf['sasl.mechanism'],
        sasl_plain_password=conf['sasl.password'],
        sasl_plain_username=conf['sasl.username']
    )
    try:
        start_future = await consumer.start()
        response = await start_future  # wait until message is produced
    except KafkaError as err:
        print("some kafka error on produce: {}".format(err))

    try:
        consumer.seek(TopicPartition('my_topic', 0), msg.offset)
        fetch_msg = await consumer.getone()
    finally:
        await consumer.stop()

    print("Success", msg, fetch_msg)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    task = loop.create_task(produce_and_consume(loop))
    try:
        loop.run_until_complete(task)
    finally:
        loop.run_until_complete(asyncio.sleep(0, loop=loop))
        task.cancel()
        try:
            loop.run_until_complete(task)
        except asyncio.CancelledError:
            pass

My obfuscated configuration conf looks like this:

bootstrap.servers=*********.us-central1.gcp.confluent.cloud:9092
ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="*********" password\="******************";
sasl.username=*********
sasl.password=*********

Is there something about my configuration which is incorrect or missing? [NB: also posted at StackOverflow, but thought it might have a better audience here]

jeffwidman commented 4 years ago

@galen211 you should post the issue on https://github.com/aio-libs/aiokafka/issues or email confluent support... this repo is unrelated to either that lib or that cloud.

rnpridgeon commented 4 years ago

As @jeffwidman pointed out you will need to take this up with aiokafka. I'm closing this issue with @mhowlett's blog being the resolution.