aio-libs / aiokafka

asyncio client for kafka
http://aiokafka.readthedocs.io/
Apache License 2.0
1.16k stars 233 forks source link

[QUESTION] Graceful shutdown #819

Closed moritz89 closed 2 years ago

moritz89 commented 2 years ago

What is the recommended method to perform a graceful shutdown for consumers (and producers)?

Would a simple global variable / Django signal at the end of the loop suffice?

In the field, when K8s pods are replaced, the process is killed with a SIGINT. Handling that signal is possible in Python, the question is, how can the async consumer iterator be stopped, so that the current message is allowed to be finished processing and the consumer then shutdown? The finally will be called which allows the consumer.stop() to be executed, but the result of the current message would be undefined behavior.

The code handling the message processing is designed to handle at least once semantics, however, reducing the need to recover from such instances would reduce latency and improve through put especially in the case of continuous deployment.

ods commented 2 years ago

The best way to guarantee producer or consumer is properly closed is to use async with AIOKafkaProducer(…) as producer:/async with AIOKafkaConsumer(…) as consumer: pattern.

AFAIK, k8s uses SIGTERM, so you have to install a handler for it. And in case you run your code via shell script, beware it doesn't propagate signals by default.

ods commented 2 years ago

[…] how can the async consumer iterator be stopped, so that the current message is allowed to be finished processing and the consumer then shutdown?

  1. Create a global asyncio.Event() object.
  2. Set it from signal handler
  3. Use while True loop instead of async for
  4. Wait for new message (consumer.getone()) or stop event is set, which comes first, with asyncio.wait().
  5. Break the loop if stop event is set.
moritz89 commented 2 years ago

Thanks for the pointers! As a reference for others, propagating the signal from a shell script is described by Veithen here.

ods commented 2 years ago

As a reference for others, propagating the signal from a shell script is described by Veithen here.

Nowadays it seems to be much simpler: --init option to docker run or init: true line in docker-compose.yml.

jainal09 commented 1 year ago

Can anyone share a demo python code snippet for this. I am really struggling to implement this. Is this POC relevant

import asyncio
import logging
import signal
from aiokafka import AIOKafkaConsumer
logger = logging.getLogger(__name__)

# Event will be set if the program needs to stop
stop_event = asyncio.Event()

def signal_handler(*_):
    print("Application received stop signal. Stopping...")
    stop_event.set()

signal.signal(signal.SIGINT, signal_handler)

async def consume_messages(stop_event):
    async with AIOKafkaConsumer(
            'my_topic',
            bootstrap_servers='localhost:9092',
            group_id='my_consumer_group',
            auto_offset_reset='earliest',
            enable_auto_commit=False,
    ) as consumer:
        print("Consumer started")
        while not stop_event.is_set():
            # try to fetch a message with a timeout
            try:
                message = await asyncio.wait_for(consumer.getone(), timeout=1.0)
                print(f"Received message: {message.value.decode('utf-8')}")
                await asyncio.sleep(10)
                await asyncio.wait_for(consumer.commit(), timeout=1.0)
                print("Committed message")

            except asyncio.TimeoutError:
                # no message fetched within 1 second
                continue

if __name__ == '__main__':
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s - [%(levelname)s] - [%(name)s] "
               "- %(filename)s:%(lineno)d - %(message)s",
    )
    asyncio.run(consume_messages(stop_event))
ods commented 1 year ago

Yes, your code is fine. Also when your handling is idempotent it's safe to raise an exception (usually SystemExit) from signal handler.

jainal09 commented 1 year ago

The problem that I observed with this code is, when I am spinning 4 containers of this same code or even start 4 python scripts of this same code, on a kafka topic with 4 partitions (4 is just random could be anything). Each time I start a new script, a rebalance occurs that is ideal but after 4 scripts start 1 or 2 scripts/ consumer are never assigned any partitions. I don't know why, because in normal scenarios whenever a consumer joins a consumer grp a rebalance occurs and the load should be distributed among all the consumers. Here, some consumers gets assigned to 2 partitions and some just sits idle to 0 partitions.