dotnet / runtime

.NET is a cross-platform runtime for cloud, mobile, desktop, and IoT apps.
https://docs.microsoft.com/dotnet/core/
MIT License
14.97k stars 4.66k forks source link

[API Proposal] Channel.CreateUnboundedPrioritized #62761

Closed shaggygi closed 5 months ago

shaggygi commented 2 years ago

EDITED by @stephentoub on 2/17/2024:

namespace System.Threading.Channels;

public class Channel
{
+   public static Channel<T> CreateUnboundedPrioritized<T>();
+   public static Channel<T> CreateUnboundedPrioritized<T>(UnboundedPrioritizedChannelOptions<T> options);
}
+public sealed partial class UnboundedPrioritizedChannelOptions<T> : ChannelOptions
+{
+   public System.Collections.Generic.IComparer<T>? Comparer { get; set; }
+}

Just wondering if anyone knows if a priority channel is available or a potential idea of adding to System.Threading.Channels. Something similar to Bounded/Unbounded channels with Reader/Writer along with priority capabilities. Meaning, you could write a large set of items to a channel in any order, but the reader would be sorted/read by a prioritized item.

I guess when it boils down to it I could use the PriorityQueue, but I do like the features provided by Channels.

Referencing https://docs.microsoft.com/en-us/dotnet/api/system.collections.generic.priorityqueue-2?view=net-6.0 https://devblogs.microsoft.com/dotnet/an-introduction-to-system-threading-channels/

cc: @stephentoub

ghost commented 2 years ago

Tagging subscribers to this area: @dotnet/area-system-threading-channels See info in area-owners.md if you want to be subscribed.

Issue Details
Just wondering if anyone knows if a priority channel is available or a potential idea of adding to System.Threading.Channels. Something similar to Bounded/Unbounded channels with Reader/Writer along with priority capabilities. Meaning, you could write a large set of items to a reader in any order, but the reader would be sorted/read by a prioritized item. I guess when it boils down to it I could use the `PriorityQueue`, but I do like the features provided by `Channels`. Referencing https://docs.microsoft.com/en-us/dotnet/api/system.collections.generic.priorityqueue-2?view=net-6.0 https://devblogs.microsoft.com/dotnet/an-introduction-to-system-threading-channels/ cc: @stephentoub
Author: shaggygi
Assignees: -
Labels: `area-System.Threading.Channels`, `untriaged`
Milestone: -
gfoidl commented 2 years ago

A potential simple workaround: If you only need two priorities like "normal", and "high", then you could create and use two channels. One for "normal" priority, the other one for "high" priority. The reader checks the "high" channel first, if there's nothing to read from, then consult the "normal" channel.

Note: the base channel types are abstract so you could build your own based on PriorityQueue or whatever priorization solution you'd like to use.

shaggygi commented 2 years ago

Thanks @gfoidl

I was thinking similar to your scheme but have around 6 priority levels. I noticed the abstracts and thinking that might be the route I'll have to take.

buyaa-n commented 2 years ago

Doesn't look like we have something like PriortyChannel, but it sounds useful, we could consider adding one using PriorityQueue as you mentioned, @shaggygi feel free to add/update the issue with an API proposal if you have one in mind.

shaggygi commented 2 years ago

@buyaa-n Thanks for the response. I actually don't have anything specific in mind and probably a semi-combination of both their features would help. I'll add later if I something specific comes up. Thanks again.

Lucian1000 commented 9 months ago

Hi, are there any updates on this?

johnib commented 7 months ago

To add on the above, I would really like to have PriorityQueue-like Channel implementation. Specifically, the above mentioned solutions won't work for me, because my scenario would prioritize the items based on some property, such as cache TTL, which will be highly randomized.

For example, this class PriorityQueue<TElement,TPriority> would have made perfect job if only it had the goods of Channel:

  1. Support multiple writers and readers
  2. Provide async API for Enqueue/Dequeue/Peek
  3. Provide sync API (TryEnqueue/TryDequeue/TryPeek)

@stephentoub

stephentoub commented 7 months ago

This is approximately what I think this would look like: https://github.com/dotnet/runtime/compare/main...stephentoub:runtime:channelpriority with the API shape:

namespace System.Threading.Channels;

public class Channel
{
+   public static Channel<T> CreateUnboundedPrioritized<T>();
+   public static Channel<T> CreateUnboundedPrioritized<T>(UnboundedPrioritizedChannelOptions<T> options);
}
+public sealed partial class UnboundedPrioritizedChannelOptions<T> : ChannelOptions
+{
+   public System.Collections.Generic.IComparer<T>? Comparer { get; set; }
+}

Any feedback?

johnib commented 7 months ago

This is approximately what I think this would look like: main...stephentoub:runtime:channelpriority with the API shape:

namespace System.Threading.Channels;

public class Channel
{
+   public static Channel<T> CreateUnboundedPrioritized<T>();
+   public static Channel<T> CreateUnboundedPrioritized<T>(UnboundedPrioritizedChannelOptions<T> options);
}
+public sealed partial class UnboundedPrioritizedChannelOptions<T> : ChannelOptions
+{
+   public System.Collections.Generic.IComparer<T>? Comparer { get; set; }
+}

Any feedback?

@stephentoub really happy to see this

Given the implementation is done, I'd be happy to start using it (now..). At the moment, my app is running on dotnet 6, I guess in couple of weeks we'll be on dotnet 8 -- is it possible to reference this class?

Many thanks !

stephentoub commented 7 months ago

It is not in .NET 8. If it gets added, it would be at the earliest in .NET 9. But you can copy/paste the code from my branch into your project and tweak it to make it compile.

johnib commented 7 months ago

But you can copy/paste the code from my branch into your project and tweak it to make it compile.

@stephentoub, are you sure I can? It relies on internal classes/APIs which I no longer have access to, such as Deque, AsyncOperation's ctors, ChannelUtilities etc.

Actually I have a real of this implementation, if some hacks need to be made in order to make it work, please tell me.

Thanks Stephen

stephentoub commented 7 months ago

It relies on internal classes/APIs

Those are all part of its implementation. You would copy those, too.

bartonjs commented 5 months ago

Video

Looks good as proposed.

namespace System.Threading.Channels;

public class Channel
{
+   public static Channel<T> CreateUnboundedPrioritized<T>();
+   public static Channel<T> CreateUnboundedPrioritized<T>(UnboundedPrioritizedChannelOptions<T> options);
}
+public sealed partial class UnboundedPrioritizedChannelOptions<T> : ChannelOptions
+{
+   public System.Collections.Generic.IComparer<T>? Comparer { get; set; }
+}