Open davidmatson opened 6 years ago
Another analog that might help - how do web servers handle this problem (limiting the number of requests they're in the middle of processing)? What would you think of a web server that limited that number to 48? :)
I was digging into how to fix and rediscovered NewBatchThreshold. It may already serve this purpose...
No, it appears that NewBatchThreshold isn't solving this problem. I have it set to 10,000, but when there are millions of messages in the queue, it will only start processing ~100 of them currently. Perhaps NewBatchThreshold is re-evaluated when a message finishes processing but not before. I expect it was written to allow controlling a value between 1 and BatchSize but I suspect I didn't fully consider how to handle values above BatchSize correctly.
The quirks of using BatchSize + NewBatchThreshold for concurrency control is a usability issue we've been dealing with for a while (@davidebbo ). Using NewBatchThreshold you can define an upper limit on the number of concurrent invocations you'll allow. Until you reach the threshold, we'll continue to fetch and start new batches. Only once you've exceeded NewBatchThreshold do we begin to wait until at least one invocation completes before fetching more work (code here). Thus you can actually have NewBatchThreshold + BatchSize concurrent invocations. So while use of these two knobs is confusing to users (they'd just like a single knob) they do serve the purpose.
Regarding the current usability issue of the two knobs, I believe @davidebbo created an issue for simplifying this somewhere, I can't find it though.
I checked that, and I think it's more than just a usability issue - if I have millions of messages in the queue and set the NewBatchThreshold to 10,000, I'm only getting ~100 concurrently processing, at least at first. I checked that code as well, and I'm not entirely sure why we're not getting more messages out of the queue right away - any ideas?
Depending on the nature of the outbound connections you're making, perhaps you're hitting connection limits? See https://github.com/Azure/azure-webjobs-sdk/wiki/ServicePointManager-settings-for-WebJobs
Yeah, that's definitely important, I discovered that a while back. (And thanks for adding the warning in more recent versions.) I already have that setting in place now and am still getting this behavior.
Now I'm thinking you may be running into an issue https://github.com/Azure/azure-webjobs-sdk/issues/1467 which we recently fixed, but hasn't been released to nuget yet. Underlying issue was a location in the code where we were doing sync over async (PR here).
Recommend trying the latest v2.x version on our myget feed, version 2.3.0-beta1-11288.
I tried the latest 2.x from both the public (2.2.0) and myget (2.3.0-beta1-11332) feeds. I'm still seeing significant latency/limited parallelism draining large queues.
After doing a bit more digging, I suspect the problem may be that we're only pulling up to one batch of 32 messages at a time (batches are fetched serially rather than in parallel, when NewBatchThreshold is above QueueBatchSize and lots of messages are available).
For testing, here's what I'm using to populate a queue with 100,000 messages:
using Microsoft.Internal.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Queue;
using System;
using System.Diagnostics;
using System.Net;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
class Program
{
const int degreeOfParallelism = 1000;
static CloudQueue queue;
static void Main()
{
RunAsync().GetAwaiter().GetResult();
}
static async Task RunAsync()
{
ServicePointManager.DefaultConnectionLimit = int.MaxValue;
ServicePointManager.UseNagleAlgorithm = false;
ServicePointManager.Expect100Continue = false;
CloudStorageAccount account = CloudStorageAccount.Parse("DefaultEndpointsProtocol=https;AccountName=<myaccount>;AccountKey=<mykey>");
CloudQueueClient queueClient = account.CreateCloudQueueClient();
queue = queueClient.GetQueueReference("test");
await EnqueueAllAsync();
}
static async Task EnqueueAllAsync()
{
await queue.CreateIfNotExistsAsync();
const int messageCount = 100000;
ExecutionDataflowBlockOptions boundedParallelism = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = degreeOfParallelism
};
ActionBlock<int> enqueueBlock = new ActionBlock<int>(EnqueueAsync, boundedParallelism);
Stopwatch watch = Stopwatch.StartNew();
for (int index = 0; index < messageCount; ++index)
{
enqueueBlock.Post(index);
}
enqueueBlock.Complete();
await enqueueBlock.Completion;
Console.WriteLine($"Enqueued in {watch.Elapsed}");
}
static Task EnqueueAsync(int value)
{
return queue.AddMessageAsync(new CloudQueueMessage(value.ToString()));
}
}
It runs fairly quickly since it does the enqueuing in parallel:
Enqueued in 00:00:37.3974646
Dequeing with the WebJobs SDK is significantly slower:
using Microsoft.Azure.WebJobs;
using Microsoft.WindowsAzure.Storage.Queue;
using System;
using System.Diagnostics;
using System.Net;
using System.Threading;
namespace ProcessMessagesViaWebJobsSdk
{
class Program
{
public static ManualResetEvent DoneEvent = new ManualResetEvent(initialState: false);
static void Main(string[] args)
{
ServicePointManager.DefaultConnectionLimit = int.MaxValue;
ServicePointManager.UseNagleAlgorithm = false;
ServicePointManager.Expect100Continue = false;
JobHostConfiguration configuration = new JobHostConfiguration
{
StorageConnectionString = "DefaultEndpointsProtocol=https;AccountName=<myaccount>;AccountKey=<mykey>",
DashboardConnectionString = null
};
configuration.Queues.BatchSize = 32;
configuration.Queues.NewBatchThreshold = 1000;
using (JobHost host = new JobHost(configuration))
{
Stopwatch watch = Stopwatch.StartNew();
host.Start();
DoneEvent.WaitOne();
Console.WriteLine($"Processed in {watch.Elapsed}");
host.Stop();
}
}
}
public class Functions
{
static int processedCount;
public static void Process([QueueTrigger("test")] CloudQueueMessage message)
{
if (Interlocked.Increment(ref processedCount) == 100000)
{
Program.DoneEvent.Set();
}
}
}
}
On 2.2.0:
Processed in 00:04:24.4446805
On 2.3.0-beta1-11332:
Processed in 00:04:35.0234503
If I use TPL DataFlow to do both get message and processing in parallel, it's significantly faster, even if I don't use any batching:
using Microsoft.Internal.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage;
using Microsoft.WindowsAzure.Storage.Queue;
using System;
using System.Diagnostics;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
class Program
{
const int degreeOfParallelism = 1000;
static CloudQueue queue;
static int processingCount;
static EventWaitHandle noMoreMessagesSignal = new ManualResetEvent(initialState: false);
static EventWaitHandle freeCapacitySignal = new AutoResetEvent(initialState: false);
static void Main()
{
RunAsync().GetAwaiter().GetResult();
}
static async Task RunAsync()
{
ServicePointManager.DefaultConnectionLimit = int.MaxValue;
ServicePointManager.UseNagleAlgorithm = false;
ServicePointManager.Expect100Continue = false;
CloudStorageAccount account = CloudStorageAccount.Parse("DefaultEndpointsProtocol=https;AccountName=<myaccount>;AccountKey=<mykey>");
CloudQueueClient queueClient = account.CreateCloudQueueClient();
queue = queueClient.GetQueueReference("test");
await ProcessAllAsync();
}
static async Task ProcessAllAsync()
{
ExecutionDataflowBlockOptions boundedParallelism = new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = degreeOfParallelism
};
TransformBlock<bool, CloudQueueMessage> getMessageBlock = new TransformBlock<bool, CloudQueueMessage>(
new Func<bool, Task<CloudQueueMessage>>(GetMessageAsync), boundedParallelism);
ActionBlock<CloudQueueMessage> processBlock = new ActionBlock<CloudQueueMessage>(ProcessAsync,
boundedParallelism);
Stopwatch watch = Stopwatch.StartNew();
using (getMessageBlock.LinkTo(processBlock))
{
while (!noMoreMessagesSignal.WaitOne(0))
{
while (!noMoreMessagesSignal.WaitOne(0) &&
Thread.VolatileRead(ref processingCount) < degreeOfParallelism)
{
if (Interlocked.Increment(ref processingCount) > degreeOfParallelism)
{
throw new InvalidOperationException("Exceeded promised max degree of parallelism!");
}
if (!getMessageBlock.Post(true))
{
throw new InvalidOperationException("Couldn't post!");
}
}
WaitHandle.WaitAny(new WaitHandle[] { noMoreMessagesSignal, freeCapacitySignal });
}
getMessageBlock.Complete();
await getMessageBlock.Completion;
processBlock.Complete();
await processBlock.Completion;
}
Console.WriteLine($"Processed in {watch.Elapsed}");
}
static async Task<CloudQueueMessage> GetMessageAsync(bool ignore)
{
CloudQueueMessage message = await queue.GetMessageAsync();
if (message == null)
{
if (!noMoreMessagesSignal.Set())
{
throw new InvalidOperationException();
}
}
return message;
}
static async Task ProcessAsync(CloudQueueMessage message)
{
if (message == null)
{
return;
}
await queue.DeleteMessageAsync(message);
if (Interlocked.Decrement(ref processingCount) == degreeOfParallelism - 1)
{
if (!freeCapacitySignal.Set())
{
throw new InvalidOperationException();
}
}
}
}
Processed in 00:01:56.5589115
If I use batches, I had the time to around 1 min 20 sec (less than 2x the cost for enqueuing).
From my local testing, I suspect that without doing parallel GetMessage(s) calls, we're never concurrently processing more than roughly one batch of messages at a time (for functions with minimal processing time).
I think the ideal implementation for processing would take longer (total duration) than enqueuing - with enqueuing it's one roundtrip per message to enqueue, while with processing it's one roundtrip for each GetMessage(s) call plus one per message to delete. Without batching, I'd expect perhaps 2x and with batching with size 32, I'd expect perhaps 1x + 1/32x. I'll play around more with a parallel batched implementation - I think I was getting reasonably close to that mark (and significantly better than the 7x+ of the WebJobs SDK today).
Or to put it another way (from an impact standpoint), I think can't get the WebJobs SDK to process more than ~360 messages/sec on my box. Some queues I have get into the millions or even above 10 million messages pending. No matter how fast the function itself is, if I'm using the WebJobs SDK and it can't go faster than ~360 messages/sec, it'll take one host ~7.7 hours to drain the queue. If I could process at say 1250 messages/sec (TPL data flow parallel and batching with 100,000 in ~80 seconds), it would take only ~2.2 hours, if I'm doing the math correctly.
FWIW, on my machine MaxDegreeOfParallelism of 1000 seemed better than 100 or 10,000 for enqueuing and I think dequeuing, but I didn't try to get an exact ideal number - just something within an order of magnitude.
Messages/sec is probably the most standard way to look at this question. A colleague pointed out this tool today: Azure Throughput Analyzer, which reports local box connectivity to Azure - for queues, it reports messages/sec among other metrics. When I run locally with 50 threads, I get 1027/sec upload and 1210/sec download, so if we're close to that range there's a good chance the algorithm isn't getting in the way too much.
The scalability targets documentation says 2,000 message/sec for the entire queue across all hosts (assuming a 1KiB message size), which may not be possible to do from one client but still does use the messages/sec measurement.
Yes, I believe the problem is that QueueListener's serial GetMessages loop can't pull messages fast enough unless processing takes a really long time - around 3 seconds or so. Otherwise, we never reach NewBatchThreshold concurrently running functions.
Here's what I did to test concurrently running inside a function in the WebJobs SDK:
public class Functions
{
static int processingCount;
public static int HighWaterMark;
static int processedCount;
public static async Task ProcessAsync([QueueTrigger("test10")] CloudQueueMessage message)
{
int possibleNewHighWaterMark = Interlocked.Increment(ref processingCount);
int lastHighWaterMark = Thread.VolatileRead(ref HighWaterMark);
while (possibleNewHighWaterMark > lastHighWaterMark)
{
lastHighWaterMark = Interlocked.CompareExchange(ref HighWaterMark, possibleNewHighWaterMark, HighWaterMark);
}
await Task.Delay(0);
Interlocked.Decrement(ref processingCount);
if (Interlocked.Increment(ref processedCount) == 10000)
{
Program.DoneEvent.Set();
}
}
}
and in the TPL data flow code:
static int innerProcessingCount;
static int highWaterMark;
static async Task ProcessAsync(CloudQueueMessage message)
{
if (message == null)
{
return;
}
int possibleNewHighWaterMark = Interlocked.Increment(ref innerProcessingCount);
int lastHighWaterMark = Thread.VolatileRead(ref highWaterMark);
while (possibleNewHighWaterMark > lastHighWaterMark)
{
lastHighWaterMark = Interlocked.CompareExchange(ref highWaterMark, possibleNewHighWaterMark, highWaterMark);
}
await Task.Delay(10000);
Interlocked.Decrement(ref innerProcessingCount);
await queue.DeleteMessageAsync(message);
if (Interlocked.Decrement(ref processingCount) == degreeOfParallelism - 1)
{
if (!freeCapacitySignal.Set())
{
throw new InvalidOperationException();
}
}
}
With both writing the high water mark at the end.
Web Jobs SDK output:
1000 messages, 0ms delay:
Processed in 00:00:30.4255358
High water mark was 1
1000 messages, 100ms delay:
Processed in 00:00:31.3562639
High water mark was 69
1000 messages, 1sec delay:
Processed in 00:00:33.7882621
High water mark was 512
1000 messages, 3sec delay:
Processed in 00:00:36.8957239
High water mark was 1024
1000 messages, 5 sec delay:
Processed in 00:00:56.1672798
High water mark was 1024
1000 messages, 10sec delay:
Processed in 00:01:46.1016663
High water mark was 1024
TPL data flow version output:
1000 messages, 0ms delay:
Processed in 00:00:11.0257936
High water mark was 4
1000 messages, 100ms delay:
Processed in 00:00:09.4309517
High water mark was 742
1000 messages, 1sec delay:
Processed in 00:00:19.0385412
High water mark was 1000
1000 messages, 3sec delay:
Processed in 00:00:35.3293730
High water mark was 1000
1000 messages, 5sec delay:
Processed in 00:01:09.9625452
High water mark was 1000
1000 messages, 10sec delay:
Processed in 00:01:49.9800823
High water mark was 1000
Again, note that the TPL data flow version is not using any batching - this is the rather simplistic code above.
It's kind of crazy that the queue drains basically just as fast in the current QueueListener regardless of whether functions take 100ms or 3sec to execute; QueueListener just isn't reading fast enough to keep up.
Note that the 0ms test cases above aren't quite fair - they're evaluating concurrency inside the function, so without some delay, they're immediately decreasing the number just increased. There's at least some more concurrency happening with getting messages ready to execute the functions, so the 100ms case is probably closer to a fair first test case.
Another way of putting it: Increasing NewBatchThreshold doesn't seem to do much currently unless functions take quite a long time to execute. Because new batches are fetched only one at a time (serially), we usually can't get to the point that we hit a number much above BatchSize anyway.
David, thanks for all the deep analysis on this! We're definitely open to making improvements in this area. The existing listener algorithm is very old (perhaps originally written by you?) and hasn't really changed much over time. The existing algorithm seems to work well for most users now - I haven't seen issues come in for this, but it sounds like you're running at higher scale that most people run :)
Why don't you work your improvements up into a PR? We could then benchmark it as you've done - if the changes result in improvements for high scale scenarios without detrimental effects on low throughput scenarios, we can take the change.
Thanks!
This update would be very useful. I have a system that sends batches of around 80,000 messages to the queue that then need to be processed as soon as possible, 32 messages at a time is rather limiting.
How do I vote for this update?
Something to consider for it not being an issue for others: people may have believed the 32 concurrent message capacity was a fact-of-life and put in place scaling-out to compensate for it. I almost did, until I came across this thread.
I was considering scaling-out to 5-6 instances during the load, which would give me 160-192 concurrent messages. It would be much better if I could do more with a single instance (especially if I can do in the realms of 1,000 concurrent messages) and scale-out/scale-up only when I have reached the limits of the underlying resource.
@andrewbauer-bauersystems - concurrency is not bound to 32 messages at a time. What makes you think that? Please read my comments above which explain how the current algorithm works.
What level of concurrency are you seeing in your scenario? What version of the SDK are you using?
I am getting concurrency of 32 messages and am using v9.2.0
How are you verifying that you are only processing 32 messages at a time? What version of the WebJobs SDK are you using - we haven't published a v9 :)
Sorry, v2.2.0. I filled the queue with 1000 messages and then watched as it decremented at a rate of 32 messages at a time.
What do you have NewBatchThreshold set to? To increase concurrency you'll want to crank this value up. E.g. you should be able to achieve a concurrency of (NewBatchThreshold + BatchSize). Please share the settings you're applying to the host before starting (JobHostConfiguration.Queues settings).
I think I may have misunderstood NewBatchThreshold. When I attempted it, I set it to 32 believing that it couldn't be more than BatchSize. So if I set it to say 900, it will continue to get batches of 32 until it reaches the 900 mark, is that correct?
Right - not withstanding the discussion started by David above, where he's saying that in his particular scenario he's unable to push concurrency up to NewBatchThreshold when he sets if very high. It's not clear yet if there is a general issue here, or whether there is something unique in his scenario. As I said above, I haven't heard others complaining that they can't achieve high levels of concurrency. We should still verify David's scenario however - I haven't tested it myself.
Let me know what results you see after configuring NewBatchThreshold to a high number. If you are still having issues, we'll need repro code to go any further (exactly how you're configuring the host, code for a repro function, etc.)
This is an old thread. I've found it very interesting because because I too have problems in reaching a high parallelism degree with WebJob (I'm using version 3.0.19). Is something changed in the meantime in the SDK or what reported in this thread is still yet valid?
(Related to #521, but the details/context are different enough that I thought it was worth highlighting separately.)
(Also, I haven't fully investigated, but I think it's far enough along to send for further consideration.)
Azure queues allows pulling a batch of up to 32 messages at a time, and the WebJobs SDK allows controlling the queue batch size within this range. It then creates separate tasks for each message, processing them in parallel, and gets another batch once it reaches at least the halfway done point. (I'm assuming this algorithm hasn't changed significantly; let me know if I'm out-of-date.)
I thought this algorithm was a good idea at the time I wrote it, but I'm now starting to conclude it was a terrible idea (or, to be more precise, terribly limited).
As an extreme case, consider the following message processing:
If there's a queue of a million messages, how long will it take 10 machines running with the maximum configuration.Queues.BatchSize to drain the queue?
Stated another way: The existing algorithm is fine for compute-bound message processing, but it's extremely limited for message processing that uses async/IO completions. We wait for the entire task to finish before saying we'd be willing to start more work.
Or stated another way: Just because we can't pull more than 32 messages in one request, why does that need to limit how many we're processing at once? Can't we pull more batches as-needed until we hit our scaling targets?
Net, I believe that currently, scaling of queue processing in such cases is limited by the latency of processing any single item, and not at all by local resource constraints (which means for such workloads we're just wasting compute, likely by orders of magnitude).
Task Parallel Library (TPL) has a concept of MaxDegreeOfParallelism: https://msdn.microsoft.com/en-us/library/system.threading.tasks.dataflow.executiondataflowblockoptions.maxdegreeofparallelism(v=vs.110).aspx
I believe that's the better knob to expose here. Today, our MaxDegreeOfParallelism is Queues.BatchSize * 1.5, which can never be greater than 48. That's way to small to process a backlog of work that involves I/O latency - we likely get multiple batches and keep thousands running in parallel without hitting any local resource limits.
If we removed BatchSize (eventually) and only had MaxDegreeOfParallelism, we could infer BatchSize = Min(MaxDegreeOfParallelism, 32) and just keep fetching more batches until this value, rather than only fetching 1 batch (with up to 1.5 batches outstanding) at any given time.
Let me know if this idea makes sense; I'm happy to explain further. I haven't tried coming up with a simple repro yet, but I suspect the design flaw here (my own fault; it's how I wrote this algorithm years back) would be pretty easy to see with the right sample.