dotnet / runtime

.NET is a cross-platform runtime for cloud, mobile, desktop, and IoT apps.
https://docs.microsoft.com/dotnet/core/
MIT License
15.29k stars 4.74k forks source link

Proposal: BoundedConcurrentQueue<T> #23700

Closed stephentoub closed 4 years ago

stephentoub commented 7 years ago

ConcurrentQueue<T> is an unbounded, thread-safe queue, where the primary operations are Enqueue and TryDequeue. It's one of the more valuable concurrent collection types. However, unbounded collections aren't always desirable. For example, consider wanting to use a concurrent queue for object pooling. If you want to ensure you don't store more than N objects, this is difficult or impossible to achieve efficiently with ConcurrentQueue<T>, which will automatically grow its storage to store the item being added if there's insufficient room.

ConcurrentQueue<T>, however, is actually implemented as a wrapper for a bounded queue, internally called ConcurrentQueue<T>.Segment. https://github.com/dotnet/corefx/blob/9c468a08151402a68732c784b0502437b808df9f/src/System.Collections.Concurrent/src/System/Collections/Concurrent/ConcurrentQueue.cs#L820 In essence, ConcurrentQueue<T>.Segment provides the bounded queue behavior, and ConcurrentQueue<T> layers on top of that unbounded semantics.

We should clean up the Segment APIs and expose it as

namespace System.Collections.Concurrent
{
    public sealed class BoundedConcurrentQueue<T>
    {
        public BoundedConcurrentQueue(int capacity); // capacity must a power of 2 that's >= 2

        public int Capacity { get; }
        public int Count { get; }
        public bool IsEmpty { get; }

        public bool TryEnqueue(T item);
        public bool TryDequeue(out T item);
    }
}

The current implementation is fast, and is able to achieve that speed in part by eschewing some functionality that would weigh it down non-trivially, e.g. enumeration. That's why it doesn't implement any interfaces like IEnumerable<T> or IReadOnlyCollection<T> that would force us to add such behavior and slow it down. This collection would be very specialized and used purely for its ability to have items enqueued and dequeued quickly. (Methods like TryPeek, ToArray, CopyTo, GetEnumerator, etc., all require the ability to look at data in the queue without removing it, and in the current implementation, that requires marking the segment as "preserving for observation", which means nothing will ever be removed from the queue; this has the effect of continuing to allow enqueues until the segment is full, but since dequeues don't end up removing data, at that point nothing further can be enqueued, even if everything is dequeued. ConcurrentQueue<T> deals with this simply by creating a new segment, but that doesn't work for the segment itself.)


EDIT @stephentoub 7/6/2018: See alternate proposal at https://github.com/dotnet/corefx/issues/24365#issuecomment-403074379.

stephentoub commented 7 years ago

cc: @kouvel, @tarekgh, @benaadams

benaadams commented 7 years ago

For performance it would be desirable for boundedCapacity to be a power of 2 (to use & rather than %).

Would it be acceptable to have the capacity as:

(int minCapacity) // Will be rounded up to nearest power of two
stephentoub commented 7 years ago

Yes, we would want either to require the value be a power of 2 or be allowed to round it up to one. And I think it's best to require it, as that keeps the doors open in the future to allow for other sizes to be supported.

benaadams commented 7 years ago

This is a vary desirable type for Object pooling (rather than array pooling)

One candidate use is "Improve performance of Immutable/SecureObjectPool/AllocFreeConcurrentStack" https://github.com/dotnet/corefx/issues/24337

However; testing wise it would be better to be a more generic type and have specific tests rather than be part of Immutable.

Other candidates:

Also the ObjectPools in aspnet https://github.com/aspnet/Common/tree/dev/src/Microsoft.Extensions.ObjectPool

System.Reflection.Internal.ObjectPool https://github.com/dotnet/corefx/blob/master/src/System.Reflection.Metadata/src/System/Reflection/Internal/Utilities/ObjectPool%601.cs

System.Net.Http.HttpConnectionPool https://github.com/dotnet/corefx/blob/master/src/System.Net.Http/src/System/Net/Http/Managed/HttpConnectionPool.cs

System.Data.ProviderBase.DbConnectionPool https://github.com/dotnet/corefx/blob/master/src/System.Data.SqlClient/src/System/Data/ProviderBase/DbConnectionPool.cs

Also items like SocketAsyncEventArgs docs suggest using a pool for them

Allocate a new SocketAsyncEventArgs context object, or get a free one from an application pool. ... Reuse the context for another operation, put it back in the pool, or discard it.

However there is no easy way to do a high performance bounded pool in the framework (hence all the different implementations within the framework itself)

Note: the applications for a bounded non-blocking concurrent queue are much bigger than object pooling; its just a common pattern; and its very easy to put an object pool on top (with other semantics like auto-create when empty, clear etc)

svick commented 7 years ago

What about implementing IProducerConsumerCollection<T>? This type would seemingly fit that interface, except then it would also have to implement IEnumerable<T> and ICollection.

benaadams commented 7 years ago

What would

void ICollection.Add(T item);

Do? Throw when full?

Can an interface be inserted into an inheritance chain and be binary compat?

interface IBoundedCollection<T>
{
    bool TryAdd(T item);
    bool TryTake(out T item);
}

interface IProducerConsumerCollection<T> : IBoundedCollection<T>, IEnumerable<T>, ICollection
{
    void CopyTo(T[] array, int index);
    T[] ToArray();
}

I assume not; else the IReadOnlyList and IReadOnlyCollection were put in wrong place

svick commented 7 years ago

@benaadams

What would void ICollection.Add(T item); do?

IProducerConsumerCollection<T> implements System.Collections.ICollection, but not System.Collections.Generic.ICollection<T>, so there is no Add.

stephentoub commented 7 years ago

This type would seemingly fit that interface

Yes, but in addition to IEnumerable being problematic, CopyTo and ToArray are problematic for the same reasons.

tarekgh commented 7 years ago

@stephentoub do we need to expose it as a new type? or we can just add the bounding functionality to ConcurrentQueue?

namespace System.Collections.Concurrent
{
    public sealed class ConcurrentQueue<T>
    {
        public ConcurrentQueue(int capacity); // capacity must a power of 2 that's >= 2

        public int Capacity { get; }
        ...
    }
}
stephentoub commented 7 years ago

do we need to expose it as a new type? or we can just add the bounding functionality to ConcurrentQueue?

A few concerns:

  1. ConcurrentQueue already exposes all of the APIs I mentioned make it difficult to efficiently implement the core TryEnqueue/TryDequeue. For example, what happens if someone enumerates the queue when the sole bounded segment is 1/3 full? Today in the unbounded case, ConcurrentQueue will just create a new segment, such that subsequent enqueues will go to that. But then if it did that for the bounded case, how would it efficiently managed the bound across data in two different segments?
  2. ConcurrentQueue exposes Enqueue rather than TryEnqueue... what's the behavior of Enqueue if the queue is full?
  3. ConcurrentQueue would need to support both the bounded and unbounded modes. That would make certain operations more expensive, as they'd need to check which mode they were in and act accordingly.
tarekgh commented 7 years ago

@stephentoub I am not pushing back on the new type but trying to evaluate if it is really needed. is it possible we can have a static method on ConcurrentQueue (e.g. CreateBoundedQueue) and this method will just return an internal instance of BoundedConcurrentQueue (considering it is subclassed of ConcurrentQueue). by doing that we'll provide the same implementation you were willing to have in BoundedConcurrentQueue anyway and will not affect the current ConcurrentQueue implementation. which should address the first and third points you mentioned.

ConcurrentQueue exposes Enqueue rather than TryEnqueue... what's the behavior of Enqueue if the queue is full?

I think we are defining a new behavior so I think it is ok to throw at that time. This is similar to the pattern we have in other classes anyway.

My only point in having a new type is, developers, have to treat BoundedConcurrentQueue and ConcurrentQueue as 2 different things while they are really same thing but with different settings or constraints.

stephentoub commented 7 years ago

trying to evaluate if it is really needed

That's a good thing to do :smile:

is it possible we can have a static method on ConcurrentQueue (e.g. CreateBoundedQueue) and this method will just return an internal instance of BoundedConcurrentQueue (considering it is subclassed of ConcurrentQueue)

That would require the relevant methods to be made virtual, which can add non-trivial expense.

have to treat BoundedConcurrentQueue and ConcurrentQueue as 2 different things while they are really same thing but with different settings or constraints.

Well, if a core method like Enqueue throws on one but not the other, developers have to do two different things.

tarekgh commented 7 years ago

That would require the relevant methods to be made virtual, which can add non-trivial expense.

agree. I heard the jit is trying to optimize on some specific cases or patterns of the virtual calls. If this is true, we can write put implementation with the pattern that the Jit optimize it.

Well, if a core method like Enqueue throws on one but not the other, developers have to do two different things.

Usually, devs can avoid that by just using TryEnque. so it is up to the dev to decide what works better in their scenario.

stephentoub commented 7 years ago

I heard the jit is trying to optimize on some specific cases or patterns of the virtual calls. If this is true, we can write put implementation with the pattern that the Jit optimize it.

If you have a method DoWork(ConcurrentQueue<T> queue) that operates on that queue, unless DoWork gets inlined and the call site was able to determine the concrete type of the queue, there's no way the JIT could help avoid the virtuals here.

stephentoub commented 7 years ago

I'm also not understanding why adding such a factory is better in this case. 90% of the surface area of the type would need to throw for the special case, code would need to know how the type it's handed was constructed / what its constraint is, none of the interface implementations would work, etc. What is the benefit over exposing a new type?

tarekgh commented 7 years ago

What is the benefit overexposing a new type?

The only reason is what I mentioned which is devs don't have to use 2 types which really doing exact same things. Imagine someone wants to write a method which takes the queue as a parameter. having 2 types means they need to provide 2 methods doing exactly same things.

If you think having a factory method on the ConcurrentQueue will complicate the implementation and the behavior, then I am fine with 2 types at that time.

4creators commented 7 years ago

Is it possible to make educated guess what performance gain could be achived (or even simple perf test using internal ConcurrentQueue.Segment)?

benaadams commented 7 years ago

@4creators current issue is you can't create a bounded queue with ConcurrentQueue; calling .Count rather than .IsEmpty causes an allocation; and also you'd have a race condition between .Count and .Enqueue.

In order to get round that you need to wrap it in another structure that does interlocks; but then you are getting a performance hit for the extra overhead - whereas its already behaving in the desired way in the internal Segment without a race or extra overhead.

kouvel commented 7 years ago

One case for which I had to create a bounded concurrent queue before was when I had multiple producers and a single consumer that was transferring results from producers over the network. Producers had to be throttled such that they don't cause too much memory to be used while the sole consumer is lagging behind. Would it be beneficial to add Enqueue and Dequeue to the API that block upon queue full / queue empty respectively?

For that particular case it may still be benefical to have a different implementation, as it may be possible to have more efficient Dequeue in some cases since there is only one consumer.

kouvel commented 7 years ago

Another requirement for that particular scenario was to bound the queue based on an estimation of memory usage rather than number of items, throttling on some memory usage bound. Would it be useful to allow some way for the user to determine the bound?

svick commented 7 years ago

@kouvel

Would it be beneficial to add Enqueue and Dequeue to the API that block upon queue full / queue empty respectively?

Can't you use BlockingCollection<T> for that already?

stephentoub commented 7 years ago

Would it be beneficial to add Enqueue and Dequeue to the API that block upon queue full / queue empty respectively?

This is what the Channel.CreateBounded<T> API from the channels library on corefxlab that's moving to corefx lab provides, or at least the asynchronous version. BlockingCollection<T> already provides that for synchronous.

an estimation of memory usage rather than number of items

Would that really need to be built-in? I'd think such an estimation could just be accomplished by the user and translated by the user into a count. This implementation would likely suffer significantly in perf if it had to do anything more per operation than it's already doing.

kouvel commented 7 years ago

Can't you use BlockingCollection for that already?

I'm guessing it doesn't allow concurrent enqueues, in which case it would require a lock

Channel.CreateBounded

I see, I'll take a look

I'd think such an estimation could just be accomplished by the user and translated by the user into a count.

The capacity would have to be guessed up-front, some items may be much larger than others so if the queue contains a bunch of large items I may want to bound it sooner, and if it contains a bunch of small items I may want to allow more items in the queue before bounding it.

This implementation would likely suffer significantly in perf if it had to do anything more per operation than it's already doing.

Yea probably not worth folding that into this implementation

stephentoub commented 7 years ago

I'm guessing it doesn't allow concurrent enqueues, in which case it would require a lock

It does support concurrent enqueues, and it does use a lock, though I think there are ways around that. One of the things I'd like help with in the channels lib is improving the perf of the implementation, including removing some of the locking that's currently being done, e.g. currently the channel implementations all lock on enqueue (most avoid that on dequeue), but we should be able to avoid that in many cases.

The capacity would have to be guessed up-front, some items may be much larger than others so if the queue contains a bunch of large items I may want to bound it sooner, and if it contains a bunch of small items I may want to allow more items in the queue before bounding it.

You're saying you'd make the bound dynamic? How would you do that while still keeping the queue implementation fast?

svick commented 7 years ago

@kouvel

Can't you use BlockingCollection for that already?

I'm guessing it doesn't allow concurrent enqueues, in which case it would require a lock

From the docs:

The Dispose method is not thread-safe. All other public and protected members of BlockingCollection<T> are thread-safe and may be used concurrently from multiple threads.

So, concurrent enqueues are definitely allowed.

kouvel commented 7 years ago

It does support concurrent enqueues

I see. BlockingCollection then seems to be very similar to BoundedConcurrentQueue in the functionality it provides. I guess it's not possible to have one implementation that does both because of the interfaces that BlockingCollection implements.

You're saying you'd make the bound dynamic? How would you do that while still keeping the queue implementation fast?

The bound could be based on the sum of opaque values that are passed in to Enqueue, which could for instance represent the size of the item being enqueued.

BoundedChannel appears to be something close to what I was looking for.

stephentoub commented 7 years ago

BoundedChannel appears to be something close to what I was looking for.

I think there's more flexibility in that API. Note that a bounded channel is created from a BoundedChannelOptions, so if there's something other than a count you think should be provided there, let's chat about it. I currently have https://github.com/dotnet/corefxlab/pull/1805 out to revise the API surface area based on the API review we did.

kouvel commented 7 years ago

Another random thought, if the main use case for BoundedConcurrentQueue is object pooling, where I imagine the order in which items are removed from the queue doesn't matter, why not call it ObjectPool? If we feel that the queue behavior is beneficial there could also be BoundedConcurrentQueue and if we further feel that they could benefit from the same implementation, that could be the case behind the scenes.

kouvel commented 7 years ago

Is it possible to implement an ObjectPool which has less restrictions to be more efficient than a BoundedConcurrentQueue?

stephentoub commented 7 years ago

why not call it ObjectPool?

I think we do need to invest in an ObjectPool<T>, but I don't know that it has the same surface area / implementation as this BoundedConcurrentQueue<T> would. Object pooling often requires varying policies, one of which could be the super simple policy effectively implemented by this queue type, but it's not the only one. I don't think we want to burn the ObjectPool<T> name on this.

That said, it's certainly possible there's a better name than the one I proposed.

stephentoub commented 7 years ago

Is it possible to implement an ObjectPool which has less restrictions to be more efficient than a BoundedConcurrentQueue?

I think it depends on the scenario. For example, if you have a bunch of threads that are all just taking and returning objects, and are doing so at fairly consistent rates, the existing ConcurrentBag<T> can actually be a more efficient pool (after https://github.com/dotnet/corefx/pull/14254).

You might also not want to throw objects away, again depending on your scenario, in which case you might prefer ConcurrentQueue<T> (or ConcurrentBag<T>) over this. In theory ConcurrentStack<T> would also be good, and in fact it's used in coreclr/corefx for some object pooling today, but it suffers in its current implementation from an allocation per push, which means it's really only useful as a pool for super expensive objects.

kouvel commented 7 years ago

If I need the queue behavior and I call TryEnqueue and it fails because the queue is full, what could I do as the user? I would like to block until the queue has space (if I require enqueuing the item), but otherwise, I would have to not care that I cannot enqueue something or just spin until it can be enqueued. Is that a realistic scenario outside of ObjectPool?

kouvel commented 7 years ago

Forgot about ConcurrentBag, when would one use BoundedConcurrentQueue over ConcurrentBag (assuming ConcurrentBag's implementation is ideal)?

kouvel commented 7 years ago

I guess ConcurrentBag is not bounded. But when would this type be more beneficial than a theoretical BoundedConcurrentBag / ObjectPool?

stephentoub commented 7 years ago

I would have to not care that I cannot enqueue something or just spin until it can be enqueued

Or process the item yourself, e.g. if this is a producer/consumer scenario and the queue is being used to hand off data from producer to consumer and the producer has charged ahead so far that the consumers can't keep up, the producer might be able to process the item itself (temporarily switch to being a consumer).

Or dequeue an item to make room for the new item, either dropping it (e.g. imagine you were processing stock ticks and you only cared to have the most recent ones) or processing it itself as in the above case.

Is that a realistic scenario outside of ObjectPool?

Yes. Object pooling is just one example of a scenario where you're ok with some data being lost, but there are others.

when would one use BoundedConcurrentQueue over ConcurrentBag (assuming ConcurrentBag's implementation is ideal)?

Ideal for what? Different situations demand different implementations. If the producers are different threads from the consumers, for example, ConcurrentBag<T> is a bad choice, as every take will be forced to steal, making it very expensive.

I guess ConcurrentBag is not bounded.

Right

stephentoub commented 7 years ago

Is the concern being expressed here about exposing such functionality at all? Or about the name of the type being chosen? We could even choose to expose it as it is now:

namespace System.Collections.Concurrent
{
    public class ConcurrentQueue<T>
    {
        public class Segment { ... }
    }
}

I see the main upside of that being it highlights the nicheness of the type. The main downsides I see are that a) it can't be released out-of-band, as it's modifying an existing type in corelib, and b) it might result in a strange design if we ever changed ConcurrentQueue<T>s implementation again in the future.

kouvel commented 7 years ago

Or process the item yourself

Or dequeue an item to make room for the new item

Yea that's a good point, I suppose producers could act as consumers temporarily and that could serve as a decent throttling mechanism in some cases.

Is the concern being expressed here about exposing such functionality at all? Or about the name of the type being chosen?

My main concern was that I didn't understand the use case. Based on above, I guess one scenario is that I need queue behavior, there are multiple producers and consumers, and producers are willing to act as consumers when the queue is full. I don't know how realistic that scenario is, but I agree that would be something this type could provide that doesn't overlap with other existing or potential future types.

svick commented 7 years ago

@stephentoub Regarding naming the type ConcurrentQueue<T>.Segment: I'm fine with hiding niche types and making it a nested type is one way to do that. What I don't like is that the name does not describe what the type does or how people are supposed to use it, it describes how the current implementation of ConcurrentQueue<T> uses it.

benaadams commented 7 years ago

BlockingCollection then seems to be very similar to BoundedConcurrentQueue in the functionality it provides. I guess it's not possible to have one implementation that does both because of the interfaces that BlockingCollection implements.

BlockingCollection is a meta type on top of IProducerConsumerCollection<T> that uses ConcurrentQueue<T> as its default backing it does extra work on top to add bounding (including 2 semaphores); so its increasing the complexity for bounding rather than reducing it (though it provides additional functionality and can work on any IProducerConsumerCollection); and ConcurrentQueue is built on top of the BoundedConcurrentQueue - so its two extra datastructures to return to the behaviour of the underlying one.

one scenario... there are multiple producers and consumers, and producers are willing to act as consumers when the queue is full

A shared Queue is generally a good data structure for pooling over Stack as the add and remove don't contend as much; its more controlled than [ThreadStatic] though that has advantages also for contention; but isn't good when one thread is getting from pool and a different thread is putting back.

Another scenario is non-blocking queuing; without backlog getting too big per queue (as an inverse to what the threadpool does)

For example N threads with bounded local queues; global unbounded overflow queue

New task => if global queue is not empty (overflow state) -> Enqueue to global queue => else round robin TryEnqueue to a local queue ==> if full round robin to next and TryEnqueue ==> if all full -> Enqueue to global queue

Thread processing => TryDequeue from local queue => Else TryDequeue global => Else round robin TryDequeue (steal)

kouvel commented 7 years ago

@benaadams that makes sense, thanks

omariom commented 7 years ago

@stephentoub @KrzysztofCwalina

ConcurrentQueue<T> is a general purpose multiple readers/multiple writers unbounded queue - safe and convenient combination of properties.

BoundedConcurrentQueue<T> is a lower level thing, exposed publically for its performance characteristics. If there is a chance that other kinds of queues - like single producer/single consumer one - get into CoreFx, it may be worth creating a new namespace for them now.

benaadams commented 7 years ago

If the addition of an Enqueue which overwrites the oldest and also moves the head on by one wouldn't be too much of a stretch? Then it would be a CircularBuffer<T> https://github.com/dotnet/corefx/issues/3126 ?

e.g. TryEnqueue doesn't write when full; Enqueue "removes" oldest and adds newest to end when full

stephentoub commented 7 years ago

If the addition of an Enqueue which overwrites the oldest and also moves the head on by one wouldn't be too much of a stretch?

With the current implementation you would have tearing problems.

benaadams commented 7 years ago

With the current implementation you would have tearing problems.

Probably should be two different data types then.

As likely wouldn't (shouldn't) be using both Enqueue and TryEnqueue on the same instance - and it would add complication (like stream with blocking and async that interact - though likely only used in one mode)

Should they be related naming-wise?

safern commented 6 years ago

@stephentoub should we mark this one as ready for review?

karelz commented 6 years ago

FYI: The API review discussion was recorded - see https://youtu.be/BI3iXFT8H7E?t=6317 (5 min duration)

Conclusion: We want to wait on @stephentoub in the API review meeting (Feb).

benaadams commented 6 years ago

This isn't blocked anymore?

karelz commented 6 years ago

Correct, @stephentoub is back, it is unblocked :)

safern commented 6 years ago

@karelz is this approved now?

karelz commented 6 years ago

Nope, it was not critical for 2.1, so we didn't prioritize it.