confluentinc / confluent-kafka-python

Confluent's Kafka Python Client
http://docs.confluent.io/current/clients/confluent-kafka-python
Other
3.73k stars 882 forks source link

Consumer compatible with asyncio #1017

Open gwaramadze opened 3 years ago

gwaramadze commented 3 years ago

Description

A bit of a context question: what is the most optimal consumption pattern if I have more than one topics, and possibly multiple partitions per topic to be handled by a single application instance? I feel like doing this is an anti-pattern:

...
consumer = Consumer(config)
consumer.subscribe(["topic1", "topic2"])

while True:
    msg = consumer.poll(1.0)
    if msg is None:
        continue
    elif msg.error():
        print('error: {}'.format(msg.error()))
    elif msg.topic() == "topic1":
        # topic1 logic
    elif msg.topic() == "topic2":
        # topic2 logic
...

Let's assume that the topic1 logic is a lightweight filter/repartition (discard most of the stream, rehash a new key and publish to topic2) and the topic2 logic is an IO. Seems, counterproductive, It's like maintaining a single queue to a grocery and a pharmacy. Now, how to optimize this?

I have read this mind-blowing blog post https://www.confluent.io/blog/introducing-confluent-parallel-message-processing-client/

I figure that the key-level parallelism is like a holy grail, not available in the Python world as of now. But first things first: a good enough step would be to shuffle each topic to a separate Consumer instance, hopefully with asyncio rather than multiprocessing.

I have read though related issues: https://github.com/confluentinc/confluent-kafka-python/issues/185 and https://github.com/confluentinc/confluent-kafka-python/issues/100 and the famous blog post https://www.confluent.io/blog/kafka-python-asyncio-integration/ and several other resources I cannot comprehend now in my gazillion open browser tabs 😆

I have come up with a snippet of code that I am kindly requesting to review. This generally works in a local environment, I wonder what you think. Does this approach make sense or is it a disaster awaiting the moment I put some serious load there. Thanks in advance.

import asyncio
import functools
import logging
import signal
import sys

import confluent_kafka

signal.signal(signal.SIGTERM, lambda *args: sys.exit())
logging.basicConfig(level=logging.INFO)
log = logging.getLogger(__name__)

config = {
    "group.id": "consumer-group-name",
    "bootstrap.servers": "localhost:9092",
}

async def consume(config, topic):
    consumer = confluent_kafka.Consumer(config)
    consumer.subscribe([topic])
    loop = asyncio.get_running_loop()
    poll = functools.partial(consumer.poll, 0.1)
    try:
        log.info(f"Starting consumer: {topic}")
        while True:
            message = await loop.run_in_executor(None, poll)
            if message is None:
                continue
            if message.error():
                log.error("Consumer error: {}".format(msg.error()))
                continue
            # TODO: Inject topic-specific logic here
            log.info(f"Consuming message: {message.value()}")
    finally:
        log.info(f"Closing consumer: {topic}")
        consumer.close()

consume = functools.partial(consume, config)

async def main():
    await asyncio.gather(
        consume("topic1"),
        consume("topic2"),
    )

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except (KeyboardInterrupt, SystemExit):
        log.info("Application shutdown complete")

Checklist

Please provide the following information:

astubbs commented 3 years ago

hi @gwaramadze ! Thanks for the compliment! I'm the author of the parallel consumer project :) Let me know how I can help, or if you have any ideas for the PC client :)

https://github.com/confluentinc/parallel-consumer

@gwaramadze if we think there's enough demand for this functionality, we may look at porting the algorithms to librdkafka, to be inherited by the wrappers...

mhowlett commented 3 years ago

ideally we would provide a higher level class along the lines of what @astubbs made out of the box, but for now we don't.

i don't see a problem with your approach, though i might be missing something... be aware that any callbacks (statistics, rebalance etc) would happen on an executor thread. ideally you would only use one consumer as it is more efficient, but the logic would also be a lot more difficult (and in practice likely won't matter for most scenarios).

gwaramadze commented 3 years ago

@astubbs @mhowlett Thank you for your answers.

EmanDiab commented 1 year ago

That worked for me using confluent-kafka = 1.5.0 Thanks @gwaramadze I just had to change the last couple of lines to be

 try:
      loop = asyncio.get_event_loop()
      loop.run_until_complete(main())
  except (KeyboardInterrupt, SystemExit):
      log.info("Stream shutdown complete")
ericnesschrw commented 1 year ago

This code was very helpful in creating an async consumer. One thing that made it easier for me to unit test consumer is to remove functools.partial. This is only needed to pass in keyword arguments, but 0.1 is a positional argument.

    loop = asyncio.get_running_loop()
    try:
        log.info(f"Starting consumer: {topic}")
        while True:
            message = await loop.run_in_executor(None, consumer.poll, 0.1)
IgorMilavec commented 1 year ago

Hello. Is there any progress/plans on this feature? The code the op posted works, but is inefficient and blocks/quantizes the program execution.

erang-simplex commented 9 months ago

Hi, Did anyone try to use the code in real production environment and maybe share some insights? Thanks

Aragonski97 commented 2 months ago

Hi @ericnesschrw ,

async def consume(self) -> Any:
     assert self.consumer_topics is not None
     received = await self._event_loop.run_in_executor(None, self.consumer.poll, 0.1)
     return received

Does not return anything, but

async def consume(self) -> Any:
     assert self.consumer_topics is not None
     received = self.consumer.poll(0.1)
     return received

returns what is expected. I am profoundly confused as to why this might be the case. Debugging async app was notoriously hard for me, but this one just leaves me in the dirt... If you have any suggestions, I'd appreciate it. I'm running python 3.12 version.