Azure / azure-functions-durable-python

Python library for using the Durable Functions bindings.
MIT License
136 stars 55 forks source link

Orchestration Trigger via FastAPI #444

Open danielniccoli opened 1 year ago

danielniccoli commented 1 year ago

Is your question related to a specific version? If so, please specify:

V2 Model

What binding does your question apply to, if any? (e.g. Blob Trigger, Event Hub Binding, etc)

Azure Durable Functions Orchestration Triggers

Question

We're planning to write several API endpoints using Azure Durable Functions. We'd like to use FastAPI and its several features like OpenAPI, JSON Schema validation. We cannot use non-Durable functions, because we have potentially long-running tasks such as approval flows, or tasks may intermittently fail and have to be retried until completed.

I've found an article how to use FastAPI with an HTTP Trigger, but you can't just use that information and do the same with the Orchestration Trigger.

The conceptual flow or the orchestration is this:

  1. Client sends JSON to Orchestration Trigger HTTP endpoint to start the orchestration e.g., to request the creation of an object in a cloud-application.
  2. The JSON describes what activies the orchestrator calls, in which order and what parameters is passes.
  3. If the JSON body is invalid, the orchestration trigger should return an error message and not start the orchestration at all.

And also, we want to provide a generated OpenAPI/Swagger documentation for the orchestration triggers.

Is this even possible at the moment?

bhagyshricompany commented 1 year ago

Thanks for informing will update on this.

danielniccoli commented 1 year ago

Hi @bhagyshricompany, do you know if this is possible at the moment, or functionality that must be added to the module?

ricpark commented 1 year ago

I've found that you can do something like this:

from fastapi_app import app as fastapi_app

app = df.DFApp(func.AuthLevel.ANONYMOUS)

@app.route(route="{*route}", auth_level=func.AuthLevel.ANONYMOUS)
@app.durable_client_input(client_name="client")
async def http_trigger(req: func.HttpRequest, client: df.DurableOrchestrationClient) -> func.HttpResponse:
    response: func.HttpResponse = await func.AsgiMiddleware(fastapi_app).handle_async(req)
    # Do activity or orchestration based on response
    return response

host.json will need this attribute:

  "extensions": {
    "http": {
        "routePrefix": ""
    }
  }

With this implementation, I get autogenerated openapi docs (http://localhost:7071/docs) and informative ValidationErrors from Pydantic that return before the orchestrations begin. Your business logic ends up living outside of the fastapi_app, but I think the end result for the user can be the same. It would be ideal if one could inject durable functions as middleware into fastapi, but I don't know how to do that.

AndreRicardo-Zoetis commented 11 months ago

I've found that you can do something like this:

from fastapi_app import app as fastapi_app

app = df.DFApp(func.AuthLevel.ANONYMOUS)

@app.route(route="{*route}", auth_level=func.AuthLevel.ANONYMOUS)
@app.durable_client_input(client_name="client")
async def http_trigger(req: func.HttpRequest, client: df.DurableOrchestrationClient) -> func.HttpResponse:
    response: func.HttpResponse = await func.AsgiMiddleware(fastapi_app).handle_async(req)
    # Do activity or orchestration based on response
    return response

host.json will need this attribute:

  "extensions": {
    "http": {
        "routePrefix": ""
    }
  }

With this implementation, I get autogenerated openapi docs (http://localhost:7071/docs) and informative ValidationErrors from Pydantic that return before the orchestrations begin. Your business logic ends up living outside of the fastapi_app, but I think the end result for the user can be the same. It would be ideal if one could inject durable functions as middleware into fastapi, but I don't know how to do that.

I've tried this approach with function_app.py

import azure.durable_functions as df
import azure.functions as func

from durable_blueprints import bp
from WrapperFunction import app as fastapi_app

app = df.DFApp(func.AuthLevel.ANONYMOUS)
app.register_blueprint(bp)

@app.route(route="{*route}", auth_level=func.AuthLevel.ANONYMOUS)
@app.durable_client_input(client_name="client")
async def http_trigger(
    req: func.HttpRequest, client: df.DurableOrchestrationClient
) -> func.HttpResponse:
    response: func.HttpResponse = await func.AsgiMiddleware(fastapi_app).handle_async(
        req
    )
    # Do activity or orchestration based on response
    return response

And durable_blueprints.py

import logging

import azure.durable_functions as df
import azure.functions as func

# To learn more about blueprints in the Python prog model V2,
# see: https://learn.microsoft.com/en-us/azure/azure-functions/functions-reference-python?tabs=asgi%2Capplication-level&pivots=python-mode-decorators#blueprints

# Note, the `func` namespace does not contain Durable Functions triggers and bindings, so to register blueprints of
# DF we need to use the `df` package's version of blueprints.
bp = df.Blueprint()

# We define a standard function-chaining DF pattern

@bp.route(route="startOrchestrator")
@bp.durable_client_input(client_name="client")
async def start_orchestrator(req: func.HttpRequest, client):
    instance_id = await client.start_new("my_orchestrator")

    logging.info(f"Started orchestration with ID = '{instance_id}'.")
    return client.create_check_status_response(req, instance_id)

@bp.orchestration_trigger(context_name="context")
def my_orchestrator(context: df.DurableOrchestrationContext):
    result1 = yield context.call_activity("say_hello", "Tokyo")
    result2 = yield context.call_activity("say_hello", "Seattle")
    result3 = yield context.call_activity("say_hello", "London")
    return [result1, result2, result3]

@bp.activity_trigger(input_name="city")
def say_hello(city: str) -> str:
    return f"Hello {city}!"

But when I try the url http://localhost:7071/startOrchestrator Just get 404 {"detail":"Not Found"}

What am I doing wrong?

AndreRicardo-Zoetis commented 11 months ago

I finally got a working version to call the Orchestrator using FastAPI, model v2!

Any comments on how to improve welcome! I could not create a FastAPI middleware to pass the starter.

function_app.py

import logging

import azure.durable_functions as df
import azure.functions as func
from fastapi import Depends, Request

from WrapperFunction import app as fastapi_app

df_app = df.DFApp(func.AuthLevel.ANONYMOUS)

def get_starter(request: Request) -> str:
    starter = request.scope["azure_functions.trace_context"].attributes["starter"]
    return starter

# This is the FastAPI version of the `route="startOrchestrator"`
# `async def start_orchestrator(req: func.HttpRequest, client):`
@fastapi_app.get(path="/fast_orchestrator")
async def fast_orchestrator(starter: str = Depends(get_starter)):
    client = df.DurableOrchestrationClient(starter)
    instance_id = await client.start_new("my_orchestrator")

    reply = f"Started orchestration with ID = '{instance_id}'."
    logging.info(reply)
    # Don't know how to convert this back to Azure func.HttpRequest
    # return client.create_check_status_response(req, instance_id)

    # Here we should build the HTTP 202 with location header to the /status/{instance_id}

    return reply

@fastapi_app.get("/status/{instance_id}")
async def status(instance_id, starter: str = Depends(get_starter)):
    client = df.DurableOrchestrationClient(starter)
    result = await client.get_status(instance_id=instance_id)
    return result

@df_app.orchestration_trigger(context_name="context")
def my_orchestrator(context: df.DurableOrchestrationContext):
    result1 = yield context.call_activity("say_hello", "Tokyo")
    result2 = yield context.call_activity("say_hello", "Seattle")
    result3 = yield context.call_activity("say_hello", "London")
    return [result1, result2, result3]

@df_app.activity_trigger(input_name="city")
def say_hello(city: str) -> str:
    return f"Hello {city}!"

@df_app.route(route="{*route}", auth_level=func.AuthLevel.ANONYMOUS)
@df_app.generic_input_binding(arg_name="starter", type="durableClient")
async def http_trigger(
    req: func.HttpRequest, context: func.Context, starter: str
) -> func.HttpResponse:
    context.trace_context.attributes["starter"] = starter
    response: func.HttpResponse = await func.AsgiMiddleware(fastapi_app).handle_async(
        req, context
    )
    # Do activity or orchestration based on response
    return response

First call http://localhost:7071/fast_orchestrator

image

Then get the instance_id and call http://localhost:7071/status/3bbeeef84e484a3f8bdab2db520ef317 image

Using OpenAPI documentation

image

PS: I ended up also refactoring the dependency to get a df.DurableOrchestrationClient

def get_client(request: Request) -> df.DurableOrchestrationClient:
    starter = request.scope["azure_functions.trace_context"].attributes["starter"]
    client = df.DurableOrchestrationClient(starter)
    return client

And modified the response to be similar to the default Azure functions endpoint

    location = f"{request.base_url}status/{instance_id}"
    headers = {"Location": str(location)}
    content = {"id": instance_id}
    return JSONResponse(status_code=202, headers=headers, content=content)
danielniccoli commented 10 months ago

I've found that you can do something like this:

from fastapi_app import app as fastapi_app

app = df.DFApp(func.AuthLevel.ANONYMOUS)

@app.route(route="{*route}", auth_level=func.AuthLevel.ANONYMOUS)
@app.durable_client_input(client_name="client")
async def http_trigger(req: func.HttpRequest, client: df.DurableOrchestrationClient) -> func.HttpResponse:
    response: func.HttpResponse = await func.AsgiMiddleware(fastapi_app).handle_async(req)
    # Do activity or orchestration based on response
    return response

host.json will need this attribute:

  "extensions": {
    "http": {
        "routePrefix": ""
    }
  }

With this implementation, I get autogenerated openapi docs (http://localhost:7071/docs) and informative ValidationErrors from Pydantic that return before the orchestrations begin. Your business logic ends up living outside of the fastapi_app, but I think the end result for the user can be the same. It would be ideal if one could inject durable functions as middleware into fastapi, but I don't know how to do that.

I've tried this approach with function_app.py

import azure.durable_functions as df
import azure.functions as func

from durable_blueprints import bp
from WrapperFunction import app as fastapi_app

app = df.DFApp(func.AuthLevel.ANONYMOUS)
app.register_blueprint(bp)

@app.route(route="{*route}", auth_level=func.AuthLevel.ANONYMOUS)
@app.durable_client_input(client_name="client")
async def http_trigger(
    req: func.HttpRequest, client: df.DurableOrchestrationClient
) -> func.HttpResponse:
    response: func.HttpResponse = await func.AsgiMiddleware(fastapi_app).handle_async(
        req
    )
    # Do activity or orchestration based on response
    return response

And durable_blueprints.py

import logging

import azure.durable_functions as df
import azure.functions as func

# To learn more about blueprints in the Python prog model V2,
# see: https://learn.microsoft.com/en-us/azure/azure-functions/functions-reference-python?tabs=asgi%2Capplication-level&pivots=python-mode-decorators#blueprints

# Note, the `func` namespace does not contain Durable Functions triggers and bindings, so to register blueprints of
# DF we need to use the `df` package's version of blueprints.
bp = df.Blueprint()

# We define a standard function-chaining DF pattern

@bp.route(route="startOrchestrator")
@bp.durable_client_input(client_name="client")
async def start_orchestrator(req: func.HttpRequest, client):
    instance_id = await client.start_new("my_orchestrator")

    logging.info(f"Started orchestration with ID = '{instance_id}'.")
    return client.create_check_status_response(req, instance_id)

@bp.orchestration_trigger(context_name="context")
def my_orchestrator(context: df.DurableOrchestrationContext):
    result1 = yield context.call_activity("say_hello", "Tokyo")
    result2 = yield context.call_activity("say_hello", "Seattle")
    result3 = yield context.call_activity("say_hello", "London")
    return [result1, result2, result3]

@bp.activity_trigger(input_name="city")
def say_hello(city: str) -> str:
    return f"Hello {city}!"

But when I try the url http://localhost:7071/startOrchestrator Just get 404 {"detail":"Not Found"}

What am I doing wrong?

This is probably because you do not have a FastAPI route for /startOrchestrator.

With response: func.HttpResponse = await func.AsgiMiddleware(fastapi_app).handle_async(req) you pass the http request to FastAPI. handle_async() converts the af-http-request to a normal http request, passes it to fastapi_app, takes its return value and converts it back to an af-http-request. FastAPI and its routes are used as a middleware.