apache / rocketmq-client-go

Apache RocketMQ go client
https://rocketmq.apache.org/
Apache License 2.0
1.3k stars 419 forks source link

feat: support consumer consume tps option #1101

Closed twz915 closed 11 months ago

twz915 commented 12 months ago

At present, there are two ways to control the rate of consumption, one is to limit consumer goroutine number (or the increased load on the machine slowed it down), and the other is to set pullInterval and batchSize.

Because the second way is easier to estimate the maximum consumption tps, we generally use the second one. but the inconvenient thing is that when the backlog is relatively large, if we want to accelerate the consumption speed, we have to change the code and publish, the expansion of the machine can not solve the problem.

The java version can dynamically set the pull interval, but the go version cannot be dynamically modified. For convenience, the maximum consumption tps setting of a single machine is supported.

Principle of flow control: 
pull TPS = 1000ms/PullInterval * BatchSize * len(allocateResult)

Based on the number of allocated queues and the number of pulls in each batch, dynamic calculation is implemented to realize how often to pull, in order to control the consumption TPS of a single machine, and achieve consumption speed control more conveniently. This function is not enabled by default. If not enabled, it will be compatible with the original version.

目前,有两种控制消耗速度的方法,一种是限制Go消费协程的数量(或者压力使机器负载上升最终达到最大消费能力,被压式),另一种是设置 pullIntervalbatchSize

由于第二种方法更容易估计最大消耗tps,所以我们通常使用第二种方法。但不方便的是,当积压量比较大的时候,如果我们想要加速的消费能力,就得改代码,扩容机器解决不了问题。

Java版本支持动态设置 pullInterval 但目前 Go版本并没有暴露相应的方式来动态修改此选项。因此,这里增加一种设置单机TPS能力,这会使使用起来更方便。

原理是根据分配的队列数,和每批次拉取数量,来实现动态计算多久拉取一次,以实现控制单机消费TPS,可以更方便地实现消费控速。 这个功能默认是不开启的,不启用的话和原来是兼容的。