microsoft / durabletask-java

Java SDK for Durable Functions and the Durable Task Framework
MIT License
13 stars 7 forks source link

Ability to PATCH an instance of an Orchestrator #110

Closed kanupriya15025 closed 8 months ago

kanupriya15025 commented 1 year ago

We use Eternal and Regular Orchestrator Functions to run our workflows. Now, we have a use case where we need to PATCH the payload used by an existing running orchestration. There's no direct way to do it via APIs, however, I was looking if there are any work arounds. But couldn't find any. There were two approaches I thought :

  1. Delete the existing orchestration instance and recreate the with same instanceId. And this operation needs to be atomic. (Problem with this approach is that, the delete operation is async and recreation can't happen if the delete takes time)
  2. Restart the instance with HTTP API /restart. But the problem with this is that I can't set a new payload for the POST call.

Can anyone help me with what might be the best way to achieve this?

kanupriya15025 commented 1 year ago

If this functionality (https://github.com/microsoft/durabletask-java/issues/111) is catered to, PATCH might work

cgillum commented 1 year ago

To be clear, you're looking to patch the orchestration input?

I think approach (2) might not work for you because the restart API is designed to work with completed, failed, or terminated instances. However, it sounds like you want to patch a running orchestration.

If you're using eternal orchestrations, is there an option (3) where you use continueAsNew when a specific external event is received to restart the orchestration with a new input?

ChrisRomp commented 1 year ago

@cgillum From your idea option (3), do you think an external event could be sent with the new params and those could pass to continueAsNew?

ChrisRomp commented 1 year ago

Er, excuse my poor phrasing (Monday morning) -- you said external event, but you mean to pass the new params, right?

cgillum commented 1 year ago

From your idea option (3), do you think an external event could be sent with the new params and those could pass to continueAsNew?

Yes, exactly this.

kanupriya15025 commented 1 year ago

@cgillum / @ChrisRomp Do you think there can an example to explain this better? I was thinking, in order to receive an event, the function must be waiting on it and that would stop the execution of the function that "await event" line, right? (Correct me if that's not right)

If that's so, we don't want to stop the functioning of the logic anywhere.

cgillum commented 1 year ago

I think a way you can accomplish this is using the ctx.anyOf(...) method, where one of the parameters is the external event task and the other(s) are any other task that the orchestration is currently awaiting.

@kaibocai @shreyas-gopalakrishna we may want to look into ways of making this easier. Adding methods like thenAccept(Consumer<? super T> action) to the Task class might be a way to enable a simpler way of handling external events in a way that doesn't require any blocking of the orchestration logic.

kanupriya15025 commented 1 year ago

@cgillum In my scenario, my eternal orchestrator function isn't waiting for any other external event. I just expect a PATCH call, that too isn't necessary that it will receive in its lifetime.

This is essentially my core logic : 1. Start the eternal orchestrator with some input load including a cron expression. 2. Evaluate the next execution time based on cron. 3. Add a delay to the orchestrator and sleep for that time. 4. Wake up and call an activity with the payload provided. 5. Call continueAsNew(payload)

During this course, it can receive a call to patch anything in the payload.

cgillum commented 1 year ago

That makes things fairly simple: just use ctx.anyOf(…).await() and pass in the timer delay task and the external events task as arguments. Depending on which task is completed, you’ll call continueAsNew with an appropriate payload.

kanupriya15025 commented 1 year ago

@cgillum @ChrisRomp I was trying this out like this : `

        Task<?> patchEvent = ctx.waitForExternalEvent("patchEvent");  //this expects new json payload
        Task<Void> timerElapsed = ctx.createTimer(timeToNext); //this is a timer event
        Task<?> winnerEvent = ctx.anyOf(timerElapsed, patchEvent).await();
        if (winnerEvent == patchEvent) {

           jsonPayload = winnerEvent.someFunctionThatReturnsThePayloadFromEvent()  //here I want to fetch the event payload if event type is patchEvent
       } 

`

Here I can't see any function that helps me get the payload of the event. Neither could I see it in the SDK. Can you help me if I am missing something?

cgillum commented 1 year ago

@kanupriya15025 try this:

Task<String> patchEvent = ctx.waitForExternalEvent("patchEvent", String.class);  //this expects new json payload
Task<Void> timerElapsed = ctx.createTimer(timeToNext); //this is a timer event
Task<?> winnerEvent = ctx.anyOf(timerElapsed, patchEvent).await();

String jsonPayload = "";
if (winnerEvent == patchEvent) {
   jsonPayload = patchEvent.await();
}
kanupriya15025 commented 1 year ago

@cgillum Thanks this does work.

        Task<JsonNode> patchEvent = ctx.waitForExternalEvent("patchEvent", JsonNode.class);
        Task<Void> timerElapsed = ctx.createTimer(timeToNext);
        Task<?> winnerEvent = ctx.anyOf(timerElapsed, patchEvent).await();

        if (winnerEvent == patchEvent) {
            JsonNode jsonPayload = patchEvent.await();
            if (!ctx.getIsReplaying()) {
                LOGGER.info("Payload : {}", jsonPayload);
            }            
        }

However, I had one followup question. This log line is printed twice despite that the event was raised only once. Does that mean the event was processed twice? Do you think this will make any difference?

cgillum commented 1 year ago

This log line is printed twice despite that the event was raised only once. Does that mean the event was processed twice?

That's not expected. Do you see this consistently or was it a one-time duplicate log? If you add the instance ID to the log, do you see the same value for each?

if (!ctx.getIsReplaying()) {
    LOGGER.info("Orchestration : {}, Payload : {}" ctx.getInstanceId(), jsonPayload);
}
kanupriya15025 commented 1 year ago

@cgillum I think it was something one of may be running multiple tests. I don't see that line repeating again. Thanks for the help!

kanupriya15025 commented 1 year ago

Reopening this as https://github.com/microsoft/durabletask-java/issues/136 blocks this implementation

kaibocai commented 12 months ago

In my scenario, my eternal orchestrator function isn't waiting for any other external event. I just expect a PATCH call, that too isn't necessary that it will receive in its lifetime.

This is essentially my core logic : 1. Start the eternal orchestrator with some input load including a cron expression. 2. Evaluate the next execution time based on cron. 3. Add a delay to the orchestrator and sleep for that time. 4. Wake up and call an activity with the payload provided. 5. Call continueAsNew(payload)

During this course, it can receive a call to patch anything in the payload.

@kanupriya15025, we have just released java SDK version v1.2.0, which now support chaining the Task using thenApply/thenAccept. Please check it out if it helps for your use case. Thanks.

kamperiadis commented 10 months ago

@kanupriya15025 We have just released SDK v1.4.0 which contains the fix for #136. This should unblock your implementation above. Can you please let us know if this is still an issue?