ydf0509 / funboost

pip install funboost,python全功能分布式函数调度框架,funboost的功能是全面性重量级,用户能想得到的功能99%全都有;funboost的使用方式是轻量级,只有@boost一行代码需要写。支持python所有类型的并发模式和一切知名消息队列中间件,支持如 celery dramatiq等框架整体作为funboost中间件,python函数加速器,框架包罗万象,用户能想到的控制功能全都有。一统编程思维,兼容50% python业务场景,适用范围广。只需要一行代码即可分布式执行python一切函数,99%用过funboost的pythoner 感受是 简易 方便 强劲 强大,相见恨晚 。
Apache License 2.0
682 stars 135 forks source link

base_publisher的publish函数好像有点问题[用户没了解python字典是可变对象] #97

Closed LeeC20 closed 10 months ago

LeeC20 commented 10 months ago

当我这样发publish的时候。

`

publisher = get_publisher(queue_name=OutsideTopicEum.request,broker_kind=BrokerEnum.KAFKA_CONFLUENT_SASlPlAIN)
var = {
    "cmd": "requests",
    "task_id": "111111",
    "request": {
        "ply_file": {"fileId": "xxx"},
        "json_file": {"fileId": "xxx"}
    },
    "time_stamp": "",
    "send_times": 1,

}
for i in range(3):
    var['task_id'] = f"{var['task_id']}{2}"
    print(var['task_id'])
    publisher.publish(var, task_id=var['task_id'])
    time.sleep(2)

`

第一条数据是这样,

`

publish, publish {'cmd': 'requests', 'task_id': '1111112', 'request': {'ply_file': {'fileId': 'xxx'}, 'json_file': {'fileId': 'xxx'}}, 
'time_stamp': '', 'send_times': 1} 

` 但是第二条就有问题了

`

-publish-[print]-  publish, publish {'cmd': 'requests', 'task_id': '11111122', 'request': {'ply_file': {'fileId': 'xxx'}, 'json_file': {'fileId': 'xxx'}}, 'time_stamp': '', 'send_times': 1, 'extra': {'task_id': '1111112', 'publish_time': 1701261908.8162, 'publish_time_format': '2023-11-29 20:45:08'}}  

`

直接新增了一个extra。所以要怎么避免这种情况。

源码在base_publisher

    def publish(self, msg: typing.Union[str, dict], task_id=None,
                priority_control_config: PriorityConsumingControlConfig = None):
        """

        :param msg:函数的入参字典或者字典转json。,例如消费函数是 def add(x,y),你就发布 {"x":1,"y":2}
        :param task_id:可以指定task_id,也可以不指定就随机生产uuid
        :param priority_control_config:优先级配置,消息可以携带优先级配置,覆盖boost的配置。
        :return:
        """

        print("publish, publish", msg)
        print("publish, publish", task_id)
        msg, msg_function_kw, extra_params, task_id = self._convert_msg(msg, task_id, priority_control_config)
        print("msg,, msg,", msg)
        print("msg_function_kw,, msg_function_kw,", msg_function_kw)
        print("extra_params,, extra_params,", extra_params)
        print("task_id,, task_id,", task_id)
ydf0509 commented 10 months ago

你这个写代码方式很奇怪啊,为什么要get_publisher呢?不推荐这么写代码。

ydf0509 commented 10 months ago

你可以可以看看,BoosterManager的get_booster方法。不建议调用get_publisher。

非装饰器方式文档也写了,不建议跳过booster类。

LeeC20 commented 10 months ago

好 我试一下

LeeC20 commented 10 months ago

确实没问题。看来确实不能跳过boost类

ydf0509 commented 10 months ago

image 这是很基础的知识啊,字典和列表是python的可变对象,funboost的函数直接修改了字典了,你把var字典初始化放到 for循环里面就可以了.

python的可变对象和不可变对象,你应该听说过吧

ydf0509 commented 10 months ago

学习下什么是python可变对象吧

ydf0509 commented 10 months ago

image 下个版本我改下吧,我自己在publish方法内部copy一下,这样不影响用户自身的传参.

ydf0509 commented 10 months ago

如果你不想用@boost装饰器,可以看看 BoostersManager里面的方法,包括 build_booster 和 get_or_create_booster_by_queue_name