airtai / faststream

FastStream is a powerful and easy-to-use Python framework for building asynchronous services interacting with event streams such as Apache Kafka, RabbitMQ, NATS and Redis.
https://faststream.airt.ai/latest/
Apache License 2.0
2.14k stars 101 forks source link

Complex bug with rabbitbroker (ChannelInvalidStateError, CancelledError) #1487

Open dearkafka opened 1 month ago

dearkafka commented 1 month ago

Describe the bug
So I have a complex setup, I have a RabbitBroker, Header Exchange and very long process in the worker, like 10-20 minutes, ML related. So when training finished, its time to publish task from subscriber to same exchange but different header. there it fails with errors:

Traceback (most recent call last):
  File "/miniconda/envs/ml/lib/python3.11/site-packages/faststream/broker/middlewares/base.py", line 102, in publish_scope
    result = await call_next(
             ^^^^^^^^^^^^^^^^
  File "/miniconda/envs/ml/lib/python3.11/site-packages/faststream/rabbit/publisher/producer.py", line 105, in publish
    r = await self._publish(
        ^^^^^^^^^^^^^^^^^^^^
  File "/miniconda/envs/ml/lib/python3.11/site-packages/faststream/rabbit/publisher/producer.py", line 189, in _publish
    return await exchange_obj.publish(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/miniconda/envs/ml/lib/python3.11/site-packages/aio_pika/exchange.py", line 194, in publish
    return await channel.basic_publish(
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/miniconda/envs/ml/lib/python3.11/site-packages/aiormq/channel.py", line 691, in basic_publish
    await drain_future
asyncio.exceptions.CancelledError

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/install/app/workers/train.py", line 234, in handle
    a = await broker.publish(
        ^^^^^^^^^^^^^^^^^^^^^
  File "/miniconda/envs/ml/lib/python3.11/site-packages/faststream/rabbit/broker/broker.py", line 609, in publish
    return await super().publish(
           ^^^^^^^^^^^^^^^^^^^^^^
  File "/miniconda/envs/ml/lib/python3.11/site-packages/faststream/broker/core/usecase.py", line 344, in publish
    return await publish(msg, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/miniconda/envs/ml/lib/python3.11/site-packages/faststream/broker/middlewares/base.py", line 116, in publish_scope
    await self.after_publish(err)
                             ^^^
UnboundLocalError: cannot access local variable 'err' where it is not associated with a value

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/miniconda/envs/ml/lib/python3.11/site-packages/faststream/broker/subscriber/usecase.py", line 338, in consume
    await h.call(
  File "/miniconda/envs/ml/lib/python3.11/site-packages/faststream/broker/subscriber/call_item.py", line 164, in call
    result = await call(message)
             ^^^^^^^^^^^^^^^^^^^
  File "/miniconda/envs/ml/lib/python3.11/site-packages/faststream/broker/middlewares/base.py", line 73, in consume_scope
    await self.after_consume(err)
  File "/miniconda/envs/ml/lib/python3.11/site-packages/faststream/broker/middlewares/base.py", line 54, in after_consume
    raise err
  File "/miniconda/envs/ml/lib/python3.11/site-packages/faststream/broker/middlewares/base.py", line 64, in consume_scope
    result = await call_next(await self.on_consume(msg))
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/miniconda/envs/ml/lib/python3.11/site-packages/faststream/broker/wrapper/call.py", line 201, in decode_wrapper
    return await func(msg)
           ^^^^^^^^^^^^^^^
  File "/miniconda/envs/ml/lib/python3.11/site-packages/fast_depends/use.py", line 146, in injected_wrapper
    r = await real_model.asolve(
        ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/miniconda/envs/ml/lib/python3.11/site-packages/fast_depends/core/model.py", line 525, in asolve
    response = await run_async(call, *final_args, **final_kwargs)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/miniconda/envs/ml/lib/python3.11/site-packages/fast_depends/utils.py", line 48, in run_async
    return await cast(Callable[P, Awaitable[T]], func)(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/miniconda/envs/ml/lib/python3.11/site-packages/faststream/utils/functions.py", line 53, in to_async_wrapper
    return await call_or_await(func, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/miniconda/envs/ml/lib/python3.11/site-packages/fast_depends/utils.py", line 48, in run_async
    return await cast(Callable[P, Awaitable[T]], func)(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/install/app/workers/train.py", line 246, in handle
    raise NackMessage()
faststream.exceptions.NackMessage: Message was nacked

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/miniconda/envs/ml/lib/python3.11/site-packages/faststream/broker/acknowledgement_watcher.py", line 194, in __reject
    await self.message.reject(**self.extra_options)
  File "/miniconda/envs/ml/lib/python3.11/site-packages/faststream/rabbit/message.py", line 54, in reject
    await pika_message.reject(requeue=requeue)
  File "/miniconda/envs/ml/lib/python3.11/site-packages/aio_pika/message.py", line 503, in reject
    await self.channel.basic_reject(
          ^^^^^^^^^^^^
  File "/miniconda/envs/ml/lib/python3.11/site-packages/aio_pika/message.py", line 414, in channel
    raise ChannelInvalidStateError
aiormq.exceptions.ChannelInvalidStateError

How to reproduce

"""Session worker for training"""

import asyncio
import gc
import os
import sys
import time

from typing import Optional
from uuid import uuid4

import numpy as np
import sentry_sdk
from faststream import BaseMiddleware, Context, ContextRepo, FastStream
from faststream.annotations import Logger
from faststream.exceptions import AckMessage, NackMessage, RejectMessage
from faststream.rabbit import (
    ExchangeType,
    RabbitBroker,
    RabbitExchange,
    RabbitMessage,
    RabbitQueue,
)
from loguru import logger
from PIL import Image
from sentry_sdk.integrations.loguru import LoguruIntegration

logger.remove()
logger.add(sys.stderr, backtrace=True, diagnose=True, colorize=True)

sentry_sdk.init(
    dsn=config.sentry_dsn,
    integrations=[
        LoguruIntegration(),
    ],
    traces_sample_rate=1.0,
    profiles_sample_rate=1.0,
    release="train",
    attach_stacktrace=True,
    request_bodies="always",
    send_default_pii=True,
)

exchange = RabbitExchange("exchange", type=ExchangeType.HEADERS)

class TrainWorker:
    def __init__(self):
        self.broker = RabbitBroker(
            url=config.rabbitmq_url,
            max_consumers=1,
        )

        self.queue = self.define_queue()
        self.handle = self.broker.subscriber(self.queue, exchange=exchange)(self.handle)

    def define_queue(self):
        args = {"x-match": "any"}
        args.update({"train_v1": 1})
        queue = RabbitQueue(
            "train_queue",
            bind_arguments=args,
            auto_delete=False,
        )
        logger.info(f"Queue {queue} defined")
        return queue

    async def handle(
        self,
        logger: Logger,
        task: SessionTask,
        msg: RabbitMessage,
        broker: RabbitBroker = Context(),
    ):
        logger.info(f"Received training task {task}")

        # logic logic logic

        logger.info("Training part finished!")

        try:
            logger.info("all_good")
            pack_task = PackTask(
                session_id=session_id,
                pack_name=pack_name,
                pack_result_id=pack_result_id,
                image_ids=image_ids,
                trained_model_id=trained_model_id,
            )
            logger.info(f"Created pack task {pack_task}")
            a = await broker.publish(
                pack_task,
                exchange=exchange,
                headers={"pack_v1": 1},
            )
            logger.info(
                f"Published pack task {pack_task} to inference queue pack {pack_name}, result: {a}"
            )
        except Exception as e:
            logger.error(f"Error in publishing session pack task: {e}")
            raise NackMessage()

        logger.info("Finished")

        raise AckMessage()

worker = TrainWorker()
app = FastStream(worker.broker)

asyncio.run(app.run())

Problem is, without this training logic - everything works. Code above works without fail. However I can not comprehend how this training logic that I have can disrupt connection to rabbit (?). Even in theory. because channel is suddenly closed. Should I add very high timeout to publish? This also something that has not happened in previous versions of faststream and than I tried Middlewares and it became like this.

and to be clear app does not exit

Environment
Running FastStream 0.5.10 with CPython 3.11.7 on Linux

dearkafka commented 1 month ago

Also rabbit is cluster

dearkafka commented 1 month ago

So for same processing code that I have, same time and resource consumption, for Running FastStream 0.4.3 with CPython 3.11.7 on Linux - it works. I have a hunch that > 5 does not work for me, but maybe I'm doing something wrong

Lancetnik commented 4 weeks ago

@dearkafka can you, please, check it with 0.5.11? I can't reproducer your exception now