coolseven / notes

笔记与心得
MIT License
508 stars 187 forks source link

某个队列的消费者不足时,再给这个队列添加 work进程 #12

Open MrKLL opened 6 years ago

MrKLL commented 6 years ago

你好,我现在使用tp5队列来做一个风控,买卖股票的订单会随着股票价钱波动产生盈亏,达到某个亏损值的时候我需要进行平仓,可以队列每一秒只能处理15个订单,我想问一下怎么可以加进程,让系统处理更多的订单呢

coolseven commented 6 years ago

每执行一次 php think queue:work --queue xxxx 命令, 就可以增加该队列的一个消费者
如果一个消费者不够, 就多执行几次, 达到消息的推送与消费的平衡

可以使用 supervisor 等运维工具自动化地管理消费者进程数量

MrKLL commented 6 years ago

因为我这个风控需要每一秒去查看所有订单是否需要进行平仓,所以我这个队列使用了--daemon 参数让它一直执行,这样的话还能使用你的这个方法,多开进程吗,可以的话,会不会出现重复消费的情况呢

coolseven commented 6 years ago

从你的描述来看, 你说的需求实际上是定时任务, 而不是消费消息

消息队列的特点是有消息就消费, 没有消息就等着

你说的 每一秒去查看所有订单是否需要进行平仓 ,这个属于定时任务的范畴

MrKLL commented 6 years ago

对的,因为crontab最小只能支持每分钟执行一次,而我的需求是每秒要执行一次,而且为了减轻数据库压力,我每次订单需要存在缓存中,刚好用这个队列满足可以每秒执行一次以及把对应的数据给我,就这么做了,但是后面发现一秒钟执行的任务有限,所以一直在找解决办法,你能给点建议吗,谢谢

coolseven commented 6 years ago

你可以尝试一下当订单生成之后主动向队列推送一条消息, 然后由队列的消费者去处理这个订单

而不是定时去扫描现有的所有订单

MrKLL commented 6 years ago

我现在的做法貌似就是这样,我是订单任务都推到队列中,然后在消费者方法中取任务进行判断,没有达到某个点就用release(1)隔一秒继续执行,达到了就去平仓,删除任务,但是这样我看了一下,每秒就只能处理15个订单,而且我换成release(),每秒处理的也就是15单

coolseven commented 6 years ago

你可以把需求拆分成两个部分: 第一部分负责从找出达到平仓条件的订单, 并将达到平仓条件的订单推送到队列中 第二个部分负责处理达到平仓条件的订单

第一部分可以不用队列来做, 而是用常驻脚本来做, 该常驻脚本的伪代码如下:

while(true){
     // 从数据库或缓存中按照时间或者其他维度取出一定数量的订单
     $ordersToCheck = $this->queryOrders();

     // 从这些订单中过滤出达到平仓条件的订单
     $orders = $this->filterSuitableOrders($ordersToCheck) ;   

     foreach($orders as $order){
           // 将达到平仓条件的订单推送到 'close-position' 队列中, 由消费进程去进行平仓操作
           Queue::push($order, 'close-position');
     }
}

第二部分负责消费 'close-position' 队列中的订单

你可以找一下每秒处理15单的瓶颈在哪里,

MrKLL commented 6 years ago

好的,十分感谢

im55cc commented 4 years ago

你好,我想请教一下,开启多个消费进程,任务不会重复消费的原理是什么?是因为redis的原子性么。

coolseven commented 4 years ago

你好,我想请教一下,开启多个消费进程,任务不会重复消费的原理是什么?是因为redis的原子性么。

获取一个待消费的任务的本质是调用 redis 的 pop() 方法得到一个 job payload
由于 redis 服务是单进程的, 所以即使多个消费进程同时调用 pop() 方法, 也不会得到重复的 job payload

im55cc commented 4 years ago

嗯嗯,好的,非常谢谢你的讲解。

---原始邮件--- 发件人: "HuZhongyuan"<notifications@github.com> 发送时间: 2020年5月10日(周日) 下午2:37 收件人: "coolseven/notes"<notes@noreply.github.com>; 抄送: "Comment"<comment@noreply.github.com>;"55"<hi@im55.cc>; 主题: Re: [coolseven/notes] 某个队列的消费者不足时,再给这个队列添加 work进程 (#12)

你好,我想请教一下,开启多个消费进程,任务不会重复消费的原理是什么?是因为redis的原子性么。

获取一个待消费的任务的本质是调用 redis 的 pop() 方法得到一个 job payload 由于 redis 服务是单进程的, 所以即使多个消费进程同时调用 pop() 方法, 也不会得到重复的 job payload

— You are receiving this because you commented. Reply to this email directly, view it on GitHub, or unsubscribe.

Caesar0220 commented 4 years ago

你好我想请问一下。我创建multiTask的时候,taskA跟taskB都进入队列了,然后运行php think queue:work --queue multiTaskJobQueue的时候,第一次消费了TaskA,第二次消费了TaskB。但是命令行中并无输出。只有Processed: application\index\job\MultiTask@taskB这种提示,请问是什么原因?