cschleiden / go-workflows

Embedded durable workflows for Golang similar to DTFx/Cadence/Temporal
https://cschleiden.github.io/go-workflows/
MIT License
233 stars 49 forks source link

Sample concurrent not working correctly #236

Closed lovromazgon closed 1 year ago

lovromazgon commented 1 year ago

I was wondering how I could execute activities in parallel and safely collect results from all of them. I found the concurrent sample, but it contains an issue - it collects the result of activity 1 twice. See the log output below, the line A1 result is logged twice, there is no line logging A2 result.

I'm not sure if the sample is incorrect or if this is a bug in the library itself.

Note that I tested this using the sqlite and memory backends.

Logs ``` $ go run concurrent.go -backend sqlite time=2023-09-11T16:14:52.294+02:00 level=DEBUG msg="Created workflow instance" workflows.instance.id=9e0d5e6f-4e06-4694-9e93-bec7ec1d6a7e workflows.execution.id=d8235900-01c2-407e-84d4-b3d420bef624 2023/09/11 16:14:52 Started workflow 9e0d5e6f-4e06-4694-9e93-bec7ec1d6a7e time=2023-09-11T16:14:52.297+02:00 level=DEBUG msg="Executing workflow task" workflows.task.id=9e0d5e6f-4e06-4694-9e93-bec7ec1d6a7e workflows.instance.id=9e0d5e6f-4e06-4694-9e93-bec7ec1d6a7e workflows.execution.id=d8235900-01c2-407e-84d4-b3d420bef624 workflows.task.last_sequence_id=0 time=2023-09-11T16:14:52.297+02:00 level=DEBUG msg="Executing event" workflows.instance.id=9e0d5e6f-4e06-4694-9e93-bec7ec1d6a7e workflows.event.id=c47054f5-b175-4d87-81cb-65b99f43e7c6 workflows.seq_id=0 workflows.event.type=WorkflowTaskStarted workflows.event.schedule_event_id=0 workflows.is_replaying=false time=2023-09-11T16:14:52.297+02:00 level=DEBUG msg="Executing event" workflows.instance.id=9e0d5e6f-4e06-4694-9e93-bec7ec1d6a7e workflows.event.id=8070d1ae-1200-473c-8e5c-11fd04534eb3 workflows.seq_id=0 workflows.event.type=WorkflowExecutionStarted workflows.event.schedule_event_id=0 workflows.is_replaying=false workflows.workflow.name=Workflow1 time=2023-09-11T16:14:52.298+02:00 level=DEBUG msg="Entering Workflow1" workflows.instance.id=9e0d5e6f-4e06-4694-9e93-bec7ec1d6a7e workflows.execution.id=d8235900-01c2-407e-84d4-b3d420bef624 time=2023-09-11T16:14:52.298+02:00 level=DEBUG msg="\tWorkflow instance input:" workflows.instance.id=9e0d5e6f-4e06-4694-9e93-bec7ec1d6a7e workflows.execution.id=d8235900-01c2-407e-84d4-b3d420bef624 msg="Hello world" time=2023-09-11T16:14:52.298+02:00 level=DEBUG msg=Selecting... workflows.instance.id=9e0d5e6f-4e06-4694-9e93-bec7ec1d6a7e workflows.execution.id=d8235900-01c2-407e-84d4-b3d420bef624 time=2023-09-11T16:14:52.298+02:00 level=DEBUG msg="Finished workflow task" workflows.task.id=9e0d5e6f-4e06-4694-9e93-bec7ec1d6a7e workflows.instance.id=9e0d5e6f-4e06-4694-9e93-bec7ec1d6a7e workflows.execution.id=d8235900-01c2-407e-84d4-b3d420bef624 workflows.task.executed_events=4 workflows.task.last_sequence_id=4 workflows.task.workflow_state=0 2023/09/11 16:14:52 Entering Activity1 2023/09/11 16:14:52 Leaving Activity1 2023/09/11 16:14:52 Entering Activity2 time=2023-09-11T16:14:52.302+02:00 level=DEBUG msg="Executing workflow task" workflows.task.id=9e0d5e6f-4e06-4694-9e93-bec7ec1d6a7e workflows.instance.id=9e0d5e6f-4e06-4694-9e93-bec7ec1d6a7e workflows.execution.id=d8235900-01c2-407e-84d4-b3d420bef624 workflows.task.last_sequence_id=4 time=2023-09-11T16:14:52.302+02:00 level=DEBUG msg="Executing event" workflows.instance.id=9e0d5e6f-4e06-4694-9e93-bec7ec1d6a7e workflows.event.id=a2555405-ee6e-47fe-9ebd-aab95a7b969d workflows.seq_id=0 workflows.event.type=WorkflowTaskStarted workflows.event.schedule_event_id=0 workflows.is_replaying=false time=2023-09-11T16:14:52.302+02:00 level=DEBUG msg="Executing event" workflows.instance.id=9e0d5e6f-4e06-4694-9e93-bec7ec1d6a7e workflows.event.id=e8b1872c-abd2-4f1b-a273-a1542a343e51 workflows.seq_id=0 workflows.event.type=ActivityCompleted workflows.event.schedule_event_id=1 workflows.is_replaying=false time=2023-09-11T16:14:52.302+02:00 level=DEBUG msg="A1 result" workflows.instance.id=9e0d5e6f-4e06-4694-9e93-bec7ec1d6a7e workflows.execution.id=d8235900-01c2-407e-84d4-b3d420bef624 r=47 time=2023-09-11T16:14:52.302+02:00 level=DEBUG msg=Selected workflows.instance.id=9e0d5e6f-4e06-4694-9e93-bec7ec1d6a7e workflows.execution.id=d8235900-01c2-407e-84d4-b3d420bef624 time=2023-09-11T16:14:52.302+02:00 level=DEBUG msg=Selecting... workflows.instance.id=9e0d5e6f-4e06-4694-9e93-bec7ec1d6a7e workflows.execution.id=d8235900-01c2-407e-84d4-b3d420bef624 time=2023-09-11T16:14:52.302+02:00 level=DEBUG msg="A1 result" workflows.instance.id=9e0d5e6f-4e06-4694-9e93-bec7ec1d6a7e workflows.execution.id=d8235900-01c2-407e-84d4-b3d420bef624 r=47 time=2023-09-11T16:14:52.302+02:00 level=DEBUG msg=Selected workflows.instance.id=9e0d5e6f-4e06-4694-9e93-bec7ec1d6a7e workflows.execution.id=d8235900-01c2-407e-84d4-b3d420bef624 time=2023-09-11T16:14:52.302+02:00 level=DEBUG msg="Leaving Workflow1" workflows.instance.id=9e0d5e6f-4e06-4694-9e93-bec7ec1d6a7e workflows.execution.id=d8235900-01c2-407e-84d4-b3d420bef624 time=2023-09-11T16:14:52.302+02:00 level=DEBUG msg="Finished workflow task" workflows.task.id=9e0d5e6f-4e06-4694-9e93-bec7ec1d6a7e workflows.instance.id=9e0d5e6f-4e06-4694-9e93-bec7ec1d6a7e workflows.execution.id=d8235900-01c2-407e-84d4-b3d420bef624 workflows.task.executed_events=2 workflows.task.last_sequence_id=6 workflows.task.workflow_state=0 2023/09/11 16:14:57 Leaving Activity2 time=2023-09-11T16:14:57.305+02:00 level=DEBUG msg="Executing workflow task" workflows.task.id=9e0d5e6f-4e06-4694-9e93-bec7ec1d6a7e workflows.instance.id=9e0d5e6f-4e06-4694-9e93-bec7ec1d6a7e workflows.execution.id=d8235900-01c2-407e-84d4-b3d420bef624 workflows.task.last_sequence_id=6 time=2023-09-11T16:14:57.305+02:00 level=DEBUG msg="Executing event" workflows.instance.id=9e0d5e6f-4e06-4694-9e93-bec7ec1d6a7e workflows.event.id=566f942f-c644-435a-ba72-b6120cca2bb1 workflows.seq_id=0 workflows.event.type=WorkflowTaskStarted workflows.event.schedule_event_id=0 workflows.is_replaying=false time=2023-09-11T16:14:57.305+02:00 level=DEBUG msg="Executing event" workflows.instance.id=9e0d5e6f-4e06-4694-9e93-bec7ec1d6a7e workflows.event.id=ba603681-d745-4d21-bd6a-795e329076cc workflows.seq_id=0 workflows.event.type=ActivityCompleted workflows.event.schedule_event_id=2 workflows.is_replaying=false time=2023-09-11T16:14:57.305+02:00 level=DEBUG msg="Finished workflow task" workflows.task.id=9e0d5e6f-4e06-4694-9e93-bec7ec1d6a7e workflows.instance.id=9e0d5e6f-4e06-4694-9e93-bec7ec1d6a7e workflows.execution.id=d8235900-01c2-407e-84d4-b3d420bef624 workflows.task.executed_events=3 workflows.task.last_sequence_id=9 workflows.task.workflow_state=2 ```
cschleiden commented 1 year ago

It's a bug in the sample. The selector used to be stateful and removed cases but when this was changed looks like I didn't update the sample.

Depending on the scenario there are a number of ways you can wait for multiple activities, for example, something like this would work:

a2 := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity2)
a1 := workflow.ExecuteActivity[int](ctx, workflow.DefaultActivityOptions, Activity1, 35, 12)

results := workflow.NewChannel[int]()

for _, f := range []workflow.Future[int]{a1, a2} {
  f := f
  workflow.Go(ctx, func(ctx workflow.Context) {
      r, err := f.Get(ctx)
      if err != nil {
          panic(err)
      }

      logger.Debug("Received results", "result", r)

      results.Send(ctx, r)
  })
}

results.Receive(ctx)
results.Receive(ctx)
lovromazgon commented 1 year ago

Thanks for this, I'll try to update the sample and open a PR if I get the time.