serverlessworkflow / synapse

Serverless Workflow Management System (WFMS)
https://serverlessworkflow.io
Apache License 2.0
225 stars 35 forks source link

Dynamic contextAttributeName #356

Closed brampurnot closed 2 months ago

brampurnot commented 11 months ago

Hi all,

got another issue that I'm struggling with and I want to know if this is supported in Synapse. We currently have a workflow that is started from a schedule (every 15 minutes). We first call an API to trigger some async logic. The logic will gather data from multiple sources and then aggregate them and return them. For this we are using an event that is being returned by the API.

What we want to do now is to setup the correct correlation since, in theory, there can be multiple workflow instances running at the same time. My idea was that we setup a contextAttributeValue in the workflow definition which uses the workflow instance ID; so something like this: "correlation": [ { "contextAttributeName": "wfid", "contextAttributeValue": "${ $WORKFLOW.workflow.instanceId }" } ]

The API also receives this workflow instance ID and will then include that when generating the event. When trying this out, it seems like the contextAttributeValue is empty in the workflow instance so my guess is that this this not supported?

Any other ways of doing this or will this be foreseen in the future?

Thanks, Bram

cdavernas commented 11 months ago

AFAIR, runtime expressions in correlation attributes are indeed supported by Synapse, but will only get populated when consuming an event of the source/type defined in the correlations parent

cdavernas commented 11 months ago

On a side note, you'd probably be better off using the correlationid extension attribute defined for such use cases by the CloudEvent spec.

In case that does not apply and/or is not ubiquitous in your domain, I'd then use causationid, which is a commonly used event driven term to identify causation.

brampurnot commented 11 months ago

Thanks for that tip! I’ll have a look at it.

brampurnot commented 11 months ago

So it's a bit funky what's happening and I'm not 100% sure if I understand the UI correctly neither.

I'm doing this in the workflow definition file: "events": [ { "name": "dc21fe53-c154-4710-8a7c-66bfc0875caa", "source": "https://workflow-service.com", "type": "readFiles", "correlation": [ { "contextAttributeName": "wfid", "contextAttributeValue": "${ $WORKFLOW.workflow.instanceId }" } ] } ]

So basically, I'm first calling an API in the operation. Then transition to an event in the workflow. The API will return an event with that workflow instance ID as an extension attribute in the Cloud Event; here is an example: { "id": "test-1234", "specversion" : "1.0", "subject": "readFiles", "type" : "readFiles", "source" : "https://workflow-service.com", "wfid": "6fd0debf-f957-4e36-b1bb-e1642d584a0b-7mqrecyebe2guk8bwfh6gg", "data" : { "status": 200, "data": "Completed" } }

However the error is thrown in the synapse server that it couldn't determine the correlation. If I look at the correlation in the UI, I do see this: Screenshot 2023-10-23 at 15 05 53

How should I read this screen? Because it seems the wfid is passed and should be correlated, no?

cdavernas commented 11 months ago

You are right indeed, the mapping exists and is set to what seems a correct value, so it should now consume and correlate any events with the specified source/type/wfid.

What is the error/stack that the server is throwing?

It's the bit difficult to understand what's happening without a simple, concrete example that I can test/debug on my own.

brampurnot commented 11 months ago

This is the error I'm getting: [10/23/2023 13:22:48] fail: Synapse.Application.Services.CloudEventCorrelator[0] An error occured while processing an incoming cloud event: System.Exception: Failed to correlate event at Synapse.Application.Commands.Correlations.V1CorrelateEventCommandHandler.HandleAsync(V1CorrelateEventCommand command, CancellationToken cancellationToken) in /src/src/core/Synapse.Application/Commands/Correlations/v1/V1CorrelateEventCommand.cs:line 111 at Synapse.Application.Commands.Correlations.V1CorrelateEventCommandHandler.HandleAsync(V1CorrelateEventCommand command, CancellationToken cancellationToken) in /src/src/core/Synapse.Application/Commands/Correlations/v1/V1CorrelateEventCommand.cs:line 95 at Neuroglia.Mediation.FluentValidationMiddleware2.HandleAsync(TRequest request, RequestHandlerDelegate1 next, CancellationToken cancellationToken) at Neuroglia.Mediation.DomainExceptionHandlingMiddleware2.HandleAsync(TRequest request, RequestHandlerDelegate1 next, CancellationToken cancellationToken) at Neuroglia.Mediation.RequestPipeline2.HandleAsync(IRequest1 request, IServiceProvider serviceProvider, CancellationToken cancellationToken) at Neuroglia.Mediation.Mediator.ExecuteAsync[TResult](IRequest1 request, CancellationToken cancellationToken) at Synapse.IMediatorExtensions.ExecuteAndUnwrapAsync[TResult](IMediator mediator, ICommand1 request, CancellationToken cancellationToken) in /src/src/core/Synapse.Application/Extensions/IMediatorExtensions.cs:line 42 at Synapse.Application.Services.CloudEventCorrelator.CorrelateAsync(IServiceProvider serviceProvider, CloudEvent e, CancellationToken cancellationToken) in /src/src/core/Synapse.Application/Services/CloudEventCorrelator.cs:line 92 [10/23/2023 13:22:48] info: Microsoft.AspNetCore.Hosting.Diagnostics[2] Request finished HTTP/1.1 POST http://synapse.bcb5a7d.kyma.ondemand.com/events/publish application/cloudevents+json 297 - 202 0 - 11.7331ms

I'll see if I can send over an example.

brampurnot commented 11 months ago

Sorry for the delay. Here is an example workflow. As you can see, I'm calling the HttpMock endpoint and pass in the uniqueId ($WORKFLOW.workflow.instanceId).

In the HttpMock API, I'm waiting for 5 seconds before I generate an event and I'm adding the uniqueId as the wfid. Let me know if this works or you need additional inputs.

test-workflow.json

cdavernas commented 2 months ago

@brampurnot I'm closing this as it is stale in 1.0.0-alpha. Correlation engine has been refactored to use the new correlation key definitions defined in the DSL v1.0.0-alpha*.