ilyalatt / Telega

C# Telegram MTProto Client
https://ilyalatt.github.io/Telega/
MIT License
58 stars 15 forks source link

about RPC call queuing and UploadFileMultiThread #66

Closed wuyu8512 closed 3 years ago

wuyu8512 commented 3 years ago

I want to write a multi-threaded upload method like:

public async Task<InputFile> UploadFileMultiThread(string name, long fileId, int fileLength, Stream stream,
    int threadCount = 20, int startRange = 0, int? uploadCount = null)
{
    if (fileLength <= 0)
    {
        throw new ArgumentOutOfRangeException(nameof(fileLength));
    }

    var isBigFileUpload = IsBigUpload(fileLength);
    if (!isBigFileUpload) throw new NotSupportedException("fileLength must > 10485760 (10mb)");

    var chunksCount = 1 + (fileLength - 1) / ChunkSize;

    SemaphoreSlim semaphore = new(threadCount < 20 ? threadCount : 20, threadCount);
    SemaphoreSlim streamLock = new(1, 1);
    var tasks = new List<Task>();
    var tg = _tg.Fork();

    foreach (var filePart in Enumerable.Range(startRange, uploadCount ?? chunksCount))
    {
        await semaphore.WaitAsync().ConfigureAwait(false);
        var task = Task.Run(async () =>
        {
            Console.WriteLine("filePart: {0}", filePart);
            var buffer = new byte[ChunkSize];
            var nowReceived = filePart * ChunkSize;
            var chunkSize = Math.Min(ChunkSize, fileLength - nowReceived);

            try
            {
                await streamLock.WaitAsync().ConfigureAwait(false);
                try
                {
                    stream.Seek(nowReceived, SeekOrigin.Begin);
                    await ReadToBuffer(buffer, 0, chunkSize, stream).ConfigureAwait(false);
                }
                finally
                {
                    streamLock.Release();
                }

                var res = await tg.Call(
                    new SaveBigFilePart(
                        fileId: fileId,
                        filePart: filePart,
                        bytes: buffer.Take(chunkSize).ToArray().ToBytesUnsafe(),
                        fileTotalParts: chunksCount
                    )
                ).ConfigureAwait(false);
                Helpers.Assert(res, "chunk send failed");
            }
            finally
            {
                semaphore.Release();
            }
        });
        if (filePart + threadCount > chunksCount) tasks.Add(task);
    }

    await Task.WhenAll(tasks.ToArray()).ConfigureAwait(false);

    return new InputFile.BigTag(
        id: fileId,
        name: name,
        parts: chunksCount
    );
}

But Telega doesn’t seem to be able to make multiple requests at the same time?

wuyu8512 commented 3 years ago

I change TaskQueue class _queue

But uploading will be issue, and need to handle the FILE_PART_Х_MISSING error by myself

I think I need telegram_bot_api or tdlib to solve this problem

ilyalatt commented 3 years ago

Hi. TCP connection does not allow concurrent packets. But you can open multiple connections. You can upgrade TgConnectionPool to perform multiple connections per DC.