Azure / azure-cosmos-dotnet-v2

Contains samples and utilities relating to the Azure Cosmos DB .NET SDK
MIT License
578 stars 836 forks source link

Take() trigger full scan even after result found #598

Open jeremy001181 opened 6 years ago

jeremy001181 commented 6 years ago

Hey team

I am using 2.0.0, and noticed sdk sends requests to all partitions one after the other, even though result has been found in previous requests.

it works as expected if I take out the .Take(1); method, it seems like a bug?

P.S. i am using Https model for diagnostic purpose, so that i can inspect network request using proxy e.g, fiddler.

 using (var query = client.CreateDocumentQuery<Customer>(collectionUri, new FeedOptions
                {
                    MaxItemCount = 10,
                    EnableCrossPartitionQuery = true,
                })
                  .Where(c => c.Email == email)    
                  .Take(1) // <- By adding this , it sends requests to all partitions
                  .AsDocumentQuery())
            {
                while (query.HasMoreResults)
                {
                     var response = await query.ExecuteNextAsync<Customer>();
                     if (response.Count > 0)
                     {
                         return response.FirstOrDefault();
                     }
                }
                return null;
            }
ealsur commented 6 years ago

@jeremy001181 If you want just 1 item, did you try setting the MaxItemCount = 1 to see how it behaves?

jeremy001181 commented 6 years ago

@ealsur same after changed to 1, I put 1 just as an example

ealsur commented 6 years ago

Got it, saw that in the code sample there was MaxItemCount = 10. I'll investigate.

ealsur commented 6 years ago

@jeremy001181 Are you using MaxDegreeOfParallelism setting it to any other value? I don't see it in the sample, but just in case.

khdang commented 6 years ago

@jeremy001181, @ealsur I believe the wanted behavior can be achieved by setting MaxDegreeOfParallelism to 0. That would force the execution to be done in serial manner, which would stop after the first partition.

ealsur commented 6 years ago

That's what I was aiming at but I wasn't sure if @jeremy001181 was already setting it somewhere. If the MaxDegreeOfParallelism is 0 and yet all the partitions are being queried, then that would be wonrg.

jeremy001181 commented 6 years ago

@khdang the MaxDegreeOfParallelism is another thing i would like to raise in a seperate issue but since you mentioned it, i would put it here.

For this .Take() issue I raised, I tested it without setting MaxDegreeOfParallelism value so i believe it is same effect as settting it 0 according to the doc?

And if I take out the .Take() and set MaxDegreeOfParallelism to a value x that greater than 0, the SDK will send x number of requests in parallel until reach the last partition (which makes sense), but it again doesn't stop after previous request found the result and does a full scan which i would like to find out if it is by design or a bug , as it is more expensive and can be avoid.

ealsur commented 6 years ago

@jeremy001181 As a workaround, please set MaxDegreeOfParallelism to 1. I will track this internally so it's addressed and fixed. Like you said, MaxDegreeOfParallelism should default to 0 according to the docs and thus this behavior should not happen.

ealsur commented 6 years ago

@jeremy001181 A fix for this has been already worked on and will ship in the next SDK version

jeremy001181 commented 6 years ago

hey @ealsur just had a chance to play what you suggested as workaround - set MaxDegreeOfParallelism to 1, and it doesn't seem to work, it again trigger full scan with or without .Take()

ealsur commented 6 years ago

@jeremy001181 We released 2.1.0 on Saturday, could you check if using the new version you still see this behavior?

jeremy001181 commented 6 years ago

@ealsur I noticed that actually but i struggle to get my app works with the 2.1.0, i am getting this assembly redirect warning after upgrades, any idea

image

ealsur commented 6 years ago

That's odd, I created a new App with 2.1.0 and I have no errors. Could you try removing the package and re-adding it? Do you have another library project being referenced that also uses the Cosmos DB SDK that might have a different version?

ealsur commented 6 years ago

@jeremy001181 do you have any news regarding the test in 2.1.0?

jeremy001181 commented 6 years ago

No I haven't, might got some time tomorrow

On Thu, 27 Sep 2018, 4:52 pm Matias Quaranta, notifications@github.com wrote:

@jeremy001181 https://github.com/jeremy001181 do you have any news regarding the test in 2.1.0?

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/Azure/azure-cosmosdb-dotnet/issues/598#issuecomment-425145266, or mute the thread https://github.com/notifications/unsubscribe-auth/ACO9he_8-aZfHv3ddYFcM_CVYpxEaei0ks5ufPQ1gaJpZM4WwL6B .

ealsur commented 6 years ago

Closing as the issue should be resolved on the latest SDK version, please re-open in case you run into it again.

jeremy001181 commented 6 years ago

Hey @ealsur I tested both 2.1.0 and 2.1.1, the issue is still not resolved. please reopen the ticket. Thanks

ealsur commented 6 years ago

@jeremy001181 Do you have a sample code we can use to troubleshoot? Does your original code repro the issue as-is? Could you share the DocumentClient initialization parameters too so we can have a complete scenario?

jeremy001181 commented 5 years ago

@ealsur sorry i didn't describe clearly in my previous comment.

issue 1 There is a minor improvement in the later release where AsDocumentQuery() with a Take() method doesn't continue scanning after find result (it is good). however the SDK sends out total 6 requests (likely the data is located at the 6th partition) on the first call to ExecuteNextAsync<T>() method, while without the Take() the ExecuteNextAsync<T>() method gets call 6 times where the first 5 calls return no result, and 6th call return my data, which is more likely correct behaviour.

issue2 After setting MaxDegreeOfParallelism to be a value greater 1, e.g. 2, and regardless with or without Take() method , the SDK performs a full scan (10 requests as i have 10 partitions) even the 6th call return result, again all requests sent out by the first call to ExecuteNextAsync<T>(), this happens on both AsEnumerable() and AsDocumentQuery().

Following is my test code. I suggest you test it with your favourite proxy, then you would understand what I mean.


using System;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Azure.Documents;
using Microsoft.Azure.Documents.Client;
using Microsoft.Azure.Documents.Linq;
using Newtonsoft.Json;

namespace CosmosDb
{

    public class CosmosClient
    {
        private readonly string databaseName;
        private readonly string collectionName;
        private readonly DocumentClient client;
        private readonly Uri collectionUri;
        private readonly FeedOptions crossPartitionFeedOption;

        public CosmosClient()
        {

            client = new DocumentClient(new Uri(endpoint), key,

                    new JsonSerializerSettings
                    {
                        NullValueHandling = NullValueHandling.Ignore
                    },

                    new ConnectionPolicy
                    {
                        ConnectionMode = ConnectionMode.Direct,
// use Https so that I can inspect network request in Fiddler
                        ConnectionProtocol = Protocol.Https, 
                        EnableEndpointDiscovery = true,
                    }
                );

            collectionUri = UriFactory.CreateDocumentCollectionUri(databaseName, collectionName);

            crossPartitionFeedOption =
                new FeedOptions
                {
                    MaxItemCount = 1,
//MaxDegreeOfParallelism = 2,
                    EnableCrossPartitionQuery = true,
                };
        }

        public async Task<Customer> GetByEmailAsync(string email)
        {
            using (var query = client.CreateDocumentQuery<Customer>(collectionUri, crossPartitionFeedOption)
                   .Where(c =>
                         c.Email == email || c.Email == email.ToLower())
                         .Take(1) // with Take() it perform full scan without it doesn't
                  .AsDocumentQuery()) {

                while (query.HasMoreResults)
                {
                    var response = await query.ExecuteNextAsync<Customer>();
                    if (response.Count > 0)
                    {
                        return response.FirstOrDefault();
                    }
                }
                return null;
            }            
        }

        private Uri GetDocumentUri(string customerId) =>
            UriFactory.CreateDocumentUri(databaseName, collectionName, customerId);

        private RequestOptions PartitionAwareRequestOptions(string customerId) => new RequestOptions { PartitionKey = new PartitionKey(customerId) };
    }

    public class Customer
    {
        [JsonProperty("email")]
        public string Email { get; internal set; }
    }
}
ealsur commented 5 years ago

It is expected behavior (when you use MaxDegreeOfParallelism > 0), since the SDK is prefetching results from all partitions in parallel.

The reason this happens is because even if the user has TOP 1 in their query the first N - 1 partitions could be empty so the cross partition engine has to do a full fan out in order to optimize latency.

jeremy001181 commented 5 years ago

@ealsur

Sorry I am confused, I thought it is the MaxBufferedItemCount enables pre-fetch feature. MaxDegreeOfParallelism is more about maximum number of partitions then can be queried in parallel

According to https://docs.microsoft.com/en-us/azure/cosmos-db/performance-tips#sdk-usage, point 4.

ealsur commented 5 years ago

MaxDegreeOfParallelism controls the fan out across partitions like you mention. But if the value is greated than 1, it will do a fan out to try to process as many partitions in parallel as possible as I explained in the other answer.

MaxBufferedItemCount controls the single buffer where results are aggregated, like you point to in the article.

The point is, the fan out is happening because MaxDegreeOfParallelism is > 0 and it will start to work on multiple partitions in parallel, it won't do 1 partition, check if it fills the TOP and if not then go to the other (like it happens when the value is 0 or default), because it will, from the start, parallelize the query in the rest of the partitions.

I didn't see MaxBufferedItemCount being used in your repro code, what's the value you are using?

jeremy001181 commented 5 years ago

In my test, I didn't set MaxBufferedItemCount, I only set MaxDegreeOfParallelism to 2, so I expect SDK should do fan out to 2 partitions at a time per ExecuteNextAsync<T>() call, but it fans out to all partitions on the very first ExecuteNextAsync<T>() call, it is expensive and this seems wrong to me.

And this fan out behaviour also happens without setting MaxDegreeOfParallelism value see issue 1 in my previous comment.

ealsur commented 5 years ago

I'm sorry @jeremy001181 I'm confused.

issue 1 There is a minor improvement in the later release where AsDocumentQuery() with a Take() method doesn't continue scanning after find result (it is good). however the SDK sends out total 6 requests (likely the data is located at the 6th partition) on the first call to ExecuteNextAsync() method, while without the Take() the ExecuteNextAsync() method gets call 6 times where the first 5 calls return no result, and 6th call return my data, which is more likely correct behaviour.

This is without MaxDegreeOfParallelism and it's a correct behavior, it will query partitions until it finds 1 result, the # of requests will depend on the partitions and where does it find the result.

Are you sure that it is sending all network requests in parallel? Internally it should be doing 2 requests at a time, but like I mentioned, it will cover all partitions, that is expected, because it will aggregate the results from all partitions (in your case, querying in batches of 2 partitions).

jeremy001181 commented 5 years ago

Hey @ealsur, I don't mean it sends all requests in parallel, I mean the ExecuteNextAsync() method only get executed once but in Fiddler it shows 6 requests (or 10 requests if MaxDegreeOfParallelism is provided).

Is that because what you said it is done internally?

Internally it should be doing 2 requests at a time

If so, what is the point of ExecuteNextAsync() should it be named something like GetAllMyStuff()?

ealsur commented 5 years ago

That is the intent of MaxDegreeOfParallelism, to control the degree of parallelism that ExecuteNextAsync() uses internally and to control the number of simultaneous network connections that will be done when running the query. See https://docs.microsoft.com/en-us/azure/cosmos-db/sql-api-partition-data#parallel-query-execution

jeremy001181 commented 5 years ago

@ealsur okay, but why it also happens when MaxDegreeOfParallelism is not used (as not set)? See my issue 1 above

ealsur commented 5 years ago

When not set, it will default to 0. It doesn't mean it won't query all partitions, it means it won't do the calls in parallel. So in your case, it will do requests to partitions (not in parallel) until it finds one partition with 1 result (because you have a Take(1)). If you have 10 partitions and the result is in partition 6, then it might do 5 requests (to the previous partitions) until it reaches partition 6, finds the result and stops.

jeremy001181 commented 5 years ago

@ealsur I think you missed my point in the issue 1, I understand it still requires scanning all partitions, and also I understand it won't do query in parallel when MaxDegreeOfParallelism is not set. I think we are pretty clear about this property now

What I try to explain is the different behaviour I observed between with Take(n) and without it, where when without Take(n) method, the ExecuteNextAsync() gets called 6 times (per request) while with Take(n) method, it only gets called once but in Fiddler it shows 6 requests. Please note that the MaxDegreeOfParallelism were NOT set for both scenarios.

wpitallo commented 5 years ago

When not set, it will default to 0. It doesn't mean it won't query all partitions, it means it won't do the calls in parallel. So in your case, it will do requests to partitions (not in parallel) until it finds one partition with 1 result (because you have a Take(1)). If you have 10 partitions and the result is in partition 6, then it might do 5 requests (to the previous partitions) until it reaches partition 6, finds the result and stops.

So just to be clear, these requests to the partitions are being done one by one client side, so if there are millions of partitions cross partition querying will pointless?