vert-x3 / vertx-rx

Reactive Extensions for Vert.x
Apache License 2.0
147 stars 72 forks source link

Race condition when using AsyncResultSingle #238

Closed marcinus closed 4 years ago

marcinus commented 4 years ago

Version

Vertx 3.9.2. Seems to be valid in 4.0.0 and current master as well.

Context

When looking for best error handling strategy for Knot.x project, I was checking code generated by @RxGen annotation. I've found that function generated for FUTURE kind of method has the following code:

public Single<io.knotx.fragments.api.FragmentResult> rxApply(io.knotx.fragments.api.FragmentContext fragmentContext) { 
    return AsyncResultSingle.toSingle(handler -> {
      apply(fragmentContext, handler);
    });
  }

Checking out AsyncResultSingle I've investigated its subscribeActual method:

this.subscriptionConsumer.accept((ar) -> {
          if (!disposed.getAndSet(true)) {
            if (ar.succeeded()) {
              try {
                observer.onSuccess(ar.result());
              } catch (Throwable var5) {
                // Error handling
              }
            } else if (ar.failed()) {
              try {
                observer.onError(ar.cause());
              } catch (Throwable var4) {
                // Error handling
              }
            }
          }
        });

Where this.subscriptionConsumer is set in factory method called by the generated function above. As you can see, the ar (asynchronous result) gets propagated either to onSuccess with result or onError with cause. However, at the time of subscription it is possible that neither ar.succeeded() nor ar.failed() returns true!. This may lead to an uncompleted, hanging Single connected with further complex logic.

Possibly AsyncResultMaybe and AsyncResultCompletable are also impacted.

Do you have a reproducer?

Yes, a detailed unit test can be seen on my GitHub repository: vertx-rx-java2-race-condition

Steps to reproduce

See reproducer

Extra

Windows 10.0.0.18363, Java 1.8.0_202

marcinus commented 4 years ago

Now that I look at it, the issue occurs only when the rxfied method void apply(FragmentContext fragmentContext, Handler<AsyncResult<FragmentResult>>) calls handler before AsyncResult is finished.

So whenever this method returns Future<FragmentResult> on which it previously calls future.setHandler(handler), this does not occur. Probably this is the reason why it was not spotted earlier.

Question: is it a correct behaviour for a method to call handler.handle(result) before result has finished processing?

vietj commented 4 years ago

@marcinus it is incorrect, the Future type should not extend the AsyncResult type, this was an initial design mistake leading to this situation

vietj commented 4 years ago

@marcinus how does your reproducer connect to an actual use case ? in practice AsyncResult should be implemented by io.vertx.core.Future and the corresponding handler should be called when the promise of this future is completed, so I would like to see how this happens in practice

marcinus commented 4 years ago

@vietj no, this is just a vulnerability I've spotted when checking what errors may occur when calling rxApply of the interface FragmentOperation from Knotx, implemented by

We're going to mitigate this vulnerability by providing more specific interfaces extending FragmentOperation, returning Future<T>, Single<T> or just T instead of providing Handler as an argument.

vietj commented 4 years ago

I would then advice either to force the user to return a Future or use the Promise interface instead:

default void apply(FragmentContext fragmentContext, Handler<AsyncResult<FragmentResult>> resultHandler) {
   Promise<FragmentResult> promise = Promise.promise();
   promise.future().onComplete(resultHandler);
   apply(fragmentContext, promise);
}

void apply2(FragmentContext fragmentContext, Promise<FragmentResult> resultPromise);

or

default void apply(FragmentContext fragmentContext, Handler<AsyncResult<FragmentResult>> resultHandler) {
   Future<FragmentResult> future = apply(fragmentContext);
   future.onComplete(resultHandler);
}

Future<FragmentResult> apply(FragmentContext fragmentContext);

Another way it could be mitigated is by passing an handler that will do this work, i.e in Knot.x code

apply(ctx, ar -> {
  if (ar.succeeded() || ar.failed()) {
    handler.handle(ar);
  } else {
    Future< FragmentResult> fut = (Future)ar;
    fut.onComplete(handler);
  }
});

You can also combine both I think for more safety.

vietj commented 4 years ago

I will close this issue then.