taskiq-python / taskiq

Distributed task queue with full async support
MIT License
689 stars 44 forks source link

`AsyncTaskiqDecoratedTask().kicker().with_labels()` mutates task labels #301

Open donc310 opened 4 months ago

donc310 commented 4 months ago

Hello and thank you very much for your work on this project.

I currently have an issue where tasks submitted with custom labels mutates the underlying decorated task instance

@pytest.mark.anyio
async def test_with_label() -> None:
    trace_ids = {}

    class Tracer(TaskiqMiddleware):
        def pre_send(self, message: TaskiqMessage) -> TaskiqMessage | Coroutine[Any, Any, TaskiqMessage]:
            nonlocal trace_ids
            trace_ids[tuple(message.args)] = message.labels.get("trace_id")
            return super().pre_send(message)

    broker = InMemoryBroker().with_middlewares(Tracer())

    @broker.task()
    def run_task(a) -> int:
        return a

    task1 = await run_task.kicker().with_labels(trace_id="11111").kiq(1)
    task2 = await run_task.kicker().with_labels(trace_id="22222").kiq(2)
    task3 = await run_task.kiq(3)

    await task1.wait_result(timeout=1)
    await task2.wait_result(timeout=1)
    await task3.wait_result(timeout=1)

    assert trace_ids == {(1,): "11111", (2,): "22222", (3,): None}

For some context on what we were trying to achieve;

We were building a custom Sentry Integration for TaskIq that would add tracing meta-data as labels to queued messages, to achieve this we patched AsyncKicker.kiq and Receiver.run_taskmethods to add tracing info to message labels which would track tasks from when there were submitted to when a worker picks up and processes the message.

The above test case fails because after the second task is submitted all other subsequent tasks will have a trcae_id of 22222.

Our current workaround is to use a custom task class which doesn't mutate the original task labels

class CustomTask(AsyncTaskiqDecoratedTask):

    def kicker(self) -> AsyncKicker[_FuncParams, _ReturnType]:
        """
        This function returns kicker object.

        Kicker is a object that can modify kiq request
        before sending it.

        :return: AsyncKicker instance.
        """
        return AsyncKicker(
            task_name=self.task_name,
            broker=self.broker,
            labels=deepcopy(self.labels),
        )

broker.decorator_class = CustomTask
s3rius commented 3 months ago

You're completely right. If you want to become a contributor, you can create a PR that fixes it. I haven't experienced this problem yet, but I think it's a possible bug.

Thanks for noticing. If you don't want to create a PR, I can create a patch myself.

donc310 commented 3 months ago

Will make PR for the fix.