apache / rocketmq-client-go

Apache RocketMQ go client
https://rocketmq.apache.org/
Apache License 2.0
1.28k stars 409 forks source link

Using WithConsumeGoroutineNums in pushconsumer may not work as expected. #1135

Closed bogerv closed 3 months ago

bogerv commented 3 months ago

The issue tracker is ONLY used for the go client (feature request of RocketMQ need to follow RIP process). Keep in mind, please check whether there is an existing same report before your raise a new one.

Alternately (especially if your communication is not a bug report), you can send mail to our mailing lists. We welcome any friendly suggestions, bug fixes, collaboration, and other improvements.

Please ensure that your bug report is clear and that it is complete. Otherwise, we may be unable to understand it or to reproduce it, either of which would prevent us from fixing the bug. We strongly recommend the report(bug report or feature request) could include some hints as to the following:

BUG REPORT

  1. Please describe the issue you observed:

    • What did you do (The steps to reproduce)? Use consumer.WithConsumeGoroutineNums(1) to create push consumer, but not work c, err = rocketmq.NewPushConsumer( consumer.WithNameServer(config.Config.NameServers), consumer.WithGroupName(fmt.Sprintf("%s-%s%s", group, aa, bb)), consumer.WithConsumeMessageBatchMaxSize(config.Config.ConsumeMaxSize), consumer.WithConsumeGoroutineNums(1), )
    • What did you expect to see? Only start one goroutine to proceed
    • What did you see instead? Started 8 goroutine one time
  2. Please tell us about your environment:

    • What is your OS? CentOS 7
    • What is your client version? V4_5_2
    • What is your RocketMQ version? V5_1_4
  3. Other information (e.g. detailed explanation, logs, related issues, suggestions on how to fix, etc): Using poll consumer will not guarantee this problem
cserwen commented 3 months ago

How to determine that 8 coroutines for consumption are generated @bogerv

bogerv commented 3 months ago

func OrderPersistListDealing(_accountType string, _b string, _q string) { err := rocketmq.RMQ().Consumer(rocketmq.ConsumerGroupPersist).Subscribe( rocketmq.GetTopicPersist(_b, _q), consumer.MessageSelector{Type: consumer.TAG, Expression: util.FormatSymbol(_b, _q)}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) { wg := &sync.WaitGroup{} bulkInsertESOrder(orders, wg, &esErr) }) } Inside the bulkInsertESOrder function, print logs after sleep 5 seconds. The result is that every 5 seconds, 8 logs will be printed at the same time.

The scenario is an application process that starts multiple goroutine(OrderPersistListDealing is started multiple goroutine with different _b and _q), with each goroutine using the same consumer to subscribe multiple topics, and use MessageSelector to distinguish different data.

cserwen commented 3 months ago

func OrderPersistListDealing(_accountType string, _b string, _q string) { err := rocketmq.RMQ().Consumer(rocketmq.ConsumerGroupPersist).Subscribe( rocketmq.GetTopicPersist(_b, _q), consumer.MessageSelector{Type: consumer.TAG, Expression: util.FormatSymbol(_b, _q)}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) { wg := &sync.WaitGroup{} bulkInsertESOrder(orders, wg, &esErr) }) } Inside the bulkInsertESOrder function, print logs after sleep 5 seconds. The result is that every 5 seconds, 8 logs will be printed at the same time.

The scenario is an application process that starts multiple goroutine(OrderPersistListDealing is started multiple goroutine with different _b and _q), with each goroutine using the same consumer to subscribe multiple topics, and use MessageSelector to distinguish different data.

Cosume orderly ? Can you show the init code of rmq-consumer?

bogerv commented 3 months ago

func OrderPersistListDealing(_accountType string, _b string, _q string) { err := rocketmq.RMQ().Consumer(rocketmq.ConsumerGroupPersist).Subscribe( rocketmq.GetTopicPersist(_b, _q), consumer.MessageSelector{Type: consumer.TAG, Expression: util.FormatSymbol(_b, _q)}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) { wg := &sync.WaitGroup{} bulkInsertESOrder(orders, wg, &esErr) }) } Inside the bulkInsertESOrder function, print logs after sleep 5 seconds. The result is that every 5 seconds, 8 logs will be printed at the same time. The scenario is an application process that starts multiple goroutine(OrderPersistListDealing is started multiple goroutine with different _b and _q), with each goroutine using the same consumer to subscribe multiple topics, and use MessageSelector to distinguish different data.

Cosume orderly ? Can you show the init code of rmq-consumer?

Yes, Cosume orderly c, err = rocketmq.NewPushConsumer( consumer.WithNameServer(config.Config.NameServers), consumer.WithGroupName(group), consumer.WithConsumerOrder(true), consumer.WithConsumerModel(consumer.Clustering), consumer.WithConsumeMessageBatchMaxSize(config.ConsumeMaxSize), )

cserwen commented 3 months ago

This param only takes effect for concurrent consumption. For order-consumption, each queue has one and only one coroutine to consume. @bogerv

bogerv commented 3 months ago

This param only takes effect for concurrent consumption. For order-consumption, each queue has one and only one coroutine to consume.

So does this mean that if we need to consume bulk data in sequence, each consumer will need to subscribe to a separate topic?