dotnet / runtime

.NET is a cross-platform runtime for cloud, mobile, desktop, and IoT apps.
https://docs.microsoft.com/dotnet/core/
MIT License
15.47k stars 4.76k forks source link

[API Proposal]: SemaphoreSlim.WaitAsync(int waitCount) #109343

Open KrzysztofBranicki opened 1 month ago

KrzysztofBranicki commented 1 month ago

Background and motivation

I'm implementing partitioned buffer for messages (retrieved from Kafka). Processing ordering needs to be guaranteed within partition so internally buffer holds dictionary of concurrent queues (one for each partition). I have one thread which is reading messages from Kafka and adding them to the buffer. Second thread is enqueuing message processing on the thread pool. I use Channel<T> for signaling available work on partition and SemaphorSlim for enforcing max buffer size. Method which enqueues message to a buffer uses WaitAsync(ct) and tasks which are processing messages use Release() when they finish processing, dequeue message and commit message offset. This works great, however max buffer size is expressed in terms of number of messages and it would be better if we are able to express it in bytes. I would be able to use same design if WaitAsync method accepted parameter waitCount so I'm able to express how many bytes I'm waiting for. The Release method already supports releaseCount parameter so no issues there.

API Proposal

public class SemaphoreSlim
{
    public Task WaitAsync(int waitCount)
    public Task WaitAsync(int waitCount, CancellationToken cancellationToken)
}

Those are just example signatures, other overloads (non async and accepting timeout should also get waitCount parameter for completenes.

API Usage

private readonly SemaphoreSlim bufferSizeSemaphore = new (initialCount: maxBufferSizeBytes, maxCount: maxBufferSizeBytes);

private readonly Channel<ConcurrentQueue<MessageWrapper>> workSignallingChannel;
private readonly Dictionary<int, ConcurrentQueue<MessageWrapper>> partitions;

public async Task EnqueueMessageForProcessingAsync(Message message, CancellationToken cancellationToken)
{
    await bufferSizeSemaphore.WaitAsync(message.SizeBytes, cancellationToken);

   var partition = partitions[message.Partition]
   partition.Enqueue(new MessageWrapper(message));

   if (!workSignallingChannel.Writer.TryWrite(partition))
      throw new Exception("Failed to write to work signaling channel.");

    ...
}

After partition is signaled we use TryPeek, mark message as being processed and enqueue processing task on thread pool. After message is processed it will be dequeued and bufferSizeSemaphore.Release(message.SizeBytes) will be called.

Alternative Designs

No response

Risks

No response

dotnet-policy-service[bot] commented 1 month ago

Tagging subscribers to this area: @mangod9 See info in area-owners.md if you want to be subscribed.

lilinus commented 1 month ago

I think the suggested methods can be useful.

Although they need to be named differently because there already exists overloads with exactly the same parameters (the int parameter is timeout in milliseconds).

KrzysztofBranicki commented 1 month ago

@lilinus good point, alternative name could be WaitMany and WaitManyAsync.

jilles-sg commented 1 month ago

SemaphoreSlim must work also for the case where multiple threads are calling WaitAsync. Consider a situation where the buffer is almost full (semaphore count very low) and lots of small messages are coming in and occasionally a big one. If messages are let through whenever there is space, the small messages will constantly take the last bit of space and it will take a long time before there will be enough space for a big message. To prevent this problem (known as starvation), the semaphore will need to store that a big message (wait with a large count) is pending and hold small messages (wait for 1 count) until the big message has gone through. This makes the semaphore more complex and slower for the existing use cases.

Making this worse is a behaviour of SemaphoreSlim to avoid lock convoying and improve throughput in general: if a thread is waiting synchronously when Release is called, Release will wake the thread but not hand over the count to it. So if another thread calls Wait or WaitAsync before the awakened thread continues running, that other thread will return immediately and the awakened thread might find a count of zero again. In the happy flow, the other thread will have released the semaphore again when the awakened thread gets its turn at the CPU and the context switch is complete, and the other thread has saved a lot of time without hurting the awakened thread. It is assumed that threads do not "hold" the semaphore for a long time.

On the other hand, if a task is waiting asynchronously, Release will hand over the count to it. Newly arriving Wait and WaitAsync callers must wait until the task completes.

In this specific case with only one flow calling WaitAsync, starvation is not an issue and I suggest hand-rolling something using lock, an integer storing the free buffer space and a TaskCompletionSource reference. Maybe add another integer to avoid waking the Kafka reader when there is no sufficient space yet.

KrzysztofBranicki commented 3 weeks ago

@jilles-sg thanks for your remarks. Regarding starvation issue I assumed that Wait would be satisfied in the FIFO order, and not in the order of "whichever waiter can be satisfied first based on the waitCount and CurrentCount", this approach does not have the starvation issue but if that complicates the implementation to a point where base case (waitCount == 1) would take performance hit then there is no point in doing that as part of this component.