elastic / elasticsearch-net

This strongly-typed, client library enables working with Elasticsearch. It is the official client maintained and supported by Elastic.
https://www.elastic.co/guide/en/elasticsearch/client/net-api/current/index.html
Apache License 2.0
17 stars 1.15k forks source link

Nest *potentially* has problems scaling in Azure #2364

Closed ghost closed 7 years ago

ghost commented 8 years ago

NEST/Elasticsearch.Net version: 2.3.3

Description of the problem including expected versus actual behavior:

We are using NEST which is hosted in a continuously running Azure WebJob(or simply a console app) which indexes data that is posted or putted to an Azure App Service(or simply WebApi) after creates or updates are written to an Azure SQL database. We then have features which fully leverage the advanced search capabilities in Elastic without having to roll huge ass contention based queries in SQL.

To add to the mix we are also using a pub/sub mechanism using Azure ServiceBus to communicate the indexing command messages from the WebApi to the WebJob using topics/queues which in turn engages the NEST client in the messaging handler and then pushes an upsert to an elastic cluster hosted in AWS using the Elastic Cloud offering.

The thing is everything works fine, until we scale the system up and out and then start running load tests.

The real problem is the razor thin PAAS infrastructure in Azure when it comes to the maximum number of sockets you can have at any given time. Even on the P1 tier we appear to be breaching the 2000 odd socket limit when we stress test the system. Let me be clear when I say we have engaged Microsoft on a multitude of issues and they have been "kind of" helpful. In the end, we had to stress test each cloud component in Azure in isolation and fix all the problems along the way. The recurring theme we found was the following exception:

System.Net.Sockets.SocketException: An attempt was made to access a socket in a way forbidden by its access permissions.

When you google this, we found it to be quite nebulous in its nature. After repeated workflows of raising support calls with Microsoft and reverse engineering their code we found the problem to be eager creation(or abuse even) of the HttpClient which were not disposable with unbounded parallelism issues. The answer there was to convert all socket bound creation logic to singletons. So far this has unlocked our scalability issue to a point where the only remaining problem right now is our writes to Elastic. This has luckily only taken months not years.

Tonight I had a look through your code and found that you are creating HttpClients on the basis of a hash coding algorithm which is implemented in your Elasticsearch.Net.RequestData object. This is in turn consumed by Elasticsearch.Net.HttpConnection which caches these HttpClients in ConcurrentDictionary. If we follow the recommendation of this link:

https://www.elastic.co/guide/en/elasticsearch/client/net-api/current/lifetimes.html

Whereby we make the ElasticClient a singleton, how do we restrict the number of HttpClients created by the HttpConnection object? This in turn would solve our scalability problem and make NEST a first class citizen in Azure. If you guys can help me out this would make your project plug and play with Azure and it's scalability offerings. In all truth I would have used AWS but that was not my decision. Sorry for not posting a repro(it is quite complex) but hopefully we can work together to find a solution.

russcam commented 8 years ago

Sounds very familiar to architectures I've put together before on Azure and AWS 😄

@fir3pho3nixx Are you running on desktop CLR or .NET Core?

There are two versions of a IConnnection that make HTTP calls:

  1. HttpConnection.cs used by Desktop CLR (.NET 4.5+) that uses System.Net.WebRequest to make HTTP calls
  2. HttpConnection-CoreFx.cs used by .NET Core that uses System.Net.Http.HttpClient to make HTTP calls, caching the HttpClient instance based on a hashcode of the RequestData that takes into account the following:
        public override int GetHashCode()
        {
            unchecked
            {
                var hashCode = RequestTimeout.GetHashCode();
                hashCode = (hashCode*397) ^ PingTimeout.GetHashCode();
                hashCode = (hashCode*397) ^ KeepAliveTime;
                hashCode = (hashCode*397) ^ KeepAliveInterval;
                hashCode = (hashCode*397) ^ (RunAs?.GetHashCode() ?? 0);
                hashCode = (hashCode*397) ^ Pipelined.GetHashCode();
                hashCode = (hashCode*397) ^ HttpCompression.GetHashCode();
                hashCode = (hashCode*397) ^ (Headers?.GetHashCode() ?? 0);
                hashCode = (hashCode*397) ^ (ProxyAddress?.GetHashCode() ?? 0);
                hashCode = (hashCode*397) ^ (ProxyUsername?.GetHashCode() ?? 0);
                hashCode = (hashCode*397) ^ (ProxyPassword?.GetHashCode() ?? 0);
                hashCode = (hashCode*397) ^ DisableAutomaticProxyDetection.GetHashCode();
                hashCode = (hashCode*397) ^ (BasicAuthorizationCredentials?.GetHashCode() ?? 0);
                hashCode = (hashCode*397) ^ (ConnectionSettings?.GetHashCode() ?? 0);
                hashCode = (hashCode*397) ^ (MemoryStreamFactory?.GetHashCode() ?? 0);
                return hashCode;
            }
        }

If you are using .NET Core, would the above settings change on a request by request basis for the client running in the WebJob? If they don't then only the single instance will be created.

If you prefer, you can implement your own IConnection and use this with the client:

public class MyConnectionWithClientCertificates : HttpConnection
{
    protected override HttpWebRequest CreateHttpWebRequest(RequestData requestData)
    {
        var request = base.CreateHttpWebRequest(requestData);
        request.ClientCertificates.Add(new X509Certificate());
        return request;
    }
}

var node = new Uri("http://mynode.example.com:8082/apiKey");
var pool = new SingleNodeConnectionPool(node);
var connection = new MyConnectionWithClientCertificates();
var settings = new ConnectionSettings(pool, connection);
var client = new ElasticClient(settings);

Example here adds a certificate to requests, but you could write a HttpConnection that creates a single instance of HttpClient internally and uses that.

ghost commented 8 years ago

Just checked and unfortunately it seems we are using the Desktop CLR version(4.6). When we last looked at making the leap to core we couldn't because certain Azure dependencies we were using were not compatible at the time(can't remember which ones). They might have caught up by now.

NuGet also very quickly turned into a shit show with upgrades because of assembly binding errors that could not resolve duplicate assemblies between the .\bin folder and what was coming out of the desktop framework directories. We solved this by moving over to paket(fsharps package manager) whereby we could pin certain nuget versions.

Thanks for the tip. I might just plagiarise your core version for now and copy it into our solution until we can we can make the leap to core after our imminent go live.

Will feed back to you on the results after I have given this a go tomorrow. 👍

russcam commented 8 years ago

Just checked and unfortunately it seems we are using the Desktop CLR version(4.6).

That might not be too bad, at least until HttpClient on top of libuv is mature. If I recall correctly, the last stable HttpClientHandler uses HttpWebRequest under the covers.

We solved this by moving over to paket(fsharps package manager) whereby we could pin certain nuget versions.

Awesome! We've been using it for sometime 😄

Thanks for the tip. I might just plagiarise your core version for now and copy it into our solution until we can we can make the leap to core after our imminent go live.

Feel free to do so; it's under Apache 2

If you come across any interesting findings, performance improvements, etc. feel free to open an issue to discuss.

ghost commented 8 years ago

A little feed back on my efforts so far. I copied the dotnet core version of the HttpConnection and adapted it to Desktop CLR (4.6).

The first problem I ran into was a deadlock for the following code segment:

public virtual ElasticsearchResponse<TReturn> Request<TReturn>(RequestData requestData) where TReturn : class
{
    var client = this.GetClient(requestData);

    var builder = new ResponseBuilder<TReturn>(requestData);

    try
    {
        var requestMessage = CreateHttpRequestMessage(requestData);

        var response = client.SendAsync(requestMessage, requestData.CancellationToken)
            .GetAwaiter().GetResult(); // <- Deadlock here in .NET 4.6

        builder.StatusCode = (int)response.StatusCode;

        if (response.Content != null)
            builder.Stream = response.Content.ReadAsStreamAsync().GetAwaiter().GetResult();
    }
    catch (HttpRequestException e)
    {
        builder.Exception = e;
    }

    return builder.ToResponse();
}

I managed to stop this from happening by using Stephen Clearey's Nito.AsyncEx nuget package by changing the line from:

var response = client.SendAsync(requestMessage, requestData.CancellationToken).GetAwaiter().GetResult();

To:

var response = AsyncContext.Run(() => client.SendAsync(requestMessage, requestData.CancellationToken));

I am afraid this prevents me from raising a PR as having to introduce this dependency into this library just does not feel right. We are about to commence the load test and will be running it for 48 hours. Will report back on the results and paste the stress tested version of the azure scalable friendly version of the HttpConnection once I am done. Look forward to your feedback.

russcam commented 8 years ago

Interesting, we don't see this deadlock issue in the integration tests; it could however be because of a different synchronization context. Did the deadlock happen when running in Desktop CLR 4.6 targeted MVC web application? If you could provide as much environment details on this, I can take a look.

russcam commented 8 years ago

@fir3pho3nixx Can you provide a small reproducible example of the deadlock? I've tried to reproduce a deadlock with

In all, I have used the HttpConnection taken from NEST .NET Core and made a call to the synchronous .Search<T>() method from both synchronous and asynchronous methods but have been unable to get it to deadlock on the line you've commented above. I can get deadlocks waiting or calling result on an async method that internally awaits an async method that captures synchronization context, for example, in Windows Forms

public partial class Form1 : Form
{
    private readonly IElasticClient _client;

    public Form1(IElasticClient client)
    {
        _client = client;
        InitializeComponent();
    }

    private async void button1_Click(object sender, EventArgs e)
    {
        var searchResponse = GoAsync().Result; // initiate the deadlock....

        this.textBox1.Clear();
        this.textBox1.Text = JsonConvert.SerializeObject(searchResponse.Documents);
    }

    private async Task<ISearchResponse<dynamic>> GoAsync()
    {
        await Task.Delay(1000); // <- ...DEADLOCK HERE

        var searchResponse = _client.Search<dynamic>(s => s.AllIndices().AllTypes());

        await Task.Delay(1000);

        return searchResponse;
    }
}

but cannot get one to happen as reported. An example here would be super helpful as we're looking to push new releases out shortly.

ghost commented 8 years ago

Will build you a way of reproducing it over the weekend. Also have not had time to dig into Nito to work out why it solves the problem. Hope to get back to you soon.

ghost commented 8 years ago

Hi Rus

Sorry I have not posted a repro yet. It is important to note that I have bastardized a CoreFX version of your HttpConnection class and that I am using it in a desktop CLR scenario. The deadlock could be because of something I have done.

WebRequest's in your desktop CLR version are causing socket pressure in our environment which causes denial of service under load. So I opted for using the HttpClient as a singelton which yields dividends. The problem was the awaiter deadlocked when I copied your code and changed it.

I post the bastardized version here:

public class CoreFxHttpConnection : IConnection
    {
        private readonly ILog _log = LogManager.GetLogger(typeof(CoreFxHttpConnection)); 

        private readonly object _lock = new object();

        private HttpClient _clients = null;

        private HttpClient GetClient(RequestData requestData)
        {
            var hashCode = requestData.GetHashCode();

            HttpClient client;

            if (this._clients != null) return _clients;

            lock (_lock)
            {
                var handler = CreateHttpClientHandler(requestData);

                _log.Warn("Creating new http client for elastic search ... if this happens often we have a problem.");

                client = new HttpClient(handler, false)
                {
                    Timeout = requestData.RequestTimeout
                };

                client.DefaultRequestHeaders.ExpectContinue = false;

                this._clients = client;

                return client;
            }

        }

        public virtual ElasticsearchResponse<TReturn> Request<TReturn>(RequestData requestData) where TReturn : class
        {
            var client = this.GetClient(requestData);

            var builder = new ResponseBuilder<TReturn>(requestData);

            try
            {
                var requestMessage = CreateHttpRequestMessage(requestData);

                var response = AsyncContext.Run(() => client.SendAsync(requestMessage, requestData.CancellationToken));

                builder.StatusCode = (int)response.StatusCode;

                if (response.Content != null)
                    builder.Stream = response.Content.ReadAsStreamAsync().GetAwaiter().GetResult();
            }
            catch (HttpRequestException e)
            {
                builder.Exception = e;
            }

            return builder.ToResponse();
        }

        public virtual async Task<ElasticsearchResponse<TReturn>> RequestAsync<TReturn>(RequestData requestData) where TReturn : class
        {
            var client = this.GetClient(requestData);

            var builder = new ResponseBuilder<TReturn>(requestData);

            try
            {
                var requestMessage = CreateHttpRequestMessage(requestData);

                var response = await client.SendAsync(requestMessage, requestData.CancellationToken).ConfigureAwait(false);

                builder.StatusCode = (int)response.StatusCode;

                if (response.Content != null)
                    builder.Stream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false);
            }
            catch (HttpRequestException e)
            {
                builder.Exception = e;
            }

            return await builder.ToResponseAsync().ConfigureAwait(false);
        }

        protected virtual HttpClientHandler CreateHttpClientHandler(RequestData requestData)
        {
            var handler = new HttpClientHandler { };

            if (!string.IsNullOrEmpty(requestData.ProxyAddress))
            {
                var uri = new Uri(requestData.ProxyAddress);

                var proxy = new CoreFxWebProxy(uri);

                var credentials = new NetworkCredential(requestData.ProxyUsername, requestData.ProxyPassword);

                proxy.Credentials = credentials;

                handler.Proxy = proxy;
            }

            if (requestData.DisableAutomaticProxyDetection)
                handler.Proxy = null;

            return handler;
        }

        protected virtual HttpRequestMessage CreateHttpRequestMessage(RequestData requestData)
        {
            var request = this.CreateRequestMessage(requestData);

            SetBasicAuthenticationIfNeeded(request, requestData);

            return request;
        }

        protected virtual HttpRequestMessage CreateRequestMessage(RequestData requestData)
        {
            var method = ConvertHttpMethod(requestData.Method);

            var requestMessage = new HttpRequestMessage(method, requestData.Uri);

            foreach (string key in requestData.Headers)
            {
                requestMessage.Headers.TryAddWithoutValidation(key, requestData.Headers.GetValues(key));
            }

            requestMessage.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue(requestData.ContentType));

            if (! string.IsNullOrEmpty(requestData.RunAs))
                requestMessage.Headers.Add("es-shield-runas-user", requestData.RunAs);

            var data = requestData.PostData;

            if (data != null)
            {
                var stream = requestData.MemoryStreamFactory.Create();

                requestMessage.Content = new StreamContent(stream);

                if (requestData.HttpCompression)
                {
                    requestMessage.Content.Headers.Add("Content-Encoding", "gzip");

                    requestMessage.Headers.AcceptEncoding.Add(new StringWithQualityHeaderValue("gzip"));

                    requestMessage.Headers.AcceptEncoding.Add(new StringWithQualityHeaderValue("deflate"));

                    using (var zipStream = new GZipStream(stream, CompressionMode.Compress, true))
                        data.Write(zipStream, requestData.ConnectionSettings);
                }
                else
                    data.Write(stream, requestData.ConnectionSettings);

                stream.Position = 0;
            }

            if (requestMessage.Content != null)
                requestMessage.Content.Headers.ContentType = new MediaTypeHeaderValue(requestData.ContentType);

            return requestMessage;
        }

        protected virtual void SetBasicAuthenticationIfNeeded(HttpRequestMessage requestMessage, RequestData requestData)
        {
            string userInfo = null;

            if (!string.IsNullOrEmpty(requestData.Uri.UserInfo))
                userInfo = Uri.UnescapeDataString(requestData.Uri.UserInfo);
            else if (requestData.BasicAuthorizationCredentials != null)
                userInfo = requestData.BasicAuthorizationCredentials.ToString();

            if (!string.IsNullOrEmpty(userInfo))
            {
                var credentials = Convert.ToBase64String(Encoding.UTF8.GetBytes(userInfo));
                requestMessage.Headers.Authorization = new AuthenticationHeaderValue("Basic", credentials);
            }
        }

        private static System.Net.Http.HttpMethod ConvertHttpMethod(HttpMethod httpMethod)
        {
            switch (httpMethod)
            {
                case HttpMethod.GET: return System.Net.Http.HttpMethod.Get;
                case HttpMethod.POST: return System.Net.Http.HttpMethod.Post;
                case HttpMethod.PUT: return System.Net.Http.HttpMethod.Put;
                case HttpMethod.DELETE: return System.Net.Http.HttpMethod.Delete;
                case HttpMethod.HEAD: return System.Net.Http.HttpMethod.Head;
                default:
                    throw new ArgumentException("Invalid value for HttpMethod", nameof(httpMethod));
            }
        }

        void IDisposable.Dispose() => this.DisposeManagedResources();

        protected virtual void DisposeManagedResources()
        {
            //_clients.Dispose();
        }
    }

Furthermore, I found that the hashcoding baked into this class to do with the concurrent dictionary was spinning up way more HttpClients then I expected, so I had to rip that out and brute force a singleton. After that things were good again.

We did run into some more NuGet drama with JSON.NET which we have not figured out yet. Will save this for another issue.

The goal for us right now is back porting the HttpClient implementation into the desktop CLR version and irradicating the dirty filthy WebRequest implementation which is vunerable to socket pressure. If you need to have call a call will be happy to walk you through it in more detail.

Mpdreamz commented 8 years ago

Hey @fir3pho3nixx

The socket pressure might be due to our default connection limit of 10k per node which is tad overzealous, we dialed this back to 80 per node in the HEADS of all active branches. NEST will reuse the TCP sockets but given that the desktop CLR HttpConnection limit was so high you almost never did in practice. I hope this'll alleviate the socket pressure there.

The hashcode on corefx is also something we just fixed on the heads of all the branches (master, 5.x and 2.x) too. This was due to a one line change I introduced in NEST 2.4.4 and up :(

https://github.com/elastic/elasticsearch-net/commit/139ccfebbdbe9778201de5380000c6e4f9f33a8c#diff-4eb6f7a15a069c6324a632b9426e6046L75

This async issue is the last we hope to tackle today or tomorrow before releasing all three fixes:

If we can not reproduce today I would love to schedule a skype call to get to the bottom of this! You can find me on skype under firstname Martijn and lastname Mpdreamz.

Mpdreamz commented 8 years ago

Some addendum to the 10k issue on desktop CLR: that problem is a tad more nuanced than initially presented, grabbed a coffee and remembered more on the backstory to that number.

Eventhough the max is 10k per servicepoint the ServicePointManager is still pretty conservative about opening a new connection as the load increases. Especially since we have connection reuse enabled through HTTP1/1 keep alive and the TCP KeepAlive settings on the servicepoint.

When we did a random load test session a looong time ago, we had a hard time pushing it past a 100 concurrent connections per servicepoint. So much so that we played with building our own makeshift servicepoint pool and setting them on requests explicitly effectively creating N servicepoints per hostname vs the 1 per hostname which is the default. Although this helped getting reliably more concurrent TCP connections it did not help the overal search/index throughput at all so we abbandoned this train of thought rather quickly!

Since the high number did not seem to hurt in practice we left it, although this might still misbehave on different hardware. (cc @elastic/microsoft )

I am really curious if you have better luck with the desktop clr http connection using:

public class MyHttpConnection : HttpConnection
{
    protected override void AlterServicePoint(ServicePoint requestServicePoint, RequestData requestData)
    {
        requestServicePoint.ConnectionLimit = 80;
    }
}
var settings = new ConnectionSettings(connectionPool, new MyHttpConnection())
var client = new ElasticClient(settings)

Which should tame it down on azure as well.

russcam commented 8 years ago

@fir3pho3nixx Your HttpConnection is basically the same as what I have except:

  1. I've fixed the hashcode issue as per our fix in https://github.com/elastic/elasticsearch-net/pull/2420
  2. I've simply left the synchronous Request call with .GetAwaiter().GetResult()

I've thrown 50 concurrent requests at an ASP.NET application with a total of 100,000 requests but still am not seeing any deadlock in the synchronous path.

Would you be able to try with the following?

public class HttpConnection : IConnection
{
    private readonly object _lock = new object();
    private readonly ConcurrentDictionary<int, HttpClient> _clients = new ConcurrentDictionary<int, HttpClient>();

    private HttpClient GetClient(RequestData requestData)
    {
        var key = GetClientKey(requestData);
        HttpClient client;
        if (!this._clients.TryGetValue(key, out client))
        {
            lock (_lock)
            {
                client = this._clients.GetOrAdd(key, h =>
                {
                    var handler = CreateHttpClientHandler(requestData);
                    var httpClient = new HttpClient(handler, false)
                    {
                        Timeout = requestData.RequestTimeout
                    };

                    httpClient.DefaultRequestHeaders.ExpectContinue = false;
                    return httpClient;
                });
            }
        }

        return client;
    }

    public virtual ElasticsearchResponse<TReturn> Request<TReturn>(RequestData requestData) where TReturn : class
    {
        var client = this.GetClient(requestData);

        var builder = new ResponseBuilder<TReturn>(requestData);

        try
        {
            var requestMessage = CreateHttpRequestMessage(requestData);

                        var response = client.SendAsync(requestMessage, requestData.CancellationToken)).GetAwaiter().GetResult();
            //var response = RunSynchronously(() => client.SendAsync(requestMessage, requestData.CancellationToken));

            requestData.MadeItToResponse = true;
            builder.StatusCode = (int)response.StatusCode;

            if (response.Content != null)
                                builder.Stream = response.Content.ReadAsStreamAsync().GetAwaiter().GetResult();
                //builder.Stream = RunSynchronously(() => response.Content.ReadAsStreamAsync());
        }
        catch (HttpRequestException e)
        {
            builder.Exception = e;
        }

        return builder.ToResponse();
    }

    public virtual async Task<ElasticsearchResponse<TReturn>> RequestAsync<TReturn>(RequestData requestData) where TReturn : class
    {
        var client = this.GetClient(requestData);
        var builder = new ResponseBuilder<TReturn>(requestData);
        try
        {
            var requestMessage = CreateHttpRequestMessage(requestData);
            var response = await client.SendAsync(requestMessage, requestData.CancellationToken).ConfigureAwait(false);
            requestData.MadeItToResponse = true;
            builder.StatusCode = (int)response.StatusCode;

            if (response.Content != null)
                builder.Stream = await response.Content.ReadAsStreamAsync().ConfigureAwait(false);
        }
        catch (HttpRequestException e)
        {
            builder.Exception = e;
        }

        return await builder.ToResponseAsync().ConfigureAwait(false);
    }

    protected virtual HttpClientHandler CreateHttpClientHandler(RequestData requestData)
    {
        var handler = new HttpClientHandler
        {
            AutomaticDecompression = requestData.HttpCompression ? DecompressionMethods.GZip | DecompressionMethods.Deflate : DecompressionMethods.None
        };

        if (!string.IsNullOrEmpty(requestData.ProxyAddress))
        {
            var uri = new Uri(requestData.ProxyAddress);
            var proxy = new WebProxy(uri);
            var credentials = new NetworkCredential(requestData.ProxyUsername, requestData.ProxyPassword);
            proxy.Credentials = credentials;
            handler.Proxy = proxy;
        }

        if (requestData.DisableAutomaticProxyDetection)
            handler.Proxy = null;

        return handler;
    }

    protected virtual HttpRequestMessage CreateHttpRequestMessage(RequestData requestData)
    {
        var request = this.CreateRequestMessage(requestData);
        SetBasicAuthenticationIfNeeded(request, requestData);
        return request;
    }

    protected virtual HttpRequestMessage CreateRequestMessage(RequestData requestData)
    {
        var method = ConvertHttpMethod(requestData.Method);
        var requestMessage = new HttpRequestMessage(method, requestData.Uri);

        foreach (string key in requestData.Headers)
        {
            requestMessage.Headers.TryAddWithoutValidation(key, requestData.Headers.GetValues(key));
        }

        requestMessage.Headers.Accept.Add(new MediaTypeWithQualityHeaderValue(requestData.ContentType));

        if (!string.IsNullOrEmpty(requestData.RunAs))
            requestMessage.Headers.Add("es-shield-runas-user", requestData.RunAs);

        var data = requestData.PostData;

        if (data != null)
        {
            var stream = requestData.MemoryStreamFactory.Create();
            requestMessage.Content = new StreamContent(stream);
            if (requestData.HttpCompression)
            {
                requestMessage.Content.Headers.Add("Content-Encoding", "gzip");
                requestMessage.Headers.AcceptEncoding.Add(new StringWithQualityHeaderValue("gzip"));
                requestMessage.Headers.AcceptEncoding.Add(new StringWithQualityHeaderValue("deflate"));
                using (var zipStream = new GZipStream(stream, CompressionMode.Compress, true))
                    data.Write(zipStream, requestData.ConnectionSettings);
            }
            else
                data.Write(stream, requestData.ConnectionSettings);
            stream.Position = 0;
        }
        else
        {
            // Set content in order to set a Content-Type header.
            // Content gets diposed so can't be shared instance
            requestMessage.Content = new ByteArrayContent(new byte[0]);
        }

        requestMessage.Content.Headers.ContentType = new MediaTypeHeaderValue(requestData.ContentType);

        return requestMessage;
    }

    protected virtual void SetBasicAuthenticationIfNeeded(HttpRequestMessage requestMessage, RequestData requestData)
    {
        string userInfo = null;
        if (!string.IsNullOrEmpty(requestData.Uri.UserInfo))
            userInfo = Uri.UnescapeDataString(requestData.Uri.UserInfo);
        else if (requestData.BasicAuthorizationCredentials != null)
            userInfo = requestData.BasicAuthorizationCredentials.ToString();
        if (!string.IsNullOrEmpty(userInfo))
        {
            var credentials = Convert.ToBase64String(Encoding.UTF8.GetBytes(userInfo));
            requestMessage.Headers.Authorization = new AuthenticationHeaderValue("Basic", credentials);
        }
    }

    private static System.Net.Http.HttpMethod ConvertHttpMethod(HttpMethod httpMethod)
    {
        switch (httpMethod)
        {
            case HttpMethod.GET:
                return System.Net.Http.HttpMethod.Get;
            case HttpMethod.POST:
                return System.Net.Http.HttpMethod.Post;
            case HttpMethod.PUT:
                return System.Net.Http.HttpMethod.Put;
            case HttpMethod.DELETE:
                return System.Net.Http.HttpMethod.Delete;
            case HttpMethod.HEAD:
                return System.Net.Http.HttpMethod.Head;
            default:
                throw new ArgumentException("Invalid value for HttpMethod", nameof(httpMethod));
        }
    }

    private static int GetClientKey(RequestData requestData)
    {
        unchecked
        {
            var hashCode = requestData.RequestTimeout.GetHashCode();
            hashCode = (hashCode * 397) ^ requestData.HttpCompression.GetHashCode();
            hashCode = (hashCode * 397) ^ (requestData.ProxyAddress?.GetHashCode() ?? 0);
            hashCode = (hashCode * 397) ^ (requestData.ProxyUsername?.GetHashCode() ?? 0);
            hashCode = (hashCode * 397) ^ (requestData.ProxyPassword?.GetHashCode() ?? 0);
            hashCode = (hashCode * 397) ^ requestData.DisableAutomaticProxyDetection.GetHashCode();
            return hashCode;
        }
    }

    void IDisposable.Dispose() => this.DisposeManagedResources();

    protected virtual void DisposeManagedResources()
    {
        foreach (var c in _clients)
            c.Value.Dispose();
    }

    internal class WebProxy : IWebProxy
    {
        private readonly Uri _uri;

        public WebProxy(Uri uri)
        {
            _uri = uri;
        }

        public ICredentials Credentials { get; set; }

        public Uri GetProxy(Uri destination) => _uri;

        public bool IsBypassed(Uri host) => host.IsLoopback;
    }

    /// <summary>Runs the specified asynchronous function on the current thread</summary>
    /// <remarks>
    /// Modified version of https://blogs.msdn.microsoft.com/pfxteam/2012/01/20/await-synchronizationcontext-and-console-apps/
    /// </remarks>
    /// <param name="func">The asynchronous function to execute.</param>
    private static T RunSynchronously<T>(Func<Task<T>> func)
    {
        var previousContext = SynchronizationContext.Current;
        try
        {
            var context = new SingleThreadSynchronizationContext();
            SynchronizationContext.SetSynchronizationContext(context);

            var t = func();
            t.ContinueWith(delegate { context.Complete(); }, TaskScheduler.Default);

            context.RunOnCurrentThread();
            return t.GetAwaiter().GetResult();
        }
        finally
        {
            SynchronizationContext.SetSynchronizationContext(previousContext);
        }
    }

    /// <summary>Provides a SynchronizationContext that's single-threaded.</summary>
    private sealed class SingleThreadSynchronizationContext : SynchronizationContext
    {
        private readonly BlockingCollection<KeyValuePair<SendOrPostCallback, object>> _queue =
            new BlockingCollection<KeyValuePair<SendOrPostCallback, object>>();

        public override void Post(SendOrPostCallback d, object state)
        {
            if (d == null) throw new ArgumentNullException(nameof(d));
            _queue.Add(new KeyValuePair<SendOrPostCallback, object>(d, state));
        }

        public override void Send(SendOrPostCallback d, object state)
        {
            throw new NotSupportedException("Synchronously sending is not supported.");
        }

        /// <summary>Runs an loop to process all queued work items.</summary>
        public void RunOnCurrentThread()
        {
            foreach (var workItem in _queue.GetConsumingEnumerable())
                workItem.Key(workItem.Value);
        }

        public void Complete() => _queue.CompleteAdding();
    }
}

Interested to know if:

  1. You see the deadlock with the GetHashCode issue
  2. If you do see it, if swapping out the commented out lines that use a SynchronizationContext that runs single-threaded fixes it (which is essentially what Nito.AsyncEx does)

With RunSynchronously(), the code is ~17% slower than without

russcam commented 8 years ago

@fir3pho3nixx NEST 2.5.0 has been released, which includes the RequestData hashcode fix; interested to know if you still see issues with it.

Mpdreamz commented 7 years ago

@fir3pho3nixx did you have a chance to upgrade to NEST 2.5.0?

ghost commented 7 years ago

Hi guys, we went live on our side, so have not had time to check yet and I am currently supporting those activities. I will get back to you next week once things have settled down and I can try the new release. Thanks so much for your help so far.

russcam commented 7 years ago

Hey @fir3pho3nixx, have you had a chance to look at this again?

ghost commented 7 years ago

Hi Rus

This has been in the back of my mind for quite some time. We have been having significant problems with Azure Service Bus and tried to replace it with Amazon SNS/SQS. Our implementation had some threading issues and was causing thread starvation in our API when trying to publish back to the browser using SignalR with scaled out instances. Busy fixing this as we speak.

I also had to pull the source for 2.4.7 and recompile it to use a lower version of JSON.NET because we had 3 competing versions which were causing erratic assembly binding errors in our WebJobs. I think you guys did the right thing by using latest but unfortunately the Azure SDK and WebAPI is a little less forgiving.

As soon as I have fixed our messaging my intention is to upgrade to the new version and start running load tests again. I have also raised this on our backlog as a high priority item.

Just need a little time to get there! Hope you don't mind :)

Thanks for your patience.

Gav

ghost commented 7 years ago

Hi Guys, we have a bit of bandwidth in our sprint starting next week are going to have a look at this again. So should have feedback for you guys before next week Friday.

ghost commented 7 years ago

Hi,

Good news! Looks like limiting the sockets to 80 in service point has done it! We just passed performance testing with v5.0.1.

Many thanks guys. Closing this out.

Gav

Mpdreamz commented 7 years ago

Awesome :fireworks: :tada: :balloon: :cake: <insert more party emoticons>

Really appreciate you reaching back out to us and confirming the fix! May we ever meet we owe you a :beer: (or other beverage of choice) or two :)

If you are interested in some elastic swag (tshirt, stickers) as a small token of our appreciation hit us up at microsoft @ elastic.co and send us your address and t-shirt size!

ghost commented 7 years ago

No probs!