Azure / azure-functions-durable-python

Python library for using the Durable Functions bindings.
MIT License
134 stars 54 forks source link

Long running issue with Durable Python function #256

Closed tamathew closed 3 years ago

tamathew commented 3 years ago

I'm running Snowflake SQLs via ADF->Az function activity which calls a Python Durable Az function. When I tested with a long running SQL : "call system$wait(9, 'MINUTES)", it ran beyond 9 min.. and I aborted the job at 35th minute. The status of the statusQueryGetUri is below

Output -

{
  "name": "ExecSnowSQLDurableOrchestrator",
  "instanceId": "452918153ab741ab94e69ce37581408c",
  "runtimeStatus": "Running",
  "input": "{\"sql\": \"call system$wait(9, 'MINUTES')\", \"activity_name\": \"Runner_2\", \"factory_name\": \"gdapsandboxadf\", \"pipeline_name\": \"PL_PYTHON_DURABLE_FUNCTION_POC\"}",
  "customStatus": null,
  "output": null,
  "createdTime": "2021-01-26T19:43:26Z",
  "lastUpdatedTime": "2021-01-26T20:06:30Z"
}

The output log from Azure Monitor- DurableActivity however shows that there was an outcome of the SQL. But for some reason it was not getting updated in the webhook --> statusQueryGetUri.

This issue is also intermittent. Please let me know if you need more info.

2021-01-26 19:57:30.261 query: [call system$wait(9, 'MINUTES')] Information 2021-01-26 20:06:30.355 query execution done Information 2021-01-26 20:06:30.357 Output of the SQL execution : waited 9 minutes Information

DurableOrchestrator code -

async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
    try :
        status_code=400
        req_body=""
        output=None
        client = df.DurableOrchestrationClient(starter)
        req_body = req.get_body().decode()
        payload = json.loads(req_body)
        instance_id = await client.start_new(req.route_params["functionName"], client_input=payload) ### Start DurableOrechestartor func
        logging.info(f"Started orchestration with ID = '{instance_id}'.")
        # durable_orchestration_status = await client.get_status(instance_id)
        response = client.create_check_status_response(req, instance_id)
        #return client.create_check_status_response(req, instance_id)
        logging.info("Starter Response is below : ")
        logging.info(response)
        return response
    except Exception as e :
        logging.error(f"Exception occured in Starter : {str(e)}")
        json_output=json.dumps(str(e))
        log={"status":"Failed","status_code":400,"azfunc_output":json_output, "warehouse" : ""}
        return func.HttpResponse(json_output,mimetype='application/json',status_code=400)`
davidmrdavid commented 3 years ago

Hi @tmathewlulu,

Thank you for reaching out! I do have a few clarifying questions and comments, so let me take a step back.

(1) In your post, you refer to the following code as the Durable orchestrator code.

async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
  try :
    status_code=400
    req_body=""
    output=None
    client = df.DurableOrchestrationClient(starter)
    req_body = req.get_body().decode()
    payload = json.loads(req_body)
    instance_id = await client.start_new(req.route_params["functionName"], client_input=payload) ### Start DurableOrechestartor func
    logging.info(f"Started orchestration with ID = '{instance_id}'.")
    # durable_orchestration_status = await client.get_status(instance_id)
    response = client.create_check_status_response(req, instance_id)
    #return client.create_check_status_response(req, instance_id)
    logging.info("Starter Response is below : ")
    logging.info(response)
    return response
except Exception as e :
    logging.error(f"Exception occured in Starter : {str(e)}")
    json_output=json.dumps(str(e))
    log={"status":"Failed","status_code":400,"azfunc_output":json_output, "warehouse" : ""}
    return func.HttpResponse(json_output,mimetype='application/json',status_code=400)`_

This is a common misconception, but this is not an orchestrator :) . This is an orchestrator client. In particular, it appears to be an HTTPTrigger'ed orchestrator client, which starts some orchestrator code defined elsewhere. I just wanted to make sure we're using the same terminology!

(2) Could you provide us with a minimal reproducer of this issue? A zip file or linked repo would do the trick. Otherwise, pseudocode would help. In particular, understanding how your orchestrator code utilizes the durable API is key to investigating this.

(3) For us to look at our logs, we'll need the following info about your app:

And that should be it for now! Thanks again for reaching out and for trying Durable Python ⚡ ⚡

tamathew commented 3 years ago

Thank you @davidmrdavid for acknowledging the issue. PFA the code. I have NOT included the local settings.

You can reproduce the issue by running a wait or sleep inside activity. In my case it is a Snowflake SQL execution through Snowflake SQL connector.

Timeframe issue observed: 1/26/2021 - All day Function App name: lllcddsedwfapp-py-v1 Function name(s):ExecSnowSQLDurableActivity, ExecSnowSQLDurableOrchestrator, ExecSnowSQLStarter Azure region: East US 2 Azure storage account name: lllcddsedwraw1 Orchestration instance ID(s): Inside zip file : OrchestrationID_and_Response.json

DurableTrio.zip

tamathew commented 3 years ago

The Csharp durable took exactly 14 min where as Python Durable ran longer.

Durable_Csharp_Output Durable_Python_Output
tamathew commented 3 years ago

@davidmrdavid - What do you think about the issue ? Is it going to take time ? Any ETA ?

davidmrdavid commented 3 years ago

Hi @tmathewlulu, I'll be taking a preliminary look at this now :) , starting with the reproducer and a look at the logs. My ETA for an update is no later than 5pm PDT, certainly most likely earlier, though I can't guarantee we will have identified the issue by then.

I'm still a bit unsure about the behaviour I'm attempting to reproduce, so help me understand two things (1) you're saying I can reproduce this by replacing your activity with a simple sleep statement, is that right? So just have it wait for 10 minutes? So I don't really need SnowflakeSQL, right?

(2) Assuming the answer to (1) is "yes", let's say I replace the activity with a 10 minute-long sleep statement, and then I visit the statusQueryGetUri link as soon as the orchestrator starts, what exactly is the issue I should be seeing? And what is the correct behaviour you'd expect to see?

Thanks! ⚡ ⚡

tamathew commented 3 years ago

Here is the test scenario - Input : Issue a sleep statement for 10min, You may also run a parallel copy of the job for sleep(25 min) to save testing time. Because this issue is intermittent. Expected result : The statusQueryGetUri.runtimeStatus should return "Completed" at the end of 10 minutes and hence poller will mark the process as done. The issue is statusQueryGetUri.runtimeStatus is still in "Pending" or "Running" beyond 10 minutes and hence the client poller cannot determine the end state of the Azure Function [Check my second screenshot above ] and keeps polling.

tamathew commented 3 years ago

FYI - Only my starter HTTP is async function. Does it make a difference ? Also I have NOT defined a FUNCTIONS_WORKER_PROCESS_COUNT variable.

davidmrdavid commented 3 years ago

Just a heads up that will provide an update just slightly past 5pm PDT, got pulled into urgent meeting

tamathew commented 3 years ago

Perfectly fine. Thanks @davidmrdavid

tamathew commented 3 years ago

Also, I'm happy to change my design if there is any alternative option. My business requirement is to just make an API call which can take a long time to get a response. To be precise, I will have to execute a Snowflake SQL procedure which loads a table and takes 30 minutes to process. I want to orchestrate this flow in Azure Data Factory through Azure Function activity(ADF does not support running Snowflake SQL through their native connectors). Azure Function activity cannot run more than 230 seconds due to Azure Load Balancer timeout and that is the reason why I chose durable function.

davidmrdavid commented 3 years ago

Hi @tmathewlulu,

Thanks for your patience, After looking at your latest execution logs, and after speaking with your microsoft-internal point of contact, it seems that after changing your activities to be async, your activity function is now during for the expected amount of time and now it's just your orchestrator that seems to be running for longer than expected. I'll need to sync up w/ the rest of the Durable Functions team to understand if this is expect and/or what could be causing this. I'll loop them in to this GitHub thread tomorrow. I'll also make sure they chime in with potential alternative designs in the meantime.

In the past, we've seen delays in Durable Python due to configuration issues that led the functions host to crash repeatedly. There's a good thread with advice in the last comment of this thread. I really recommend reviewing that thread in case any of that advice applies to you

Unfortunately today I wasn't able to dive too deeply into this, but we are starting a Durable Python sprint this coming Monday and I can make this investigation into a P1 priority for it. I will still continue to investigate this before then, but I'm a little strapped for time until the end of the week.

@ConnorMcMahon, @cgillum . I'd love your thoughts on this thread, let's discuss tomorrow, I'll bring y'all up to speed :)

cgillum commented 3 years ago

I took a quick look at the orchestration instance in question and according to our telemetry it completed successfully (time is UTC):

PreciseTimeStamp RoleInstance AppName InstanceId TaskName FunctionName
10:06:01 PM pl0SmallDedicatedLinuxWebWorkerRole_IN_1778 lllcddsedwfapp-py-v1 435e6be91fd54b4fb055e1003a0458e7 FunctionScheduled ExecSnowSQLDurableOrchestrator
10:06:03 PM pl0SmallDedicatedLinuxWebWorkerRole_IN_1778 lllcddsedwfapp-py-v1 435e6be91fd54b4fb055e1003a0458e7 FunctionStarting ExecSnowSQLDurableOrchestrator
10:48:02 PM pl0SmallDedicatedLinuxWebWorkerRole_IN_1778 lllcddsedwfapp-py-v1 435e6be91fd54b4fb055e1003a0458e7 FunctionScheduled ExecSnowSQLDurableActivity
10:48:04 PM pl0SmallDedicatedLinuxWebWorkerRole_IN_1778 lllcddsedwfapp-py-v1 435e6be91fd54b4fb055e1003a0458e7 FunctionStarting ExecSnowSQLDurableActivity
11:09:02 PM pl0SmallDedicatedLinuxWebWorkerRole_IN_1778 lllcddsedwfapp-py-v1 435e6be91fd54b4fb055e1003a0458e7 FunctionCompleted ExecSnowSQLDurableActivity
11:24:09 PM pl1SmallDedicatedLinuxWebWorkerRole_IN_31 lllcddsedwfapp-py-v1 435e6be91fd54b4fb055e1003a0458e7 FunctionCompleted ExecSnowSQLDurableOrchestrator

I'm not exactly sure why there was such a long delay between the activity completing and the orchestrator completing. We can look into that further. @tmathew1000 can you confirm whether your orchestration eventually finished, according to your poller?

tamathew commented 3 years ago

@cgillum Yes , my orchestration and ADF pipeline eventually finished. My testing involved calling same Durable function (lllcddsedwfapp-py-v1) concurrently from different client ADF pipelines. Most of them completed on time and very few seems to run longer might be... because of a lock in underlying client orchestrating code(per MS Support engineer ). The orchestrating client code is not "async".

Another most recent example is here --> instance id : 36d0474fbaeb4498a73d4ac82f3a54b8. This was supposed to run for 11 min (the artificial delay set in my activity) but it ran for 47 min and finished successfully with an output message : waited 11 minutes. There were other jobs running at the same time invoking the same function.

I have also triggered an ADF pipeline at the same time as above - a C# durable function with same artificial delay of 11min and that function however finished on time - Instance ID for C# durable : 0e15feb07e2740b6a09616d566583711

Output of Python Durable pipeline -

{
    "name": "ExecSnowSQLDurableOrchestrator",
    "instanceId": "36d0474fbaeb4498a73d4ac82f3a54b8",
    "runtimeStatus": "Completed",
    "input": "{\"sql\": \"call system$wait(11, 'MINUTES')\", \"activity_name\": \"Runner_2\", \"factory_name\": \"gdapsandboxadf\", \"pipeline_name\": \"PL_PYTHON_DURABLE_FUNCTION_POC\"}",
    "customStatus": null,
    "output": {
        "status": "Success",
        "status_code": 200,
        "azfunc_output": "waited 11 minutes",
        "warehouse": "ETL_S_DW"
    },
    "createdTime": "2021-01-28T01:20:29Z",
    "lastUpdatedTime": "2021-01-28T02:07:28Z",
    "ADFWebActivityResponseHeaders": {
        "Request-Context": "appId=cid-v1:e6f47123-a861-4198-8964-30e19f2f9fc6",
        "Date": "Thu, 28 Jan 2021 02:07:34 GMT",
        "Server": "Kestrel",
        "Content-Length": "505",
        "Content-Type": "application/json; charset=utf-8"
    },
    "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US 2)",
    "executionDuration": 2824,
    "durationInQueue": {
        "integrationRuntimeQueue": 0
    },
    "billingReference": {
        "activityType": "ExternalActivity",
        "billableDuration": [
            {
                "meterType": "AzureIR",
                "duration": 0.7999999999999999,
                "unit": "Hours"
            }
        ]
    }
}

Output of C# Durable pipeline - #################################################

{
    "name": "longrunningorchestrator",
    "instanceId": "0e15feb07e2740b6a09616d566583711",
    "runtimeStatus": "Completed",
    "input": "call system$wait(11, 'MINUTES')",
    "customStatus": null,
    "output": "{\"status\":waited 11 minutes}",
    "createdTime": "2021-01-28T01:20:40Z",
    "lastUpdatedTime": "2021-01-28T01:31:41Z",
    "ADFWebActivityResponseHeaders": {
        "Request-Context": "appId=cid-v1:eee61f03-de99-4f77-a539-2da0d3f34b7a",
        "Date": "Thu, 28 Jan 2021 01:31:44 GMT",
        "Set-Cookie": "ARRAffinity=55310c53a2a6660a2f3721faf903f53afc86446dd328fe2381776d215e58007b;Path=/;HttpOnly;Domain=lllcddsedwfap9.azurewebsites.net",
        "Content-Length": "292",
        "Content-Type": "application/json; charset=utf-8"
    },
    "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (East US 2)",
    "executionDuration": 661,
    "durationInQueue": {
        "integrationRuntimeQueue": 0
    },
    "billingReference": {
        "activityType": "ExternalActivity",
        "billableDuration": [
            {
                "meterType": "AzureIR",
                "duration": 0.19999999999999998,
                "unit": "Hours"
            }
        ]
    }
}
cgillum commented 3 years ago

Thanks, instance 36d0474fbaeb4498a73d4ac82f3a54b8 looks the same from my end as well. I did notice, however, that from our telemetry, it looks like ExecSnowSQLDurableActivity actually took about 26 minutes to complete (not 11) - not sure if that's expected or relevant. The orchestration then took another 18 minutes to transition into the Completed state.

I looked at the DTFx logs for your app and I wasn't able to find an explanation for this 18 minute delay. I can see that the framework scheduled your orchestration to run and it wasn't until 18 minutes later until it actually finished running.

There are two possible explanations I can think of:

  1. There's something in your orchestrator code that is blocking the completion for these 18 minutes.
  2. The Python worker is preventing the orchestrator code from actually starting its execution, possibly because it's waiting for some other non-async function execution to complete.

Do any of these sound likely to you? Regarding (2), this is actually quite difficult for me to determine because our telemetry does not make this obvious. However, I have seen problems like this for other customers who use Python functions (though not Durable Functions). This explanation also only makes sense if you see this delay only sometimes, or if it's highly variable. I suspect tweaking some of your concurrency settings could resolve this problem, if it is actually the root cause. Or, if you have any non-async Python functions, converting them to be async to free up the main Python execution thread.

tamathew commented 3 years ago

I suspect behavior 2 because I was running multiple instances of same Python Durable function job concurrently.

I have a different example to substantiate this theory. Attached xlsx file has the details.

Night King was blocking Arya for about 36 min :)

Arya_NightKing_AzureFuncIssue.xlsx

I have also set up “FUNCTIONS_WORKER_PROCESS_COUNT” = “10”.
My Durable Activity and Durable starter and async.

cgillum commented 3 years ago

Try setting both maxConcurrentActivityFunctions and maxConcurrentOrchestratorFunctions in host.json to 5. That will help ensure you don’t run out of Python worker processes on any single node.

https://docs.microsoft.com/en-us/azure/azure-functions/durable/durable-functions-perf-and-scale#concurrency-throttles

tamathew commented 3 years ago

@cgillum I tried setting the limit as you said. PFB my host.json. But it did not work. Few of my pipelines even failed with 504 - bad gateway error.

I'm still seeing long running jobs. Instance ID : a05fe03360aa4614bd5dc25c8d946e51. - This was supposed to complete in 1 min, but might got locked with some other parallel process which I was running. It took almost 48 min (I had initiated parallel job with a sleep of 48 min(artificial delay) ).

{
  "version": "2.0",
  "functionTimeout": "02:00:00",
  "logging": {
    "applicationInsights": {
      "samplingSettings": {
        "isEnabled": true,
        "excludedTypes": "Request"
      }
    }
  },
  "extensionBundle": {
    "id": "Microsoft.Azure.Functions.ExtensionBundle",
    "version": "[2.*, 3.0.0)"
  },
  "extensions": {
    "durableTask": {
      "maxConcurrentActivityFunctions": 5,
      "maxConcurrentOrchestratorFunctions": 5
    }
  }
}
cgillum commented 3 years ago

@tmathewlulu thanks for trying this out and setting up a new test. Here are the logs I found (which I expect you also have in your App Insights):

EventTimestamp InstanceId TaskName FunctionName
4:28:29 PM a05fe03360aa4614bd5dc25c8d946e51 FunctionScheduled ExecSnowSQLDurableOrchestrator
4:28:29 PM a05fe03360aa4614bd5dc25c8d946e51 FunctionStarting ExecSnowSQLDurableOrchestrator
4:28:29 PM a05fe03360aa4614bd5dc25c8d946e51 FunctionScheduled ExecSnowSQLDurableActivity
4:28:29 PM a05fe03360aa4614bd5dc25c8d946e51 FunctionStarting ExecSnowSQLDurableActivity
5:17:17 PM a05fe03360aa4614bd5dc25c8d946e51 FunctionCompleted ExecSnowSQLDurableActivity
5:17:17 PM a05fe03360aa4614bd5dc25c8d946e51 FunctionCompleted ExecSnowSQLDurableOrchestrator

The good news (if we can call it that) is that the orchestrator completed immediately after the activity finished. This all further supports the theory that this is a Python concurrency issue.

I'm trying to reason about why my workaround wasn't sufficient. I am able to see that there were 10 Python workers allocated on this node at around 4:27 PM, so that shouldn't be the issue. There are also host.json settings named controlQueueBatchSize and controlQueueBufferThreshold that default to large numbers and could cause orchestrations to get blocked, but it doesn't explain why ExecSnowSQLDurableActivity got blocked (we don't prefetch activity function messages).

Are you sure that this instance of ExecSnowSQLDurableActivity was only supposed to take a minute to execute? Unfortunately I don't have any information about when the Python worker actually started or finished executing it. Only your logs would contain that information.

tamathew commented 3 years ago

@cgillum

Are you sure that this instance of ExecSnowSQLDurableActivity was only supposed to take a minute to execute? - Yes. I have a database SQL command which waits for 1 min (call system$wait(1, 'MINUTES')).

PFB the activity log .

Activity_log01292021_428pm.xlsx

tamathew commented 3 years ago

Below is the skeleton version of ExecSnowSQLDurableActivity

###Activity 
async def main(payload: str) -> str:

    try:
        await execute_snow_sql("CALL SYSTEM$WAIT(1,MINUTES)")  
    except Exception as e:
        logging.error('Error')

async def execute_snow_sql(sql: str) -> str:

    try:
        api_call_to_snowflake_database(sql) 
    except Exception as e:
        logging.error('Error')
davidmrdavid commented 3 years ago

Just as a sanity check, can we replace the call to execute_snow_sql in main with a time.sleep(60)? If we can't replicate the error with it, then the issue might be caused by the connection to snowflakeDB.

tamathew commented 3 years ago

Hi @davidmrdavid and @cgillum ,

I'm currently running the time.sleep instead of Snowflake API and looks like the issue persists.

The instance id - a536716bab684939af43511b96f7ad37 was supposed to finish in 11 minutes(660 seconds) but running since 40 min. And I'm sure it is because of another concurrent job (instance id : f7a464ced93645fa942f0a5eed21c62f )which is supposed to sleep for 50 min(3000 seconds) is still running.

I noticed yet another thing. The runtimeStatus shows as "Pending" instead of "Running".

INSTANCE : a536716bab684939af43511b96f7ad37. - 11 MIN SLEEP {"name":"ExecSnowSQLDurableOrchestrator","instanceId":"a536716bab684939af43511b96f7ad37","runtimeStatus":"Pending","input":"{\"sleep_time\": \"660\", \"activity_name\": \"Runner_2\", \"factory_name\": \"gdapsandboxadf\", \"pipeline_name\": \"PL_PYTHON_DURABLE_FUNCTION_POC\"}","customStatus":null,"output":null,"createdTime":"2021-01-29T22:16:47Z","lastUpdatedTime":"2021-01-29T22:16:47Z"}

INSTANCE : f7a464ced93645fa942f0a5eed21c62f. - 50 MIN SLEEP {"name":"ExecSnowSQLDurableOrchestrator","instanceId":"f7a464ced93645fa942f0a5eed21c62f","runtimeStatus":"Running","input":"{\"sleep_time\": \"3000\", \"activity_name\": \"Runner_2\", \"factory_name\": \"gdapsandboxadf\", \"pipeline_name\": \"PL_PYTHON_DURABLE_FUNCTION_POC\"}","customStatus":null,"output":null,"createdTime":"2021-01-29T22:21:08Z","lastUpdatedTime":"2021-01-29T22:21:08Z"}

tamathew commented 3 years ago

I used the below Activity code

import logging
from .execute_snow_sql import execute_snow_sql
#import azure.functions as func
import os
import json
import time

async def main(payload: str) -> str:
    req_body = payload
    sec = req_body.get('sleep_time')
    dummy_snowflake_sql={
        "status": "Success",
        "status_code": 200,
        "azfunc_output": f"Slept for {int(sec)} seconds",
        "warehouse": "dummy warehouse"
        }
    time.sleep(int(sec))
    return dummy_snowflake_sql
tamathew commented 3 years ago

a536716bab684939af43511b96f7ad37 (Supposed to run 11 min)- This finished at 1/29/21, 3:23:43 PM PST (Took almost 1 hr). The instance : f7a464ced93645fa942f0a5eed21c62f which I suspected blocking the "11 min" one finished around 1/29/21, 3:12:45 PM (Took 54 min - almost expected time ). Having said, there is a 10 min gap between the above run finish times and the "blocking theory" is not holding 100 percent true for this.

tamathew commented 3 years ago

@cgillum @davidmrdavid Any update ?

cgillum commented 3 years ago

Hey @tmathewlulu sorry for the delay. Thanks for providing those activity logs. I'm looking at the first few entries:

timestamp [UTC] message
1/29/2021, 4:28:29.096 PM Executing 'Functions.ExecSnowSQLDurableActivity' (Reason='(null)', Id=2145600b-35b1-47f1-b247-9d1abb0d49dd)
1/29/2021, 5:16:14.347 PM #################################

Can you confirm that the ##### log statement is the first thing the activity function does before doing any real work?

This is definitely looking like a Python concurrency issue, but I'm a little puzzled as to why we're having such a hard time working around it. I think I'll just need to roll up my sleeves and try to reproduce this problem locally in order to figure out exactly what's going on and run some experiments...

tamathew commented 3 years ago

@cgillum - You are right, if you are looking at an older log. In most recent version of the code I have commented out the Snowflake API call part(which also have the ####### comment ). Having said, If I run the function now, you may not see the Python comment(##########...) . Here is the current activity code for your ref. In this code I'm just using Python sleep statement instead of the Snowflake execution API call. The sleep seconds value is passed from my Client code(Azure Data Factory orchestrator)

# This function is not intended to be invoked directly. Instead it will be
# triggered by an orchestrator function.
# Before running this sample, please:
# - create a Durable orchestration function
# - create a Durable HTTP starter function
# - add azure-functions-durable to requirements.txt
# - run pip install -r requirements.txt

import logging
from .execute_snow_sql import execute_snow_sql
#import azure.functions as func
import os
import json
import time

async def main(payload: str) -> str:
    req_body = payload
    sec = req_body.get('sleep_time')
    dummy_snowflake_sql={
        "status": "Success",
        "status_code": 200,
        "azfunc_output": f"Slept for {int(sec)} seconds",
        "warehouse": "dummy warehouse"
        }
    time.sleep(int(sec))
    return dummy_snowflake_sql

# async def execute_snow_sql_dummy(seconds):
#     dummy_snowflake_sql={
#         "status": "Success",
#         "status_code": 200,
#         "azfunc_output": f"Slept for {int(seconds)} seconds",
#         "warehouse": "dummy warehouse"
#         }
#     time.sleep(int(seconds))
#     return dummy_snowflake_sql

# async def main(payload: str) -> str:

#     try:
#         ## Variable initialization

#         sql=""
#         warehouse_override=""
#         warehouse_value_from_config=""
#         warehouse_value=""
#         default_warehouse = os.environ["KV_BI_SNW_DEFAULT_WH"]  # Get from Key vault.
#         config_table=os.environ["KV_BI_WAREHOUSE_CONFIG_TABLE"]

#         ### Get HTTP Body params - This is from ADF Az Func activity
#         req_body = payload
#         sql = req_body.get('sql')
#         warehouse_override = req_body.get('warehouse_override')
#         factory_name = req_body.get('factory_name')
#         pipeline_name = req_body.get('pipeline_name')
#         activity_name = req_body.get('activity_name')

#         logging.info("#################################")
#         logging.info(f"Values passed from ADF : ")
#         logging.info("#################################")
#         logging.info(f"SQL :{sql}")
#         logging.info(f"warehouse_override :{warehouse_override}")
#         logging.info(f"factory_name :{factory_name}")
#         logging.info(f"pipeline_name :{pipeline_name}")
#         logging.info(f"activity_name :{activity_name}")
#         logging.info("#################################")

#         dummy_snowflake_sql={
#         "status": "Success",
#         "status_code": 200,
#         "azfunc_output": "dummy output",
#         "warehouse": "dummy warehouse"
#         }

#         if warehouse_override is not None:
#             warehouse_override=warehouse_override.strip()

#         if warehouse_override is not None or warehouse_override!="":
#             warehouse_value=warehouse_override   ### First check for warehouse value override from ADF### If no override, use the default warehouse config for finding the warehouse config info for executing stored proc.

#         else:
#             warehouse_value= default_warehouse  

#         if sql is None or sql=='' or factory_name is None or factory_name.strip()==''  or pipeline_name is None or pipeline_name.strip()=='' or activity_name is None or activity_name.strip()==''  : ### Check if body has all data.
#            log={"status":"Failed","status_code":400,"azfunc_output":"Please pass valid SQL,Data factory name,Pipeline name,Activity name in the body ", "warehouse" : warehouse_value}
#            return log
#         else :
#             ## Get the warehouse configuration from metadata table.
#             sql_to_get_warehouse = f'''
#             select 
#                   warehouse_name
#             from  {config_table} 
#             where factory_name='{factory_name}' and pipeline_name='{pipeline_name}' and activity_name='{activity_name}' and enabled = true 

#             '''

#             logging.info(f"SQL to get the warehouse : {sql_to_get_warehouse}")
#             logging.info(f'Snowflake SQL : {str(sql)}')  

#             #seconds=sql_to_get_warehouse  
#             output_for_wh = await execute_snow_sql(sql_to_get_warehouse,warehouse_value)  ## Use default warehouse

#             warehouse_value_from_config = output_for_wh['azfunc_output']
#             if warehouse_value_from_config is not None:
#                 warehouse_value_from_config=warehouse_value_from_config.strip()

#     except Exception as e:
#         logging.error(f"Exception occured in Activity to get warehouse config : {e}")
#         log={"status":"Failed","status_code":500,"azfunc_output":f" {e} Failed to execute SQL: {sql}", "warehouse" : warehouse_value}
#         logging.error(log)
#         return log

#     try :  ## Use the warehouse configuration to execute the SQL.
#             if warehouse_override is not None and warehouse_override!="":
#                 warehouse_value=warehouse_override   ### First check for warehouse value override from ADF
#                 logging.info(f"Using warehouse override value : {warehouse_override}")
#             elif warehouse_value_from_config is not None and warehouse_value_from_config!="":
#                 warehouse_value=warehouse_value_from_config  ## If override is not present, check for warehouse value from config table
#                 logging.info(f"Using warehouse config table value : {warehouse_value_from_config}")
#             else:
#                 warehouse_value=default_warehouse ## Use default from key vault if there is no override or config entry
#                 logging.info(f"Using default warehouse value : {default_warehouse}")

#             #seconds=sql_to_get_warehouse
#             output = output_for_wh = await execute_snow_sql(sql,warehouse_value)

#             logging.info("Output response : ")
#             logging.info(output)
#             return output
#     except Exception as e:
#         logging.error(f"Exception occured in Az Function Durable Activity : {str(e)}")
#         log={"status":"Failed","status_code":500,"azfunc_output":f" {e} Failed to execute SQL: {sql}", "warehouse" : warehouse_value}
#         logging.error(log)
#         return log
cgillum commented 3 years ago

Hey @tmathewlulu just wanted to let you know I'm still looking into this. I found a few interesting but unrelated bugs while trying to set up this repro that have sidetracked me a bit.

tamathew commented 3 years ago

@cgillum - Thanks for keeping me updated. Fingers crossed for the RCA...:)

cgillum commented 3 years ago

Alright, I set up a very simple repro app but I'm not seeing any unexpected delays in activity function execution. Here is my setup:

host.json:

{
  "version": "2.0",
  "extensionBundle": {
    "id": "Microsoft.Azure.Functions.ExtensionBundle",
    "version": "[2.*, 3.0.0)"
  },
  "extensions": {
    "durableTask": {
      "maxConcurrentActivityFunctions": 5,
      "maxConcurrentOrchestratorFunctions": 5
    }
  }

Starter

import logging

import azure.functions as func
import azure.durable_functions as df

async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
    client = df.DurableOrchestrationClient(starter)
    instance_id = await client.start_new("DurableFunctionsOrchestrator1", None, None)
    logging.info(f"Started orchestration with ID = '{instance_id}'.")
    return client.create_check_status_response(req, instance_id)

Orchestration

import azure.functions as func
import azure.durable_functions as df

def orchestrator_function(context: df.DurableOrchestrationContext):
    result1 = yield context.call_activity('DurableActivity1', "Tokyo")
    return result1

main = df.Orchestrator.create(orchestrator_function)

Activity

import logging
import time

def main(name: str) -> str:
    logging.info(f"Started activity with input = '{name}'.")
    time.sleep(60)
    logging.info(f"Finished activity with input = '{name}'.")
    return f"Hello {name}"

To start the test, I send 10 concurrent HTTP requests to start 10 orchestrations. I observe that all 10 get scheduled almost immediately and then 5 activity functions start running. Those 5 activity functions block for 60 seconds and then complete. Shortly after that, the next 5 start, block for 60 seconds and then complete. All 10 activity functions report executing for 60 seconds in spite of the blocking behavior because the thottling logic in host.json ensures that I never start more work than I can process concurrently.

[2021-02-04T19:17:29.711] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=5c7b6da9-e2d2-4c22-b148-7e57f25ce05c)
[2021-02-04T19:17:29.769] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=dfaba515-987b-4f07-be7d-12ab40bda8a7)
[2021-02-04T19:17:29.791] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=91c89f71-d338-4182-b9e4-d523dd5ae18a)
[2021-02-04T19:17:29.804] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=e7f4bec8-068c-40ed-861a-4cc2efc9387e)
[2021-02-04T19:17:29.834] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=1b55089e-2361-46e2-ace4-f08d9116f393)
[2021-02-04T19:18:29.744] Executed 'Functions.DurableActivity1' (Succeeded, Id=5c7b6da9-e2d2-4c22-b148-7e57f25ce05c, Duration=60032ms)
[2021-02-04T19:18:29.812] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=7ab96feb-e3f4-49e2-9da8-31a4590b553b)
[2021-02-04T19:18:29.828] Executed 'Functions.DurableActivity1' (Succeeded, Id=91c89f71-d338-4182-b9e4-d523dd5ae18a, Duration=60036ms)
[2021-02-04T19:18:29.826] Executed 'Functions.DurableActivity1' (Succeeded, Id=dfaba515-987b-4f07-be7d-12ab40bda8a7, Duration=60057ms)
[2021-02-04T19:18:29.829] Executed 'Functions.DurableActivity1' (Succeeded, Id=e7f4bec8-068c-40ed-861a-4cc2efc9387e, Duration=60024ms)
[2021-02-04T19:18:29.923] Executed 'Functions.DurableActivity1' (Succeeded, Id=1b55089e-2361-46e2-ace4-f08d9116f393, Duration=60088ms)
[2021-02-04T19:18:29.951] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=a2a04018-379d-47fd-b51e-b2b03cc70a44)
[2021-02-04T19:18:29.992] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=1145d902-d70c-4dec-8d8d-592af97326ea)
[2021-02-04T19:18:30.046] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=424297c3-3366-4b47-bf4e-94661a92b2dd)
[2021-02-04T19:18:30.101] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=f449d6db-c0fb-42cb-b34e-f565a919b69b)
[2021-02-04T19:19:29.846] Executed 'Functions.DurableActivity1' (Succeeded, Id=7ab96feb-e3f4-49e2-9da8-31a4590b553b, Duration=60033ms)
[2021-02-04T19:19:30.014] Executed 'Functions.DurableActivity1' (Succeeded, Id=a2a04018-379d-47fd-b51e-b2b03cc70a44, Duration=60062ms)
[2021-02-04T19:19:30.015] Executed 'Functions.DurableActivity1' (Succeeded, Id=1145d902-d70c-4dec-8d8d-592af97326ea, Duration=60023ms)
[2021-02-04T19:19:30.111] Executed 'Functions.DurableActivity1' (Succeeded, Id=424297c3-3366-4b47-bf4e-94661a92b2dd, Duration=60065ms)
[2021-02-04T19:19:30.171] Executed 'Functions.DurableActivity1' (Succeeded, Id=f449d6db-c0fb-42cb-b34e-f565a919b69b, Duration=60069ms)

Is this test fundamentally different from yours in any way?

cgillum commented 3 years ago

One important thing I discovered which might be related. The behavior of FUNCTIONS_WORKER_PROCESS_COUNT might be responsible for this. When I ran the above test, all of my python language worker processes had already been initialized. However, it actually takes several minutes for all these language workers to be initialized on any given instance, according to this documentation: https://docs.microsoft.com/en-us/azure/azure-functions/functions-app-settings#functions_worker_process_count

Language worker processes are spawned every 10 seconds until the count set by FUNCTIONS_WORKER_PROCESS_COUNT is reached.

It turns out that this documentation is actually slightly wrong, and that it takes much longer than this (over 6 minutes) to get all the language worker processes up and running. That was one of the distractions I was chasing down with the engineering team yesterday. Some fixes will be coming in later releases that improve this, but it may take some time due to slow release cycles.

In any case, I ran this same test again immediately after restarting my functions host and saw very clear evidence of a problem. Here were the new activity execution logs:

[2021-02-04T19:44:09.915] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=df51fcaa-15bf-40ef-9712-831aee18bc04)
[2021-02-04T19:44:09.918] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=fbe165cb-4550-4472-aaa7-50c076edd572)
[2021-02-04T19:44:09.938] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=0417bf9a-acb8-44f0-bd87-313ea3511418)
[2021-02-04T19:44:09.955] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=87e6aa6c-e8c3-4167-a0de-f9fcbd26b2e5)
[2021-02-04T19:44:09.972] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=17786906-8ca5-4bf7-bcb9-1733b4e537e3)
[2021-02-04T19:45:09.977] Executed 'Functions.DurableActivity1' (Succeeded, Id=fbe165cb-4550-4472-aaa7-50c076edd572, Duration=60058ms)
[2021-02-04T19:46:09.966] Executed 'Functions.DurableActivity1' (Succeeded, Id=df51fcaa-15bf-40ef-9712-831aee18bc04, Duration=120057ms)
[2021-02-04T19:47:09.970] Executed 'Functions.DurableActivity1' (Succeeded, Id=0417bf9a-acb8-44f0-bd87-313ea3511418, Duration=180032ms)
[2021-02-04T19:48:09.979] Executed 'Functions.DurableActivity1' (Succeeded, Id=87e6aa6c-e8c3-4167-a0de-f9fcbd26b2e5, Duration=240024ms)
[2021-02-04T19:49:09.981] Executed 'Functions.DurableActivity1' (Succeeded, Id=17786906-8ca5-4bf7-bcb9-1733b4e537e3, Duration=300008ms)
[2021-02-04T19:49:10.034] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=a22ef351-5252-4f5c-8f2c-a5909a689a71)
[2021-02-04T19:49:10.092] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=d5576fa5-d923-4b9a-9702-450b633e017b)
[2021-02-04T19:49:10.130] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=0e22dbc9-aa79-47e8-848b-b9be95b36ad9)
[2021-02-04T19:49:10.149] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=01557f42-0c5e-45d7-a0f6-8742c7239b02)
[2021-02-04T19:49:10.177] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=15a2998e-c064-4ab5-a797-870926c6ed5b)
[2021-02-04T19:50:10.081] Executed 'Functions.DurableActivity1' (Succeeded, Id=a22ef351-5252-4f5c-8f2c-a5909a689a71, Duration=60046ms)
[2021-02-04T19:50:10.153] Executed 'Functions.DurableActivity1' (Succeeded, Id=0e22dbc9-aa79-47e8-848b-b9be95b36ad9, Duration=60023ms)
[2021-02-04T19:50:10.154] Executed 'Functions.DurableActivity1' (Succeeded, Id=d5576fa5-d923-4b9a-9702-450b633e017b, Duration=60061ms)
[2021-02-04T19:50:10.202] Executed 'Functions.DurableActivity1' (Succeeded, Id=01557f42-0c5e-45d7-a0f6-8742c7239b02, Duration=60052ms)
[2021-02-04T19:50:10.214] Executed 'Functions.DurableActivity1' (Succeeded, Id=15a2998e-c064-4ab5-a797-870926c6ed5b, Duration=60037ms)

If you scroll to the right, you can see several of the initial activities report taking several minutes to execute. This is because Durable Functions tried to start 5 concurrently but only one or two language workers were ready at the time, thus those activities got backlogged behind the others that had already started running. Unfortunately the Functions default logging can't distinguish this when reporting latencies so it's difficult to understand.

Long story short, I think this might be what you're actually running into. It's a design flaw of the Functions host when using concurrency limited languages like Python. If I'm correct about this, the only way to guarantee that you never run into this would be to either make your activity functions async or set maxConcurrentActivityFunctions to 1. Give that a try and see if it resolves the issue for you. In the meantime, I'm working with the core Functions team to try and find a way to correct this design flaw.

tamathew commented 3 years ago

Hi @cgillum
Thanks for your analysis. I was already running my "Activity" as async . I changed maxConcurrentActivityFunctions to "1" from "5" and tested. I ran concurrent jobs as usual and I can still see the long running behaviour . PFB the instance ID. It was supposed to finish in 6 min(360sec) but ran for 12 min.. probably due to another parallel process.

I'm running my testing by triggering multiple client instances with different sleep times (2min, 5min, 10 min, 15 min etc)

Instance ID : b2b1d7f601d043899b4f4dc5803afa3f

cgillum commented 3 years ago

@tmathewlulu It's not enough to simply declare the function as async. You need to do that and also use await when doing any blocking operation.

https://stackoverflow.com/questions/50757497/simplest-async-await-example-possible-in-python

tamathew commented 3 years ago

Below is the high level format of the Durable functions. I have placed await and async where it let me to.

starter --> async def main(): instance_id = await client.start_new()

orchestrator -->. (Cannot be asyc as per @davidmrdavid ) def orchestrator_function(): result = yield context.call_activity()

activity ---> async def main(): sleep()

davidmrdavid commented 3 years ago

@tmathewlulu , I think your sleep statement should read await asyncio.sleep(5) from https://stackoverflow.com/questions/56729764/python-3-7-asyncio-sleep-and-time-sleep

tamathew commented 3 years ago

@davidmrdavid
I used await asyncio.sleep() but seems like the issue persists

ID : 4287366d86a3498db19c3436a62b99ca - Supposed to finish in 4 min, but took 7 min.

@cgillum - Can you explain why this issue is not there for C# Az Func but only for Python ?

cgillum commented 3 years ago

@tmathewlulu I used asyncio.sleep() and I couldn't reproduce the issue anymore.

import asyncio
import logging
import time

async def main(name: str) -> str:
    logging.info(f"Started activity with input = '{name}'. ")
    await asyncio.sleep(60)
    logging.info(f"Finished activity with input = '{name}'.") 
    return f"Hello {name}"

I then ran 14 orchestrations concurrently and all activities completed in exactly 60 seconds, no delays:

[2021-02-05T18:50:19.831] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=9abdc73e-ba17-45d6-8b0e-190cedd5b472)
[2021-02-05T18:50:19.850] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=37179a92-4c4a-4d14-9dfa-27436fcf19c9)
[2021-02-05T18:50:19.869] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=802f8eb3-6b9f-4768-ad89-cd8fa03f0b5f)
[2021-02-05T18:50:19.888] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=9cd74ec1-41c6-4394-a0bf-cb761c9205d1)
[2021-02-05T18:50:19.910] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=1e3945d7-b8e4-4068-9b8a-8e9507bf3a04)
[2021-02-05T18:51:19.847] Executed 'Functions.DurableActivity1' (Succeeded, Id=9abdc73e-ba17-45d6-8b0e-190cedd5b472, Duration=60017ms)
[2021-02-05T18:51:19.880] Executed 'Functions.DurableActivity1' (Succeeded, Id=802f8eb3-6b9f-4768-ad89-cd8fa03f0b5f, Duration=60010ms)
[2021-02-05T18:51:19.883] Executed 'Functions.DurableActivity1' (Succeeded, Id=37179a92-4c4a-4d14-9dfa-27436fcf19c9, Duration=60033ms)
[2021-02-05T18:51:20.791] Executed 'Functions.DurableActivity1' (Succeeded, Id=9cd74ec1-41c6-4394-a0bf-cb761c9205d1, Duration=60903ms)
[2021-02-05T18:51:20.801] Executed 'Functions.DurableActivity1' (Succeeded, Id=1e3945d7-b8e4-4068-9b8a-8e9507bf3a04, Duration=60890ms)
[2021-02-05T18:51:21.763] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=37edcab7-7fc3-4515-af7c-a39d4f8bbd12)
[2021-02-05T18:51:21.765] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=75b22b4e-796c-4c35-bbf7-c0837846f235)
[2021-02-05T18:51:22.757] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=cc1f5455-94ac-4562-bc90-4ca71b229b2c)
[2021-02-05T18:51:22.771] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=3d6afa6e-3ecf-45f8-a01c-edbd84bb168b)
[2021-02-05T18:51:22.825] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=cdce2550-bef3-4902-9e0f-141f11aadf1b)
[2021-02-05T18:52:22.781] Executed 'Functions.DurableActivity1' (Succeeded, Id=75b22b4e-796c-4c35-bbf7-c0837846f235, Duration=61016ms)
[2021-02-05T18:52:22.788] Executed 'Functions.DurableActivity1' (Succeeded, Id=cc1f5455-94ac-4562-bc90-4ca71b229b2c, Duration=60031ms)
[2021-02-05T18:52:22.796] Executed 'Functions.DurableActivity1' (Succeeded, Id=37edcab7-7fc3-4515-af7c-a39d4f8bbd12, Duration=61032ms)
[2021-02-05T18:52:22.808] Executed 'Functions.DurableActivity1' (Succeeded, Id=3d6afa6e-3ecf-45f8-a01c-edbd84bb168b, Duration=60037ms)
[2021-02-05T18:52:22.875] Executed 'Functions.DurableActivity1' (Succeeded, Id=cdce2550-bef3-4902-9e0f-141f11aadf1b, Duration=60050ms)
[2021-02-05T18:52:22.924] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=141febc3-ce15-47e4-90d9-e2c1d6f80b2e)
[2021-02-05T18:52:22.952] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=1ce6566a-c332-4635-beef-312f04a0aa08)
[2021-02-05T18:52:22.984] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=54423167-77bd-4fa4-b0b9-4953604ef9df)
[2021-02-05T18:52:22.999] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=f50f0eff-a59a-4caf-80e9-8077879f6bb0)
[2021-02-05T18:53:22.982] Executed 'Functions.DurableActivity1' (Succeeded, Id=1ce6566a-c332-4635-beef-312f04a0aa08, Duration=60031ms)
[2021-02-05T18:53:22.995] Executed 'Functions.DurableActivity1' (Succeeded, Id=141febc3-ce15-47e4-90d9-e2c1d6f80b2e, Duration=60071ms)
[2021-02-05T18:53:23.053] Executed 'Functions.DurableActivity1' (Succeeded, Id=f50f0eff-a59a-4caf-80e9-8077879f6bb0, Duration=60054ms)
[2021-02-05T18:53:23.053] Executed 'Functions.DurableActivity1' (Succeeded, Id=54423167-77bd-4fa4-b0b9-4953604ef9df, Duration=60069ms)

Can you explain why this issue is not there for C# Az Func but only for Python ?

Independent of Azure Functions, Python was designed as a single-threaded runtime. This means that only one thread can be used to execute Python code at a time. If you block that thread, then no other Python function can run until the thread gets unblocked. Node.js is the same way. However, unlike Python, node.js APIs never block threads so it's much less of an issue. C# and Java don't have this issue because these runtimes are multi-threaded.

More information on Python performance and scalability here: https://docs.microsoft.com/en-us/azure/azure-functions/python-scale-performance-reference

So I guess the next thing we need to figure out is why you are seeing a delay but I am not?

tamathew commented 3 years ago

Hi @cgillum - Thanks for the detailed analysis. Can you retry your orchestrations with different sleep durations ? Eg: 1 min, 2 min, 4 min, 8 min, 16 min 1min, 3 min, 9 min, 27 min.

I too tried with fourteen 60 seconds orchestrations - But still facing performance issues.

How I tested ? : I ran my Azure DataFactory pipeline with Az function activity within it, triggering one pipeline after other upto 14 pipelines with a parameter = 60 seconds for sleep.

My next test would be to create a brand new azure function and try to orchestrate with async changes and test(Will try to use plain simple code from template). I will also try to change my ADF. This will give me a brand new env set up. Will keep you posted.

tamathew commented 3 years ago

@cgillum - Also one more question. Regarding the issue with Python's single threaded runtime . What design work-around could solve this issue ( We can't change the implementation of Python anyways!) ?

cgillum commented 3 years ago

@tmathewlulu I ran another test with varying delays. 3 x 1 minute, 3 x 2 minutes, 3 x 4 minutes, 3 x 8 minutes, and 3 x 16 minutes. All 15 orchestrations were started at the same time and all executed in exactly the sleep time specified. Here is the trigger, orchestration, and activity code I used:

HTTP trigger

import logging
import azure.functions as func
import azure.durable_functions as df

async def main(req: func.HttpRequest, starter: str) -> func.HttpResponse:
    client = df.DurableOrchestrationClient(starter)
    sleep_time = float(req.params.get('sleep'))
    instance_id = await client.start_new("DurableFunctionsOrchestrator1", None, sleep_time)
    logging.info(f"Started orchestration with ID = '{instance_id}', sleep_time = {sleep_time}.")
    return client.create_check_status_response(req, instance_id)

Orchestration

import azure.functions as func
import azure.durable_functions as df

def orchestrator_function(context: df.DurableOrchestrationContext):
    sleep_time = context.get_input()
    yield context.call_activity('DurableActivity1', sleep_time)
    return "done"

main = df.Orchestrator.create(orchestrator_function)

Activity

import asyncio
import logging

async def main(sleep: float):
    logging.info(f"Started activity with input = '{sleep}'. ")
    await asyncio.sleep(sleep)
    logging.info(f"Finished activity with input = '{sleep}'.") 

Here are the logs for the activity executions. They all completed in the expected amount of time and started out with only one or maybe two Python worker processes initialized.

[2021-02-06T17:28:08.851] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=8068bf01-591e-4e2b-9925-b66d9922a876)
[2021-02-06T17:28:09.070] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=12858288-0f0c-4aa2-a207-da544511db4a)
[2021-02-06T17:28:17.884] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=cf5a12a1-0646-4fde-b00c-1baa33b2f556)
[2021-02-06T17:28:17.902] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=8e1e4a4b-69c9-4571-bf74-27cf85a40f09)
[2021-02-06T17:28:18.128] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=f16850ee-01b9-492c-bafb-2a7c843d7c50)
[2021-02-06T17:29:17.887] Executed 'Functions.DurableActivity1' (Succeeded, Id=cf5a12a1-0646-4fde-b00c-1baa33b2f556, Duration=60003ms)
[2021-02-06T17:29:17.946] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=69b3ff80-dba7-4ee6-8de3-924182fe0f21)
[2021-02-06T17:29:18.151] Executed 'Functions.DurableActivity1' (Succeeded, Id=f16850ee-01b9-492c-bafb-2a7c843d7c50, Duration=60023ms)
[2021-02-06T17:29:18.196] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=9b2072e5-045f-4cbb-a341-231c6c96adb6)
[2021-02-06T17:30:08.859] Executed 'Functions.DurableActivity1' (Succeeded, Id=8068bf01-591e-4e2b-9925-b66d9922a876, Duration=120007ms)
[2021-02-06T17:30:08.911] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=b22c9d07-7e7a-4211-b00e-d6a8861610e1)
[2021-02-06T17:30:09.091] Executed 'Functions.DurableActivity1' (Succeeded, Id=12858288-0f0c-4aa2-a207-da544511db4a, Duration=120021ms)
[2021-02-06T17:30:09.144] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=8b57897e-87a5-46c4-9fa8-50545f299634)
[2021-02-06T17:30:17.933] Executed 'Functions.DurableActivity1' (Succeeded, Id=8e1e4a4b-69c9-4571-bf74-27cf85a40f09, Duration=120030ms)
[2021-02-06T17:30:17.982] Executed 'Functions.DurableActivity1' (Succeeded, Id=69b3ff80-dba7-4ee6-8de3-924182fe0f21, Duration=60036ms)
[2021-02-06T17:30:18.004] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=c052e55f-4329-4d3b-bea6-b48f6cd14542)
[2021-02-06T17:30:18.046] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=40992d6d-6c1d-4ee0-93f7-283b1037054c)
[2021-02-06T17:33:18.215] Executed 'Functions.DurableActivity1' (Succeeded, Id=9b2072e5-045f-4cbb-a341-231c6c96adb6, Duration=240019ms)
[2021-02-06T17:33:18.271] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=08b8d4df-1733-4468-bc69-2e5427e5fee5)
[2021-02-06T17:34:08.939] Executed 'Functions.DurableActivity1' (Succeeded, Id=b22c9d07-7e7a-4211-b00e-d6a8861610e1, Duration=240028ms)
[2021-02-06T17:34:09.017] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=2be48575-c4a4-45e1-88f6-19a72ef70f2b)
[2021-02-06T17:34:09.175] Executed 'Functions.DurableActivity1' (Succeeded, Id=8b57897e-87a5-46c4-9fa8-50545f299634, Duration=240031ms)
[2021-02-06T17:34:09.227] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=1443ec7b-9efb-4f9b-bb49-c9d0de101909)
[2021-02-06T17:38:18.047] Executed 'Functions.DurableActivity1' (Succeeded, Id=c052e55f-4329-4d3b-bea6-b48f6cd14542, Duration=480043ms)
[2021-02-06T17:38:18.061] Executed 'Functions.DurableActivity1' (Succeeded, Id=40992d6d-6c1d-4ee0-93f7-283b1037054c, Duration=480015ms)
[2021-02-06T17:38:18.127] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=3d52a08e-231c-41b5-a1e9-2397302c0dcf)
[2021-02-06T17:41:18.305] Executed 'Functions.DurableActivity1' (Succeeded, Id=08b8d4df-1733-4468-bc69-2e5427e5fee5, Duration=480034ms)
[2021-02-06T17:50:09.057] Executed 'Functions.DurableActivity1' (Succeeded, Id=2be48575-c4a4-45e1-88f6-19a72ef70f2b, Duration=960040ms)
[2021-02-06T17:50:09.277] Executed 'Functions.DurableActivity1' (Succeeded, Id=1443ec7b-9efb-4f9b-bb49-c9d0de101909, Duration=960050ms)
[2021-02-06T17:54:18.164] Executed 'Functions.DurableActivity1' (Succeeded, Id=3d52a08e-231c-41b5-a1e9-2397302c0dcf, Duration=960037ms)

I'm wondering if your ADF data pipeline code isn't actually running async? Running async is the key to ensuring you don't run into the delay problem if executing multiple activity functions in parallel.

ConnorMcMahon commented 3 years ago

@cgillum and @tmathewlulu, what versions of Python are you both running?

One thing to note is that according to this link Python 3.6-3.8 set the value of maximum synchronous workers to 1, but in Python 3.9, this value is not set by default, as it presumably handles it in a more intelligent fashion.

This should have less of an impact if all of your code is running async now, but this could explain the difference you were seeing before. Changing the value of PYTHON_THREADPOOL_THREAD_COUNT on Python 3.6-3.8 could have a substantial impact on functions that are forced to have blocking IO. It's not clear to me why 1 is the default when we recommend in our docs to set it to 4+number of available cores.

cgillum commented 3 years ago

@ConnorMcMahon I'm using Python 3.7. I tried setting PYTHON_THREADPOOL_THREAD_COUNT to 10 and running a test where I ran 10 concurrent orchestrations with activities that do a blocking time.sleep(30) but I still got the same backlogging behavior where some activity functions reported running for 60, 120, or even 150 seconds. It's not clear to me whether changing the thread count had any affect at all.

cgillum commented 3 years ago

Update!

So it turns out that PYTHON_THREADPOOL_THREAD_COUNT does allow blocking functions to run in parallel! The reason that it didn't work in my test is because I did not remove the async keyword from my activity function declaration. As soon as I removed the async keyword I was able to run all my activity functions in parallel successfully.

Here is another test that uses 10 second delays and only a single Python language worker process:

local.settings.json

{
  "IsEncrypted": false,
  "Values": {
    "AzureWebJobsStorage": "UseDevelopmentStorage=true",
    "FUNCTIONS_WORKER_PROCESS_COUNT": 1,
    "PYTHON_THREADPOOL_THREAD_COUNT": 10,
    "FUNCTIONS_WORKER_RUNTIME": "python"
  }
}

Activity

import logging
import time

def main(sleep: float):
    logging.info(f"Started activity with input = '{sleep}'. ")
    time.sleep(sleep)
    logging.info(f"Finished activity with input = '{sleep}'.")
    return "done"

Activity logs

[2021-02-08T23:50:24.690Z] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=f2da3b64-face-4180-a9d7-72a11aba30d2)
[2021-02-08T23:50:24.803Z] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=ae8e06e4-0917-4034-8bf8-d4220600af7e)
[2021-02-08T23:50:24.854Z] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=94de84e0-6dda-48c6-be48-9a840b145a98)
[2021-02-08T23:50:24.890Z] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=9c30524f-c730-41ac-bf5c-b153c24111d4)
[2021-02-08T23:50:24.941Z] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=38c5c303-87c9-4d14-af37-ccc5abb71ea5)
[2021-02-08T23:50:34.798Z] Executed 'Functions.DurableActivity1' (Succeeded, Id=f2da3b64-face-4180-a9d7-72a11aba30d2, Duration=10108ms)
[2021-02-08T23:50:34.858Z] Executed 'Functions.DurableActivity1' (Succeeded, Id=ae8e06e4-0917-4034-8bf8-d4220600af7e, Duration=10055ms)
[2021-02-08T23:50:34.859Z] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=6552b7a5-c2d6-42d9-a37f-5b92d292a049)
[2021-02-08T23:50:34.923Z] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=2b4ac96e-d96c-4e45-844a-5fd435327f56)
[2021-02-08T23:50:34.935Z] Executed 'Functions.DurableActivity1' (Succeeded, Id=94de84e0-6dda-48c6-be48-9a840b145a98, Duration=10081ms)
[2021-02-08T23:50:34.955Z] Executed 'Functions.DurableActivity1' (Succeeded, Id=9c30524f-c730-41ac-bf5c-b153c24111d4, Duration=10065ms)
[2021-02-08T23:50:34.966Z] Executed 'Functions.DurableActivity1' (Succeeded, Id=38c5c303-87c9-4d14-af37-ccc5abb71ea5, Duration=10025ms)
[2021-02-08T23:50:35.006Z] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=8f8d2851-ed74-43d0-816b-bd9f3fd33af5)
[2021-02-08T23:50:35.044Z] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=ab975adb-ec35-48f5-8dac-c6006c07caff)
[2021-02-08T23:50:35.065Z] Executing 'Functions.DurableActivity1' (Reason='(null)', Id=a7d2e4fc-2212-4c39-b1dd-f7d4460c0fd3)
[2021-02-08T23:50:44.878Z] Executed 'Functions.DurableActivity1' (Succeeded, Id=6552b7a5-c2d6-42d9-a37f-5b92d292a049, Duration=10018ms)
[2021-02-08T23:50:44.942Z] Executed 'Functions.DurableActivity1' (Succeeded, Id=2b4ac96e-d96c-4e45-844a-5fd435327f56, Duration=10018ms)
[2021-02-08T23:50:45.048Z] Executed 'Functions.DurableActivity1' (Succeeded, Id=8f8d2851-ed74-43d0-816b-bd9f3fd33af5, Duration=10042ms)
[2021-02-08T23:50:45.076Z] Executed 'Functions.DurableActivity1' (Succeeded, Id=ab975adb-ec35-48f5-8dac-c6006c07caff, Duration=10031ms)
[2021-02-08T23:50:45.112Z] Executed 'Functions.DurableActivity1' (Succeeded, Id=a7d2e4fc-2212-4c39-b1dd-f7d4460c0fd3, Duration=10047ms)

As shown above, they all completed in 10 seconds, even with just 1 language worker.

@tmathewlulu can you try setting PYTHON_THREADPOOL_THREAD_COUNT to 10 and trying your scenario again (without the async keyword)?

tamathew commented 3 years ago

@cgillum - PYTHON_THREADPOOL_THREAD_COUNT to 10 and no async on activity. That did not work. Would you mind sharing your entire working code in a zip file. I will create a brand new "Durable - Premium" function with that code and try to call from my data factory.

cgillum commented 3 years ago

@tmathewlulu Here is my project in a zip file. TestPythonApp.zip

tamathew commented 3 years ago

@cgillum @ConnorMcMahon - My funcapp is Py3.8 runtime with Elastic Premium plan. Which one do you want me to try ? I'm planning to start from scratch.

tamathew commented 3 years ago

Hi @cgillum , I have created a brand new funcapp (premium) and used your TestPythonApp code. However I'm still facing he issue. Here is a sample run id - Supposed to finish in 720seconds , but running for more than an hour. I think if we don't use async in activity, the performance is real bad.

{"name":"ChrisDurableFunctionsOrchestrator","instanceId":"2e49a97c9cbb44968348514d8128e287","runtimeStatus":"Running","input":"720.0","customStatus":null,"output":null,"createdTime":"2021-02-15T00:07:26Z","lastUpdatedTime":"2021-02-15T00:09:13Z"}

tamathew commented 3 years ago

Also, @cgillum ,

I have a different ADF batch job ( not the "sleep" one )

I noticed that the issue is not limited to concurrent jobs. As you said before, the language worker process availability might be a reason. Below orchestrator-client output shows the starting time as 18:32 and ending time as 18:38. The actual activity was started around 18:37 - Which means it took almost 5 min to start the activity. My host.json settings are default. PFA the logs.

OPeration_d379eeb7cf9dc54990407923ac0bb35c.xlsx

Orchestrator client response ------- >

{
  "name": "ADFMetaExtractorOrchestrator",
  "instanceId": "af450c5387364f1d8aee5325e31a4522",
  "runtimeStatus": "Completed",
  "input": "{\"api_name\": \"GetTriggerRuns\", \"resource_group\": \"cdp-sedw.rg\", \"factory_name\": \"lllcdpsedwadf1\", \"api_limit\": 999, \"watermark_offset\": 24}",
  "customStatus": null,
  "output": {
    "status": "Success",
    "status_code": 200,
    "function_name": "get_trigger_runs",
    "message": {
      "Impacted rows on  META_DB.CONFIG.T_ADF_META_TRIGGER_RUNS_STG": 7
    }
  },
  "createdTime": "2021-02-15T18:32:21Z",
  "lastUpdatedTime": "2021-02-15T18:38:18Z"
}

Activity AppInsight log -------->

2/15/2021, 6:37:37.428 PM         Inside Meta Extractor Activity
2/15/2021, 6:37:37.401 PM         Executing 'Functions.ADFMetaExtractorActivity' (Reason='(null)', Id=5045dd6a-008b-4f69-88b3-5bedfcca763a)
2/15/2021, 6:37:37.398 PM 071a33d5cf5c244c ADFMetaExtractorActivity   TRUE  

Do you have any work around available for this issue ? Does restarting the az function prior to running the clients help ?

cgillum commented 3 years ago

I took a look at 2e49a97c9cbb44968348514d8128e287 and it appears to have gotten stuck - not related to performance. This would be a good one for us to follow up on separately, as I saw another case of a stuck orchestration that looks like this one recently.

Still need to look at your more recent instance...

cgillum commented 3 years ago

I tried looking at af450c5387364f1d8aee5325e31a4522 but I couldn't find any Durable-specific logs. I did find some generic functions logs but they had some unexpected data in them which makes me think there might be a logging issue with this app (@davidmrdavid you may want to take a look at this).

Using the logs you provided in the .xslx file (which are the most helpful - thank you!), it looks to me that the activity function was not delayed at all.

2/15/2021, 6:37:37.401 PM Executing 'Functions.ADFMetaExtractorActivity' (Reason='(null)', Id=5045dd6a-008b-4f69-88b3-5bedfcca763a)
2/15/2021, 6:37:37.428 PM Inside Meta Extractor Activity

Normally if it was delayed, you'd see a large gap between the "Executing {name}" log statement and the first log statement emitted by the function ("Inside Meta Extractor Activity", in this case). These logs occur within a few milliseconds so there doesn't appear to be a Python concurrency issue. Perhaps what you're observing is the expected behavior of maxConcurrentActivityFunctions? The difference between the behavior for Python concurrency issues and explicitly configured throttling is that explicitly configured throttling should result in accurate latency reporting and an improved ability to scale-out when the Scale Controller sees that there are unprocessed activity messages in the work-item queue. Let me know if I can help clarify this further.