houseofcat / RabbitMQ.Dataflows

A set of libraries for rapidly developing RabbitMQ workflows/pipelines.
MIT License
14 stars 3 forks source link

Achieving Optimal RabbitMQ Pipeline/Dataflow Performance #28

Closed houseofcat closed 1 year ago

houseofcat commented 3 years ago

@tiggerite

Can you post your config? And draw out your flow? Show your setup, with no sensitive data, etc. Cluster, Queue, Consumer, Service

Unless I am mistaken on what you are saying, 6 messages per second is wrong/seem like incredibly low numbers.

Last I checked, I can process around 130,000 msgs/s while also hosting the RabbitMQ server on my work laptop. With 4x Dell series i7 from circa 2018, I can roughly hit 800,000 msg/s for pub/sub while hosting the RabbitMQ (single node) on a different box. It's why I scoffed at Google's numbers with their hardware.

A persistence layer, will saturate your DB as RabbitMQ will be faster than most databases outside of C*/Scylla or Redis caching.

houseofcat commented 3 years ago

ConsumerPipelineMicroservice (R9 5950X) Global Settings With Simulated IO delay per processing each message.

        public static Stopwatch Stopwatch;
        public static LogLevel LogLevel = LogLevel.Error;
        public static long GlobalCount = 200_000;
        public static bool EnsureOrdered = false; // use with simulate IO delay to determine if ensuring order is causing delays
        public static bool SimulateIODelay = true;
        public static int MinIODelay = 50;
        public static int MaxIODelay = 100;
        public static bool AwaitShutdown = true;
        public static bool LogOutcome = false;
        public static bool UseStreamPipeline = false;
        public static int MaxDoP = 64;
        public static Random Rand = new Random();

Output

Run a ConsumerPipelineMicroservice demo... press any key to continue!

Running example...

Awaiting full ConsumerPipeline finish...

Example finished...

Statistics!
MaxDoP: 64, Ensure Ordered: False
SimulateIODelay: True, MinIODelay: 50ms, MaxIODelay: 100ms
AwaitShutdown: True, LogOutcome: False
UseStreamPipeline: False
Finished processing 200000 messages (Steps: 600000) in 145898 milliseconds.
Rate: 1370.8207103592922 msg/s.
Rate: 4112.462131077877 functions/s.

Client Finished! Press any key to start the shutdown!
houseofcat commented 3 years ago

Here is the DataProducer

Single Publisher rate.

image

As expected - per RabbitMQ - as the queue fills up, performance decreases. This is where multiple queues (of the same data type) are necessary to keep the 20K msg/s to stay consistent and high.

houseofcat commented 3 years ago

Consumer Stats on ConsumerPipelineMicroservice (1 Consumer Count) image

Cpu On ConsumerPipelineMicroservice image

AppTestSettings

{
  "HouseofCat": {
    "ConsumerDataflowOptions": {
      "DataflowName": "ConsumerDataflow",
      "ConsumerName": "ConsumerFromConfig",
      "ConsumerCount": 1,
      "MaxDoP": 64,
      "EnsureOrdered": false,
      "Capacity": 1280, // 2*2*MaxDop*5, this number times * max message size = Peak Memory Usage for this Consumer
      "SimulateIODelay": false,
      "MinIODelay": 50,
      "MaxIODelay": 100,
      "LogStepOutcomes": false
    }
  },
  "Logging": {
    "LogLevel": {
      "Default": "Error",
      "Microsoft": "Warning",
      "Microsoft.Hosting.Lifetime": "Error"
    }
  },
  "AllowedHosts": "*"
}

RabbitConfig Test Settings

{
  "FactoryOptions": {
    "Uri": "amqp://guest:guest@localhost:5672/",
    "MaxChannelsPerConnection": 2000,
    "HeartbeatInterval": 6,
    "AutoRecovery": true,
    "TopologyRecovery": true,
    "NetRecoveryTimeout": 10,
    "ContinuationTimeout": 10,
    "EnableDispatchConsumersAsync": true,
    "SslOptions": {
      "EnableSsl": false,
      "CertServerName": "",
      "LocalCertPath": "",
      "LocalCertPassword": "",
      "ProtocolVersions": 3072
    }
  },
  "PoolOptions": {
    "ServiceName": "HoC.RabbitMQ",
    "MaxConnections": 5,
    "MaxChannels": 25,
    "SleepOnErrorInterval": 1000
  },
  "PublisherOptions": {
    "LetterQueueBufferSize": 100,
    "PriorityLetterQueueBufferSize": 100,
    "BehaviorWhenFull": 0,
    "AutoPublisherSleepInterval": 0,
    "CreatePublishReceipts": true,
    "Compress": false,
    "Encrypt": false
  },
  "GlobalConsumerOptions": {
    "AggressiveSettings": {
      "ErrorSuffix": "Error",
      "BatchSize": 128,
      "BehaviorWhenFull": 0,
      "SleepOnIdleInterval": 0,
      "UseTransientChannels": true,
      "AutoAck": false,
      "NoLocal": false,
      "Exclusive": false,
      "GlobalConsumerPipelineOptions": {
        "WaitForCompletion": false,
        "MaxDegreesOfParallelism": 64,
        "EnsureOrdered": false
      }
    },
    "ModerateSettings": {
      "ErrorSuffix": "Error",
      "BatchSize": 48,
      "BehaviorWhenFull": 0,
      "SleepOnIdleInterval": 100,
      "UseTransientChannels": true,
      "AutoAck": false,
      "NoLocal": false,
      "Exclusive": false,
      "GlobalConsumerPipelineOptions": {
        "WaitForCompletion": true,
        "MaxDegreesOfParallelism": 24,
        "EnsureOrdered": false
      }
    },
    "LightSettings": {
      "ErrorSuffix": "Error",
      "BatchSize": 16,
      "BehaviorWhenFull": 0,
      "SleepOnIdleInterval": 100,
      "UseTransientChannels": true,
      "AutoAck": false,
      "NoLocal": false,
      "Exclusive": false,
      "GlobalConsumerPipelineOptions": {
        "WaitForCompletion": true,
        "MaxDegreesOfParallelism": 8,
        "EnsureOrdered": false
      }
    },
    "SingleThreaded": {
      "ErrorSuffix": "Error",
      "BatchSize": 1,
      "BehaviorWhenFull": 0,
      "SleepOnIdleInterval": 0,
      "UseTransientChannels": true,
      "AutoAck": false,
      "NoLocal": false,
      "Exclusive": false,
      "GlobalConsumerPipelineOptions": {
        "WaitForCompletion": true,
        "MaxDegreesOfParallelism": 1,
        "EnsureOrdered": false
      }
    }
  },
  "ConsumerOptions": {
    "ConsumerFromConfig": {
      "Enabled": true,
      "GlobalSettings": "ModerateSettings",
      "ConsumerName": "ConsumerFromConfig",
      "QueueName": "TestRabbitServiceQueue"
    }
  }
}
houseofcat commented 3 years ago

Now, the above test is about allocating a ton of CPU to the Consumer. The following test demonstrates scaling out internally with more consumers with less CPU resources to achieve about the same effect as a single Consumer. The benefit of doing this is reducing the parallelism of the single consumer and allocating more resources to the core application.

Config changes to add 3 consumers, but reduce their global settings to "LightSettings".

image

  "ConsumerOptions": {
    "ConsumerFromConfig": {
      "Enabled": true,
      "GlobalSettings": "LightSettings",
      "ConsumerName": "ConsumerFromConfig",
      "QueueName": "TestRabbitServiceQueue"
    }
  }
{
  "HouseofCat": {
    "ConsumerDataflowOptions": {
      "DataflowName": "ConsumerDataflow",
      "ConsumerName": "ConsumerFromConfig",
      "ConsumerCount": 3,
      "MaxDoP": 64,
      "EnsureOrdered": false,
      "Capacity": 1280, // 2*2*MaxDop*5, this number times * max message size = Peak Memory Usage for this Consumer
      "SimulateIODelay": false,
      "MinIODelay": 50,
      "MaxIODelay": 100,
      "LogStepOutcomes": false
    }
  },
  "Logging": {
    "LogLevel": {
      "Default": "Error",
      "Microsoft": "Warning",
      "Microsoft.Hosting.Lifetime": "Error"
    }
  },
  "AllowedHosts": "*"
}
houseofcat commented 3 years ago

Exact same test but now with 5 consumers, with single threaded settings. More CPU for the Core app.

image

  "HouseofCat": {
    "ConsumerDataflowOptions": {
      "DataflowName": "ConsumerDataflow",
      "ConsumerName": "ConsumerFromConfig",
      "ConsumerCount": 5,
      "MaxDoP": 64,
      "EnsureOrdered": false,
      "Capacity": 1280, // 2*2*MaxDop*5, this number times * max message size = Peak Memory Usage for this Consumer
      "SimulateIODelay": false,
      "MinIODelay": 50,
      "MaxIODelay": 100,
      "LogStepOutcomes": false
    }
  },
    "SingleThreaded": {
      "ErrorSuffix": "Error",
      "BatchSize": 10, // also changed
      "BehaviorWhenFull": 0,
      "SleepOnIdleInterval": 0,
      "UseTransientChannels": true,
      "AutoAck": false,
      "NoLocal": false,
      "Exclusive": false,
      "GlobalConsumerPipelineOptions": {
        "WaitForCompletion": true,
        "MaxDegreesOfParallelism": 1,
        "EnsureOrdered": false
      }
    }
  },
  "ConsumerOptions": {
    "ConsumerFromConfig": {
      "Enabled": true,
      "GlobalSettings": "SingleThreaded",
      "ConsumerName": "ConsumerFromConfig",
      "QueueName": "TestRabbitServiceQueue"
    }
houseofcat commented 3 years ago

The effect of batch on a single threaded consumer, BatchCount = 1 vs BatchCount = 32

BatchCount = 1/Single Consumer Scenario is useful when throughput is slow - no matter what, but you also don't want the to throughput to spike (Consumer msg/s is highly predictable). image

BatchCount directly impacts efficiency. You increase/decrease this value until you get 100% consumer efficiency, before adding more consumers, then you have to adjust it all over again.

image

BatchCount can make the most impact out of any setting! The cost here is that you can be prone to burst traffic. Distributed/concurrent programming often is difficult for software engineers to understand the impact of their code. Thats why all of these settings are exposed. Everyone has to kind of test what works for them.

Multi-Threaded + High Batch/Buffer/Prefetch
Overall higher throughput, which means more core application calls, more function calls, more IO etc. You could overwhelm your logging layer or Database layer periodically with high burst traffic.

Single Threaded + High Batch/Buffer/Prefetch
Has lower throughput overall, but message count spikes lead to unpredictable performance spikes. If the message ingress count is consistent, your performance is consistent.

Single Threaded + Low Batch/Buffer/Prefetch
Lowest throughput, but highly predictable peak amounts of function calls that don't spike because queue message counts spike.

tiggerite commented 3 years ago

Hey, I'll get back to you on this when I can (maybe not until next week though). Just to note the 6 messages per second per queue is the peak rate we load test for, it should be able to cope with more for sure. However we are a bit constrained by memory and CPU, 800Mb would blow the current limits for example.

As for burst traffic this can happen for sure. We usually scale horizontally if we get warning in advance but this isn't always possible. Currently in production we use a batch size / prefetch of 15 with one consumer per queue per rabbit instance and always hit 100% utilisation; with the new solution I've managed 32 with 4 consumers per queue per instance and 16 with 8 consumers and under load it's still at 100% for both.

houseofcat commented 3 years ago

Okay, but just a note on the system tested with, I have 64 GB of RAM, and its not set to Server GC. If memory serves me correctly, Dataflows memory lifecycles don't aggressively release RAM to the OS... It maybe due to preventing GC churn. I do think its a place for improvement for sure in NET for sure, but I have ran microservices on a 1GB total RAM docker image too. It will scale accordingly.

While this is not entirely designed specifically to run on RaspberryPIs or light weight T* instances... the software scales UP to the hardware. It has plenty of good practices that will lead to just slowing down instead of crashing when redlining memory. Of course, there is also whatever you are doing added to the mix. Therefore, I can't make honest guarantees. Anyone who would give that guarantee would be a liar. I can do a better job of making the above documentations as time permits. But writing code is much more fun than my documentation chores. Being the only dev also means still doing everything with my limited time.

Instead of looking at throttling the core system, it would be better to consider adjusting the right settings. Consider a CPU benchmark application for a second. Its designed to push the general application to use 100% of CPU. In a stable setup, its not designed to crash the computer despite redlining resources. As I improve and grow, get more into lower level optimizations, this should start approaching the performance of things like wPrime or Prime95. A lofty target goal, but I think I can get to it.

tiggerite commented 3 years ago

I also don't set it to Server GC, I think it would do more harm than good (especially for the microservice ones - this behemoth one will be obsoleted in the nearish future anyway, albeit it handles the most traffic as well, so I can't completely discard it either). It's basically limited to around 500Mb at present and CPU of 1.5, that seems to be fine for the use case however, under load it doesn't hit 50%; in fact it's only on startup that it even approaches 75% of that.

I definitely hear you on it scaling up, however, it completely crashes should I push it much further (for e.g. adding more worker threads) - the kubernetes instance just doesn't come up at all as the resources are used up before it gets chance to start up properly, and then it cycles itself over and over, and never actually processes any messages. I'm hoping we can have double the memory available as a limit in the near future as well, that would make things so much easier and then we wouldn't see this issue.

As for what I am doing.. that's a tough one as a lot of the actual "message handling" is outside of my control. But in basic terms, after the message has been deserialized, it then has to be converted to another format, transmitted across the wire to an external service and then, once the external service has returned, converted again before we can finally ack the message. There are a few publishes in there as well at various points and of course metrics to emit.

There is a time limit between the message being received and acked, and for the slowest external services this can leave very little leeway in terms of the receiving/acking, conversions, publishes etc. Each message can also only stay on the "receive" queue for a maximum of a few seconds as previously discussed. For these reasons it's imperative to have quite a high batch size / prefetch and several consumers, so that the slowest external services can still be processed in a timely manner. Otherwise it is left waiting for those services to respond, and in the meantime the number of messages builds up and up causing a cascading effect as the messages are left on the queue too long.

I'm absolutely sure you can reach performance levels close to wPrime/Prime95. It's already highly impressive and really I'm just into tweaking things now, as I've already found a pretty good equilibrium from what I can tell. Now I can prevent dead letters on application shutdown thanks to the counter concept and nacking to requeue / shutting down consumers as they complete, aided by my overridden Consumer class, prefetch being large is no longer the issue it was in the past either.

houseofcat commented 3 years ago

I definitely hear you on it scaling up, however, it completely crashes should I push it much further (for e.g. adding more worker threads) - the kubernetes instance just doesn't come up at all as the resources are used up before it gets chance to start up properly, and then it cycles itself over and over, and never actually processes any messages. I'm hoping we can have double the memory available as a limit in the near future as well, that would make things so much easier and then we wouldn't see this issue.

If you could, I would really like details on this. At the very least I can see if there is anything at the library level to safe guard the crashing. This is the first case of crashing I have heard about - even on a constrained system that's a bit surprising to me. While Tesseract is new, the RabbitMQ parts are pretty vetted/solid over multiple years and would like to see what the errors/stacktraces were.

In addition to that, I took a look about inventing a new type of BufferBlock over the weekend. I was hoping one already existed but of course it didn't. Work is in progress, but I have had a look at the core parts you have been using in an effort to optimize them even further. I still think I can streamline it one step further for Consumers. Right now, there is a channel transfer mechanism from Consumer Channel -> ChannelBlockEngine channel. I would like a dev to be to integrate that channel directly instead, but I may need to abandon the Datablock/Actionblock pattern to do it correctly.

https://github.com/houseofcat/Tesseract/tree/feature/v1.2.1

This should in theory completely remove the memory impact. In terms of performance, the Channel integration, would speed things up significantly in very hot / highly parallel scenarios, but probably not your use case.

My last tip I can suggest, is to retest ConnectionPool with only 1 or 2 connections, on a 1.5 vCPUs allocation and be sure to bump your channel count a smidge to keep the ChannelPool saturated. I would go with 2 connections x 10 channels to start with (which only impacts publishing really when consumers use transient channels.) Each connection has a lot of CPU locking and wrapping Socket operations. Sockets generally don't perform as well on low thread systems, when they are heavily parallelized. You get a lot of blocking/stuttering.

At the end of the day, I may just have to build out a K8S lab test to see what works well. Could be a fun experiment, see how much performance I can squeak out of a single CPU.

tiggerite commented 3 years ago

If you could, I would really like details on this. At the very least I can see if there is anything at the library level to safe guard the crashing. This is the first case of crashing I have heard about - even on a constrained system that's a bit surprising to me. While Tesseract is new, the RabbitMQ parts are pretty vetted/solid over multiple years and would like to see what the errors/stacktraces were.

I'll see what I can do but it's a tricky one, as the pods just get restarted due to the pressure on resources and then the logs are gone for good. I'm not sure there are even any stack traces or exceptions.. at any rate I doubt it's your library anyway, probably more likely the SetMinThreads "hack".

In addition to that, I took a look about inventing a new type of BufferBlock over the weekend. I was hoping one already existed but of course it didn't. Work is in progress, but I have had a look at the core parts you have been using in an effort to optimize them even further. I still think I can streamline it one step further for Consumers. Right now, there is a channel transfer mechanism from Consumer Channel -> ChannelBlockEngine channel. I would like a dev to be to integrate that channel directly instead, but I may need to abandon the Datablock/Actionblock pattern to do it correctly.

https://github.com/houseofcat/Tesseract/tree/feature/v1.2.1

This looks really good, would love to try and get it integrated.

This should in theory completely remove the memory impact. In terms of performance, the Channel integration, would speed things up significantly in very hot / highly parallel scenarios, but probably not your use case.

Honestly feels like anything is worth a try! They can be quite highly parallelised with the 20 queue services albeit not all in one pipeline/dataflow.

My last tip I can suggest, is to retest ConnectionPool with only 1 or 2 connections, on a 1.5 vCPUs allocation and be sure to bump your channel count a smidge to keep the ChannelPool saturated. I would go with 2 connections x 10 channels to start with (which only impacts publishing really when consumers use transient channels.) Each connection has a lot of CPU locking and wrapping Socket operations. Sockets generally don't perform as well on low thread systems, when they are heavily parallelized. You get a lot of blocking/stuttering.

Interesting. I can certainly try this, it would only be a configuration change and fairly painless to deploy.

At the end of the day, I may just have to build out a K8S lab test to see what works well.

Could be a fun experiment, see how much performance I can squeak out of a single CPU.

I think I might need to create a local one as well. I have limited access to even our dev version of the actual deployment.

tiggerite commented 3 years ago

So I tried with MaxConnections 2 and MaxChannels 10 for each instance (the service connects to 2) and you are right, latency is reduced and memory usage is also down quite a bit, without any noticeable impact on publishes. 👍🏻 Before it was 5 connections and 25 channels per instance.

tiggerite commented 3 years ago

If you could, I would really like details on this. At the very least I can see if there is anything at the library level to safe guard the crashing. This is the first case of crashing I have heard about - even on a constrained system that's a bit surprising to me. While Tesseract is new, the RabbitMQ parts are pretty vetted/solid over multiple years and would like to see what the errors/stacktraces were.

I managed to get some help with this today and all we see is OOMKilled by kubernetes as the pod went over the memory limit under high load - no errors or stacktraces. This with the 2 connections / 10 channels as above as well and 64 worker threads which should not have pushed it over the limit. Eventually it managed to deploy however, after several failures.

I tried with a different instance type which has lower CPU (0.5 limit) but 1Gb memory, this then caused Kestrel issues like

{"@t":"2021-08-10T10:40:45.6279856Z","@mt":"As of \"{now}\", the heartbeat has been running for \"{heartbeatDuration}\" which is longer than \"{interval}\". This could be caused by thread pool starvation.","@l":"Warning","now":"2021-08-10T10:40:43.5291562+00:00","heartbeatDuration":"00:00:02.0987399","interval":"00:00:01","EventId":{"Id":22,"Name":"HeartbeatSlow"},"SourceContext":"Microsoft.AspNetCore.Server.Kestrel"}

... and kubernetes continually restarted the pod as liveness/readiness probes were failing, despite that it actually did manage to process some messages along the way.

houseofcat commented 3 years ago

@tiggerite

v1.2.1 - Both standalone ChannelBlockEngine integration and this DirectChannelExecutionEngine appear to be fully working in lieu of using BufferBlock. The DirectChannel call specifically uses the Channel inside of the Consumer. This would be optimal use case for lower memory allocations, the trade-off being that work stops immediately with the Consumer Channel being shuts down.

await consumer
    .StartConsumerAsync()
    .ConfigureAwait(false);

_ = consumer.DirectChannelExecutionEngineAsync(
    ProcessMessageAsync,
    4,
    true,
    null);
....

private Task<bool> ProcessMessageAsync(ReceivedData data)
{
    messageCount++;
    return Task.FromResult(data.AckMessage());
}

In high Pub/Sub scenarios, it reduced total Process Memory from 600-900 MB, to about a flat 400-500 MB. Actual object counts inside my library look virtually perfect. The highest object count from the StressTestClient 250,000 messages pub/subbed was a grand total of 54 and that was RabbitMQ channels in the ChannelPool. This is also in part to switching to the RecyclableAes/Gzip providers too in the StressTestClient. RecyclableGzip provider has had a major hotfix included to reduce memory allocations total to about 88% on average.

The majority of memory allocations appear to be under publishing. Furthermore, it seems tied to Utf8JsonProvider using memoization (involving runes/jobject property maps) and the RabbitMQ client itself seems to sit on quite large buffers that stay alive.

151 MB is simply Pivotal/VMWare's code image

I haven't had good luck reporting issues to their team though, I find them very rude and discourteous.

Newer Recyclable Compression benchmark data is found under the RecyclableGzipProvider. I honestly do not have enough time to test this any further than I have - a couple of unit tests, benchmarks, and updating the stress test client - without possibly another week or two delay. So, I am releasing it now. When I have some free time, I will review again for further polishing but if you see any exceptions/bugs, let me know in separate issues.

tiggerite commented 3 years ago

This looks amazing! Thanks so much for all your help and looking into this so deeply. I'll see what testing I can do (had to open a new PR though to override ConsumerBlock and thus use my own ConsumerDataflow because of C# nuances). Hope it will be easy enough to merge in before you release to NuGet?

I get that impression from the Pivotal/VMWare team as well, unfortunately :( I actually reached out to one of them about using the "async" branch in production code but didn't even get the courtesy of a reply. Oh well. You're doing God's work with your library so it more than makes up for it ;)

PS as for Serialization, I had to bake my own unfortunately as our use case it deserializes a Mongo BSON/JSON object. It's nowhere near as good as the Utf8Json implementation so I'm sure I am wasting memory there as well. I can probably share that as it's trivial:

using System;
using System.IO;
using System.Text;
using System.Threading.Tasks;
using HouseofCat.Serialization;
using Microsoft.Toolkit.HighPerformance;
using MongoDB.Bson.IO;
using MongoDB.Bson.Serialization;

namespace Serialisation
{
    public class BsonConverter : ISerializationProvider
    {
        private static readonly JsonWriterSettings DefaultWriterSettings = 
            new JsonWriterSettings { OutputMode = JsonOutputMode.Strict };

        public TOut Deserialize<TOut>(ReadOnlyMemory<byte> input) => Deserialize<TOut>(input.AsStream());

        public TOut Deserialize<TOut>(Stream inputStream)
        {
            using var strReader = new StreamReader(inputStream);
            using var jsonReader = new JsonReader(strReader);
            return BsonSerializer.Deserialize<TOut>(jsonReader);
        }

        public Task<TOut> DeserializeAsync<TOut>(Stream inputStream) => throw new NotImplementedException();

        public TOut Deserialize<TOut>(string input)
        {
            using var jsonReader = new JsonReader(input);
            return BsonSerializer.Deserialize<TOut>(jsonReader);
        }

        public byte[] Serialize<TIn>(TIn input)
        {
            using var memStream = new MemoryStream();
            Serialize(memStream, input);
            return memStream.ToArray();
        }

        public void Serialize<TIn>(Stream outputStream, TIn input)
        {
            using var strReader = new StreamWriter(outputStream);
            using var writer = new JsonWriter(strReader, DefaultWriterSettings);

            BsonSerializer.Serialize(writer, input);

            writer.Flush();
            outputStream.Position = 0;
        }

        public Task SerializeAsync<TIn>(Stream outputStream, TIn input) => throw new NotImplementedException();

        public string SerializeToPrettyString<TIn>(TIn input) => throw new NotImplementedException();

        public string SerializeToString<TIn>(TIn input) => Encoding.UTF8.GetString(Serialize(input));
    }
}

PS work stopping immediately when the Consumer Channel is shut down is completely fine for my use case. It is only shut down when the entire service is being shut down and I already use the immediate flag for consumers there for the dead letter prevention talked about previously 👍🏻

tiggerite commented 3 years ago

I've refactored the code to use DirectChannelExecutionEngineAsync with no pipeline/dataflow steps. I'll run a load test and see how it gets on, I have linked the task to ExecuteAsync of a BackgroundService with the cancellation token passed in so (once all the current messages have been processed) this should immediately stop the processing as well.

tiggerite commented 3 years ago

I wrote my own ChannelReaderBlock and ChannelReaderBlockEngine inspired by your versions:

using System;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using HouseofCat.Logger;
using HouseofCat.Utilities.Errors;
using Microsoft.Extensions.Logging;

namespace Dataflows
{
    public class ChannelReaderBlock<TOut> : ISourceBlock<TOut>
    {
        public Task Completion { get; }

        private readonly ILogger<ChannelReaderBlock<TOut>> _logger;
        private readonly ChannelReader<TOut> _channelReader;
        private readonly ITargetBlock<TOut> _targetBlock;
        private readonly ISourceBlock<TOut> _sourceFromTargetBlock;

        public ChannelReaderBlock(
            ChannelReader<TOut> channelReader, ExecutionDataflowBlockOptions executeOptions)
        {
            Guard.AgainstNull(channelReader, nameof(channelReader));
            _logger = LogHelper.LoggerFactory.CreateLogger<ChannelReaderBlock<TOut>>();
            _channelReader = channelReader;

            _targetBlock = new TransformBlock<TOut, TOut>(input => input, executeOptions);
            _sourceFromTargetBlock = (ISourceBlock<TOut>)_targetBlock;
            Completion = _targetBlock.Completion;
        }

        public async ValueTask ReadChannelAsync(CancellationToken token = default)
        {
            try
            {
                while (await _channelReader.WaitToReadAsync(token).ConfigureAwait(false))
                {
                    var message = await _channelReader.ReadAsync(token).ConfigureAwait(false);
                    if (message != null)
                    {
                        await _targetBlock.SendAsync(message, token).ConfigureAwait(false);
                    }
                }
            }
            catch (OperationCanceledException)
            {
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Reading consumer buffer threw an exception.");
            }
        }

        public void Complete() => _targetBlock.Complete();

        public void Fault(Exception exception) => _targetBlock.Fault(exception);

        public IDisposable LinkTo(ITargetBlock<TOut> target, DataflowLinkOptions linkOptions) =>
            _sourceFromTargetBlock.LinkTo(target, linkOptions);

        public TOut ConsumeMessage(
            DataflowMessageHeader messageHeader, ITargetBlock<TOut> target, out bool messageConsumed) =>
            throw new NotImplementedException();

        public void ReleaseReservation(DataflowMessageHeader messageHeader, ITargetBlock<TOut> target) =>
            throw new NotImplementedException();

        public bool ReserveMessage(DataflowMessageHeader messageHeader, ITargetBlock<TOut> target) =>
            throw new NotImplementedException();
    }
}
using System;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using HouseofCat.Dataflows;
using HouseofCat.Logger;
using Microsoft.Extensions.Logging;

namespace Dataflows
{
    public class ChannelReaderBlockEngine<TIn, TOut>
    {
        private readonly ILogger<ChannelReaderBlockEngine<TIn, TOut>> _logger;
        private readonly ExecutionDataflowBlockOptions _executeOptions;
        private readonly ChannelReaderBlock<TIn> _channelReaderBlock;
        private readonly ActionBlock<TIn> _workBlock;
        private readonly Func<TIn, ValueTask<TOut>> _workBodyAsync;
        private readonly Func<TOut, ValueTask> _postWorkBodyAsync;

        public ChannelReaderBlockEngine(
            ChannelReader<TIn> channelReader,
            Func<TIn, ValueTask<TOut>> workBodyAsync,
            int maxDegreeOfParallelism,
            bool ensureOrdered,
            Func<TOut, ValueTask> postWorkBodyAsync = null,
            TaskScheduler taskScheduler = null) : 
            this(workBodyAsync, maxDegreeOfParallelism, ensureOrdered, taskScheduler)
        {
            _channelReaderBlock = new ChannelReaderBlock<TIn>(channelReader, _executeOptions);
            _channelReaderBlock.LinkTo(_workBlock);

            _postWorkBodyAsync = postWorkBodyAsync;
        }

        private ChannelReaderBlockEngine(
            Func<TIn, ValueTask<TOut>> workBodyAsync,
            int maxDegreeOfParallelism,
            bool ensureOrdered,
            TaskScheduler taskScheduler)
        {
            _logger = LogHelper.GetLogger<ChannelReaderBlockEngine<TIn, TOut>>();
            _workBodyAsync = workBodyAsync ?? throw new ArgumentNullException(nameof(workBodyAsync));

            _executeOptions = new ExecutionDataflowBlockOptions
            {
                MaxDegreeOfParallelism = maxDegreeOfParallelism,
                EnsureOrdered = ensureOrdered
            };

            if (taskScheduler != null)
            {
                _executeOptions.TaskScheduler = taskScheduler;
            }

            _workBlock = new ActionBlock<TIn>(ExecuteWorkBodyAsync, _executeOptions);
        }

        public async ValueTask ReadChannelAsync(CancellationToken token = default) =>
            await _channelReaderBlock.ReadChannelAsync(token).ConfigureAwait(false);

        private async Task ExecuteWorkBodyAsync(TIn data)
        {
            try
            {
                if (_postWorkBodyAsync != null)
                {
                    var output = await _workBodyAsync(data).ConfigureAwait(false);
                    if (output != null)
                    {
                        await _postWorkBodyAsync(output).ConfigureAwait(false);
                    }
                }
                else
                {
                    await _workBodyAsync(data).ConfigureAwait(false);
                }
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, Constants.Dataflows.Error);
            }
        }
    }
}

This enables me to use an extension method like so:

    public static class ConsumerExtensions
    {
        public static async ValueTask DirectChannelExecutionEngineAsync(
            this IConsumer<ReceivedData> consumer,
            Func<ReceivedData, ValueTask<IRabbitWorkState>> workBodyAsync,
            GlobalConsumerPipelineOptions globalConsumerPipelineOptions,
            Func<IRabbitWorkState, ValueTask> postWorkBodyAsync = null,
            TaskScheduler taskScheduler = null,
            CancellationToken cancellationToken = default)
        {
            var channelReaderBlockEngine = new ChannelReaderBlockEngine<ReceivedData, IRabbitWorkState>(
                consumer.GetConsumerBuffer(),
                workBodyAsync,
                globalConsumerPipelineOptions.MaxDegreesOfParallelism ?? 1,
                globalConsumerPipelineOptions.EnsureOrdered ?? true,
                postWorkBodyAsync,
                taskScheduler);

            await channelReaderBlockEngine.ReadChannelAsync(cancellationToken).ConfigureAwait(false);
        }
    }

With this, I can link the stoppingToken directly to the new function:

            await _consumers
                .ParallelForEachAsync(
                    async consumer => await consumer.StartConsumerAsync().ConfigureAwait(false),
                    Environment.ProcessorCount)
                .ConfigureAwait(false);
            await Task.WhenAll(
                _consumers.Select(async consumer =>
                {
                    await consumer
                        .DirectChannelExecutionEngineAsync(
                            ProcessMessageAsync,
                            _globalConsumerPipelineOptions,
                            AckMessageAsync,
                            taskScheduler,
                            stoppingToken)
                        .ConfigureAwait(false);
                    await consumer.StopConsumerAsync(true).ConfigureAwait(false);
                })).ConfigureAwait(false);

(ProcessMessageAsync being ValueTask<IRabbitWorkState> ProcessMessageAsync(IReceivedData receivedData) which does the deserializing and then handling of the message).

tiggerite commented 3 years ago

I finally got chance to run a loadtest on this by the way and both CPU and memory are down by quite a bit (80-85% memory; CPU peaks at under 40%).

I found that a custom TaskScheduler passed in to TPL Dataflow has improved things in terms of reduced latency, reduced CPU consumption and lower memory usage. I'd be interested in your thoughts on it. (WorkStealingTaskScheduler from https://github.com/dotnet/samples/tree/main/csharp/parallel/ParallelExtensionsExtras - it also seems to help massively with reducing ThreadPool usage). Total consumer count is a (capped) value of all consumers for all queues.

                using var taskScheduler = new WorkStealingTaskScheduler(totalConsumerCount);
                await Task
                    .WhenAll(_processors.Select(async processor =>
                        // ReSharper disable once AccessToDisposedClosure
                        await processor.StartAsync(taskScheduler, stoppingToken).ConfigureAwait(false)))
                    .ConfigureAwait(false);

PS what do you make of the .NET 6 implementation of Parallel.ForEachAsync @houseofcat ? :) https://github.com/stephentoub/runtime/blob/main/src/libraries/System.Threading.Tasks.Parallel/src/System/Threading/Tasks/Parallel.ForEachAsync.cs

tiggerite commented 2 years ago

Is there an ETA for version 2.0.0 to NuGet, @houseofcat ? :) We're migrating everything to NET 6, and 1.2.2 still targets NET 5 (plus, has some of the stuff I've merged missing, isn't the end of the world, but means a bit of duplicated code now)

tiggerite commented 1 year ago

With everything now merged (well bar the really minor PR but I'm in no rush for it) and the performance issues all ironed out, happy to close this one, @houseofcat 👍🏻