aio-libs / aiokafka

asyncio client for kafka
http://aiokafka.readthedocs.io/
Apache License 2.0
1.17k stars 234 forks source link

KafkaError: Unexpected error during batch delivery #907

Open hexesisb opened 1 year ago

hexesisb commented 1 year ago

Describe the bug After some number of producer.send_and_wait(topic,message) (it could be 15k successfully sended messages), I've got:

ERROR    Unexpected error in sender routine                                                                                                                                              sender.py:167
                    ╭───────────────────────────────────────────────────────────────────── Traceback (most recent call last) ─────────────────────────────────────────────────────────────────────╮              
                    │ /home/username/Folder/aiohttp/aiohttp/lib/python3.10/site-packages/aiokafka/producer/sender.py:155 in _sender_routine                                                          │              
                    │                                                                                                                                                                             │              
                    │   152 │   │   │   │   # done tasks should never produce errors, if they are it's a                                                                                          │              
                    │   153 │   │   │   │   # bug                                                                                                                                                 │              
                    │   154 │   │   │   │   for task in done:                                                                                                                                     │              
                    │ ❱ 155 │   │   │   │   │   task.result()                                                                                                                                     │              
                    │   156 │   │   │   │                                                                                                                                                         │              
                    │   157 │   │   │   │   tasks -= done                                                                                                                                         │              
                    │   158                                                                                                                                                                       │              
                    │                                                                                                                                                                             │              
                    │ /home/username/Folder/aiohttp/aiohttp/lib/python3.10/site-packages/aiokafka/producer/sender.py:260 in _send_produce_req                                                        │              
                    │                                                                                                                                                                             │              
                    │   257 │   │   t0 = time.monotonic()                                                                                                                                         │              
                    │   258 │   │                                                                                                                                                                 │              
                    │   259 │   │   handler = SendProduceReqHandler(self, batches)                                                                                                                │              
                    │ ❱ 260 │   │   await handler.do(node_id)                                                                                                                                     │              
                    │   261 │   │                                                                                                                                                                 │              
                    │   262 │   │   # if batches for node is processed in less than a linger seconds                                                                                              │              
                    │   263 │   │   # then waiting for the remaining time                                                                                                                         │              
                    │                                                                                                                                                                             │              
                    │ /home/username/Folder/aiohttp/aiohttp/lib/python3.10/site-packages/aiokafka/producer/sender.py:716 in do                                                                       │              
                    │                                                                                                                                                                             │              
                    │   713 │   async def do(self, node_id):                                                                                                                                      │              
                    │   714 │   │   request = self.create_request()                                                                                                                               │              
                    │   715 │   │   try:                                                                                                                                                          │              
                    │ ❱ 716 │   │   │   response = await self._client.send(node_id, request)                                                                                                      │              
                    │   717 │   │   except KafkaError as err:                                                                                                                                     │              
                    │   718 │   │   │   log.warning(                                                                                                                                              │              
                    │   719 │   │   │   │   "Got error produce response: %s", err)                                                                                                                │              
                    │                                                                                                                                                                             │              
                    │ /home/username/Folder/aiohttp/aiohttp/lib/python3.10/site-packages/aiokafka/client.py:504 in send                                                                              │              
                    │                                                                                                                                                                             │              
                    │   501 │   │   │   │   request.required_acks == 0:                                                                                                                           │              
                    │   502 │   │   │   expect_response = False                                                                                                                                   │              
                    │   503 │   │                                                                                                                                                                 │              
                    │ ❱ 504 │   │   future = self._conns[(node_id, group)].send(                                                                                                                  │              
                    │   505 │   │   │   request, expect_response=expect_response)                                                                                                                 │              
                    │   506 │   │   try:                                                                                                                                                          │              
                    │   507 │   │   │   result = await future                                                                                                                                     │              
                    │                                                                                                                                                                             │              
                    │ /home/username/Folder/aiohttp/aiohttp/lib/python3.10/site-packages/aiokafka/conn.py:444 in send                                                                                │              
                    │                                                                                                                                                                             │              
                    │   441 │   │   │   │   f"Connection at {self._host}:{self._port} broken: {err}"                                                                                              │              
                    │   442 │   │   │   )                                                                                                                                                         │              
                    │   443 │   │                                                                                                                                                                 │              
                    │ ❱ 444 │   │   self.log.debug(                                                                                                                                               │              
                    │   445 │   │   │   '%s Request %d: %s', self, correlation_id, request)                                                                                                       │              
                    │   446 │   │                                                                                                                                                                 │              
                    │   447 │   │   if not expect_response:                                                                                                                                       │              
                    │                                                                                                                                                                             │              
                    │ /usr/lib/python3.10/logging/__init__.py:1465 in debug                                                                                                                       │              
                    │                                                                                                                                                                             │              
                    │   1462 │   │   logger.debug("Houston, we have a %s", "thorny problem", exc_info=1)                                                                                          │              
                    │   1463 │   │   """                                                                                                                                                          │              
                    │   1464 │   │   if self.isEnabledFor(DEBUG):                                                                                                                                 │              
                    │ ❱ 1465 │   │   │   self._log(DEBUG, msg, args, **kwargs)                                                                                                                    │              
                    │   1466 │                                                                                                                                                                    │              
                    │   1467 │   def info(self, msg, *args, **kwargs):                                                                                                                            │              
                    │   1468 │   │   """                                                                                                                                                          │              
                    │                                                                                                                                                                             │              
                    │ /usr/lib/python3.10/logging/__init__.py:1624 in _log                                                                                                                        │              
                    │                                                                                                                                                                             │              
                    │   1621 │   │   │   │   exc_info = sys.exc_info()                                                                                                                            │              
                    │   1622 │   │   record = self.makeRecord(self.name, level, fn, lno, msg, args,                                                                                               │              
                    │   1623 │   │   │   │   │   │   │   │    exc_info, func, extra, sinfo)                                                                                                       │              
                    │ ❱ 1624 │   │   self.handle(record)                                                                                                                                          │              
                    │   1625 │                                                                                                                                                                    │              
                    │   1626 │   def handle(self, record):                                                                                                                                        │              
                    │   1627 │   │   """                                                                                                                                                          │              
                    │                                                                                                                                                                             │              
                    │ /usr/lib/python3.10/logging/__init__.py:1634 in handle                                                                                                                      │              
                    │                                                                                                                                                                             │              
                    │   1631 │   │   well as those created locally. Logger-level filtering is applied.                                                                                            │              
                    │   1632 │   │   """                                                                                                                                                          │              
                    │   1633 │   │   if (not self.disabled) and self.filter(record):                                                                                                              │              
                    │ ❱ 1634 │   │   │   self.callHandlers(record)                                                                                                                                │              
                    │   1635 │                                                                                                                                                                    │              
                    │   1636 │   def addHandler(self, hdlr):                                                                                                                                      │              
                    │   1637 │   │   """                                                                                                                                                          │              
                    │                                                                                                                                                                             │              
                    │ /usr/lib/python3.10/logging/__init__.py:1696 in callHandlers                                                                                                                │              
                    │                                                                                                                                                                             │              
                    │   1693 │   │   │   for hdlr in c.handlers:                                                                                                                                  │              
                    │   1694 │   │   │   │   found = found + 1                                                                                                                                    │              
                    │   1695 │   │   │   │   if record.levelno >= hdlr.level:                                                                                                                     │              
                    │ ❱ 1696 │   │   │   │   │   hdlr.handle(record)                                                                                                                              │              
                    │   1697 │   │   │   if not c.propagate:                                                                                                                                      │              
                    │   1698 │   │   │   │   c = None    #break out                                                                                                                               │              
                    │   1699 │   │   │   else:                                                                                                                                                    │              
                    │                                                                                                                                                                             │              
                    │ /usr/lib/python3.10/logging/__init__.py:968 in handle                                                                                                                       │              
                    │                                                                                                                                                                             │              
                    │    965 │   │   if rv:                                                                                                                                                       │              
                    │    966 │   │   │   self.acquire()                                                                                                                                           │              
                    │    967 │   │   │   try:                                                                                                                                                     │              
                    │ ❱  968 │   │   │   │   self.emit(record)                                                                                                                                    │              
                    │    969 │   │   │   finally:                                                                                                                                                 │              
                    │    970 │   │   │   │   self.release()                                                                                                                                       │              
                    │    971 │   │   return rv                                                                                                                                                    │              
                    │                                                                                                                                                                             │              
                    │ /home/username/Folder/aiohttp/aiohttp/lib/python3.10/site-packages/rich/logging.py:159 in emit                                                                                 │              
                    │                                                                                                                                                                             │              
                    │   156 │   │   │   │   │   record.asctime = formatter.formatTime(record, formatter.datefmt)                                                                                  │              
                    │   157 │   │   │   │   message = formatter.formatMessage(record)                                                                                                             │              
                    │   158 │   │                                                                                                                                                                 │              
                    │ ❱ 159 │   │   message_renderable = self.render_message(record, message)                                                                                                     │              
                    │   160 │   │   log_renderable = self.render(                                                                                                                                 │              
                    │   161 │   │   │   record=record, traceback=traceback, message_renderable=message_renderable                                                                                 │              
                    │   162 │   │   )                                                                                                                                                             │              
                    │                                                                                                                                                                             │              
                    │ /home/username/Folder/aiohttp/aiohttp/lib/python3.10/site-packages/rich/logging.py:185 in render_message                                                                       │              
                    │                                                                                                                                                                             │              
                    │   182 │   │   │   ConsoleRenderable: Renderable to display log message.                                                                                                     │              
                    │   183 │   │   """                                                                                                                                                           │              
                    │   184 │   │   use_markup = getattr(record, "markup", self.markup)                                                                                                           │              
                    │ ❱ 185 │   │   message_text = Text.from_markup(message) if use_markup else Text(message)                                                                                     │              
                    │   186 │   │                                                                                                                                                                 │              
                    │   187 │   │   highlighter = getattr(record, "highlighter", self.highlighter)                                                                                                │              
                    │   188 │   │   if highlighter:                                                                                                                                               │              
                    │                                                                                                                                                                             │              
                    │ /home/username/Folder/aiohttp/aiohttp/lib/python3.10/site-packages/rich/text.py:268 in from_markup                                                                             │              
                    │                                                                                                                                                                             │              
                    │    265 │   │   """                                                                                                                                                          │              
                    │    266 │   │   from .markup import render                                                                                                                                   │              
                    │    267 │   │                                                                                                                                                                │              
                    │ ❱  268 │   │   rendered_text = render(text, style, emoji=emoji, emoji_variant=emoji_variant)                                                                                │              
                    │    269 │   │   rendered_text.justify = justify                                                                                                                              │              
                    │    270 │   │   rendered_text.overflow = overflow                                                                                                                            │              
                    │    271 │   │   rendered_text.end = end                                                                                                                                      │              
                    │                                                                                                                                                                             │              
                    │ /home/username/Folder/aiohttp/aiohttp/lib/python3.10/site-packages/rich/markup.py:161 in render                                                                                │              
                    │                                                                                                                                                                             │              
                    │   158 │   │   │   │   │   try:                                                                                                                                              │              
                    │   159 │   │   │   │   │   │   start, open_tag = pop_style(style_name)                                                                                                       │              
                    │   160 │   │   │   │   │   except KeyError:                                                                                                                                  │              
                    │ ❱ 161 │   │   │   │   │   │   raise MarkupError(                                                                                                                            │              
                    │   162 │   │   │   │   │   │   │   f"closing tag '{tag.markup}' at position {position} doesn't                                                                               │              
                    │       match any open tag"                                                                                                                                                   │              
                    │   163 │   │   │   │   │   │   ) from None                                                                                                                                   │              
                    │   164 │   │   │   │   else:  # implicit close                                                                                                                               │              
                    ╰─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯              
                    MarkupError: closing tag                                                                                                                                                                     
                    '[/e`\x00\x00\x00\x00\x00\x00\x00\x00\x01\x89N\xe3j&\x00\x00\x01\x89N\xe3j&\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x00\x01\x96\x0e\x00\x00\x00\x01\x88              
                    \x0e{"data": {"snapshot": true, "bi...'))]' at position 277 doesn't match any open tag                                                                                                       
[17:54:38] ERROR    Ошибка при отправке сообщения в Kafka: KafkaError: Unexpected error during batch delivery           

Environment (please complete the following information):

Reproducible example


        self.producer = AIOKafkaProducer(
            loop=self.loop,
            bootstrap_servers=config['kafka_arress'], 
            client_id='Bot',
            request_timeout_ms=500, 
            connections_max_idle_ms=5000,
            value_serializer=lambda m: json.dumps(m).encode('utf-8')
        )

    async def send(self, topic, message):
        try:
            # Produce message
            await self.producer.send_and_wait(topic,message)
        except Exception as e:
            log.error("Ошибка при отправке сообщения в Kafka: %s", e)
            await self.producer.start()
hexesisb commented 1 year ago
                    MarkupError: closing tag                                                                                                                                                                     
                    '[/\x00\x00\x00\x00\x00\x00\x00\x00\x01\x89c\xb6hA\x00\x00\x01\x89c\xb6hA\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\x00\x00\x00\x00\x00\x00\x00\x01\xac\x0e\x00\x00\x00\x01\x9e\x              
                    0e{"data": {"snapshot": true, "bi...'))]' at position 2474 doesn't match any open tag                                                                                                        
[18:57:30] ERROR    Unexpected exception in AIOKafkaConnection                                                                                                                                        conn.py:385
                    ╭────────────────────────────────────────────────────────────────────── Traceback (most recent call last) ──────────────────────────────────────────────────────────────────────╮            
                    │ /home/username/Folder/aiohttp/aiohttp/lib/python3.10/site-packages/aiokafka/conn.py:382 in _on_read_task_error                                                                   │            
                    │                                                                                                                                                                               │            
                    │   379 │   │   │   return                                                                                                                                                      │            
                    │   380 │   │                                                                                                                                                                   │            
                    │   381 │   │   try:                                                                                                                                                            │            
                    │ ❱ 382 │   │   │   read_task.result()                                                                                                                                          │            
                    │   383 │   │   except Exception as exc:                                                                                                                                        │            
                    │   384 │   │   │   if not isinstance(exc, (OSError, EOFError, ConnectionError)):                                                                                               │            
                    │   385 │   │   │   │   cls.log.exception("Unexpected exception in AIOKafkaConnection")                                                                                         │            
                    │                                                                                                                                                                               │            
                    │ /home/username/Folder/aiohttp/aiohttp/lib/python3.10/site-packages/aiokafka/conn.py:533 in _read                                                                                 │            
                    │                                                                                                                                                                               │            
                    │   530 │   │   │   self = self_ref()                                                                                                                                           │            
                    │   531 │   │   │   if self is None:                                                                                                                                            │            
                    │   532 │   │   │   │   return                                                                                                                                                  │            
                    │ ❱ 533 │   │   │   self._handle_frame(resp)                                                                                                                                    │            
                    │   534 │   │   │   del self                                                                                                                                                    │            
                    │   535 │                                                                                                                                                                       │            
                    │   536 │   def _handle_frame(self, resp):                                                                                                                                      │            
                    │                                                                                                                                                                               │            
                    │ /home/username/Folder/aiohttp/aiohttp/lib/python3.10/site-packages/aiokafka/conn.py:537 in _handle_frame                                                                         │            
                    │                                                                                                                                                                               │            
                    │   534 │   │   │   del self                                                                                                                                                    │            
                    │   535 │                                                                                                                                                                       │            
                    │   536 │   def _handle_frame(self, resp):                                                                                                                                      │            
                    │ ❱ 537 │   │   correlation_id, resp_type, fut = self._requests[0]                                                                                                              │            
                    │   538 │   │                                                                                                                                                                   │            
                    │   539 │   │   if correlation_id is None:  # Is a SASL packet, just pass it though                                                                                             │            
                    │   540 │   │   │   if not fut.done():                                                                                                                                          │            
                    ╰───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯            
                    **IndexError: deque index out of range**    `

Which index? How to reset it?

ods commented 1 year ago

Which index?

self._requests is a deque, so self._requests[0] raises IndexError when it's empty.

hexesisb commented 1 year ago

Which index?

self._requests is a deque, so self._requests[0] raises IndexError when it's empty.

How is it happening? I send more than 15k messages, after that I've got this error. And it's happening each time. What should I do to solve this issue?

hexesisb commented 1 year ago

Also I can't understand how to reconnect or how to check is connection is alive?