Open srisudarsan opened 6 months ago
Hello, we have an example using asyncio.
You have to use a different thread to poll (producer/consumer) or consume (consumer) and make sure all the callbacks, are wrapped with call_soon_threadsafe
to return to the event loop thread.
We'll be working on an easier to implement API for asyncio
and gevent
The example is great, but there's no async consumer demonstrated. The thing is, async consuming is much trickier than producing.
I've ended up writing this. [WARNING] This code only handles a subset of the possible states of a kafka consumer, and has only been used in a simple use-case (no consumer groups, no offset commits, no transactions...). There may be weird async edge-cases not properly handled. Use at your own risk. [/]
T = TypeVar("T")
V = TypeVar("V")
class AsyncKafkaConsumer:
def __init__(
self,
bootstrap_servers: str,
sasl_username: str,
sasl_password: str,
topic: str,
key_deserializer: Callable[[bytes], str],
value_deserializer: Callable[[bytes], V],
group_id: str | None = None,
offset_reset="earliest",
):
self.topic = topic
group_id = group_id or uuid_utils.uuid4()
self._consumer = Consumer(
{
# see: https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#pythonclient-configuration
"bootstrap.servers": bootstrap_servers,
"sasl.mechanism": "PLAIN",
"security.protocol": "SASL_SSL",
"sasl.username": sasl_username,
"sasl.password": sasl_password,
"error_cb": error_cb,
"auto.offset.reset": offset_reset,
"enable.auto.offset.store": False, # check what that mean
"group.id": group_id,
}
)
self._consumer.subscribe(
[topic], on_assign=lambda c, p: logger.info(f"Assigned partition {p}")
)
self._stop = False
self._key_deserializer = key_deserializer
self._value_deserializer = value_deserializer
self._loop = None
async def _run_in_executor(self, func: Callable[..., T], *args: Any) -> T:
return await self._loop.run_in_executor(None, func, *args)
async def consume(
self,
timeout_s: float | None = None,
loop: asyncio.AbstractEventLoop | None = None,
) -> AsyncIterable[tuple[str, V]]:
self._loop = loop or asyncio.get_running_loop()
timeout_s = timeout_s or float("inf")
stop_at = time.monotonic() + timeout_s
logger.debug("Starting consume")
# inspired from https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/consumer.py
# and https://github.com/Aiven-Open/karapace/blob/26e8354e55619f9e7498101ee5a97930a4991b64/karapace/kafka/consumer.py#L200
try:
while not self._stop and time.monotonic() < stop_at:
messages = await self._run_in_executor(self._consumer.consume, 100, 1.0)
for msg in messages:
if msg.error():
if msg.error().retriable():
logger.warning(f"Got {msg.error()} - Will retry")
continue
else:
logger.error(f"Got {msg.error()}")
raise KafkaException(msg.error())
else:
try:
key = self._key_deserializer(msg.key())
value = self._value_deserializer(msg.value())
except Exception as e:
logger.error(
f"Cannot deserialize message {msg=}", exc_info=e
)
continue
logger.debug(
f"Received record with key={key} from topic {msg.topic()} [{msg.partition()}] at offset {msg.offset()}"
)
yield key, value
await self._run_in_executor(self._consumer.store_offsets, msg)
finally:
self.stop()
def stop(self):
self._stop = True
self._consumer.close()
def error_cb(err):
"""The error callback is used for generic client errors.
These errors are generally to be considered informational as the
client will automatically try to recover from all errors, and no
extra action is typically required by the application. For this
example however, we terminate the application if the client is
unable to connect to any broker (_ALL_BROKERS_DOWN) and on
authentication errors (_AUTHENTICATION).
"""
logger.error(f"Client error: {err}")
if (
err.code() == KafkaError._ALL_BROKERS_DOWN
or err.code() == KafkaError._AUTHENTICATION
):
# Any exception raised from this callback will be re-raised from the
# triggering flush() or poll() call.
raise KafkaException(err)
Regarding the FastAPI part, we have something like this:
# you can hook this to your app with @app.on_event("startup")
# the UpdateNotifSubscriber below is a wrapper using the AsyncKafkaConsumer above.
# note that FastAPI's Depends() does not work for on_event() hooks
async def consume(sub: UpdateNotifSubscriber):
async for k, v in sub.consume():
await handle_update(v)
async def handle_update(update: UpdateMessage):
...
The thing is, async consuming is much trickier than producing.
Yes, it's infuriating how badly it is supported by Confluent
The example is great, but there's no async consumer demonstrated. The thing is, async consuming is much trickier than producing.
I've ended up writing this.
Interestingly we have*) very similar asyncio wrapper around confluent_kafka.Consumer
.
But instead of loop.run_in_executor
we use await asyncio.to_thread(consumer.consume, num_messages, timeout)
.
Rebalance callbacks we use in the following way:
assign_queue = asyncio.Queue[Sequence[TopicPartition]]()
def on_assign(_: Consumer, partitions: Sequence[ConfluentTopicPartition])
assign_queue.put_nowait(partitions)
consumer.subscribe(topics, on_assign=on_assign)
Then you can have separate task that awaits the queue of assigned partitions.
*) I cannot show full code since it's proprietary.
Postin @mdespriee's code with code highlighting
T = TypeVar("T")
V = TypeVar("V")
class AsyncKafkaConsumer:
def __init__(
self,
bootstrap_servers: str,
sasl_username: str,
sasl_password: str,
topic: str,
key_deserializer: Callable[[bytes], str],
value_deserializer: Callable[[bytes], V],
group_id: str | None = None,
offset_reset="earliest",
):
self.topic = topic
group_id = group_id or uuid_utils.uuid4()
self._consumer = Consumer(
{
# see: https://docs.confluent.io/platform/current/clients/confluent-kafka-python/html/index.html#pythonclient-configuration
"bootstrap.servers": bootstrap_servers,
"sasl.mechanism": "PLAIN",
"security.protocol": "SASL_SSL",
"sasl.username": sasl_username,
"sasl.password": sasl_password,
"error_cb": error_cb,
"auto.offset.reset": offset_reset,
"enable.auto.offset.store": False, # check what that mean
"group.id": group_id,
}
)
self._consumer.subscribe(
[topic], on_assign=lambda c, p: logger.info(f"Assigned partition {p}")
)
self._stop = False
self._key_deserializer = key_deserializer
self._value_deserializer = value_deserializer
self._loop = None
async def _run_in_executor(self, func: Callable[..., T], *args: Any) -> T:
return await self._loop.run_in_executor(None, func, *args)
async def consume(
self,
timeout_s: float | None = None,
loop: asyncio.AbstractEventLoop | None = None,
) -> AsyncIterable[tuple[str, V]]:
self._loop = loop or asyncio.get_running_loop()
timeout_s = timeout_s or float("inf")
stop_at = time.monotonic() + timeout_s
logger.debug("Starting consume")
# inspired from https://github.com/confluentinc/confluent-kafka-python/blob/master/examples/consumer.py
# and https://github.com/Aiven-Open/karapace/blob/26e8354e55619f9e7498101ee5a97930a4991b64/karapace/kafka/consumer.py#L200
try:
while not self._stop and time.monotonic() < stop_at:
messages = await self._run_in_executor(self._consumer.consume, 100, 1.0)
for msg in messages:
if msg.error():
if msg.error().retriable():
logger.warning(f"Got {msg.error()} - Will retry")
continue
else:
logger.error(f"Got {msg.error()}")
raise KafkaException(msg.error())
else:
try:
key = self._key_deserializer(msg.key())
value = self._value_deserializer(msg.value())
except Exception as e:
logger.error(
f"Cannot deserialize message {msg=}", exc_info=e
)
continue
logger.debug(
f"Received record with key={key} from topic {msg.topic()} [{msg.partition()}] at offset {msg.offset()}"
)
yield key, value
await self._run_in_executor(self._consumer.store_offsets, msg)
finally:
self.stop()
def stop(self):
self._stop = True
self._consumer.close()
def error_cb(err):
"""The error callback is used for generic client errors.
These errors are generally to be considered informational as the
client will automatically try to recover from all errors, and no
extra action is typically required by the application. For this
example however, we terminate the application if the client is
unable to connect to any broker (_ALL_BROKERS_DOWN) and on
authentication errors (_AUTHENTICATION).
"""
logger.error(f"Client error: {err}")
if (
err.code() == KafkaError._ALL_BROKERS_DOWN
or err.code() == KafkaError._AUTHENTICATION
):
# Any exception raised from this callback will be re-raised from the
# triggering flush() or poll() call.
raise KafkaException(err)
I created this pull-request https://github.com/confluentinc/confluent-kafka-python/pull/1448 which should make integration (especially the consumer part) with async frameworks like gevent, asyncio possible.
By exposing librdkafka event_io_enable
frameworks like asyncio can truly wait (without threads in an async way) for new messages.
Description
NOTE: This is a question and not an issue
I am building an application with FastAPI, this application will also need to consume messages from kafka. Thus we want to run the consumer asynchronously.
I read https://www.confluent.io/blog/kafka-python-asyncio-integration/ and understood that we can introduce async nature for confluent-kafka-plugin. While exploring other libraries, I stumbled on aiokafka, I am trying to understand the differences between the libraries and I found a specific issue with an interesting comment from one of the repo's members - https://github.com/aio-libs/aiokafka/issues/665#issuecomment-713091846
With this I am unable to understand if confluent-kafka-python offers complete async support as mentioned in https://github.com/confluentinc/confluent-kafka-python/issues/185
Can you please help me clarify this and unblock ?
How to reproduce
Checklist
Please provide the following information:
confluent_kafka.version()
andconfluent_kafka.libversion()
):{...}
'debug': '..'
as necessary)