dotnet / runtime

.NET is a cross-platform runtime for cloud, mobile, desktop, and IoT apps.
https://docs.microsoft.com/dotnet/core/
MIT License
15.45k stars 4.76k forks source link

Add a RingBuffer<T> collection type #30650

Open Suchiman opened 5 years ago

Suchiman commented 5 years ago

A RingBuffer<T> is an efficient collection type of fixed size to which elements can be added indefinitely which works by removing the oldest element once capacity is reached.

Often this sort of behavior is emulated inefficiently by using List<T> from which an element is explicitly removed when it has reached a certain size before adding the new element.

Proposed API Shape

The class follows the basic design of other .NET collection types

namespace System.Collections.Generic
{
    public class RingBuffer<T> : IList<T>
    {
        public int Capacity { get; }
        public int Count { get; }
        public bool IsReadOnly { get; }

        public RingBuffer(int capacity);
        public void Add(T item);
        public T this[int index] { get; set; }
        public IEnumerator<T> GetEnumerator();
        IEnumerator IEnumerable.GetEnumerator();
        public void Clear();
        public bool Contains(T item);
        public void CopyTo(T[] array, int arrayIndex);
        public int IndexOf(T item);
        void IList<T>.Insert(int index, T item);
        bool ICollection<T>.Remove(T item);
        void IList<T>.RemoveAt(int index);
    }
}

Naive sample implementation

namespace System.Collections.Generic
{
    public class RingBuffer<T> : IList<T>
    {
        private readonly T[] _buffer;
        private uint _position;

        public int Capacity => _buffer.Length;
        public int Count => _position < _buffer.Length ? (int)_position : _buffer.Length;
        public bool IsReadOnly => false;

        public RingBuffer(int capacity)
        {
            _buffer = new T[capacity];
        }

        public void Add(T item)
        {
            uint previousPosition = _position++;
            int nextIndex = (int)(previousPosition % _buffer.Length);
            if (_position < previousPosition) // overflow
            {
                _position += (uint)_buffer.Length + (uint)nextIndex + 1;
            }

            _buffer[nextIndex] = item;
        }

        public T this[int index]
        {
            get => _buffer[(_position + index) % _buffer.Length];
            set => _buffer[(_position + index) % _buffer.Length] = value;
        }

        public IEnumerator<T> GetEnumerator()
        {
            if (_position < _buffer.Length)
            {
                for (int i = 0; i < _position; i++)
                {
                    yield return _buffer[i];
                }
                yield break;
            }

            int currentZero = (int)(_position % _buffer.Length);
            for (int i = currentZero; i < _buffer.Length; i++)
            {
                yield return _buffer[i];
            }
            for (int i = 0; i < currentZero; i++)
            {
                yield return _buffer[i];
            }
        }

        IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();

        public void Clear()
        {
            Array.Clear(_buffer, 0, _buffer.Length);
            _position = 0;
        }

        public bool Contains(T item)
        {
            return IndexOf(item) != -1;
        }

        public void CopyTo(T[] array, int arrayIndex)
        {
            if (_position < _buffer.Length)
            {
                Array.Copy(_buffer, 0, array, 0, _position);
                return;
            }

            int currentZero = (int)(_position % _buffer.Length);
            Array.Copy(_buffer, currentZero, array, arrayIndex, _buffer.Length - currentZero);
            Array.Copy(_buffer, 0, array, arrayIndex + (_buffer.Length - currentZero), currentZero);
        }

        public int IndexOf(T item)
        {
            if (_position < _buffer.Length)
            {
                return Array.IndexOf(_buffer, item, 0, (int)_position);
            }

            int virtualIndex = Array.IndexOf(_buffer, item);

            if (virtualIndex == -1)
            {
                return virtualIndex;
            }

            return virtualIndex - ((int)_position % Capacity);
        }

        void IList<T>.Insert(int index, T item)
        {
            throw new NotSupportedException("RingBuffer does not support Insert");
        }

        bool ICollection<T>.Remove(T item)
        {
            throw new NotSupportedException("RingBuffer does not support Remove");
        }

        void IList<T>.RemoveAt(int index)
        {
            throw new NotSupportedException("RingBuffer does not support RemoveAt");
        }
    }
}

In my sample i've choosen to not implement Insert, Remove and RemoveAt for performance reasons but if higher compatibility with the IList<T> interface is desired, such implementation could be provided.

Clockwork-Muse commented 5 years ago

... I think I usually see this sort of thing as some sort of queue (although C# doesn't have an IQueue interface). Especially as Add maps so well to Push.

Not having a way to remove a single element (as opposed to clearing) feels strange.

scalablecory commented 5 years ago

Additional considerations from HTTP2's ring buffer:

And one more:

MarcoRossignoli commented 5 years ago

I think it's a good collection type to have built-in, maybe provide some (single-multi)producer/(single-multi)consumer feature(i.e. fintech scenarios)...but maybe is too much for a building block.

stakx commented 5 years ago

the ability to operate on a ring buffer as two Memory<T> is useful for I/O and parsing

Makes sense, but could it be that System.IO.Pipelines already provides a better abstraction for that?

svick commented 5 years ago

@Suchiman

Often this sort of behavior is emulated inefficiently by using List<T>

Can't you implement if fairly easily and efficiently by using Queue<T> instead?

Clockwork-Muse commented 5 years ago

@MarcoRossignoli - If you're talking about financial-sector programming, you probably want an existing class - ConcurrentQueue. This one is explicitly dropping things in the event of an overrun, which is almost certain to give bankers the willies.

MarcoRossignoli commented 5 years ago

This one is explicitly dropping things in the event of an overrun, which is almost certain to give bankers the willies.

In my experience it depends on algorithm goal...sometimes it's ok or also unavoidable, as always there are some tradeoff between perf/resource/cost, but this is out of pure technical discussion. BTW I think this is a good building block also without advanced features(prod/cons/sync/blocking etc...), anyone who wants can extend or build on it.

khellang commented 5 years ago

Duplicate of https://github.com/dotnet/corefx/issues/3126?

Suchiman commented 5 years ago

@khellang looks like it, i've failed to notice that a RingBuffer is also called CircularBuffer / CircularQueue.

iSazonov commented 5 years ago

PowerShell uses such type https://github.com/PowerShell/PowerShell/blob/da748549e3cd59d32bd34daf1b69e70fedf6aac8/src/Microsoft.PowerShell.Commands.Utility/commands/utility/MatchString.cs#L308

GSPP commented 5 years ago

Maybe it is better to add a Deque and use that as a ring buffer.

static void RingBufferStyleAdd(this Deque<T> d, T item) {
 if (d.Count == d.Capacity) d.PopEnd();
 d.PushFront(item);
}

I am in favor of adding a deque to the framework (https://github.com/dotnet/corefx/issues/32790).

Update: I now realize that a Queue is sufficient for this idea.

GrabYourPitchforks commented 2 years ago

FYI this code served as the reference implementation for the ring buffer used by SignalR's first releases: https://github.com/GrabYourPitchforks/messagestore

In their scenario, they needed a way to get all messages that were added since a particular timestamp, where it was ok to lose some messages. I suspect anybody needing a proper ring buffer structure also needs the concept of "here's what was added since you last looked."

Aaronontheweb commented 2 years ago

So a circular buffer should ideally implement:

I'd propose an implementation first that is non-concurrent and non-resizable as a starting point.

I've been using an implementation I wrote many years ago that does all of this (sans the zero-ing out - I need to add that) https://github.com/petabridge/Petabridge.Collections/blob/dev/src/Petabridge.Collections/CircularBuffer.cs

We use this internally in many of our streaming applications - I would be happy to contribute a more polished implementation that meets the standards of the .NET runtime team if there are no objections.

omariom commented 2 years ago

Aren't ring buffers just queues with a policy?

Aaronontheweb commented 2 years ago

They're queues that consume a fixed amount of memory and automatically overwrite the head of the queue when capacity is exceeded - they're commonly used in streaming applications where data is highly perishable.

Come to think of it, they're not 100% queue semantics either - in applications like time-series, for instance, you want to keep the buffer full and let the push operations overwrite older data from the head. You want to enumerate without popping so you can display the full series.

Same is true for applications like media streaming - you want to keep the buffer full so +/- N seconds of playback are available in-memory on the local device so the user to stay ahead of the live data stream and have the option to seek backwards briefly. But eventually you need to push the older frames out of the buffer because memory is constrained .

EgorBo commented 9 months ago

Any particular reason this has never been accepted for an API review? Considering it already has 2 duplicate proposals + 17 upvotes

stephentoub commented 9 months ago

Any particular reason this has never been accepted for an API review? Considering it already has 2 duplicate proposals + 17 upvotes

We're fairly conservative when it comes to adding new collection types into the core libraries, this could easily be shipped as a nuget package by someone (and has, in multiple different ways, such as the one linked above), and the requested semantics are what you get with Queue<T> if you just have an Enqueue helper that's basically:

if (queue.Count == queue.Capacity) queue.Dequeue();
queue.Enqueue(value);