ydf0509 / funboost

pip install funboost,python全功能分布式函数调度框架,funboost的功能是全面性重量级,用户能想得到的功能99%全都有;funboost的使用方式是轻量级,只有@boost一行代码需要写。支持python所有类型的并发模式和一切知名消息队列中间件,支持如 celery dramatiq等框架整体作为funboost中间件,python函数加速器,框架包罗万象,用户能想到的控制功能全都有。一统编程思维,兼容50% python业务场景,适用范围广。只需要一行代码即可分布式执行python一切函数,99%用过funboost的pythoner 感受是 简易 方便 强劲 强大,相见恨晚 。
Apache License 2.0
682 stars 135 forks source link

你好,我想请教一下使用Rocketmq作为中间件时自定义消费者类的问题 #115

Closed CangWangu closed 4 months ago

CangWangu commented 4 months ago

作者你好,我想使用您项目下的AbstractConsumer作为父类自定义我自己的消费者与发布者,并将rocketmq作为中间件。 我想请教一下,如果使用rocketmq作为中间件的话,子类消费者的_confirm_consume()方法该怎么写呢? 我注意到您自己定义的RocketmqConsumer中对这个方法采用直接pass,请问这样的话难道不会造成中间件接收不到offset的更新信息吗? 事实上,我自己就是一个rocketmq的新人,之前在这个方向上苦恼了很久。我之前使用rocketmq.client作为我开发使用的包,但由于找不到更新offset的方法(我的代码一旦启动便会一直重复计算之前队列中的信息),所以被我放弃了。 谢谢您的建议,也欢迎各位网友提供帮助与指导,十分感谢

ydf0509 commented 4 months ago

我现在没有rocketmq测试环境了。你可以自己发布10个消息,等消费完成后,再次重启看看能不能再消费就能验证了。当时就是按网上一般方式写的,rocketmq是填写一个callback方法,callback函数直行好了,rocketmq包应该是自动帮你ack。 一般mq包,都会有自动ack和手动ack的方式,如果使用自动ack模式,用户是是不需要去手动亲自执行ack操作的,你完全可以写个精简demo验证猜测。

至于重复消费,比如发布100万消息,你突然在代码运行时候重启消费,是重新消费100万消息,还是重新消费几十条消息,那也是区别,自动ack没手动ack那么精确控制,你非要确保消息执行是onlyonce吗?

ydf0509 commented 4 months ago

一般mq都有自动和手动ack,如果是自动ack不需要人工操作ack。

ydf0509 commented 4 months ago

比如自动每5秒提交一次offsets,那肯定有部分消息消费了,但还没来得及及时提交offsets。

总而言之只要不是重新消费几百万消息就好了,重新消费几百条消息没什么问题,一眨眼功夫就能重复消费完几百条,你自己函数做好幂等。

ydf0509 commented 4 months ago

rocketmq是为java服务的,他不是kafka啊,kafka是知名消息队列,有专门的人愿意维护python客户端,python kafka客户端已经达到java kafka客户端90%的功能了。

rocketmq就是阿里巴巴自己搞得,阿里不用python语言,rocketmq在全球也没人气,所以自然pythob的话rocketmq包没有人力开发,python的rocketmq客户端烂的很,连windows都不支持,而且只有常用的api实现,很多java有的方法,在python中都没有,pytjon rocketmq包只有java版功能的30%。不建议使用python操作rocketmq,必须用java才更好操作rocketmq。

ydf0509 commented 4 months ago

使用python的人,不要用阿里的任何中间件,阿里没心思支持python。

你看他的dubbo rocketmq完全是只为java量身定制,他们没心思理会python,中间件服务端可以用java做,但他们连python的客户端也没心思做,他并不是像美国开发的中间件和框架,做全语言生态的支持。

比如redis是c语言开发,但redis的python java golang 客户端都做得很好,这才是全语言生态。阿里的中间件都不是全语言生态。

CangWangu commented 4 months ago

我现在没有rocketmq测试环境了。你可以自己发布10个消息,等消费完成后,再次重启看看能不能再消费就能验证了。当时就是按网上一般方式写的,rocketmq是填写一个callback方法,callback函数直行好了,rocketmq包应该是自动帮你ack。 一般mq包,都会有自动ack和手动ack的方式,如果使用自动ack模式,用户是是不需要去手动亲自执行ack操作的,你完全可以写个精简demo验证猜测。

至于重复消费,比如发布100万消息,你突然在代码运行时候重启消费,是重新消费100万消息,还是重新消费几十条消息,那也是区别,自动ack没手动ack那么精确控制,你非要确保消息执行是onlyonce吗?

我希望的情况是在某次重新初始化相同groupID的consumer后,不会消费之前已经消费过的信息,但现在我的程序会在初始化后将之前消费过的队列内的部分信息再重新消费,而且也不仅一次,而是会断断续续地重复尝试,让我很苦恼,我原本以为是ACK的问题。 被重复消费的信息中心,有些会引起报错的信息,有些则是正常处理成功的信息,它们之间我找不到关联的地方

CangWangu commented 4 months ago

使用python的人,不要用阿里的任何中间件,阿里没心思支持python。

你看他的dubbo rocketmq完全是只为java量身定制,他们没心思理会python,中间件服务端可以用java做,但他们连python的客户端也没心思做,他并不是像美国开发的中间件和框架,做全语言生态的支持。

比如redis是c语言开发,但redis的python java golang 客户端都做得很好,这才是全语言生态。阿里的中间件都不是全语言生态。

没有办法,计划开始的时候敲定使用rocketmq,等我实际上手才发现它在python上坑好多😭

ydf0509 commented 4 months ago

是谁帮你敲定rocketmq的?python和rocketmq不搭,资料少,api实现不全,建议rabbitmq或者kafka。

ydf0509 commented 4 months ago

重复消费少部分消息是非常正常的,即使你手动提交offsets,如果你在手动提交offsets的前1毫秒突然重启代码呢?你自己做好幂等或者做好去重过滤就好了,这不依赖消息队列中间件。