hbjorgo / ATLib

ATLib is a C# library that makes it easy to communicate with modems.
MIT License
49 stars 20 forks source link

Wrong way to use Task #27

Closed tqk2811 closed 1 year ago

tqk2811 commented 1 year ago

https://github.com/hbjorgo/ATLib/blob/f50942650a74b25f26d4d6ba989668371a946171/src/HeboTech.ATLib/Parsers/AtReader.cs#L32 https://github.com/hbjorgo/ATLib/blob/f50942650a74b25f26d4d6ba989668371a946171/src/HeboTech.ATLib/Parsers/AtChannel.cs#L62

Task.Factory.StartNew with flag LongRunning will create new Task and run in new Thread (not run in ThreadPool)
But you use async await without SynchronizationContext will make ReadPipeAsync run in ThreadPool after await if await another task run in different thread.

In current, it work ok (maybe not) because PipeReader.ReadAsync not in the case (depends on input stream).
It may not work in the future if the library changes or modifies the code or specific stream input, so please keep this in mind if you modify the code or update the library.

class CustomStream : MemoryStream
{
    public override async ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
    {
        await Task.Delay(10, cancellationToken);
        return await base.ReadAsync(buffer, cancellationToken);
    }
}
class Program
{
    static async Task Main(string[] args)
    {
        Stream _stream = new CustomStream();
        AtReader atReader = new AtReader(_stream);
        atReader.Open();
        Console.ReadLine();
    }
}

image


Suggested Install Nito.AsyncEx.Context

reader = Task.Factory.StartNew(
    () => AsyncContext.Run(() => ReadPipeAsync(pipe, cancellationTokenSource.Token)), 
    TaskCreationOptions.LongRunning
);
tqk2811 commented 1 year ago

In one AT device, we have two thread. And it not good for so much device (ex: 100 devices).
So we can use 8-16 thread with SynchronizationContext for 100 devices

public void Open(SynchronizationContext synchronizationContext = null)
{
    if (synchronizationContext != null)
    {
        reader = synchronizationContext.PostAsync(() => ReadPipeAsync(pipe, cancellationTokenSource.Token));
    }
    else
    {
        reader = Task.Factory.StartNew(() => AsyncContext.Run(() => ReadPipeAsync(pipe, cancellationTokenSource.Token)), TaskCreationOptions.LongRunning);
    }
}
List<SynchronizationContext> _SynchronizationContextPool = new List<SynchronizationContext>();
async void MainLoopSynchronizationContextPool()
{
    lock (_SynchronizationContextPool) _SynchronizationContextPool.Add(SynchronizationContext.Current);
    while (true) await Task.Delay(10);
}
for (int i = 0; i < Environment.ProcessorCount; i++)
{
    Task.Factory.StartNew(() => AsyncContext.Run(MainLoopSynchronizationContextPool), CancellationToken.None, TaskCreationOptions.LongRunning, TaskScheduler.Default);
}
await Task.Delay(1000);

for(int i = 0; devices.Count; i++)
{
    //..........
    atChannel.Open(_SynchronizationContextPool[ i % _SynchronizationContextPool.Count ]);
}
hbjorgo commented 1 year ago

Thank you for reporting this and contributing to making this library better! I've set it to use the thread pool: PR