ydf0509 / funboost

pip install funboost,python全功能分布式函数调度框架,funboost的功能是全面性重量级,用户能想得到的功能99%全都有;funboost的使用方式是轻量级,只有@boost一行代码需要写。支持python所有类型的并发模式和一切知名消息队列中间件,支持如 celery dramatiq等框架整体作为funboost中间件,python函数加速器,框架包罗万象,用户能想到的控制功能全都有。一统编程思维,兼容50% python业务场景,适用范围广。只需要一行代码即可分布式执行python一切函数,99%用过funboost的pythoner 感受是 简易 方便 强劲 强大,相见恨晚 。
Apache License 2.0
682 stars 135 forks source link

fix: 修复在其他文件中使用publish发布任务并指定task_id时获取task_id报错 #110

Closed aniya105 closed 6 months ago

aniya105 commented 6 months ago

在非消费函数的文件中使用publish方法发布任务时,使用的TaskIdLoggger由于无法通过get_current_taskid()获取到task_id报错

ydf0509 commented 6 months ago

看下你的代码demo例子呢,还有你的funboost版本,

image image

发一下你报错截图吧,应该不会报错的吧,都try了,如果没传也获取不到,就返回no_task_id.

ydf0509 commented 6 months ago

还是要发下报错截图呢,因为你在 f'10秒内推送了 {self.count_per_minute} 条消息,累计推送了 {self.publish_msg_num_total} 条消息到 {self._queue_name} 队列中') 这里加taskid是不适合的,这个地方使用过统计,没有和某条任务消息关联.

ydf0509 commented 6 months ago

image 这就是模拟你说的场景,在单独的另外文件里面publish的,不会报错的.no_task_id取代了.

aniya105 commented 6 months ago

funboost版本: 43.0

生产者

async def restart_data_annotation_task(
    task_id: str = Body(),
    app_id: str = Body(),
    begin_time: str = Body(),
    end_time: str = Body(),
    wait_annotation_num: int = Body(),
    startup_mode: str = Body(),
):
    data_annotation_consumer.data_annotation.publish(
        {
            "app_id": app_id,
            "start_time": begin_time,
            "end_time": end_time,
            "wait_annotation_num": wait_annotation_num,
            "startup_mode": "full" if startup_mode == "0" else "increment",
        },
        task_id=task_id,
    )

消费者

@boost(
    BoosterParams(
        queue_name="queue_data_annotation",
        qps=10,
        broker_kind=BrokerEnum.REDIS_STREAM,
        concurrent_mode=ConcurrentModeEnum.ASYNC,
        is_using_distributed_frequency_control=True,
        user_custom_record_process_info_func=save_to_postgresql,
    )
)
async def data_annotation(
    app_id,
    start_time,
    end_time,
    wait_annotation_num,
    startup_mode="full",
):
    task_id = funboost_current_task().task_id

报错信息:

  File "C:\Users\ys\PycharmProjects\power-qa\backend\app\app\api\v1\endpoints\data_annotation.py", line 168, in restart_data_annotation_task
    data_annotation_consumer.data_annotation.publish(
    │                        │               └ <bound method Booster._safe_publish of <funboost.core.booster.Booster object at 0x0000027B7383AAD0>>
    │                        └ <funboost.core.booster.Booster object at 0x0000027B7383AAD0>
    └ <module 'app.core.funboost.data_annotation_consumer' from 'C:\\Users\\ys\\PycharmProjects\\power-qa\\backend\\app\\app\\core\...
  File "C:\soft\Anaconda3\envs\power-qa\lib\site-packages\funboost\core\booster.py", line 123, in _safe_publish
    return consumer.publisher_of_same_queue.publish(msg=msg, task_id=task_id, priority_control_config=priority_control_config)
           │        │                                   │            │                                └ None
           │        │                                   │            └ '018e7f2255bf7f489fc908f65ab8fed0'
           │        │                                   └ {'app_id': '018d10cf328e729f8d7455d26f0f2f57', 'start_time': '2023-12-01', 'end_time': '2024-03-29', 'wait_annotation_num': 7...
           │        └ <property object at 0x0000027B7506BAB0>
           └ <funboost.consumers.redis_stream_consumer.RedisStreamConsumer object at 0x0000027B7383A530>
  File "C:\soft\Anaconda3\envs\power-qa\lib\site-packages\funboost\publishers\base_publisher.py", line 222, in publish
    self.logger.info(
    │    │      └ <function Logger.info at 0x0000027B58116710>
    │    └ <TaskIdLogger funboost.RedisStreamPublisher--queue_data_annotation (DEBUG)>
    └ <funboost.publishers.redis_stream_publisher.RedisStreamPublisher object at 0x0000027B75D89E40>
  File "C:\soft\Anaconda3\envs\power-qa\lib\logging\__init__.py", line 1477, in info
    self._log(INFO, msg, args, **kwargs)
    │    │    │     │    │       └ {}
    │    │    │     │    └ ()
    │    │    │     └ '10秒内推送了 1 条消息,累计推送了 1 条消息到 queue_data_annotation 队列中'
    │    │    └ 20
    │    └ <function TaskIdLogger._log at 0x0000027B74A28280>
    └ <TaskIdLogger funboost.RedisStreamPublisher--queue_data_annotation (DEBUG)>
  File "C:\soft\Anaconda3\envs\power-qa\lib\site-packages\funboost\core\task_id_logger.py", line 11, in _log
    extra['task_id'] = get_current_taskid()
    │                  └ <function get_current_taskid at 0x0000027B74A280D0>
    └ {}
  File "C:\soft\Anaconda3\envs\power-qa\lib\site-packages\funboost\core\current_task.py", line 155, in get_current_taskid
    return fct.task_id  # 不在funboost的消费函数里面就获取不到上下文了
           │   └ <property object at 0x0000027B74A1CA40>
           └ <funboost.core.current_task.AsyncioCurrentTask object at 0x0000027B7B58AAD0>
  File "C:\soft\Anaconda3\envs\power-qa\lib\site-packages\funboost\core\current_task.py", line 136, in task_id
    return self.function_result_status.task_id
           │    └ <property object at 0x0000027B74A1CB80>
           └ <funboost.core.current_task.AsyncioCurrentTask object at 0x0000027B7B58AAD0>
  File "C:\soft\Anaconda3\envs\power-qa\lib\site-packages\funboost\core\current_task.py", line 128, in function_result_status
    return self._function_result_status.get()
           │    │                       └ <method 'get' of '_contextvars.ContextVar' objects>
           │    └ <ContextVar name='function_result_status' at 0x0000027B74A1C9A0>
           └ <funboost.core.current_task.AsyncioCurrentTask object at 0x0000027B7B58AAD0>
LookupError: <ContextVar name='function_result_status' at 0x0000027B74A1C9A0>
ydf0509 commented 6 months ago

你把这个 funboost/core/current_task.py 的改成

def get_current_taskid():
    fct = funboost_current_task()
    # return fct.function_result_status.task_id
    try:
        return fct.task_id  # 不在funboost的消费函数里面就获取不到上下文了
    except (AttributeError,LookupError) as e:
        # print(e,type(e))
        return 'no_task_id'

就是这行 except (AttributeError,LookupError) as e: 增加 LookupError 试试.

image

aniya105 commented 6 months ago

你把这个 funboost/core/current_task.py 的改成

def get_current_taskid():
    fct = funboost_current_task()
    # return fct.function_result_status.task_id
    try:
        return fct.task_id  # 不在funboost的消费函数里面就获取不到上下文了
    except (AttributeError,LookupError) as e:
        # print(e,type(e))
        return 'no_task_id'

就是这行 except (AttributeError,LookupError) as e: 增加 LookupError 试试.

image

这样可以,不报错了,感谢指点