fixer-m / snowflake-db-net-client

Snowflake .NET Client
Apache License 2.0
51 stars 14 forks source link

Concurrent ChunksDownloader #26

Closed tungda-ccbji closed 1 year ago

tungda-ccbji commented 2 years ago

On the method QueryAsync, you do have a check for Chunks > 0, then call ChunksDownloader to get data from all the chunks, however, you missed this on method QueryRawResponseAsync.

Another thing I think that can be improved is the ChunksDownloader, currently, you are downloading multiple chunks 1 by 1. When I tried to make a query that returned around 14000 records, it took about 5 seconds to complete. If you let these chunks run by themselves on multiple tasks, then merge the result once all the tasks are completed, it could reduce the time to under 1 second.

grexican commented 1 year ago

If you do that you have to merge them in the correct order

Right now I'm facing an issue with the chunk data being in the incorrect order. Trying to determine if the chunk data is wrong or if the way it's being processed by this library is wrong.

tungda-ccbji commented 1 year ago

Yes, you need to merge the result in the correct order, which can be completed easily by marking each task with a number, then ordering by the number before merge the chunks. Something like this:

        var result = new ConcurrentDictionary<int, List<List<string>>>();
        var tasks = new List<Task>();
        foreach (var chunk in chunksDownloadInfo.Chunks)
        {
            var downloadRequest = BuildChunkDownloadRequest(chunk, chunksDownloadInfo.ChunkHeaders, chunksDownloadInfo.Qrmk);
            var task = Task.Run(() => result.TryAdd(chunksDownloadInfo.Chunks.IndexOf(chunk), GetChunkContentAsync(downloadRequest, ct).Result));
            tasks.Add(task);
        }

        Task.WaitAll(tasks.ToArray());
        var orderedResult = result.OrderBy(x => x.Key).ToList();
        foreach (var chunk in orderedResult) 
        {
            rowSet.AddRange(chunk.Value);
        }
grexican commented 1 year ago

Yep, agreed.

Also, updating my client library from .4.0 to .4.3 seems to have solved my issue with data being in the incorrect order.

fixer-m commented 1 year ago

@tungda-ccbji, Thanks and sorry for a long waited answer.

As for QueryRawResponseAsync() not downloading chunks - well - that's by design. All "Raw" methods return response from SF as is. You can download data chunks easily with public ChunksDownloader class if you want to. But I'll think about it. Maybe it can have some additional optional parameter and it will download chunks if it's set to true.

As for concurrent ChunksDownloader - that's a good improvement. I've already implemented it and testing it right now. Thanks!

fixer-m commented 1 year ago

@grexican

Right now I'm facing an issue with the chunk data being in the incorrect order.

That's weird. Because right now it just downloads chunks one-by-one and merge them in the same order as they come in QueryExecResponseData.Chunks. If you have more details or you can reproduce it, please report it. Thanks!

grexican commented 1 year ago

@fixer-m Some change between .40 and .43 fixed my issue. I was able to reproduce it easily for basically anything that chunked. The easy check was to use a ROW_NUMBER(). My first rows were in the 300s instead of 1, 2, 3, ... It didn't happen every time, but I'd say once every 3 or 4 queries at least.

fixer-m commented 1 year ago

@grexican Oh I see. I think it is because of this issue: https://github.com/fixer-m/snowflake-db-net-client/pull/27 - it was fixed in 0.4.1.

fixer-m commented 1 year ago

@tungda-ccbji I've implemented concurrent ChunksDownloader in 0.4.4. You can specify threads count using PrefetchThreadsCount option in SnowflakeClientSettings. Default value is 4. Please, let me know if it doesn't work as expected.

As well as I've added new option DownloadChunksForQueryRawResponses, so you can get raw SF responses but with downloaded chunks, so the rowset will have all rows.

fixer-m commented 1 year ago

@grexican Yeah - I used almost the same approach as you suggested for concurrent chunks downloader. After downloading chunks got sorted in the same order as they were received in initial response. I've only added concurrency threshold with SemaphoreSlim. Please let me know if you find any issues with it. Thanks!