restatedev / sdk-java

Restate SDK for JVM Languages
https://restate.dev
MIT License
22 stars 4 forks source link

Integrating with popular futures/flow libraries #6

Open slinkydeveloper opened 1 year ago

slinkydeveloper commented 1 year ago

This is an umbrella issue about the common issues for integrating our sdk with popular flow/futures frameworks, such as Reactor, Mutiny, RxJava and, at some extent, for Kotlin coroutines as well.

Deterministic future completions

Future completions needs to be completed in a deterministic order when replaying. This is required to make sure combinator operators will produce a deterministic result, given their implementation being deterministic. We have checked with Reactor, RxJava and Kotlin coroutines, and it's guaranteed that their combinators implementation is deterministic.

We have brainstormed 2 ideas to solve this issue:

When, if and how to propagate suspension points

The network layer is the component responsible for triggering suspensions, which can happen for any of these reasons: runtime closed the connection, invocation is in request response/lambda mode and we already wrote the response, ...

Because the wiring between network layer and state machine is push based (that is, the network layer pushes events to the state machine), we can only react to these events by propagating them to the user code layer whenever appropriate. The reason for this push based design of the network layer is mostly dictated by the way Vert.x and other reactive frameworks work on the JVM, but also by the way we communicate with the user layer through futures/callbacks/java flow or whatever other style we use.

Although some of these frameworks support the concept of cancellation, which we could hook in, we still cannot "blindly" discard a computation (e.g. a Mono chain) when we hit a suspension point, as it could very well be that the read channel is closed, but the write channel is still open and we can complete the invocation by writing the output.

By excluding all the alternatives, this means we need to propagate the suspension point upward in the user code and subscribe on its output. The problem with this approach is that we might lose or get the suspension exception hidden in some other exception, e.g. see the case with Mono.firstWithSignal. We might be able to unpack these exceptions, but it requires some specific knowledge in the sdk core of the specific reactive/futures framework we're using (perhaps this is registered as spi?). E.g. https://projectreactor.io/docs/core/release/api/reactor/core/Exceptions.html#unwrapMultiple-java.lang.Throwable- and http://reactivex.io/RxJava/3.x/javadoc/

Propagating suspension information to the runtime

This topic is relevant for future optimizations of the runtime. If we allow users to develop their applications with combinators from other reactor frameworks, we won't be able to optimize the runtime to re-invoke the service only after a certain "await" condition has been reached, as we won't have this information available in the state machine.

slinkydeveloper commented 1 year ago

For the time being we gave up on this specific issue, as after a first attempt to implement the solution 2, we found out that this case:

// Service code
RestateContext context = RestateContext.current();

Awaitable<GreetingResponse> a1 = context.call(GreeterGrpc.getGreetMethod(), GreetingRequest.getDefaultInstance());
Awaitable<GreetingResponse> a2 = context.call(GreeterGrpc.getGreetMethod(), GreetingRequest.getDefaultInstance());

responseObserver.onNext((GreetingResponse) Awaitable.anyOf(a1, a2).await());
responseObserver.onCompleted();

// Test
testInvocation(new AnyCalls(), GreeterGrpc.getGreetMethod())
    .withInput(
            startMessage(5),
            inputMessage(GreetingRequest.getDefaultInstance()),
            orderMessage(0),
            invokeMessage(GreeterGrpc.getGreetMethod(), GreetingRequest.getDefaultInstance(), FRANCESCO_RESPONSE),
            invokeMessage(GreeterGrpc.getGreetMethod(), GreetingRequest.getDefaultInstance(), TILL_RESPONSE),
            orderMessage(3)
    )
    .usingAllThreadingModels()
    .expectingOutput(
            // TODO Are we sure this needs to be here? Ignore this now
            //orderMessage(2),
            outputMessage(TILL_RESPONSE))
    .named("any(a1, a2), both completed, with a2 completed before a1")

Won't pass, meaning the first time this was executed, it returned TILL_RESPONSE, the second time it returned FRANCESCO_RESPONSE.

The reason is that this is because we complete the futures before we reach CompletableFuture.anyOf, the anyOf implementation it will just pick up whichever is the first completed one, which on replay is always a1.