robinhood / faust

Python Stream Processing
Other
6.72k stars 535 forks source link

faust.exceptions.ProducerSendError: Error while sending: KafkaTimeoutError() #729

Open kamilkozik opened 3 years ago

kamilkozik commented 3 years ago

Checklist

Steps to reproduce

Kafka (bitnami/kafka:2.7.0) config:

MESSAGE_MAX_BYTES=6291456 MAX_REQUEST_SIZE=6291456 MAX_PARTITION_FETCH_BYTES=6291456

Faust app config:

consumer_max_fetch_size=6291456 producer_max_request_size=6291456 producer_request_timeout=300

Prepare Kafka + Zookeeper stack + 2 services:

  1. Service for downloading html page
  2. Service for scraping URLs and passing to 1st service

Let start the stack. Produce input messages (such amount it'll run at least for 30 minutes). Messages max size 6MB (for sure bigger than default 1MB).

Expected behavior

Service is operating without any interruption like PrducerSendError

Actual behavior

After run facing

faust.exceptions.ProducerSendError: Error while sending: KafkaTimeoutError()

Full traceback

[2021-08-02 09:16:52,586] [19] [ERROR] Task exception was never retrieved
future: <Task finished name='Task-62580' coro=<Topic.send() done, defined at /usr/local/lib/python3.9/site-packages/faust/topics.py:155> exception=ProducerSendError('Error while sending: KafkaTimeoutError()')>
Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/faust/transport/drivers/aiokafka.py", line 1053, in send
    return cast(Awaitable[RecordMetadata], await producer.send(
  File "/usr/local/lib/python3.9/site-packages/aiokafka/producer/producer.py", line 319, in send
    fut = await message_accumulator.add_message(
  File "/usr/local/lib/python3.9/site-packages/aiokafka/producer/message_accumulator.py", line 341, in add_message
    raise KafkaTimeoutError()
kafka.errors.KafkaTimeoutError: KafkaTimeoutError

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.9/site-packages/faust/topics.py", line 184, in send
    return await self._send_now(
  File "/usr/local/lib/python3.9/site-packages/faust/channels.py", line 300, in _send_now
    return await self.publish_message(
  File "/usr/local/lib/python3.9/site-packages/faust/topics.py", line 413, in publish_message
    fut2 = cast(asyncio.Future, await producer.send(
  File "/usr/local/lib/python3.9/site-packages/faust/transport/drivers/aiokafka.py", line 1062, in send
    raise ProducerSendError(f'Error while sending: {exc!r}') from exc
faust.exceptions.ProducerSendError: Error while sending: KafkaTimeoutError()

Versions

docker-compose.txt kafka_service.log zookeeper.log

bobh66 commented 3 years ago

This project appears to have been abandoned.

You might want to check out the fork of this project - https://github.com/faust-streaming/faust

It has a bunch of fixes merged for problems that were in the base project.