temporalio / sdk-java

Temporal Java SDK
https://temporal.io
Apache License 2.0
211 stars 142 forks source link

Failed promise before calling allOf is not failing the wrapped promise. #2046

Open awx-michael-wang opened 5 months ago

awx-michael-wang commented 5 months ago

Expected Behavior

Promise.allOf(promises) should fail as long as one of the promises fails, no matter it is already failed before calling the allOf or after.

Actual Behavior

If one Promise in promises is already failed before calling allOf, the failure is ignored.

How to reproduce

A workflow like following will ended successfully.

        val promise1 = Async.procedure { throw RuntimeException() }
        val promise2 = Async.procedure {
            // do something else
        }
        someActivity.doSomething()

        Promise.allOf(promise1, promise2).get()

Analysis

AllOfPromise#addPromise only handles the promise that is not completed.

Specifications

Quinn-With-Two-Ns commented 5 months ago

Yes this is a bug, looks like we ignore the promise if it has completed regardless of how it completed

https://github.com/temporalio/sdk-java/blob/ed211fa611112288b576a2c979be9284e17fec89/temporal-sdk/src/main/java/io/temporal/internal/sync/AllOfPromise.java#L54

We should check if it has completed exceptionally or not.

Fixing this though could break backwards compatibility since this could effect workflow code. Any fix will need to make sure to preserve history compatibility with workflows that have this scenario.

Quinn-With-Two-Ns commented 5 months ago

In the mean time the easiest way to workaround this bug I think is to implement a wrapper around promiseAllOf to check if any promise passed in failed

static Promise<Void> allOf(Promise<?>... promises) {
  for (Promise<?> p : promises) {
      if (p.isCompleted() && p.getFailure() != null) {
        return Workflow.newFailedPromise(p.getFailure());
      }
    }
    return WorkflowInternal.promiseAllOf(promises);
  }
}
awx-michael-wang commented 4 months ago

Thank you. I did implement a similar wrapper:

fun <T> List<Promise<T>>.safeAllOf(): Promise<Void> {
    val existFailure = this.filter { it.isCompleted }
        .firstNotNullOfOrNull { it.failure }

    return if (existFailure != null) {
        Workflow.newFailedPromise(existFailure)
    } else {
        Promise.allOf(this)
    }
}