StephenCleary / AsyncEx

A helper library for async/await.
MIT License
3.46k stars 356 forks source link

Feature request: Easier-to-use AsyncThrottler #88

Open StephenCleary opened 7 years ago

StephenCleary commented 7 years ago

From @StephenCleary on October 10, 2016 16:11

Can be based on SemaphoreSlim or AsyncSemaphore.

Something with disposable-style usage:

using (await throttler.WaitAsync())
{
  ...
}

Copied from original issue: StephenClearyArchive/AsyncEx.Coordination#8

bboyle1234 commented 5 years ago

I coded this when I needed to "throttle" orders sent to a stock exchange api according to their throttling rules, which was a combination of something like "max 4 orders per second, max 10 orders per 15 seconds etc"

    internal sealed class ThrottlerSet {

        readonly Throttler[] Throttlers;

        public ThrottlerSet(params (int numOrders, double numSeconds)[] configuration) {
            if (null == configuration || configuration.Length == 0) throw new ArgumentException(nameof(configuration));
            Throttlers = configuration.Select(c => new Throttler(c.numOrders, c.numSeconds)).ToArray();
        }

        public Task WaitAsync(CancellationToken cancellationToken) {
            return Task.WhenAll(Throttlers.Select(t => t.WaitAsync(cancellationToken)));
        }
    }

    internal sealed class Throttler {

        /// <summary>
        /// The maximum number of events that are allowed to happen in the given time window.
        /// </summary>
        public readonly int NumEvents;

        /// <summary>
        /// The given time window in which up to the maximum number of events are allowed to happen.
        /// </summary>
        public readonly double NumSeconds;

        readonly AsyncLock Lock;
        readonly CircularBuffer<TimeStamp> Events;

        public Throttler(int numEvents, double numSeconds) {
            NumEvents = numEvents;
            NumSeconds = numSeconds;
            Lock = new AsyncLock();
            Events = new CircularBuffer<TimeStamp>(NumEvents);
        }

        /// <summary>
        /// Waits, and then allows execution to proceed after the throttling rule has been obeyed.
        /// </summary
        /// <exception cref="OperationCanceledException">Thrown when cancellationToken is cancelled.</exception>
        public async Task WaitAsync(CancellationToken cancellationToken) {
            /// Allow execution to pass through here just one at a time.
            using (await Lock.LockAsync(cancellationToken).ConfigureAwait(false)) {
                /// Get the time at the other end of the throttling window, calling it the "boundaryTime"
                var boundaryTime = TimeStamp.Now.AddSeconds(-NumSeconds);
                /// Only need to wait if we have accumulated a sufficient number of actions already.
                if (Events.Count == NumEvents) {
                    /// Only need to wait if the first action was after the "boundaryTime"
                    var waitTime = Events.PeekFirst().Subtract(boundaryTime);
                    if (waitTime.Ticks > 0)
                        await Task.Delay(waitTime, cancellationToken).ConfigureAwait(false); // Actually performs the wait
                }
                /// Record the time at which the current action was allowed to proceed.
                Events.PushLast(TimeStamp.Now);
            }
        }

        /// <summary>
        /// A fixed-size collection of values. NOT thread-safe.
        /// </summary>
        sealed class CircularBuffer<T> : IEnumerable<T> {

            readonly int Size;
            readonly T[] Buffer;

            int _head;

            public CircularBuffer(int size) {
                Size = size;
                Buffer = new T[Size];
            }

            public int Count { get; private set; }

            public T this[int index] {
                get {
                    if (index < 0) throw new ArgumentOutOfRangeException(nameof(index));
                    if (index >= Count) throw new ArgumentOutOfRangeException(nameof(index));
                    index = _head - Count + index + 1;
                    if (index < 0) index += Size;
                    return Buffer[index];
                }
                set {
                    if (index < 0) throw new ArgumentOutOfRangeException(nameof(index));
                    if (index >= Count) throw new ArgumentOutOfRangeException(nameof(index));
                    index = _head - Count + index + 1;
                    if (index < 0) index += Size;
                    Buffer[index] = value;
                }
            }

            /// <summary>
            /// Adds a value to the end of the collection, overwriting the first value if the collection's size is exceeded.
            /// </summary>
            public void PushLast(T value) {
                if (++_head == Size) _head = 0;
                Buffer[_head] = value;
                if (Count < Size) Count++;
            }

            /// <summary>
            /// Removes the value from the end of the collection.
            /// </summary>
            public T PopLast() {
                if (Count == 0) throw new Exception("Buffer is empty.");
                var value = Buffer[_head];
                if (--_head == -1) _head = Size - 1;
                Count--;
                return value;
            }

            /// <summary>
            /// Adds a value to the beginning of the collection, overwriting the last value if the collection's size is exceeded.
            /// </summary>
            public void PushFirst(T value) {
                if (Count == Size) {
                    /// Overwriting the last item
                    Buffer[_head] = value;
                    if (--_head == -1) _head = Size - 1;
                } else {
                    var newTail = _head - Count;
                    if (newTail < 0) newTail += Size;
                    Buffer[newTail] = value;
                    Count++;
                }
            }

            /// <summary>
            /// Removes the value at the beginning of the collection.
            /// </summary>
            public T PopFirst() {
                if (Count == 0) throw new Exception("Buffer is empty.");
                var tail = _head - Count + 1;
                if (tail < 0) tail += Size;
                Count--;
                return Buffer[tail];
            }

            /// <summary>
            /// Returns the value stored at the end of the collection.
            /// </summary>
            public T PeekLast() {
                if (Count == 0) throw new Exception("Buffer is empty.");
                return Buffer[_head];
            }

            /// <summary>
            /// Returns the value stored at the beginning of the collection.
            /// </summary>
            public T PeekFirst() {
                if (Count == 0) throw new Exception("Buffer is empty.");
                var tail = _head - Count + 1;
                if (tail < 0) tail += Size;
                return Buffer[tail];
            }

            public IEnumerator<T> GetEnumerator() {
                for (var index = 0; index < Count; index++)
                    yield return this[index];
            }

            IEnumerator IEnumerable.GetEnumerator()
                => this.GetEnumerator();
        }
    }