mdevilliers / SignalR.RabbitMq

MessageBus implementation using RabbitMq as the backing store.
MIT License
89 stars 40 forks source link

SignalR client receives same message repeatedly #43

Closed chazt3n closed 8 years ago

chazt3n commented 8 years ago

I'm having real difficulty getting this to work, can someone please glance at this and see if something is wrong in the setup?

Here's my test:

I spin up two self hosted servers, configured to the same RabbitMqScaleoutConfiguration

 [Test]
        public void BasicBackplaneTest()
        {
            SubsciberTestServerNode nodeA = null;
            SubsciberTestServerNode nodeB = null;

            string messageA = null;
            string messageB = null;
            try
            {
                Log("Given I have a WorkCenter Dispatch Publisher");
                var publisher = new WcDispatchPublisher(ConnectionString);

                Log("And I have multiple server nodes subscribed to the backplane");
                nodeA = new SubsciberTestServerNode("nodeA").Start().Result;
                nodeB = new SubsciberTestServerNode("nodeB").Start().Result;

                Log("And I wait 5 seconds");
                Thread.Sleep(5000);

                Log("When I publish a message: {0}", TestPayload);
                publisher.Publish(TestPayload);

                Log("And I wait 60 seconds");
                Thread.Sleep(TimeSpan.FromSeconds(60));

                messageA = nodeA.Message;
                messageB = nodeB.Message;
            }
            catch (AggregateException exception)
            {
                Log("Exception Occurred: {0}", exception.Flatten().Message);
                Exception = exception;
            }
            catch (Exception exception)
            {
                Log("Exception Occurred: {0}", exception.Message);
                Exception = exception;
            }
            finally
            {
                nodeA?.Dispose();
                nodeB?.Dispose();

                Log("Then no exceptions should have been thrown.");
                Exception.Should().BeNull();

                Log("Then the message should have been added to the Message Queue");
                messageA.Should().NotBeNullOrWhiteSpace();
                messageB.Should().NotBeNullOrWhiteSpace();
            }

Server:

 internal class SubsciberTestServerNode : IDisposable
    {
        private readonly string _nodeName;
        private readonly string _url;
        private WcDispatchSubscriber _subscriber;
        private IDisposable _webApp;

        public SubsciberTestServerNode(string nodeName)
        {
            _nodeName = nodeName;
            _url = $"http://localhost:9999/{nodeName}";
            MessageList = new List<string>();
        }

        public string Message { get; set; }
        public List<string> MessageList { get; set; }

        public void Dispose()
        {
            if (_webApp != null)
            {
                _webApp.Dispose();
                _webApp = null;
                _subscriber.Dispose();
                _subscriber = null;
            }
        }

        public async Task<SubsciberTestServerNode> Start()
        {
            _webApp = WebApp.Start(_url, app =>
            {
                new Startup(_nodeName).Configuration(app);
                Thread.Sleep(TimeSpan.FromSeconds(5));
                //Place this code into your Application_Start() method.
                var factory = new ConnectionFactory
                {
                    UserName = "guest",
                    Password = "guest",
                    HostName = "localhost"
                };

                var exchangeName = "WC_LeadDispatch_Exchange";

                var configuration = new RabbitMqScaleoutConfiguration(factory, exchangeName);
                GlobalHost.DependencyResolver.UseRabbitMq(configuration);
                GlobalHost.Configuration.TransportConnectTimeout = TimeSpan.FromSeconds(10);

                Thread.Sleep(TimeSpan.FromSeconds(5));
            });

            _subscriber = new WcDispatchSubscriber();
            await _subscriber.Subscribe(_url, msg =>
            {
                string message = $"Message received at Node: {_nodeName}. Message: {msg}.";
                Console.WriteLine(message);
                Message = message;
                MessageList.Add(message);
            });
            return this;
        }
    }

Subscriber:

public class WcDispatchSubscriber : IDisposable
    {
        private const string HubName = "DispatchUpdateHub";
        private const string MessageEventName = "addMessage";
        private readonly int _connectionLimitInt;
        private IDisposable _hubProxySubscription;

        public WcDispatchSubscriber()
        {
            string connectionLimit = ConfigurationManager.AppSettings.Get("SignalRConnectionLimit");
            int.TryParse(connectionLimit, out _connectionLimitInt);
            _connectionLimitInt = _connectionLimitInt == 0 ? 100 : _connectionLimitInt;
        }

        public void Dispose()
        {
            _hubProxySubscription.Dispose();
        }

        public async Task Subscribe(string hubConnectionString, Action<string> messageReceived)
        {
            var hubConnection = new HubConnection(hubConnectionString);
            IHubProxy dispatchHubProxy = hubConnection.CreateHubProxy(HubName);
            _hubProxySubscription = dispatchHubProxy.On(MessageEventName, messageReceived);
            ServicePointManager.DefaultConnectionLimit = _connectionLimitInt;
            await hubConnection.Start();
        }
    }

Publisher:

 public class WcDispatchPublisher
    {
        private const string ExchangeName = "WC_LeadDispatch_Exchange";
        private readonly IHubContext _hubContext;

        public WcDispatchPublisher(string connectionString)
        {
            //actual string will look like this.  we may need to overload the other constructors in the Rabbit/SigR.
            //_rabbitConnectionString =
            //  "host=cprmqsrvt02vn01:5672;publisherConfirms=true;username=unittest;password=Un1t735t;virtualhost=UnitTest-NotificationService";
            var configuration = new RabbitMqScaleoutConfiguration(connectionString, ExchangeName);
            GlobalHost.DependencyResolver.UseRabbitMq(configuration);

            _hubContext = GlobalHost.ConnectionManager.GetHubContext<DispatchUpdateHub>();
        }

        /// <summary>
        /// </summary>
        /// <param name="payload"></param>
        public void Publish(string payload)
        {
            Task.Factory.StartNew(() =>
            {
                _hubContext.Clients.All.addMessage(payload);
            }).Wait();
        }
    }

this will work every say 12th run or so, normally here's what I get:

Given I have a WorkCenter Dispatch Publisher
And I have multiple server nodes subscribed to the backplane
And I wait 5 seconds
When I publish a message: test-payload
And I wait 60 seconds
Message received at Node: nodeA. Message: test-payload.
Message received at Node: nodeB. Message: test-payload.
Message received at Node: nodeB. Message: {"LeadId":37252,"DispatchId":153595,"NotificationTypeId":1,"NotificationTitle":"Leads Dispatch","DispatchDateTime":"2016-06-16T17:16:26.187","AlertMessage":"Lead number 37252 dispatched 6/16/2016 at 7:16 AM CST","DispatchedToFranchise":5576,"FranchiseOpsId":130}.
Message received at Node: nodeB. Message: {"LeadId":37252,"DispatchId":153595,"NotificationTypeId":1,"NotificationTitle":"Leads Dispatch","DispatchDateTime":"2016-06-16T17:16:26.187","AlertMessage":"Lead number 37252 dispatched 6/16/2016 at 7:16 AM CST","DispatchedToFranchise":5576,"FranchiseOpsId":130}.
Message received at Node: nodeB. Message: {"LeadId":37252,"DispatchId":153595,"NotificationTypeId":1,"NotificationTitle":"Leads Dispatch","DispatchDateTime":"2016-06-16T17:16:26.187","AlertMessage":"Lead number 37252 dispatched 6/16/2016 at 7:16 AM CST","DispatchedToFranchise":5576,"FranchiseOpsId":130}.
Message received at Node: nodeB. Message: {"LeadId":37252,"DispatchId":153595,"NotificationTypeId":1,"NotificationTitle":"Leads Dispatch","DispatchDateTime":"2016-06-16T17:16:26.187","AlertMessage":"Lead number 37252 dispatched 6/16/2016 at 7:16 AM CST","DispatchedToFranchise":5576,"FranchiseOpsId":130}.
Message received at Node: nodeB. Message: {"LeadId":37252,"DispatchId":153595,"NotificationTypeId":1,"NotificationTitle":"Leads Dispatch","DispatchDateTime":"2016-06-16T17:16:26.187","AlertMessage":"Lead number 37252 dispatched 6/16/2016 at 7:16 AM CST","DispatchedToFranchise":5576,"FranchiseOpsId":130}.
Message received at Node: nodeB. Message: {"LeadId":37252,"DispatchId":153595,"NotificationTypeId":1,"NotificationTitle":"Leads Dispatch","DispatchDateTime":"2016-06-16T17:16:26.187","AlertMessage":"Lead number 37252 dispatched 6/16/2016 at 7:16 AM CST","DispatchedToFranchise":5576,"FranchiseOpsId":130}.
Message received at Node: nodeB. Message: {"LeadId":37252,"DispatchId":153595,"NotificationTypeId":1,"NotificationTitle":"Leads Dispatch","DispatchDateTime":"2016-06-16T17:16:26.187","AlertMessage":"Lead number 37252 dispatched 6/16/2016 at 7:16 AM CST","DispatchedToFranchise":5576,"FranchiseOpsId":130}.
Message received at Node: nodeB. Message: {"LeadId":37252,"DispatchId":153595,"NotificationTypeId":1,"NotificationTitle":"Leads Dispatch","DispatchDateTime":"2016-06-16T17:16:26.187","AlertMessage":"Lead number 37252 dispatched 6/16/2016 at 7:16 AM CST","DispatchedToFranchise":5576,"FranchiseOpsId":130}.
Message received at Node: nodeB. Message: {"LeadId":37252,"DispatchId":153595,"NotificationTypeId":1,"NotificationTitle":"Leads Dispatch","DispatchDateTime":"2016-06-16T17:16:26.187","AlertMessage":"Lead number 37252 dispatched 6/16/2016 at 7:16 AM CST","DispatchedToFranchise":5576,"FranchiseOpsId":130}.
Message received at Node: nodeB. Message: {"LeadId":37252,"DispatchId":153595,"NotificationTypeId":1,"NotificationTitle":"Leads Dispatch","DispatchDateTime":"2016-06-16T17:16:26.187","AlertMessage":"Lead number 37252 dispatched 6/16/2016 at 7:16 AM CST","DispatchedToFranchise":5576,"FranchiseOpsId":130}.
Message received at Node: nodeB. Message: {"LeadId":37252,"DispatchId":153595,"NotificationTypeId":1,"NotificationTitle":"Leads Dispatch","DispatchDateTime":"2016-06-16T17:16:26.187","AlertMessage":"Lead number 37252 dispatched 6/16/2016 at 7:16 AM CST","DispatchedToFranchise":5576,"FranchiseOpsId":130}.
Message received at Node: nodeB. Message: {"LeadId":37252,"DispatchId":153595,"NotificationTypeId":1,"NotificationTitle":"Leads Dispatch","DispatchDateTime":"2016-06-16T17:16:26.187","AlertMessage":"Lead number 37252 dispatched 6/16/2016 at 7:16 AM CST","DispatchedToFranchise":5576,"FranchiseOpsId":130}.
Message received at Node: nodeB. Message: {"LeadId":37252,"DispatchId":153595,"NotificationTypeId":1,"NotificationTitle":"Leads Dispatch","DispatchDateTime":"2016-06-16T17:16:26.187","AlertMessage":"Lead number 37252 dispatched 6/16/2016 at 7:16 AM CST","DispatchedToFranchise":5576,"FranchiseOpsId":130}.
... CONTINUES LIKE THIS FOREVER
mdevilliers commented 8 years ago

Well it all looks good to me and the fact it does work when it works is positive.

What happens when it doesn't work? Is there any output or exceptions?

The only thing I can think of is that I've never ran two connections in the same process - maybe there is a race? If you try registering another scaleout provider e.g. Redis and see if the same behaviour happens?

Mark

chazt3n commented 8 years ago

Great idea Mark, I'll stand up a SQL backplane and check that out, thank you sir for your time.

mdevilliers commented 8 years ago

Have you had a chance you try out another backplane or chase down the bug a bit more?