apache / rocketmq-client-go

Apache RocketMQ go client
https://rocketmq.apache.org/
Apache License 2.0
1.29k stars 415 forks source link

run RebalanceImmediately with goroutine #1081

Closed wenxuwan closed 1 year ago

wenxuwan commented 1 year ago

What is the purpose of the change

When the application cluster is published, consumer will receive many notification:

image

RebalanceIfNotPaused will run with lock.

image

So consumer start spend long time.

image

The java sdk implement like this, only send notify:

image

Brief changelog

XX

Verifying this change

XXXX

Follow this checklist to help us incorporate your contribution quickly and easily. Notice, it would be helpful if you could finish the following 5 checklist(the last one is not necessary)before request the community to review your PR.

twz915 commented 1 year ago

================== WARNING: DATA RACE Read at 0x00c000baa5d8 by goroutine 904: github.com/apache/rocketmq-client-go/v2/internal.(SubscriptionData).Clone() /Users/tu/go/pkg/mod/github.com/apache/rocketmq-client-go/v2@v2.1.2-0.20230803074138-7eedaf948c60/internal/model.go:70 +0x1d4 github.com/apache/rocketmq-client-go/v2/consumer.(defaultConsumer).SubscriptionDataList.func1() /Users/tu/go/pkg/mod/github.com/apache/rocketmq-client-go/v2@v2.1.2-0.20230803074138-7eedaf948c60/consumer/consumer.go:457 +0x6c sync.(Map).Range() /usr/local/go/src/sync/map.go:476 +0x2d8 github.com/apache/rocketmq-client-go/v2/consumer.(defaultConsumer).SubscriptionDataList() /Users/tu/go/pkg/mod/github.com/apache/rocketmq-client-go/v2@v2.1.2-0.20230803074138-7eedaf948c60/consumer/consumer.go:456 +0xf4 github.com/apache/rocketmq-client-go/v2/consumer.(pushConsumer).SubscriptionDataList() /Users/tu/go/pkg/mod/github.com/apache/rocketmq-client-go/v2@v2.1.2-0.20230803074138-7eedaf948c60/consumer/push_consumer.go:351 +0x4c github.com/apache/rocketmq-client-go/v2/internal.(rmqClient).UpdateTopicRouteInfo.func2() /Users/tu/go/pkg/mod/github.com/apache/rocketmq-client-go/v2@v2.1.2-0.20230803074138-7eedaf948c60/internal/client.go:706 +0x90 sync.(Map).Range() /usr/local/go/src/sync/map.go:476 +0x2d8 github.com/apache/rocketmq-client-go/v2/internal.(rmqClient).UpdateTopicRouteInfo() /Users/tu/go/pkg/mod/github.com/apache/rocketmq-client-go/v2@v2.1.2-0.20230803074138-7eedaf948c60/internal/client.go:704 +0x3d8 github.com/apache/rocketmq-client-go/v2/internal.(rmqClient).Start.func1.2.1() /Users/tu/go/pkg/mod/github.com/apache/rocketmq-client-go/v2@v2.1.2-0.20230803074138-7eedaf948c60/internal/client.go:447 +0x34 github.com/apache/rocketmq-client-go/v2/internal.(rmqClient).Start.func1.2() /Users/tu/go/pkg/mod/github.com/apache/rocketmq-client-go/v2@v2.1.2-0.20230803074138-7eedaf948c60/internal/client.go:457 +0x38c github.com/apache/rocketmq-client-go/v2/primitive.WithRecover() /Users/tu/go/pkg/mod/github.com/apache/rocketmq-client-go/v2@v2.1.2-0.20230803074138-7eedaf948c60/primitive/base.go:100 +0x50 github.com/apache/rocketmq-client-go/v2/internal.(*rmqClient).Start.func1.7() /Users/tu/go/pkg/mod/github.com/apache/rocketmq-client-go/v2@v2.1.2-0.20230803074138-7eedaf948c60/internal/client.go:444 +0x3c

Previous write at 0x00c000baa5d8 by goroutine 907: github.com/apache/rocketmq-client-go/v2/consumer.(pushConsumer).messageQueueChanged() /Users/tu/go/pkg/mod/github.com/apache/rocketmq-client-go/v2@v2.1.2-0.20230803074138-7eedaf948c60/consumer/push_consumer.go:505 +0x30c github.com/apache/rocketmq-client-go/v2/consumer.(pushConsumer).messageQueueChanged-fm()

:1 +0xb8 github.com/apache/rocketmq-client-go/v2/consumer.(*defaultConsumer).doBalance.func1() /Users/tu/go/pkg/mod/github.com/apache/rocketmq-client-go/v2@v2.1.2-0.20230803074138-7eedaf948c