apache / rocketmq-client-go

Apache RocketMQ go client
https://rocketmq.apache.org/
Apache License 2.0
1.29k stars 415 forks source link

一个进程创建一个消费者,用多个goroutine 订阅多个topic会产生data race错误 #1083

Closed ilaziness closed 1 year ago

ilaziness commented 1 year ago

创建一个consumer:

    c, err := rocketmq.NewPushConsumer(
        consumer.WithGroupName(group),
        consumer.WithNsResolver(primitive.NewPassthroughResolver(cfg.NameServer)),
        consumer.WithStrategy(consumer.AllocateByAveragely),
        consumer.WithCredentials(primitive.Credentials{
            AccessKey: cfg.AccessKey,
            SecretKey: cfg.AccessSecret,
        }),
    )

然后是用goroutine 运行多个不同topicSubscribe

go func(){
  //....
  c.Subscribe(....)
  c.Start()
  //....
  //阻塞
}()

运行会报WARNING: DATA RACE,有时报这个错运行不起来,大部分时间是报这个错能运行起来,多个Subscribe也消费正常。

==================
WARNING: DATA RACE
Write at 0x00c00049e510 by goroutine 103:
  runtime.mapassign_faststr()
      /usr/lib/go-1.20/src/runtime/map_faststr.go:203 +0x0
  github.com/apache/rocketmq-client-go/v2/consumer.(*pushConsumer).Subscribe()
      /home/test/go/pkg/mod/github.com/apache/rocketmq-client-go/v2@v2.1.2-0.20230628073434-533de03048e1/consumer/push_consumer.go:305 +0x3dd
  cockgo_boilerplate/pkg/rocketmq.(*MqClient).StartHandleMsg()
      /home/test/git_repo/cockgo_boilerplate/pkg/rocketmq/mq.go:149 +0x298
  cockgo_boilerplate/pkg/rocketmq.RunConsumer.func1()
      /home/test/git_repo/cockgo_boilerplate/pkg/rocketmq/consumer.go:32 +0x144
  cockgo_boilerplate/pkg/rocketmq.RunConsumer.func2()
      /home/test/git_repo/cockgo_boilerplate/pkg/rocketmq/consumer.go:33 +0x58

Previous read at 0x00c00049e510 by goroutine 18:
  runtime.mapiterinit()
      /usr/lib/go-1.20/src/runtime/map.go:815 +0x0
  github.com/apache/rocketmq-client-go/v2/consumer.(*pushConsumer).Start()
      /home/test/go/pkg/mod/github.com/apache/rocketmq-client-go/v2@v2.1.2-0.20230628073434-533de03048e1/consumer/push_consumer.go:238 +0x150
  cockgo_boilerplate/pkg/rocketmq.(*MqClient).StartHandleMsg()
      /home/test/git_repo/cockgo_boilerplate/pkg/rocketmq/mq.go:180 +0x3ef
  cockgo_boilerplate/pkg/rocketmq.RunConsumer.func1()
      /home/test/git_repo/cockgo_boilerplate/pkg/rocketmq/consumer.go:32 +0x144
  cockgo_boilerplate/pkg/rocketmq.RunConsumer.func2()
      /home/test/git_repo/cockgo_boilerplate/pkg/rocketmq/consumer.go:33 +0x58

Goroutine 103 (running) created at:
  cockgo_boilerplate/pkg/rocketmq.RunConsumer()
      /home/test/git_repo/cockgo_boilerplate/pkg/rocketmq/consumer.go:31 +0x144
  cockgo_boilerplate/internal/cmd.glob..func7()
      /home/test/git_repo/cockgo_boilerplate/internal/cmd/queue.go:19 +0x24
  github.com/gogf/gf/v2/os/gcmd.(*Command).doRun()
      /home/test/go/pkg/mod/github.com/gogf/gf/v2@v2.4.4/os/gcmd/gcmd_command_run.go:149 +0xd30
  github.com/gogf/gf/v2/os/gcmd.(*Command).RunWithValueError()
      /home/test/go/pkg/mod/github.com/gogf/gf/v2@v2.4.4/os/gcmd/gcmd_command_run.go:91 +0x104
  github.com/gogf/gf/v2/os/gcmd.(*Command).RunWithValue()
      /home/test/go/pkg/mod/github.com/gogf/gf/v2@v2.4.4/os/gcmd/gcmd_command_run.go:41 +0x67
  github.com/gogf/gf/v2/os/gcmd.(*Command).Run()
      /home/test/go/pkg/mod/github.com/gogf/gf/v2@v2.4.4/os/gcmd/gcmd_command_run.go:35 +0x44
  main.main()
      /home/test/git_repo/cockgo_boilerplate/main.go:15 +0x4e

Goroutine 18 (running) created at:
  cockgo_boilerplate/pkg/rocketmq.RunConsumer()
      /home/test/git_repo/cockgo_boilerplate/pkg/rocketmq/consumer.go:31 +0x144
  cockgo_boilerplate/internal/cmd.glob..func7()
      /home/test/git_repo/cockgo_boilerplate/internal/cmd/queue.go:19 +0x24
  github.com/gogf/gf/v2/os/gcmd.(*Command).doRun()
      /home/test/go/pkg/mod/github.com/gogf/gf/v2@v2.4.4/os/gcmd/gcmd_command_run.go:149 +0xd30
  github.com/gogf/gf/v2/os/gcmd.(*Command).RunWithValueError()
      /home/test/go/pkg/mod/github.com/gogf/gf/v2@v2.4.4/os/gcmd/gcmd_command_run.go:91 +0x104
  github.com/gogf/gf/v2/os/gcmd.(*Command).RunWithValue()
      /home/test/go/pkg/mod/github.com/gogf/gf/v2@v2.4.4/os/gcmd/gcmd_command_run.go:41 +0x67
  github.com/gogf/gf/v2/os/gcmd.(*Command).Run()
      /home/test/go/pkg/mod/github.com/gogf/gf/v2@v2.4.4/os/gcmd/gcmd_command_run.go:35 +0x44
  main.main()
      /home/test/git_repo/cockgo_boilerplate/main.go:15 +0x4e
==================
==================
WARNING: DATA RACE
Read at 0x00c000144358 by goroutine 103:
  github.com/apache/rocketmq-client-go/v2/internal.(*SubscriptionData).Clone()
      /home/test/go/pkg/mod/github.com/apache/rocketmq-client-go/v2@v2.1.2-0.20230628073434-533de03048e1/internal/model.go:70 +0x1ea
  github.com/apache/rocketmq-client-go/v2/consumer.(*defaultConsumer).SubscriptionDataList.func1()
      /home/test/go/pkg/mod/github.com/apache/rocketmq-client-go/v2@v2.1.2-0.20230628073434-533de03048e1/consumer/consumer.go:457 +0x66
  sync.(*Map).Range()
      /usr/lib/go-1.20/src/sync/map.go:476 +0x1db
  github.com/apache/rocketmq-client-go/v2/consumer.(*defaultConsumer).SubscriptionDataList()
      /home/test/go/pkg/mod/github.com/apache/rocketmq-client-go/v2@v2.1.2-0.20230628073434-533de03048e1/consumer/consumer.go:456 +0x75
  github.com/apache/rocketmq-client-go/v2/consumer.(*pushConsumer).SubscriptionDataList()
      /home/test/go/pkg/mod/github.com/apache/rocketmq-client-go/v2@v2.1.2-0.20230628073434-533de03048e1/consumer/push_consumer.go:351 +0x3d
  github.com/apache/rocketmq-client-go/v2/internal.(*rmqClient).UpdateTopicRouteInfo.func2()
      /home/test/go/pkg/mod/github.com/apache/rocketmq-client-go/v2@v2.1.2-0.20230628073434-533de03048e1/internal/client.go:704 +0x67
  sync.(*Map).Range()
      /usr/lib/go-1.20/src/sync/map.go:476 +0x1db
  github.com/apache/rocketmq-client-go/v2/internal.(*rmqClient).UpdateTopicRouteInfo()
      /home/test/go/pkg/mod/github.com/apache/rocketmq-client-go/v2@v2.1.2-0.20230628073434-533de03048e1/internal/client.go:702 +0x224
  github.com/apache/rocketmq-client-go/v2/consumer.(*pushConsumer).Start()
      /home/test/go/pkg/mod/github.com/apache/rocketmq-client-go/v2@v2.1.2-0.20230628073434-533de03048e1/consumer/push_consumer.go:237 +0xfd
  cockgo_boilerplate/pkg/rocketmq.(*MqClient).StartHandleMsg()
      /home/test/git_repo/cockgo_boilerplate/pkg/rocketmq/mq.go:180 +0x3ef
  cockgo_boilerplate/pkg/rocketmq.RunConsumer.func1()
      /home/test/git_repo/cockgo_boilerplate/pkg/rocketmq/consumer.go:32 +0x144
  cockgo_boilerplate/pkg/rocketmq.RunConsumer.func2()
      /home/test/git_repo/cockgo_boilerplate/pkg/rocketmq/consumer.go:33 +0x58

Previous write at 0x00c000144358 by goroutine 69:
  github.com/apache/rocketmq-client-go/v2/consumer.(*pushConsumer).messageQueueChanged()
      /home/test/go/pkg/mod/github.com/apache/rocketmq-client-go/v2@v2.1.2-0.20230628073434-533de03048e1/consumer/push_consumer.go:505 +0x2ce
  github.com/apache/rocketmq-client-go/v2/consumer.(*pushConsumer).messageQueueChanged-fm()
      <autogenerated>:1 +0xad
  github.com/apache/rocketmq-client-go/v2/consumer.(*defaultConsumer).doBalance.func1()
      /home/test/go/pkg/mod/github.com/apache/rocketmq-client-go/v2@v2.1.2-0.20230628073434-533de03048e1/consumer/consumer.go:419 +0x3d6
  sync.(*Map).Range()
      /usr/lib/go-1.20/src/sync/map.go:476 +0x1db
  github.com/apache/rocketmq-client-go/v2/consumer.(*defaultConsumer).doBalance()
      /home/test/go/pkg/mod/github.com/apache/rocketmq-client-go/v2@v2.1.2-0.20230628073434-533de03048e1/consumer/consumer.go:370 +0x56
  github.com/apache/rocketmq-client-go/v2/consumer.(*defaultConsumer).doBalanceIfNotPaused()
      /home/test/go/pkg/mod/github.com/apache/rocketmq-client-go/v2@v2.1.2-0.20230628073434-533de03048e1/consumer/consumer.go:366 +0x117
  github.com/apache/rocketmq-client-go/v2/consumer.(*pushConsumer).RebalanceIfNotPaused()
      /home/test/go/pkg/mod/github.com/apache/rocketmq-client-go/v2@v2.1.2-0.20230628073434-533de03048e1/consumer/push_consumer.go:335 +0x3d
  github.com/apache/rocketmq-client-go/v2/internal.(*rmqClient).RebalanceIfNotPaused.func1()
      /home/test/go/pkg/mod/github.com/apache/rocketmq-client-go/v2@v2.1.2-0.20230628073434-533de03048e1/internal/client.go:877 +0x48
  sync.(*Map).Range()
      /usr/lib/go-1.20/src/sync/map.go:476 +0x1db
  github.com/apache/rocketmq-client-go/v2/internal.(*rmqClient).RebalanceIfNotPaused()
      /home/test/go/pkg/mod/github.com/apache/rocketmq-client-go/v2@v2.1.2-0.20230628073434-533de03048e1/internal/client.go:875 +0x95
  github.com/apache/rocketmq-client-go/v2/internal.GetOrNewRocketMQClient.func1()
      /home/test/go/pkg/mod/github.com/apache/rocketmq-client-go/v2@v2.1.2-0.20230628073434-533de03048e1/internal/client.go:231 +0x125
  github.com/apache/rocketmq-client-go/v2/internal/remote.(*remotingClient).processCMD.func2()
      /home/test/go/pkg/mod/github.com/apache/rocketmq-client-go/v2@v2.1.2-0.20230628073434-533de03048e1/internal/remote/remote_client.go:244 +0x95
  github.com/apache/rocketmq-client-go/v2/primitive.WithRecover()
      /home/test/go/pkg/mod/github.com/apache/rocketmq-client-go/v2@v2.1.2-0.20230628073434-533de03048e1/primitive/base.go:100 +0x48
  github.com/apache/rocketmq-client-go/v2/internal/remote.(*remotingClient).processCMD.func4()
      /home/test/go/pkg/mod/github.com/apache/rocketmq-client-go/v2@v2.1.2-0.20230628073434-533de03048e1/internal/remote/remote_client.go:243 +0x39

Goroutine 103 (running) created at:
  cockgo_boilerplate/pkg/rocketmq.RunConsumer()
      /home/test/git_repo/cockgo_boilerplate/pkg/rocketmq/consumer.go:31 +0x144
  cockgo_boilerplate/internal/cmd.glob..func7()
      /home/test/git_repo/cockgo_boilerplate/internal/cmd/queue.go:19 +0x24
  github.com/gogf/gf/v2/os/gcmd.(*Command).doRun()
      /home/test/go/pkg/mod/github.com/gogf/gf/v2@v2.4.4/os/gcmd/gcmd_command_run.go:149 +0xd30
  github.com/gogf/gf/v2/os/gcmd.(*Command).RunWithValueError()
      /home/test/go/pkg/mod/github.com/gogf/gf/v2@v2.4.4/os/gcmd/gcmd_command_run.go:91 +0x104
  github.com/gogf/gf/v2/os/gcmd.(*Command).RunWithValue()
      /home/test/go/pkg/mod/github.com/gogf/gf/v2@v2.4.4/os/gcmd/gcmd_command_run.go:41 +0x67
  github.com/gogf/gf/v2/os/gcmd.(*Command).Run()
      /home/test/go/pkg/mod/github.com/gogf/gf/v2@v2.4.4/os/gcmd/gcmd_command_run.go:35 +0x44
  main.main()
      /home/test/git_repo/cockgo_boilerplate/main.go:15 +0x4e

Goroutine 69 (finished) created at:
  github.com/apache/rocketmq-client-go/v2/internal/remote.(*remotingClient).processCMD()
      /home/test/go/pkg/mod/github.com/apache/rocketmq-client-go/v2@v2.1.2-0.20230628073434-533de03048e1/internal/remote/remote_client.go:243 +0x486
  github.com/apache/rocketmq-client-go/v2/internal/remote.(*remotingClient).receiveResponse()
      /home/test/go/pkg/mod/github.com/apache/rocketmq-client-go/v2@v2.1.2-0.20230628073434-533de03048e1/internal/remote/remote_client.go:220 +0x506
  github.com/apache/rocketmq-client-go/v2/internal/remote.(*remotingClient).connect.func1()
      /home/test/go/pkg/mod/github.com/apache/rocketmq-client-go/v2@v2.1.2-0.20230628073434-533de03048e1/internal/remote/remote_client.go:165 +0x3e
  github.com/apache/rocketmq-client-go/v2/primitive.WithRecover()
      /home/test/go/pkg/mod/github.com/apache/rocketmq-client-go/v2@v2.1.2-0.20230628073434-533de03048e1/primitive/base.go:100 +0x48
  github.com/apache/rocketmq-client-go/v2/internal/remote.(*remotingClient).connect.func3()
      /home/test/go/pkg/mod/github.com/apache/rocketmq-client-go/v2@v2.1.2-0.20230628073434-533de03048e1/internal/remote/remote_client.go:164 +0x39
==================

我这是用法不对吗?通常用法是一个进程就消费一个topic吗?我这用法是一个进程消费多个topic

redlsz commented 1 year ago

... runtime.mapassign_faststr() ... ... runtime.mapiterinit() ...

说明并发读写了同一个map,串行调用 Subscribe 订阅多个 topic,最后调用一次 Start 就好了

ilaziness commented 1 year ago

@redlsz 确实是,我以为每次Subscribe 订阅一个主题,后面都要Start,我就一个Subscribe +Start 这样下去的,也没问题,正常消费,就是偶尔启动的时候会报map读写冲突。