peake100 / rogerRabbit-go

wrapper & drop-in replacement for streadway/ampq with automatic reconnects
MIT License
25 stars 3 forks source link

Refactor: Simplify Event Relay Logic #23

Closed peake100 closed 3 years ago

peake100 commented 3 years ago

Right now the logic that advances event relays is very complex, especially the synchronization that occurs between the relay and the transport between disconnects.

This logic can probably be simplified, and such simplifications should be investigated.

peake100 commented 3 years ago

It turns out there is a data race per comments in #37 .

Here is the CI output:

==================
WARNING: DATA RACE
Read at 0x00c00028bf28 by goroutine 41:
  internal/race.Read()
      /opt/hostedtoolcache/go/1.16.0/x64/src/internal/race/race.go:37 +0x206
  sync.(*WaitGroup).Add()
      /opt/hostedtoolcache/go/1.16.0/x64/src/sync/waitgroup.go:71 +0x219
  github.com/peake100/rogerRabbit-go/amqp.(*relaySync).RunLegComplete()
      /home/vsts/work/1/s/amqp/channelEventRelaysSync.go:235 +0xd5
  github.com/peake100/rogerRabbit-go/amqp.(*Channel).runEventRelayCycle()
      /home/vsts/work/1/s/amqp/channelEventRelays.go:96 +0x117
  github.com/peake100/rogerRabbit-go/amqp.(*Channel).runEventRelay()
      /home/vsts/work/1/s/amqp/channelEventRelays.go:107 +0x136

Previous write at 0x00c00028bf28 by goroutine 14:
  internal/race.Write()
      /opt/hostedtoolcache/go/1.16.0/x64/src/internal/race/race.go:41 +0x125
  sync.(*WaitGroup).Wait()
      /opt/hostedtoolcache/go/1.16.0/x64/src/sync/waitgroup.go:128 +0x126
  github.com/peake100/rogerRabbit-go/amqp.(*channelRelaySync).WaitOnSetup()
      /home/vsts/work/1/s/amqp/channelEventRelaysSync.go:105 +0x8f
  github.com/peake100/rogerRabbit-go/amqp.(*Channel).tryReconnect()
      /home/vsts/work/1/s/amqp/channel.go:121 +0x124
  github.com/peake100/rogerRabbit-go/amqp.(*transportManager).reconnectRedialOnce()
      /home/vsts/work/1/s/amqp/transportManagerReconnect.go:12 +0xc7
  github.com/peake100/rogerRabbit-go/amqp.(*transportManager).reconnectRedial()
      /home/vsts/work/1/s/amqp/transportManagerReconnect.go:41 +0xb8
  github.com/peake100/rogerRabbit-go/amqp.(*transportManager).reconnect()
      /home/vsts/work/1/s/amqp/transportManagerReconnect.go:77 +0x124
  github.com/peake100/rogerRabbit-go/amqp.(*transportManager).reconnectListenForClose()
      /home/vsts/work/1/s/amqp/transportManagerReconnect.go:66 +0x128

Goroutine 41 (running) created at:
  github.com/peake100/rogerRabbit-go/amqp.(*Channel).setupAndLaunchEventRelay()
      /home/vsts/work/1/s/amqp/channelEventRelays.go:119 +0x13c
  github.com/peake100/rogerRabbit-go/amqp.channelHandlerBuilder.createNotifyPublish.func1()
      /home/vsts/work/1/s/amqp/channelHandlersBuilder.go:543 +0xab
  github.com/peake100/rogerRabbit-go/amqp/defaultmiddlewares.LoggingMiddlewareChannel.NotifyPublish.func1()
      /home/vsts/work/1/s/amqp/defaultmiddlewares/logging.go:498 +0x1a1
  github.com/peake100/rogerRabbit-go/amqp.(*Channel).NotifyPublish()
      /home/vsts/work/1/s/amqp/channel.go:1155 +0xae
  github.com/peake100/rogerRabbit-go/amqp_test.(*ChannelMethodsSuite).Test0150_NotifyPublish_Reconnections()
      /home/vsts/work/1/s/amqp/channel_test.go:839 +0x2c7
  runtime.call16()
      /opt/hostedtoolcache/go/1.16.0/x64/src/runtime/asm_amd64.s:550 +0x3d
  reflect.Value.Call()
      /opt/hostedtoolcache/go/1.16.0/x64/src/reflect/value.go:337 +0xd8
  github.com/stretchr/testify/suite.Run.func1()
      /home/vsts/go/pkg/mod/github.com/stretchr/testify@v1.7.0/suite/suite.go:158 +0x391
  testing.tRunner()
      /opt/hostedtoolcache/go/1.16.0/x64/src/testing/testing.go:1194 +0x202

Goroutine 14 (running) created at:
  github.com/peake100/rogerRabbit-go/amqp.(*transportManager).reconnect()
      /home/vsts/work/1/s/amqp/transportManagerReconnect.go:90 +0x284
  github.com/peake100/rogerRabbit-go/amqp.(*transportManager).reconnectListenForClose()
      /home/vsts/work/1/s/amqp/transportManagerReconnect.go:66 +0x128
==================
peake100 commented 3 years ago

Closed by #38