reactor / reactor-core

Non-Blocking Reactive Foundation for the JVM
http://projectreactor.io
Apache License 2.0
4.96k stars 1.2k forks source link

Last emitted item & context update in Retry #2556

Closed kitkars closed 3 years ago

kitkars commented 3 years ago

Please provide a mechanism to access the last emitted value as part of RetrySpec & update the context. This will be helpful to retry to let the publisher know that to start from there once again via context

simonbasle commented 3 years ago

How do you envision this would help the Publisher, since it couldn't differentiate between an initial subscription vs a re-subscription due to retry?

I don't think Retry and retryWhen operators should be in the business of memorizing onNext elements, and it has further implications when you think about cases like reactor-netty, off-heap objects and leak issues.

That said, I have a few ideas. However they are more focusing on general Context (deferredContextual and contextWrite). I'd like still like to hear more about your idea, and any related use case behind it, see if we can come up with something practical enough and versatile enough.

simonbasle commented 3 years ago

This can be done with pure reactor-core code, using an AtomicReference<T> in the context (see Flux.contextWrite(Function) and Flux.deferContextual. Here is a commented example, hope this helps:

    @Test
    @DisplayName("bootstrapping from context storing POJOs")
    void bootstrapSimpleCase() {
        //In this experiment the publisher is one of POJOs. These POJOs are retained
        //as-is for bootstrapping purpose.

        //this will be used so that errors stop after 3 re-subscriptions
        AtomicLong transientErrorCount = new AtomicLong(3);

        //the deferContextual will search for an AtomicReference with that key in the Context
        final String CONTEXT_BOOTSTRAP_KEY = "CONTEXT_BOOTSTRAP_KEY";

        Flux<Integer> bootstrap = Flux
                .deferContextual(ctx -> {

                    //simulate a source that errors after 4 elements...
                    final AtomicInteger errorCountdown = new AtomicInteger(4);
                    //...but that error is transient / stops after x cycles
                    if (transientErrorCount.decrementAndGet() == 0) {
                        errorCountdown.set(1000);
                    }

                    //we'll look for an AtomicReference in the Context
                    AtomicReference<Integer> bootstrapHolder;
                    //from that, we'll get a business value that allows us to "restart from a known point"
                    int skipX = 0;
                    //we need to cover the case where no such holder is set up, so the publisher becomes a classic cold publisher
                    if (!ctx.hasKey(CONTEXT_BOOTSTRAP_KEY)) {
                        //here we create a fake holder so that the rest of the code is unchanged, but even if the holder
                        //is updated with a seed, it won't be reused
                        bootstrapHolder = new AtomicReference<>(null); //unused null object
                        System.out.println("WARNING: No bootstrap key " + CONTEXT_BOOTSTRAP_KEY + " found in Context, will always start the sequence from scratch");
                    }
                    //here is the meat of the bootstrapping:
                    else {
                        bootstrapHolder = ctx.get(CONTEXT_BOOTSTRAP_KEY);
                        Integer bootstrapFromCtx = bootstrapHolder.get();
                        //if there is a meaningful "last known point", reshape the source to take it into account
                        if (bootstrapFromCtx != null && bootstrapFromCtx > 0) {
                            skipX = bootstrapFromCtx;
                            System.err.println("bootstrapping by skipping " + skipX);
                        }
                        //otherwise, initial subscription, full dataset. here the business value of skipX = 0 is sufficient
                        else {
                            System.err.println("bootstrapping from scratch");
                        }
                    }

                    //this generates the source from the know point / seed extracted from Context.
                    Flux<Integer> source = Flux
                            //in reality this could be a parameterized db query for instance...
                            .range(skipX+1, 10-skipX)
                            //we also trigger the error from the countdown (simulating the transient error)
                            .doOnNext(id -> {
                                if (errorCountdown.decrementAndGet() < 1) {
                                    System.err.println("Error triggered in " + id);
                                    throw new IllegalStateException("Error triggered in " + id);
                                }
                            });

                    //this is the source, but taking into account the fact that we need to update the seed
                    return source
                            //the meat of it is that when the source emit, we update the seed in the AtomicReference
                            .doOnNext(newSeed -> {
                                if (ctx.hasKey(CONTEXT_BOOTSTRAP_KEY)) {
                                    bootstrapHolder.set(newSeed);
                                }
                            })
                            .hide();
                });

        //this simulates a retry applied to the bootstrapping Flux
        Flux<String> using = bootstrap
                //we now can retry the bootstrapped Flux
                .retry(4)
                //but we need to set up the Context so that the mutable seed holder is there
                .contextWrite(ctx -> ctx.put(CONTEXT_BOOTSTRAP_KEY, new AtomicReference<Integer>()))
                //we use the source as any other publisher
                .map(i -> "value" + i);

        StepVerifier.create(using.doOnNext(v -> System.out.println("Seen " + v)).collectList())
                    .assertNext(l -> assertThat(l).containsExactly("value1", "value2", "value3", "value4", "value5", "value6", "value7", "value8", "value9", "value10"))
                    .verifyComplete();

        //we can even subscribe multiple times to the outer sequence, each get their own seed thanks to the lazyness of the contextWrite(Function)
        StepVerifier.create(using.doOnNext(v -> System.out.println("Seen " + v)).collectList())
                    .assertNext(l -> assertThat(l).containsExactly("value1", "value2", "value3", "value4", "value5", "value6", "value7", "value8", "value9", "value10"))
                    .verifyComplete();

        System.out.println("\nseparate usage\n");

        //again, the only constraint is that we prepare that particular sequence's Context
        //careful about contextWrite(ContextView) simplified API, as it will
        Flux<Integer> using2 = bootstrap
                .doOnNext(v -> System.out.println("Different usage of " + v))
                //again, the only constraint is that we prepare that particular sequence's Context
                //careful about contextWrite(ContextView) simplified API: past this point any subscription reuses the same mutable holder
                //and thus will step on each other's toe in terms of bootstrapping
                .contextWrite(Context.of(CONTEXT_BOOTSTRAP_KEY, new AtomicReference<>(7)));

        StepVerifier.create(using2)
                    .expectNext(8, 9, 10)
                    .verifyComplete();
    }
simonbasle commented 3 years ago

@bsideup kind of simplified the above example, without the defensive aspect of checking the AtomicReference is there and contains a seed, to:

    Flux<Integer> bootstrap = Flux.deferContextual(ctx -> {
            final AtomicInteger errorCountdown = new AtomicInteger(4);

            AtomicReference<Integer> seedHolder = ctx.get("seed");
            int i = seedHolder.get();

            Flux<Integer> source = Flux.range(i + 1, 10 - i);

            return source
                    .doOnNext(id -> {
                        //this is boilerplate around simulating transient errors
                        if (errorCountdown.decrementAndGet() < 1) {
                            System.err.println("Error triggered in " + id);
                            throw new IllegalStateException("Error triggered in " + id);
                        }
                        //this is actual logic: update the seed
                        seedHolder.set(id);
                    })
                    .hide();
        });

        Flux<String> using = bootstrap
                .retry(4)
                .contextWrite(ctx -> ctx.put("seed", new AtomicReference<>(0)))
                .map(i -> "value" + i);