Closed mukundansundar closed 7 months ago
+1 please make sure we also have the waitForInstanceCompletion like java sdk.
@berndverst @cgillum @DeepanshuA A specific question with respect to the initialization of the workflow runtime instance and registering the workflow, activity against a particular runtime. As of now we always have only one single workflow runtime instance, with a connection specified by host and port (can default to dapr host port)
I would like to modify the decorator
@wfr.workflow.register
To something like
@workflow.register(name=None, workflow_runtime=None)
or simply
@workflow(name=None, workflow_runtime=None)
and same for activity also Instead of
@wfr.activity.register
be either
@activity.register(name=None, workflow_runtime=None)
or
@activity(name=None, workflow_runtime=None)
The default value of workflow runtime instance being created at the start itself, based on values of Dapr host and port from settings.py.
It can if needed be overridden by specifying a different workflow_runtime
value if a new one is created by the user.
This in turn would make it that, a default workflow runtime instance will be created if either workflow or activity decorators are used and only that instance will be used and it cannot be changed later on i.e. make WorkflowRuntime
class a singleton class. If the WorkflowRuntime
class is made a singleton class, then we can make the other two methods of the class start
and shutdown
classmethod
s and thus we should be able to class them using the class itself directly.
I would like your thoughts on this change.
After change:
## default workflow_runtime is used
@workflow.register(name=”hello_world”)
@workflow.serializer(CustomInputSerializer())
def hello_world_wf(ctx: DaprWorkflowContext, input):
print(f'{input}')
yield ctx.call_activity(hello_act, input=1)
yield ctx.call_activity(hello_act, input=10)
yield ctx.wait_for_external_event("event1")
yield ctx.call_activity(hello_act, input=100)
yield ctx.call_activity(hello_act, input=1000)
@activity.register(name=”hello”)
@activity.retry(max_number_of_attempts=3)
def hello_act(ctx: WorkflowActivityContext, input):
global counter
counter += input
print(f'New counter value is: {counter}!', flush=True)
def main( ):
WorkflowRuntime.start()
. . . start workflow as normal
WorkflowRuntime.shutdown()
there is initial draft code that is there in #651. The pending work is adding tests and a TODO for creating a default init for WorkflowRuntime
.
Thanks @mukundansundar. Of the options you mentioned, I'd actually prefer the more explicit approach which doesn't include a default WorkflowRuntime
singleton. My reasoning is the following:
That said, I do value simplicity, and one simplification I'd make to your initial proposal is to combine the multiple decorators into just one. I also wonder if the user should be presented with WorkflowApp
instead of WorkflowRuntime
as the root concept.
wfapp = WorkflowApp(host, port)
@wfapp.workflow(name="hello_world", serializer=CustomInputSerializer)
def hello_world_wf(ctx: DaprWorkflowContext, input):
print(f'{input}')
yield ctx.call_activity(hello_act, input=1)
yield ctx.call_activity(hello_act, input=10)
yield ctx.wait_for_external_event("event1")
yield ctx.call_activity(hello_act, input=100)
yield ctx.call_activity(hello_act, input=1000)
activity_retry_policy = ...
@wfapp.activity(name="hello", retry_policy=activity_retry_policy)
def hello_act(ctx: WorkflowActivityContext, input):
print(f'Activity input: {input}', flush=True)
def main( ):
wfapp.start()
. . . start workflow as normal
Let me know what you think.
I think having the WorkflowRuntime(WorkflowApp) initialized explicitly does add more clarity to the flow.
For the renaming part, my one concern is we have wfapp.start()
. Could that give an incorrect notion that the workflow app has started and the workflow itself has started?
When it is workflow_runtime.start()
, that clearly says its the runtime you are starting and not the app/workflow.
@cgillum Can the runtime be started and stopped multiple times in a given application?
If so can we consider prioritizing with
statement and use that instead of start
and shutdown
?
A concern I have with the wfapp sample in https://github.com/dapr/python-sdk/issues/635#issuecomment-1852530911 is that it suggests that the module that attaches functions to the wfapp is the same module that runs the wfapp, which is not the case for my application.
As I mentioned in https://github.com/dapr/python-sdk/pull/651 I have multiple modules with activities and workflows, but only one workflow runtime that will be instantiated at the top-most level.
I think in this example, WorkflowApp has the same issues as WorkflowRuntime (because in this example WorkflowApp also takes host and port arg)
Would you consider the FastAPI approach, where FastAPI()
can be instantiated as a lightweight container to which functions get attached?
Then a higher level module can import the the 'app' from each child module and mount them to the top-most FastAPI() instance, which is the one that actually does the work.
What if WorkflowApp were only a container for registered workflows and activities, and didn't create a GrpcEndpoint or a TaskHubGrpcWorker until the start()
method were called?
# module1.py
wfapp = WorkflowApp() # a lightweight container
@wfapp.workflow(name="hello_world", serializer=CustomInputSerializer)
def hello_world_wf(ctx: DaprWorkflowContext, input):
print(f'{input}')
yield ctx.call_activity(hello_act, input=1)
...
Then (optionally in a higher level module, can combine multiple WorkflowApp together)
# main.py
from module1 import wfapp as module1_wfapp
from module2 import wfapp as module2_wfapp
wfapp = WorkflowApp() # a lightweight container
wfapp.attach(module1_wfapp)
wfapp.attach(module2_wfapp)
def main( ):
wf_runtime = wfapp.start(host, port) # instantiate WorkflowRuntime, register workflows and activities and calls wf_runtime.start()
# do stuff
wf_runtime.shutdown()
alternatively if workflow_runtime is also going to be a context manager, then maybe this approach?
# main.py
from module1 import wfapp as module1_wfapp
from module2 import wfapp as module2_wfapp
wfapp = WorkflowApp() # a lightweight container
wfapp.attach(module1_wfapp)
wfapp.attach(module2_wfapp)
def main( ):
with wfapp.create_runtime(host, port) as wf_runtime: # instantiate WorkflowRuntime, register workflows and activities
wf_runtime.start()
# do other stuff or sleep or whatever
shutdown()
is automatically called when the wf_runtime context exits
alternatively create_runtime()
can call wf_runtime.start()
, in which case wfapp.start()
makes more sense, as in the first example.
I've been experimenting with decorators for Dapr Workflow for a recent project and have just been pointed to this issue.
I have created a repo with the decorator code extracted if you're interested: https://github.com/stuartleeks/dapr-workflow-exploration-python
closing this issue since the initial set of decorators are implemented in #651 and any further enhancements will be tracked as separate issues.
Describe the proposal
The current Python SDK implementation of Workflow is structured and provides a set of functions to operate the workflow of your code/ long running scenarios.
e.g.:
schedule_new_workflow()
,get_workflow_state()
,wait_for_workflow_start()
The proposal is to leverage the strengths of Python programming model, to simplify the developers’ efforts further, by introducing decorators pattern to add additional features/ functionality to existing functions without modifying the original structure of the code.
Proposed Solution
The Workflow lifecycle is involved with multiple steps as shown below and we have multiple functions for each of these steps.
Currently in Python workflow SDK, we have workflow function definition, activity functions definition followed by registering of the workflow and activity function against the
WorkflowRuntime
instance. For now, there is only oneWorkflowRuntime
instance created which interacts with the Dapr sidecar is shown below.Given that workflow instance(
wfr
), we can register multiple workflow functions against it as well as multiple activity functions against it.@wfr.workflow.register(name)
– Register the workflow with DAPR side car.name
is related to the GitHub issue.@wfr.worflow.serializer(serializer)/@wfr.workflow.deserializer(deserializer)
– to set serialization/ deserialization logic with workflow input/output@wfr.workflow.retry()
– Resiliency, to apply the retries on Workflow.@wfr.activity.register(name)
– Register an Activity with workflow runtime.name
is related to the GitHub issue.@wfr.activity.serializer(serializer)/@wfr.activity.deserializer(deserializer)
– to set serialization/ deserialization logic with activity input/output.@wfr.activity.retry()
- Resiliency, to apply the retries on Activity. Currently an activity can be called with aRetryPolicy
, based on the code indurabletask-python
library. This is to be able to set a default retry policy for the activity even if a retry policy is not specified at the call site.Example code with workflow and activity level decorators:
Potential changes needed to codebase
dapr/python-sdk
to mirror what is there inmicrosoft/durabletask-python
library.WorkflowRuntime
or add a new class to define the decorators stated above.register_activity
andregister_workflow
methods in the class, we can reuse the code that is currently there inWorkflowRuntime
class and extend it to add a decorator for the same.serializer, deserializers
on the activity and the workflow levels, changes are needed in themicrosoft/durabletask-python
library first which then needs to be propagated into thedapr/python-sdk
workflow code.If the
WorkflowRuntime
instance is going to be a singleton instance for the application, we can potentially do away with the creation of the instance and also simplify the decorators toworkflow.register(name)
,activity.register(name)
which is then registers against the singletonWorkflowRuntime
instance created when the app starts.