Open TEH30P opened 3 years ago
I got same bug in await AIOKafkaConsumer.close()
.
I made some code review and find point where error message emits.
except (OSError, asyncio.TimeoutError, KafkaError) as err:
log.error('Unable connect to node with id %s: %s', node_id, err)
if group == ConnectionGroup.DEFAULT:
# Connection failures imply that our metadata is stale, so
# let's refresh
self.force_metadata_update() # <-- doing nothing
return None
else:
So i think the problem in this code line self.force_metadata_update()
- function AIOKafkaClient.force_metadata_update
not asynchronous but it returns Future
object (strange behaviour as i think):
def force_metadata_update(self):
# . . .
return asyncio.shield(self._md_update_fut)
... which means that simple call of this function doing nothing
Moreover I looked for calls of AIOKafkaClient.force_metadata_update
and i found await self.force_metadata_update()
at aiokafka/client.py:630 and so my guess was confirmed.
Also aiokafka/client.py:402 should not work.
def _on_connection_closed(self, conn, reason):
""" Callback called when connection is closed
"""
# Connection failures imply that our metadata is stale, so let's
# refresh
if reason == CloseReason.CONNECTION_BROKEN or \
reason == CloseReason.CONNECTION_TIMEOUT:
self.force_metadata_update() # <-- doing nothing
OK! Here is my workaround...
I wrapped AIOKafkaConsumer
method calls (meant awaits) with asyncio.wait_for
:
import logging as m_log
import sys as m_sys
import io as m_io
import os as m_os
import asyncio as m_aio
import aiokafka as m_ak
import signal as m_sig
gv_log: m_log.Logger = m_log.getLogger(__name__)
class MyConsumerCommitter:
def __init__(self, iv_consumer: m_ak.AIOKafkaConsumer, i_cache_file: str):
self.l_partition = dict()
self._consumer = iv_consumer
self._cache_file = i_cache_file
self._changed = False
def _save(self) -> None:
with m_io.open(self._cache_file, mode='r', encoding='utf-8', newline='\n') as v_rd:
l_line_new: list = []
l_partition_new: dict = self.l_partition.copy()
for v_line in v_rd:
l_line_parsed = v_line.split(maxsplit=2)
v_tp_committed = m_ak.TopicPartition(l_line_parsed[2], int(l_line_parsed[1]))
if v_tp_committed in self.l_partition:
l_line_parsed[0] = str(self.l_partition[v_tp_committed])
del l_partition_new[v_tp_committed]
l_line_new.append(str(' ').join(l_line_parsed))
l_line_new += \
[f'{v_offset} {v_tp_committed.partition} {v_tp_committed.topic}' \
for v_tp_committed, v_offset in l_partition_new.items()]
with m_io.open(self._cache_file, mode='w', encoding='utf-8', newline='\n') as v_wr:
v_wr.writelines(l_line_new)
def _load(self) -> None:
with m_io.open(self._cache_file, mode='r', encoding='utf-8', newline='\n') as v_rd:
for v_line in v_rd:
l_line_parsed = v_line.split(maxsplit=2)
v_tp_committed = m_ak.TopicPartition(l_line_parsed[2], int(l_line_parsed[1]))
if v_tp_committed in self.l_partition:
self.l_partition[v_tp_committed] = int(l_line_parsed[0])
def partition_assign(self, il_partition) -> None:
gv_log.info(f'Rebalancer: assigned {len(il_partition)}, {il_partition}')
self._changed = True
for v_tp in il_partition:
if v_tp not in self.l_partition:
self.l_partition[v_tp] = 0
self._load()
for v_tp in il_partition:
self._consumer.seek(v_tp, self.l_partition[v_tp] + 1)
def partition_revoke(self, il_partition) -> None:
gv_log.info(f'Rebalancer: revoked {len(il_partition)}, {il_partition}')
self._changed = True
for v_tp in il_partition:
del self.l_partition[v_tp]
def is_changed(self) -> bool:
if self._changed:
self._changed = False
return not self._changed
else:
return self._changed
def offset_set(self, iv_topic, iv_partition, iv_offset) -> None:
self.l_partition[m_ak.TopicPartition(iv_topic, iv_partition)] = iv_offset
self._save()
def offset_get(self, iv_topic, iv_partition) -> int:
return self.l_partition[m_ak.TopicPartition(iv_topic, iv_partition)]
gv_running: bool = True
def on_signal_exit():
gv_log.info("on signal to exit")
global gv_running
gv_running = False
async def consume():
global gv_running
v_loop = m_aio.get_running_loop()
v_loop.add_signal_handler(m_sig.SIGTERM, on_signal_exit)
v_loop.add_signal_handler(m_sig.SIGINT, on_signal_exit)
gv_log.info(m_sys.argv)
v_kc = m_ak.AIOKafkaConsumer(
bootstrap_servers=['kafka1.tz.local:9092']
, group_id='user1'
, security_protocol='SASL_PLAINTEXT'
, sasl_mechanism='PLAIN'
, sasl_plain_username='user1'
, sasl_plain_password='mUATUysl9BxpbdI'
, auto_offset_reset='earliest'
, enable_auto_commit=False)
try:
gv_log.info(f'{__file__} Begin')
v_co_cache = MyConsumerCommitter(v_kc, m_os.path.splitext(__file__)[0] + '.commit.txt')
gv_log.debug(f'Kafka: +start')
await v_kc.start()
gv_log.debug(f'Kafka: -start')
v_kc.subscribe(topics=['simple-01'],
listener=type('', (m_ak.ConsumerRebalanceListener,), {
'on_partitions_assigned': lambda self, assigned: v_co_cache.partition_assign(assigned),
'on_partitions_revoked': lambda self, revoked: v_co_cache.partition_revoke(revoked)
})()
)
l_end_offset: dict = {}
gv_log.info(f'Init done')
if v_co_cache.is_changed():
gv_log.info(f'Kafka: on rebalance [early]')
for v_tp, v_offs in v_co_cache.l_partition.items():
v_kc.seek(v_tp, v_offs + 1)
v_cnt: int = 10
while gv_running and v_cnt:
gv_log.debug(f'+loop')
v_cnt -= 1
gv_log.debug(f'Kafka: +getone')
v_msg = await m_aio.wait_for(v_kc.getone(), timeout=60)
gv_log.debug(f'Kafka: -getone')
if v_co_cache.is_changed():
gv_log.info(f'Kafka: on rebalance')
for v_tp, v_offs in v_co_cache.l_partition.items():
gv_log.debug(f'Kafka: seek')
v_kc.seek(v_tp, v_offs + 1)
continue
gv_log.info(f': consumed {v_msg.partition}:{v_msg.offset}')
with m_io.open(m_os.path.splitext(__file__)[0] + f'.{v_msg.offset % 10}.msg', mode='w+b') as v_wr:
v_wr.write(bytes(f'{len(v_msg.key)}\n', encoding='utf-8') + v_msg.key)
v_wr.write(bytes(f'\n{len(v_msg.value)}\n', encoding='utf-8') + v_msg.value)
v_co_cache.offset_set('simple-01', v_msg.partition, v_msg.offset)
await m_aio.sleep(1)
gv_log.debug(f': -loop [{gv_running}]')
except BaseException as x:
gv_log.exception(x.__repr__())
raise
finally:
try:
gv_log.info(f'Kafka: +stop')
await m_aio.wait_for(v_kc.stop(), timeout=60)
finally:
gv_log.info(f'Kafka: -stop')
gv_log.info(f'End')
m_log.basicConfig(
level=m_log.DEBUG,
handlers=(m_logh.WatchedFileHandler(m_os.extsep.join(__file__.split(m_os.extsep)[:-1] + ['log']), mode='w'),))
m_aio.run(consume())
I updated source code at aiokafka/client.py:458:
except (OSError, asyncio.TimeoutError, KafkaError) as err:
log.error('Unable connect to node with id %s: %s', node_id, err)
if group == ConnectionGroup.DEFAULT:
# Connection failures imply that our metadata is stale, so
# let's refresh
self.force_metadata_update() # <--
return None
to:
except (OSError, asyncio.TimeoutError, KafkaError) as err:
log.error('Unable connect to node with id %s: %s', node_id, err)
if group == ConnectionGroup.DEFAULT:
# Connection failures imply that our metadata is stale, so
# let's refresh
await self.force_metadata_update()
return None
If i don't i will get infinite loop in AIOKafkaConsumer.stop()
even if i wraps it with asyncio.wait_for
.
If I'm not mistaken, there is a similar unawaited function call here 😞
@TEH30P hi. i do research connected with zombie consumers after problem with network and maybe your issue is related with it.
I have try to use your snippet to reproduce but all work is fine. after 60 secound I have got
v_msg = await m_aio.wait_for(v_kc.getone(), timeout=30)
File "/usr/local/Cellar/python@3.9/3.9.2_1/Frameworks/Python.framework/Versions/3.9/lib/python3.9/asyncio/tasks.py", line 494, in wait_for
raise exceptions.TimeoutError() from exc
asyncio.exceptions.TimeoutError
could u explain how do u simulate "network was down during"?
About "which means that simple call of this function doing nothing" it's not actually true. In the force_metadata_update
function client set new future to self._md_update_fut
and resolve self._md_update_waiter
future.
def force_metadata_update(self):
if self._md_update_fut is None:
if not self._md_update_waiter.done():
self._md_update_waiter.set_result(None)
self._md_update_fut = self._loop.create_future()
this attribute are using into _md_synchronizer
function. this function run together with consumer creating as background task
if self._sync_task is None:
# starting metadata synchronizer task
self._sync_task = create_task(self._md_synchronizer())
async def _md_synchronizer(self):
while True:
await asyncio.wait(
[self._md_update_waiter],
timeout=self._metadata_max_age_ms / 1000)
ret = await self._metadata_update(self.cluster, topics)
so, this function will run self._metadata_update
method each self._metadata_max_age_ms / 1000
or when u resolve ur feature self._md_update_waiter
. force_metadata_update
without await
do update of metadata but in beckground
@Arfey @TEH30P
def force_metadata_update(self):
"""Update cluster metadata
Returns:
True/False - metadata updated or not
"""
if self._md_update_fut is None:
# Wake up the `_md_synchronizer` task
if not self._md_update_waiter.done():
self._md_update_waiter.set_result(None) # <----------- this is not nothing, it wakes a task to update metadata
self._md_update_fut = self._loop.create_future()
# Metadata will be updated in the background by syncronizer
return asyncio.shield(self._md_update_fut)
Not awaiting force_metadata_update()
et al (including add_topic()
and the like) does not necessarily appear to be a bug, it wakes the _md_synchronizer
task, not awaiting the future just means not waiting for it to finish, it's not like failing to await
a coroutine which means the coroutine is never scheduled to run.
See the following code:
async def _md_synchronizer(self):
"""routine (async task) for synchronize cluster metadata every
`metadata_max_age_ms` milliseconds"""
while True:
await asyncio.wait(
[self._md_update_waiter],
timeout=self._metadata_max_age_ms / 1000) # <---- this should wake periodically
topics = self._topics
if self._md_update_fut is None:
self._md_update_fut = create_future() # <--- this should ensure next call to force_metadata_update() wakes us, whether it is awaited or not
ret = await self._metadata_update(self.cluster, topics) # <---- this should update metadata
# If list of topics changed during metadata update we must update
# it again right away.
if topics != self._topics:
continue
# Earlier this waiter was set before sending metadata_request,
# but that was to avoid topic list changes being unnoticed, which
# is handled explicitly now.
self._md_update_waiter = create_future()
self._md_update_fut.set_result(ret)
self._md_update_fut = None
Hosts should eventually get shuffled, as i understand it.
Can you still repro this?
If you ctrl-C when getone()
is stuck, can you get a backtrace and see where it is stuck?
Not sure if I am reproducing this issue (could it be the cause of #625 ?), but I will try the following approach to wrap consumer.getone()
and see if this is happening, and why
import asyncio
import logging
logging.basicConfig(level=logging.DEBUG)
_log = logging.getLogger()
async def task() -> None:
loop = asyncio.get_running_loop()
forever = loop.create_future()
await forever # simulating a hang
async def main() -> None:
t = asyncio.create_task(task()) # imagine this is getone()
try:
await asyncio.wait_for(t, timeout=2) # set your timeout
except TimeoutError as e:
t.cancel()
try:
await t
except BaseException as e:
_log.exception('backtrace for getone() when it timed out')
asyncio.run(main())
I'm having similar error, Kafka cluster set in GCP by Confluent Cloud, we're noticing occasional "Unable connect to node with id X" error but today we run into the infinite loop of those until our process was restarted. Here are logs from the process:
2024-02-22 02:23:36: [ERROR] Unable connect to node with id 3:
2024-02-22 02:23:36: [WARN] Got error produce response: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 3).
2024-02-22 02:23:36: [ERROR] Unable connect to node with id 2: KafkaConnectionError: Connection at b2-XXX.europe-north1.gcp.confluent.cloud:9092 closed
2024-02-22 02:23:36: [WARN] Got error produce response: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 2).
2024-02-22 02:23:36: [ERROR] Unable connect to node with id 0: KafkaConnectionError: Connection at b0-XXX.europe-north1.gcp.confluent.cloud:9092 closed
2024-02-22 02:23:36: [WARN] Got error produce response: NodeNotReadyError: Attempt to send a request to node which is not ready (node id 0).
2024-02-22 02:23:36: [ERR] Unable connect to node with id 4: KafkaConnectionError: Connection at b4-XXX.europe-north1.gcp.confluent.cloud:9092 closed
2024-02-22 02:23:36: [ERR] Unable connect to node with id 3: KafkaConnectionError: Connection at b3-XXX.europe-north1.gcp.confluent.cloud:9092 closed
2024-02-22 02:23:36: [ERR] Unable connect to node with id 5: KafkaConnectionError: Connection at b5-XXX.europe-north1.gcp.confluent.cloud:9092 closed
2024-02-22 02:23:36: [ERR] Unable to update metadata from [4, 3, 5, 2, 1, 0]
Notice that first error log is missing stringified err
(so it's probably an instance of OSError
) and next logs comes probably from: https://github.com/aio-libs/aiokafka/blob/7544add039d608a94dfcc80ee150c3beb450baed/aiokafka/conn.py#L487-L489
It last 10 hours and produced 915k errors. We're using .getmany(1000)
with settings:
compression_type="lz4",
connections_max_idle_ms=60 * 1000,
acks="all",
enable_idempotence=True,
Notice that first error log is missing stringified err (so it's probably an instance of OSError) and next logs comes probably from:
It's certainly a bug, and it's easy to fix it
Looks like this is still an issue
Describe the bug
If network was down during
await AIOKafkaConsumer.getone
theawait
will never get done. Even wrapping inasyncio.wait_for
doesn't help.Expected behaviour
Some kind of exception should happen.
Environment (please complete the following information):
python -c "import kafka; print(kafka.__version__)"
):kafka-topics.sh --version
):Reproducible example
Part of the log with messages which were created at start of the infinite loop: