Azure / azure-relay-node

☁️Node.js library for Azure Relay Hybrid Connections
https://docs.microsoft.com/en-us/azure/service-bus-relay/relay-what-is-it
MIT License
12 stars 15 forks source link

Question About One endpoint with multiple listeners #55

Closed tianhao199703 closed 4 years ago

tianhao199703 commented 4 years ago

Description

Actual Behavior

Starting one listener separately on two different machines named A and B, and then starting the sender service, there will be a cross-connection situation. That is, A's listener and B's sender are connected, and B's listener and A's sender are connected.

Expected Behavior

I want to make sure A's sender connect to A's listener, and B's sender connect to B's listener for each time. What should I do?

dlstucki commented 4 years ago

The behavior you mention is really by design. Hopefully the exact scenario you describe, sender and listener needing to talk to/from same machine is just a simplification for this discussion. Otherwise I'd ask why you need the extra complexity, hops, and latency relay would introduce in the scenario.

There is a feature of HybridConnections (and WCF Relay) called AllowedListeners/DisallowedListeners which could be used to make your scenario possible provided you have some way to get the listener's Guid (aka TrackingId) from the listener to the sender. I don't have any node.js sample code but the listener provides its ID/TrackingId when it connects to the cloud service then senders can specify those same IDs in HTTP Headers named Microsoft-Relay-AllowedListeners and Microsoft-Relay-DisallowedListeners. These headers can contain Guids of listeners separated by commas. Here's a C# example (only one I have):

        static async Task SendToListenerAsync(Uri hybridHttpUri, SecurityToken token, IEnumerable<Guid> allowedListenerIDs, IEnumerable<Guid> disallowedListenerIDs, CancellationToken cancelToken)
        {
            try
            {
                var httpRequest = (HttpWebRequest)WebRequest.Create(hybridHttpUri);
                using (var abortRegistration = cancelToken.Register(() => httpRequest.Abort()))
                {
                    httpRequest.Method = HttpMethod.Get.Method;
                    httpRequest.Headers["ServiceBusAuthorization"] = token.TokenString;

                    if (allowedListenerIDs != null)
                    {
                        // ***** Specify Guids of listeners to attempt when connecting this client/request *****
                        // ***** If more than one is specified order is not guaranteed                     *****
                        httpRequest.Headers["Microsoft-Relay-AllowedListeners"] = string.Join(",", allowedListenerIDs);
                    }

                    if (disallowedListenerIDs != null)
                    {
                        // ***** Specify Guids of listeners to NOT consider when connecting this client/request *****
                        httpRequest.Headers["Microsoft-Relay-DisallowedListeners"] = string.Join(",", disallowedListenerIDs);
                    }

                    using (var httpResponse = (HttpWebResponse)(await httpRequest.GetResponseAsync()))
                    {
                        Assert.AreEqual(HttpStatusCode.OK, httpResponse.StatusCode);
                        await httpResponse.GetResponseStream().CopyToAsync(Stream.Null);
                    }
                }
            }
            catch (WebException webException)
            {
                var httpResponse = webException.Response as HttpWebResponse;
                if (httpResponse != null)
                {
                    LogHttpResponse(httpResponse);
                }

                throw;
            }
        }

        async Task HybridConnectionRequest_DotNetApi_AllowedListeners()
        {
            const int ListenerCount = 3;
            string path = "testhybridconnection";
            Uri serviceBusUri = new Uri($"sb://{host}/{path}");
            Uri httpUri = new Uri($"https://{host}/{path}");
            var token = await tokenProvider.GetTokenAsync(httpUri.AbsoluteUri, TimeSpan.FromMinutes(20));

            var listenerInfos = new List<ListenerInfo>();
            var cts = new CancellationTokenSource(TimeSpan.FromSeconds(60));
            try
            {
                for (int i = 0; i < ListenerCount; i++)
                {
                    var listener = new HybridConnectionListener(serviceBusUri, tokenProvider);
                    var listenerInfo = new ListenerInfo(listener, i);

                    listenerInfos.Add(listenerInfo);
                    listener.RequestHandler = async (context) =>
                    {
                        listenerInfo.IncrementRequestCount();
                        byte[] responseBytes = Encoding.ASCII.GetBytes($"Listener{listenerInfo.Index} response. RequestCount:{listenerInfo.RequestCount}");
                        context.Response.OutputStream.Write(responseBytes, 0, responseBytes.Length);
                        await context.Response.CloseAsync();
                    };
                }

                Log.Info("Opening HybridConnectionListeners");
                await listenerInfos.ParallelForEachAsync(info => info.Listener.OpenAsync(cts.Token));

                // Send to each specific listenerId
                for (int i = 0; i < listenerInfos.Count; i++)
                {
                    var listenerInfo = listenerInfos[i];
                    for (int j = 0; j <= i; j++)
                    {
                        // Send 1 to listener 0, 2 to listener 1, etc.
                        await SendToListenerAsync(httpUri, token, new[] { listenerInfo.Listener.TrackingContext.ActivityId }, null, cts.Token);
                    }
                }

                DisplayListenerInvocationCounts(listenerInfos);
                for (int i = 0; i < listenerInfos.Count; i++)
                {
                    var listener = listenerInfos[i];
                    Assert.AreEqual(i + 1, listener.RequestCount, $"{listener.Listener} Invocation count is not correct");
                }

                Log.Info("Send to a ListenerId which doesn't exist");
                WebException webException = await AssertThrowsAsync<WebException>(
                    () => SendToListenerAsync(httpUri, token, new[] { Guid.NewGuid() }, null, cts.Token));
                HttpWebResponse webResponse = VerifyHttpResponse(webException, HttpStatusCode.NotFound, path, false);
                AssertTrackable(webResponse.StatusDescription, "None of the connected listeners meet the AllowedListeners/DisallowedListeners criteria");

                Log.Info("Send allowing only the first 2 listeners");
                IEnumerable<ListenerInfo> firstTwoListeners = listenerInfos.Take(2);
                int firstTwoCountBefore = firstTwoListeners.Sum(l => l.RequestCount);
                var allowedListeners = firstTwoListeners.Select(l => l.Listener.TrackingContext.ActivityId);
                Log.Info($"AllowedListenerIDs = \"{string.Join(",", allowedListeners)}\"");
                await SendToListenerAsync(httpUri, token, allowedListeners, null, cts.Token);

                DisplayListenerInvocationCounts(listenerInfos);
                int firstTwoCountAfter = firstTwoListeners.Sum(l => l.RequestCount);
                Assert.AreEqual(firstTwoCountBefore + 1, firstTwoCountAfter, "Single sent message should have gone to one of the first 2 listeners");
            }
            finally
            {
                cts.Dispose();
                if (listenerInfos.Count > 0)
                {
                    await listenerInfos.ParallelForEachAsync(info => SafeCloseAsync(info.Listener));
                }
            }
        }

(Here are a few utility classes/methods if anyone wants to get the above code to work.)

        class ListenerInfo
        {
            int requestCount;

            public ListenerInfo(HybridConnectionListener listener, int index)
            {
                this.Listener = listener;
                this.Index = index;
            }

            public HybridConnectionListener Listener { get; }
            public int Index { get; }
            public int RequestCount { get { return this.requestCount; } }

            public void IncrementRequestCount() => Interlocked.Increment(ref this.requestCount);
        }

        static void DisplayListenerInvocationCounts(IEnumerable<ListenerInfo> listeners)
        {
            foreach (ListenerInfo listenerInfo in listeners)
            {
                Log.Info($"{listenerInfo.Listener}, Invocation count: {listenerInfo.RequestCount}");
            }
        }

        static async Task SafeCloseAsync(HybridConnectionListener listener)
        {
            if (listener != null)
            {
                try
                {
                    Log.Info($"Closing {listener}");
                    await listener.CloseAsync(TimeSpan.FromSeconds(10));
                }
                catch (Exception e)
                {
                    Log.Warn($"Error closing {listener} {e}");
                }
            }
        }
tianhao199703 commented 4 years ago

@dlstucki Thanks for your kindly reply! Unfortunately, I'm not familiar with C#. Where can I find README file for nodejs? Or Where can I find description for this feature("AllowedListeners/DisallowedListeners")?

dlstucki commented 4 years ago

At this time we don't have any more info about this feature in a README for nodejs. C# examples are all we have.