taskiq-python / taskiq

Distributed task queue with full async support
MIT License
770 stars 48 forks source link

log is none #120

Open vvanglro opened 1 year ago

vvanglro commented 1 year ago

The log here, I think it should be that if found_exception is not None, the value of log is equal to str(exc). If the operation is normal, do you need other operation logs?

https://github.com/taskiq-python/taskiq/blob/f54d1efb8a52220bdbec4e18457f5902e0b6145b/taskiq/receiver/receiver.py#L213-L218

s3rius commented 1 year ago

We'were planning to remove log completely, so people rely on alerting systems like sentry. But your comment makes sense. Maybe we can even propogate error with stacktrace?

vvanglro commented 1 year ago

Yes, when there is no third-party alarm system and other plugins, you can get the error log by checking whether is_err is True.

s3rius commented 1 year ago

Hi, @vvanglro. We merged a request with serializable exceptions. Now you can actually raise an error on client-side.

With new API TaskiqResult has error field and raise_for_error method.

task = await my_task.kiq(param)
# Now the exception will be thrown if there's an error.
result = (await task.wait_result()).raise_for_error()

Suddenly it's almost impossible to serialize traceback. And even if you can, it can lead to some security issues. If you want to send exception to some libraries, you can do it in middlewares! Since middlewares can execute code on worker-side, you'll be able to show traceback and everything.

s3rius commented 1 year ago

Also, I've created a test file for sentry integration. So if you would ever decide to move forward with sentry as an error tracking system. Here's a cool integration that helps tracking errors.

import asyncio
from typing import Any, Coroutine

import sentry_sdk
from sentry_sdk.integrations.logging import LoggingIntegration, ignore_logger
from taskiq_redis import RedisAsyncResultBackend

from taskiq import TaskiqMiddleware, ZeroMQBroker
from taskiq.events import TaskiqEvents
from taskiq.message import TaskiqMessage
from taskiq.result import TaskiqResult
from taskiq.state import TaskiqState

class ExcSender(TaskiqMiddleware):
    def on_error(
        self,
        message: TaskiqMessage,
        result: TaskiqResult[Any],
        exception: BaseException,
    ) -> Coroutine[Any, Any, None] | None:
        sentry_sdk.capture_exception(
            exception,
            tags={
                "taskiq_task_name": message.task_name,
                "taskiq_task_id": message.task_id,
            },
        )
        print("SENDING EXCEPTION TO SENTRY!")

broker = (
    ZeroMQBroker()
    .with_result_backend(
        RedisAsyncResultBackend("redis://localhost"),
    )
    .with_middlewares(ExcSender())
)

# Here we initialize sentry inside workers. 
# If you have framework integration, probably, you won't need to call it
# explicitly.
@broker.on_event(TaskiqEvents.WORKER_STARTUP)
def sentry_init(_: TaskiqState):

    # We ignore this logger, because it's events may overwrite
    # exception we capture in middlewares with new tags. 
    ignore_logger("taskiq.receiver.receiver")
    sentry_sdk.init(
        "https://key@your-sentry-instance",
        environment="dev",
        integrations=[LoggingIntegration()],
    )

@broker.task
async def my_task() -> None:
    try:
        print(1 / 0)
    except BaseException as exc:
        raise Exception("Lol") from exc

async def main():
    await broker.startup()
    await asyncio.sleep(2)

    task = await my_task.kiq()
    res = await task.wait_result()
    # Here it just prints an error.
    print(res)

    await broker.shutdown()

if __name__ == "__main__":
    asyncio.run(main())

It works and sends all events with propper task_id's and task names.

image

vvanglro commented 1 year ago

This is a good example of collecting errors with sentry, Maybe it should be added to the docs.

This issue can also be searched by the people behind.