aerospike / aerospike-client-csharp

Aerospike C# Client Library
70 stars 47 forks source link

Async query doesn't return all objects that are subject of the filter #72

Closed kuskmen closed 1 year ago

kuskmen commented 2 years ago

Hi, we've recently updated our aerospike server to 6.0+ version to alleviate our secondary indexes queries. We've also updated our client (4.2.0->5.2.2). However, shortly after we deployed our new query we started receiving complaints that some objects were not processed and we concluded that the query doesn't return all of them because if we revert the old client with the old query (blocking) everything works as expected.

This is the new implementation of the query

        public async Task<IList<long>> GetPurchaseIds(SettlementInput settlementInput)
        {
                var listener = new RecordListener();
                var ids = new List<long>();
                listener.OnPurchase += (key, record) =>
                {
                     ids.Add(key.userKey.ToLong());
                };

                await listener.WaitHandle.EnterAsync();

                _client.Query(
                        new QueryPolicy(_client.QueryPolicyDefault),
                        listener,
                        new Statement
                        {
                            Namespace = _settings.Namespace,
                            SetName = _settings.PurchasesSetName,
                            Filter = Filter.Contains(
                                              MarketIdsBin.Name,
                                              IndexCollectionType.LIST,
                                              settlementInput.MarketId),
                            BinNames = new[] { ProcessedInputsBin.Name, SelectionsBin.Name, IsArchivedBin.Name }
                });

                await listener.WaitHandle.WaitAsync();

                if (listener.Exception != null)
                {
                      throw listener.Exception;  // query will be reprocessed
                }

            return ids;
        }

        private delegate void OnPurchase(Key key, Record record);

        private class RecordListener : RecordSequenceListener
        {
            public event OnPurchase OnPurchase;

            public AsyncMonitor WaitHandle { get; } // <-- Nito.Async
            public Exception Exception { get; }

            public RecordListener()
            {
                WaitHandle = new AsyncMonitor();
            }

            public void OnFailure(AerospikeException exception)
            {
                Exception = exception;
                WaitHandle.PulseAll();
            }

            public void OnRecord(Key key, Record record)
            {
                this.OnPurchase?.Invoke(key, record);
            }

            public void OnSuccess()
            {
                WaitHandle.PulseAll();
            }
        }

So my question is has anyone else observed such behaviour as well or is it something wrong that we are doing?

BrianNichols commented 2 years ago

What are the _client.QueryPolicyDefault values (include base class Policy values like socketTimeout and totalTimeout)? How long does the query usually take to run in the old client?

C# client 5.2.1 had a bug where async queries prematurely timed out and OnSuccess()/OnFailure() was never called, but that was fixed in 5.2.2. Ensure that you are using 5.2.2.

kuskmen commented 2 years ago

@BrianNichols

_client.QueryPolicyDefault is the default policy coming from the client, so my assumption is https://github.com/aerospike/aerospike-client-csharp/blob/master/Framework/AerospikeClient/Policy/QueryPolicy.cs#L114 this.

I am sure that we are on 5.2.2 client version. (btw ever since we updated from 4.2.0 to 5.2.2 we noticed that we have intermittent timeout exceptions for all our aerospike requests from this service, funny enough those timeouts do not appear in Aerospike metrics when I look for them)

I haven't measured how long it takes a query to run using the old but currently, they take between 100-300ms according to our application metrics.

BrianNichols commented 2 years ago

@kuskmen When you downgraded the client to restore the query functionality, did you downgrade the server as well or did you leave the server at 6.X?

Server 6.X does de-duplicate records in queries. The raw count of returned records may be less in 6.X because some of those records might be duplicates.

kuskmen commented 2 years ago

No, we have not downgraded the server? What do you mean de-duplicate records? Each of them has their own respective PK or am I missing something? Could you elaborate more?

BrianNichols commented 2 years ago

Secondary index range queries on map/list (like Filter.Contains()) can result in the exact same key and record being returned multiple times due to the range traversal algorithm on the secondary index. Server 6.X makes a stronger effort to detect and eliminate duplicate keys/records in the query results.

Since you left the server at 6.X and only downgraded the client, this is not the reason for the less than expected number of records returned.

Your symptoms (including intermittent timeouts for all async aerospike requests) can be duplicated with C# client 5.2.1, but not 5.2.2. Can you provide a reproducible test case?

kuskmen commented 2 years ago

Unfortunately, we don't have a reproducible test case, the errors we experience are in a production environment with very uneven load, I tried load testing this on development but to no avail. Regarding the client version, I can assure you we are using 5.2.2

Can I use your AsyncMonitor implementation from the tests instead of the one from the public library I got to maybe eliminate one more possibility of unexpected results?

BrianNichols commented 2 years ago

Yes, the client test code, including AsyncMonitor, is open source.

kuskmen commented 1 year ago

Unfortunately, after using your AsyncMonitor we still observe different results with the same queries to the aerospike server. Even in our test environment where we simply add 850k records and then try to query them (without touching them in the meantime) it still results in non-deterministic results.

kuskmen commented 1 year ago

Quick question: Are methods of RecordSequenceListener thread safe, we saw one exception and we changed the internal collection that stores records in memory and now it seems queries are returning deterministic results, can you confirm they are not thread safe?

BrianNichols commented 1 year ago

RecordSequenceListener.OnRecord() is not thread-safe. The query is split into multiple node sub-queries. The async callback from these queries can be called from multiple completion port threads simultaneously, so the user's callback code must handle data in a thread-safe manner.

kuskmen commented 1 year ago

I see, all this makes sense now, thanks.