taskiq-python / taskiq

Distributed task queue with full async support
MIT License
866 stars 52 forks source link

Dependency Context Not Propagating to Broker in Task Execution #377

Open Tsovak opened 1 week ago

Tsovak commented 1 week ago

Issue: Dependency Context Not Propagating to Broker in Task Execution

Problem Description

I'm using a fastapi_app fixture along with InMemoryBroker and InmemoryResultBackend from taskiq. The setup works well for the FastAPI app itself, but tasks executed by TaskIQ encounter an error related to missing dependencies.

Here’s a snippet of my fastapi_app fixture:

@pytest.fixture
def fastapi_app(
    dbsession: AsyncSession,
    test_rmq_pool: Pool[Channel],
    fake_redis_pool: ConnectionPool,
    test_searcher: CustomSearcher,
    memory_file_system: MemoryFileSystem,
) -> FastAPI:
    from backend.web.application import get_app

    application = get_app()
    application.dependency_overrides[get_db_session] = lambda: dbsession
    return application  # noqa

And here’s the initialization of the broker:

from taskiq.brokers.inmemory_broker import InMemoryBroker, InmemoryResultBackend

result_backend = InmemoryResultBackend()
broker = InMemoryBroker().with_result_backend(result_backend)

In conftest.py, I also have the following fixture:

@pytest.fixture(autouse=True)
def init_taskiq_deps(fastapi_app: FastAPI):
    from backend.tkq import broker
    taskiq_fastapi.populate_dependency_context(broker, fastapi_app)

    yield

    broker.custom_dependency_context = {}

Issue Observed

Despite setting up the dependency context, task executions still fail with an error indicating missing dependencies. Here’s the error log:

{
  "is_err": true,
  "log": null,
  "return_value": null,
  "execution_time": 0.01,
  "labels": {
    "asana_task_id": "1111"
  },
  "error": {
    "exc_type": "AttributeError",
    "exc_message": [
      "'State' object has no attribute 'db_session_factory'"
    ],
    "exc_module": "builtins",
    "exc_cause": null,
    "exc_context": {
      "exc_type": "KeyError",
      "exc_message": [
        "db_session_factory"
      ],
      "exc_module": "builtins",
      "exc_cause": null,
      "exc_context": null,
      "exc_suppress_context": false
    },
    "exc_suppress_context": false
  }
}

Solution

The issue was resolved by explicitly updating the broker’s dependency overrides using fastapi_app.dependency_overrides. The updated code looks like this:

@pytest.fixture(autouse=True)
  def init_taskiq_deps(fastapi_app: FastAPI):
  from backend.tkq import broker
  taskiq_fastapi.populate_dependency_context(broker, fastapi_app)

  for k, v in fastapi_app.dependency_overrides.items():
      broker.dependency_overrides.update({k: v})

     yield

    broker.custom_dependency_context = {}

Suggested Improvement

It would be beneficial if the dependency context setup in populate_dependency_context could propagate fastapi_app overrides automatically to broker.dependency_overrides or if documentation could be expanded to clarify this requirement.