Azure / azure-functions-durable-js

JavaScript library for using the Durable Functions bindings
https://www.npmjs.com/package/durable-functions
MIT License
128 stars 46 forks source link

Failed to run n sub-orchestrators in parallel #494

Open ZavalichiR opened 1 year ago

ZavalichiR commented 1 year ago

Hello!

I have developed an ETL service using Azure Durable Functions. The service includes:

The workflow is as follows:

The main orchestrator creates three groups of etlOrchestrators. When I start the service, it successfully runs two groups of etlOrchestrators. However, for the third group, the latency increases and I receive numerous errors.:

These errors are randomly, but every time when the 3rd group needs to be run the latency increases and errors occur.

Code example

 const etlOrchestrators = [];

  for (let i = 0; i < chunks.length; i += 1) {
     etlOrchestrators.push(context.df.callSubOrchestrator('etl-orchestrator', chunks[i]));
  }

const groupedEtlOrchestrators: Task[][] = [];

  for (let i = 0; i < etlOrchestrators.length; i += ENV.DATA_ETL_CONCURRENT_SIZE) {
    const slicedArray = etlOrchestrators.slice(i, i + ENV.DATA_ETL_CONCURRENT_SIZE);
    groupedEtlOrchestrators.push(slicedArray);
  }

Host.json

{
  "version": "2.0",
  "logging": {
    "console": {
      "isEnabled": true,
      "DisableColors": false
    },
    "applicationInsights": {
      "samplingSettings": {
        "isEnabled": true,
        "excludedTypes": "Request"
      }
    }
  },
  "extensionBundle": {
    "id": "Microsoft.Azure.Functions.ExtensionBundle",
    "version": "[3.*, 4.0.0)"
  },
  "extensions": {
    "durableTask": {
      "tracing": {
        "traceInputsAndOutputs": false,
        "traceReplayEvents": false
      },
      "maxConcurrentActivityFunctions": 20,
      "maxConcurrentOrchestratorFunctions": 10
    }
  }
}

Memory/CPU used image

Investigative information "@azure/functions": "^3.5.0" "durable-functions": "^2.1.1" Node.js version: 16.10 Language: typescript Runtime version ~4

If deployed to Azure App Service

Additional context

Note: As this is a development environment, I occasionally delete tables, queues, and containers (excluding those ending in '-leases' and '-applease') to clear the history and re-run the service.

ZavalichiR commented 1 year ago

Is quite strange that at the beginning the reader, transformer and writer activities are extremely fast, but when only a few writers still have to be executed they are executed very slow and

In this test I started only 30 sub-orchestrators in parallel and each one started: 1 reader, 1 transformer, 3 writers (the writer is an empty function), and after 30 readers and 30 transformers was executed, 60% of writers still have to be executed, but this execution takes way too long.

Also the 'Possible thread pool starvation detected.' error occurred.

image

ZavalichiR commented 1 year ago

image

davidmrdavid commented 1 year ago

Hi @ZavalichiR:

Thanks for reaching out. The errors you report sound concerning. I'll need time to dig deeply into this, but I wanted to clear out a few easy possibilities first.

A few questions to help give me some context: 1) Can you confirm that these issues only happen with the 3rd group of sub-orchestrators? 2) Are you passing relatively large inputs and outputs to and from the sub-orchestrator APIs? I wonder if we could be hitting some memory limit by the time we reach the third sub-orchestrator group. This would be my main theory at this point.

razvan-zavalichi commented 1 year ago

Hi @davidmrdavid

I solved the errors, the problem was that I used .'map' to generate the groups of sub-orchestrators and the order wasn't the same.

Anyway, the scaling-out problem still persists. For the first two groups all activity functions are finishing pretty fast, but for the 3rd job there is a delay in the activities executions. And the biggest problem is after all groups are done it takes up to 10 minutes to start the new activities or the durable entity.

Sometimes I see this error 'Singleton lock renewal failed for blob 'xxxx/host' with error code 409'

I use the default configuration for host.json and maxConcurrent...Orchestrators/Activities = 5.

At this moment the sub-orchestrator call 3 activities sequentially and fan/our for two durable functions.

I still use 10 instances for Func App, but not all of them are used.

ZavalichiR commented 1 year ago

I added here more details regarding the performance issue: https://learn.microsoft.com/en-us/answers/questions/1194468/azure-durable-functions-fan-out-slow-performance

davidmrdavid commented 1 year ago

@ZavalichiR: I replied to your MSLearn question, but you can reach me more reliably here. Please let me know if you're able to run the experiment I suggested. Thanks!

razvan-zavalichi commented 1 year ago

Thank you for your support,

Here is my answer:

The orchestrator doesn't receive a heavy payload; instead, it gets two numbers that specify a batch configuration (readFrom and readTo). Based on this configuration, the reader retrieves 1000 JSONs, which are then transformed and processed by the writer and temporary storage. Once the orchestrator has finished its tasks, I believe the context should be destroyed. Recently, I ran another test using the same configuration as the last one. Interestingly, there were no gaps between executions. However, in the previous test, there were three gaps: after the first three groups, after the fourth group, and after all groups had been processed (before starting the last activity functions in the main orchestrator).

davidmrdavid commented 1 year ago

@razvan-zavalichi: Just to confirm - do JSONs ever get returned to the orchestrator? Internally, I can see some orchestrators and entities in the past 24 hours that have several payloads over ~10'000 kb. I can share that data in this thread: just the instanceIDs and the number of messages with those moderate payloads, not the actual contents of course.

I also noticed you're using an entity in this workflow: can you tell me more about how it fits into your system?

razvan-zavalichi commented 1 year ago

@davidmrdavid, yes, the JSONs are returned within the orchestrator and it contain two entity instances that store additional data. Whenever new data is received, each entity deduplicates and stores it in its state. Each request can include a maximum of 1000 additional data items (up to 35MB), and each entity can store up to 400MB of additional data.

The system is configurable, allowing me to decrease the number of JSONs read by a data reader and limit the number of JSONs within the Orchestrator to a maximum of n (where 1000 JSONs corresponds to 10-15Mb). However, reducing the number of JSONs increases the number of sub-orchestrators running concurrently. The current configuration ("maxConcurrentActivityFunctions": 2, "maxConcurrentOrchestratorFunctions": 1,) may not be optimal in this case, as a single instance can handle more than two activity functions due to the reduced payload. As a result, when a lot of transformers are sent to be processed by the same instance, workers may have to wait for an extended period of time, potentially causing issues.

export default function* etlOrchestrator(context: IOrchestrationFunctionContext) {
  const dataChunk: DataChunk = context.df.getInput();

  try {
    const vehiclesModel: VehiclesModel[] = yield context.df.callActivity('DataReader', dataChunk);

    if (!vehiclesModel?.length) {
      return;
    }

    const transformedData: DataContainers = yield context.df.callActivity('DataTransformer', vehiclesModel);

    yield context.df.callActivity('DataWriter', { containerName: 'vehicles', documents: transformedData.vehicles });

    const addManufacturersTempStorage: Task = context.df.callEntity(
      new df.EntityId('TempStorage', TEMP_STORAGE_KEY.MANUFACTURERS),
      TEMP_STORAGE_OPERATION.ADD,
      {
        manufacturers: transformedData.manufacturers,
      },
    );
    const addModelsTempStorage: Task = context.df.callEntity(
      new df.EntityId('TempStorage', TEMP_STORAGE_KEY.MODELS),
      TEMP_STORAGE_OPERATION.ADD,
      {
        models: transformedData.models,
      },
    );

    yield context.df.Task.all([addManufacturersTempStorage, addModelsTempStorage]);
  } catch (error) {
    new Logger('EtlOrchestrator').error(error);
  }
}

If there is an issue with the EtlOrchestrator, I am still confused because, even after all EtlOrchestrators have finished, there can be a delay of up to 10 minutes before the main orchestrator moves on to the next activity functions and starts processing them.

razvan-zavalichi commented 1 year ago

@davidmrdavid

Any update?

davidmrdavid commented 1 year ago

Hi @ZavalichiR:

Apologies for the delay here.

In terms entity storage size: I'm definitely a bit concerned about storing up to ~400MB of data in a single Durable Entity. This is definitely not enough to cause the system to halt by any means, but I think it can lead to slower processing times.

I still don't have a good grasp of exactly what is happening with your app performance wise, and I understand some concurrency settings and errors were fixed since the original issue was posted. To help clear the picture, it would help me to have up to date telemetry for your app.

Can you please provide me a recent timerange in UTC and an instanceIDs for the main orchestrator demonstrating this issue? I think it would be best if we can focus on a single repro of this (so just a single instanceID on a given timeframe) where we can break down where exactly when in the main workflow the delays start.

razvan-zavalichi commented 1 year ago

Hello @davidmrdavid, Thank you again for your support.

Here are the Operation IDs for this month executions:

I would like to mention that there have been no changes to the code or configuration between these executions.

  "extensions": {
    "durableTask": {
      "tracing": {
        "traceInputsAndOutputs": false,
        "traceReplayEvents": false
      },
      "maxConcurrentActivityFunctions": 2,
      "maxConcurrentOrchestratorFunctions": 1
    }
  }
davidmrdavid commented 1 year ago

Hi @razvan-zavalichi:

I wanted to focus on that first graph so I looked at our telemetry for dev-vehicleservice-func on 2023-04-27, and I do not see any activity on 4/27 UTC. I only see an orchestrator from 4/26 00:02 to 4/26 06:19 UTC with instanceId "7e477903d8a64760ae20159d2f0327cd", but I would have expected to see sub-orchestrators as well so I believe I may be looking at the wrong app now.

Can you please confirm the appName for the first graph, that the timerange is in UTC, as well as provide me with the instanceId (not the operationId) of the top-level orchestrator for that 1st graph?

razvan-zavalichi commented 1 year ago

Hello @davidmrdavid

Sorry, I forgot to mention: The dev-vehicleservice-func function app is no longer relevant as it contains different functions. Currently, only the production-vehicleservice-func function-app is relevant. These are the instanceIds:

davidmrdavid commented 1 year ago

Thanks @razvan-zavalichi.

I'm seeing the same gaps as you reported for instanceId 562285983f324db18230788f05f156e7 on 04/27. Right now I'm focusing on the first gap, which happens between 2023-04-27 00:18:20.0000000 to 2023-04-27 00:24:20.0000000 UTC ( about 5 min ).

With respect to the timeline, here's what I see. (1) 2023-04-27 00:18:24.0033336: worker 11827 sends a subOrchestratorCompleted message to 562285983f324db18230788f05f156e7, which is assigned to control queue productionvehicleservicefunc-control-00.

That queue is being listened to by worker 4765.

(2) 2023-04-27 00:18:38.8823380: worker 4765 (the one listening to productionvehicleservicefunc-control-00) starts shutting down. I'm unsure why, but as a result it stops listening to that control queue.

(3) 2023-04-27 00:19:42.2213706: worker 11827 starts listening to the control queue. (4) 2023-04-27 00:19:42.2251426: worker 11827 logs that it's waiting for messages from the control queue.

Not sure if this means that the message we sent at the very beginning of the timeline has not showed up yet. It's been about a minute so I would expect it to be there, but this is an AzureStorage dependency so we don't handle its visibility directly.

(5) 2023-04-27 00:21:47.1448322: worker 11827 also starts shutting down (also unsure why at this point). Therefore, it stops listening to the control queue.

(now we'll see a similar pattern who what I just described on steps 3 and 4 and 5)

(6) 2023-04-27 00:22:13.2176417: worker 11844 starts listening to the control queue. (7) 2023-04-27 00:22:13.2358653: worker 11844 logs that it's waiting for messages from the control queue. Again, unclear to me at this point if this means the message we sent at the beginning just isn't visible yet. (8) 2023-04-27 00:23:12.8651771: worker 11844 also starts shutting down. Therefore, it stops listening to the control queue.

(and now we break the pattern)

(9) 2023-04-27 00:23:37.6859395: worker 4765 starts listening to the control queue. A few milliseconds later, it receives the message from step (1).

So there's two things here that we need to answer: (1) When we see these "waiting for messages" logs, does that mean the message we sent isn't visible? That's something I can find out from conversations with my team. I'll take care of this piece.

(2) Why are these workers repeatedly shutting down? I need to look at more logs to try and find out the cause, but I wonder if this is something you may already have context on: do you know if at around that time your application went through a manual, or automated, site restart? Just trying to figure out if this is something that may have been initiated on your end somehow. Any context would help.

Thats' all for now, I'll continue searching through the logs. Thank you for your patience on this.

davidmrdavid commented 1 year ago

Hmm, I have a theory as to why these workers are shutting down.

Starting at about 2023-04-27T00:15:00Z to 2023-04-27T00:19:00Z, your application seems to be scaling in rather aggressively, most likely because you just finished a sub-orchestrator fan-out (am I right about that?) and now you only have 1 pending invocation to complete, the parent orchestrator, after which you'll probably fan out again, and therefore get more workers. I wonder if your queue is being assigned to workers that are in the process of being removed from your app, invertedly delaying the orchestrator invocation.

I need to dig deeper into this to confirm what I'm seeing. It might take me a bit of time to get more clarity here but, in the meantime, I was wondering if we could do an experiment on your end: do you find that you're able to mitigate these gaps in processing by increasing your minimum instance count? Right now that minimum is 1, but I wonder if you'd find smaller gaps if you had more workers always available, like 3 or 4. Is that an experiment we could attempt? The goal would be to validate that these gaps occur during scale-in operations.

Also, if your app is behind a VNET, you may want to enable "Runtime Scale Monitoring" which we need in order to make accurate scaling decisions. Please see here for instructions: https://learn.microsoft.com/en-us/azure/azure-functions/functions-networking-options?tabs=azure-cli#premium-plan-with-virtual-network-triggers

Finally, it would also be good to make your workers a bit busier, to prevent unecessary scaling behavior. What's your current value of FUNCTIONS_WORKER_PROCESS_COUNT (https://learn.microsoft.com/en-us/azure/azure-functions/performance-reliability#use-multiple-worker-processes)? You may want to set it somewhere between 3 and 6, so that each VM can handle more concurrent. Naturally, you'll also have to adjust your maxConcurrentActivityFunctions and maxConcurrentOrchestratorFunctions. A good heuristic is to make all these 3 settings be the same value. Again, to re-iterate, the goal with this would be to make each VM busier, to try and see if your scaling behavior is less aggressive, which I believe is correlated to these gaps. If you want a recommendation: make them all 4 to start with and check that your VMs are not overly resource-pressured by that. Adjust as necessary.

Any chance you could incorporate these recommendations and let me know what you find?

razvan-zavalichi commented 1 year ago

(2) Why are these workers repeatedly shutting down? I need to look at more logs to try and find out the cause, but I wonder if this is something you may already have context on: do you know if at around that time your application went through a manual, or automated, site restart? Just trying to figure out if this is something that may have been initiated on your end somehow. Any context would help.

I need to dig deeper into this to confirm what I'm seeing. It might take me a bit of time to get more clarity here but, in the meantime, I was wondering if we could do an experiment on your end: do you find that you're able to mitigate these gaps in processing by increasing your minimum instance count? Right now that minimum is 1, but I wonder if you'd find smaller gaps if you had more workers always available, like 3 or 4. Is that an experiment we could attempt? The goal would be to validate that these gaps occur during scale-in operations.

I will activate 'Runtime Scale Monitoring'. If I update FUNCTIONS_WORKER_PROCESS_COUNT, maxConcurrentActivityFunctions, and maxConcurrentOrchestratorFunctions to 4, I believe I will encounter the same errors related to pool starvation.

davidmrdavid commented 1 year ago

@ZavalichiR: thanks for the update.

Regarding this:

After conducting several tests, I discovered that the optimal configuration is "maxConcurrentActivityFunctions": 2, "maxConcurrentOrchestratorFunctions": 1

I see, thank you for clarifying. If you already concluded having larger concurrency values for these settings is counter-productive, then I think we can trust the results of your benchmarking. You can disregard my advice on that.

Finally, please let me know if you're also experimenting with a larger minimum instanceCount on your Elatic Premium plan. I think that would have the largest effect on reducing these gaps, assuming my theory is correct.

razvan-zavalichi commented 1 year ago

@davidmrdavid, I have a question regarding the Elastic Premium plan pricing. Specifically, I'm wondering what happens to the pricing if I change the minimum instance count. Does the pricing increase automatically? (is there any formula?)

The service runs once a week, and if I need to increase the minimum instance count, it can become expensive. Additionally, since I'm only running the service four times a month, I'm billed for an entire month despite the low frequency of execution.

Is the minimum'Minimum Instances' increased automatically based on 'Always Ready Instances'?

image

razvan-zavalichi commented 1 year ago

Hello @davidmrdavid,

I want to provide you with an update on the ideas you suggested and share the results of the testing:

  1. Runtime Scale Monitoring: I activated it as you recommended, but unfortunately, it did not yield any noticeable difference in the results.
  2. Minimum Instances = 3: I set the Minimum Instances to 3, and I'm happy to report that it worked as expected. There were no gaps in the execution.

image image

It appears that the main issue lies in spinning up new instances when new sub-orchestrators are executed in parallel. While setting the Minimum Instances to 3 resolves this problem, I am wondering if there is a way to dynamically change the minimum instances at runtime. Alternatively, if you have any other solutions in mind, please let me know.

The reason I ask is that the service runs once a week, and the process takes approximately 1 hour. Ideally, we would like the Azure Functions to scale out automatically without encountering this issue. I am also curious if this behavior is expected, or if there might be an issue with the code and the way durable functions are supposed to work.

P.S. I did this tests using a separate function app in a development environment. However, it is important to note that the configuration of this function app is identical to the environment I previously shared with you, with the exception of the specific changes made for the purpose of this test.

davidmrdavid commented 1 year ago

Hi @razvan-zavalichi,

I'm very glad to hear that raising the number of minimum number of instances worked for you. If that works, then I think we can safely conclude that the root issue is that, during scale operations, your partitions/control queues are getting moved from their previous workers and that the gaps you're seeing is the time it takes for this reallocation to complete.

Now let me answer your questions one by one.

The service runs once a week, and if I need to increase the minimum instance count, it can become expensive. Additionally, since I'm only running the service four times a month, I'm billed for an entire month despite the low frequency of execution.

Yes, I believe that increasing the number of always ready instances also increases the cost. I'm unsure about how pricing is exactly determined, but I can find out. Did you still need this information? I'm asking because you went ahead with the experiment so perhaps you already found this data.

Is the minimum'Minimum Instances' increased automatically based on 'Always Ready Instances'? Yes, see here: https://learn.microsoft.com/en-us/azure/azure-functions/functions-premium-plan?tabs=portal#always-ready-instances

It appears that the main issue lies in spinning up new instances when new sub-orchestrators are executed in parallel.

This isn't exactly how I understand the issue. From my last view at the telemetry, these gaps were explained by scale in operations causing your control queues to get load balanced away from its assigned workers, which created delays. Perhaps this also coincides your sub-orchestrator fan-out step though.

While setting the Minimum Instances to 3 resolves this problem, I am wondering if there is a way to dynamically change the minimum instances at runtime. Alternatively, if you have any other solutions in mind, please let me know.

I'm not aware of a way to dynamically change the minimum instances at runtime.

I also worry that doing this could re-introduce the delays. To understand this, I'll describe my mental model of the problem below.

In Durable Functions, orchestrators are partitioned which simply means that they are assigned to one of a finite number of queues (sometimes called control queues) which are load balanced across workers. By default, the number of partitions is 4, and this is controlled by the partitionCount parameter in host.json.

Fow the rest of this explanation, let's assume that you have set your minimum instance count to 4.

When the app starts, your partitions are load balanced across your workers. This means you'll have 1 partition per each of your always ready workers. Therefore, no matter how much you scale out or scale in, so long as those first 4 VMs remain healthy, there should be no reason for your partitions to get reassigned to other workers. That's the reason why there's no gaps.

This argument won't apply if your minimum instanceCount is less than your partitionCount. So if you're dynamically changing the minimum instanceCount, then you'll re-introduce the possibility of gaps in execution. Does that make sense?

To learn more about partitions and how they're load balanced, please see this doc: https://learn.microsoft.com/en-us/azure/azure-functions/durable/durable-functions-azure-storage-provider#orchestrator-scale-out

Also, this argument implies that you may also want to experiment with setting your partitionCount setting to match your minimum instance count. So you can either reduce your partitionCount, or increase your minimum instance count. I think the former is the cheaper option :-)

I am also curious if this behavior is expected, or if there might be an issue with the code and the way durable functions are supposed to work.

I'd categorize this as a known performance risk on the Durable Functions side. I'm looking into how to address this, but it requires infrastructure support so it's not a change we have direct control over. We would need a way to tell Azure that, during scale in, it should first remove VMs that do not have DF partitions assigned to them. That's going to require a bit of planning.

Hope this answers your questions!

razvan-zavalichi commented 1 year ago

Hi @davidmrdavid,

Thank you very much for your assistance; this will be very helpful when deciding between Azure Functions with Service Bus and Durable Functions.

Yes, I believe that increasing the number of always ready instances also increases the cost. I'm unsure about how pricing is exactly determined, but I can find out. Did you still need this information? I'm asking because you went ahead with the experiment so perhaps you already found this data.

I still need this information, unfortunately I didn't find it.

JonnyyJ commented 10 months ago

Hi @razvan-zavalichi,

I'm very glad to hear that raising the number of minimum number of instances worked for you. If that works, then I think we can safely conclude that the root issue is that, during scale operations, your partitions/control queues are getting moved from their previous workers and that the gaps you're seeing is the time it takes for this reallocation to complete.

Now let me answer your questions one by one.

The service runs once a week, and if I need to increase the minimum instance count, it can become expensive. Additionally, since I'm only running the service four times a month, I'm billed for an entire month despite the low frequency of execution.

Yes, I believe that increasing the number of always ready instances also increases the cost. I'm unsure about how pricing is exactly determined, but I can find out. Did you still need this information? I'm asking because you went ahead with the experiment so perhaps you already found this data.

Is the minimum'Minimum Instances' increased automatically based on 'Always Ready Instances'? Yes, see here: https://learn.microsoft.com/en-us/azure/azure-functions/functions-premium-plan?tabs=portal#always-ready-instances

It appears that the main issue lies in spinning up new instances when new sub-orchestrators are executed in parallel.

This isn't exactly how I understand the issue. From my last view at the telemetry, these gaps were explained by scale in operations causing your control queues to get load balanced away from its assigned workers, which created delays. Perhaps this also coincides your sub-orchestrator fan-out step though.

While setting the Minimum Instances to 3 resolves this problem, I am wondering if there is a way to dynamically change the minimum instances at runtime. Alternatively, if you have any other solutions in mind, please let me know.

I'm not aware of a way to dynamically change the minimum instances at runtime.

I also worry that doing this could re-introduce the delays. To understand this, I'll describe my mental model of the problem below.

In Durable Functions, orchestrators are partitioned which simply means that they are assigned to one of a finite number of queues (sometimes called control queues) which are load balanced across workers. By default, the number of partitions is 4, and this is controlled by the partitionCount parameter in host.json.

Fow the rest of this explanation, let's assume that you have set your minimum instance count to 4.

When the app starts, your partitions are load balanced across your workers. This means you'll have 1 partition per each of your always ready workers. Therefore, no matter how much you scale out or scale in, so long as those first 4 VMs remain healthy, there should be no reason for your partitions to get reassigned to other workers. That's the reason why there's no gaps.

This argument won't apply if your minimum instanceCount is less than your partitionCount. So if you're dynamically changing the minimum instanceCount, then you'll re-introduce the possibility of gaps in execution. Does that make sense?

To learn more about partitions and how they're load balanced, please see this doc: https://learn.microsoft.com/en-us/azure/azure-functions/durable/durable-functions-azure-storage-provider#orchestrator-scale-out

Also, this argument implies that you may also want to experiment with setting your partitionCount setting to match your minimum instance count. So you can either reduce your partitionCount, or increase your minimum instance count. I think the former is the cheaper option :-)

I am also curious if this behavior is expected, or if there might be an issue with the code and the way durable functions are supposed to work.

I'd categorize this as a known performance risk on the Durable Functions side. I'm looking into how to address this, but it requires infrastructure support so it's not a change we have direct control over. We would need a way to tell Azure that, during scale in, it should first remove VMs that do not have DF partitions assigned to them. That's going to require a bit of planning.

Hope this answers your questions!

Do we have any update for the planning to address this issue? @davidmrdavid We are facing this issue after we migrate from runtime V3 to V4 while running multiple sub-orchestrators during the scaling, will try to use context.df.continueAsNew in the catch block to see if that can re-initiate the failed operation due to node crash.