aws / aws-sdk-java-v2

The official AWS SDK for Java - Version 2
Apache License 2.0
2.2k stars 853 forks source link

Data loss when using flatMapIterable on QueryPublisher from DyanmoDB Async, with Flux from projectreactor #2690

Closed izian closed 2 years ago

izian commented 3 years ago

Version 2.17.28 When using the flatMapIterable method on QueryPublisher (the return from queryPaginator method on the Async DDB client) there is a bug in the underlying FlatteningSubscriber in AWS SDK.

When we take the publisher produced from this, and feed it into reactor, we see, by how we happened to have set up our Flux, that because of some un-related flatMap / publish; reactor makes an initial request call for X and then immediately another request call for Y.

What this seems to do is, enter your request method on the in-line implementation of Subscriber in FlatteningSubscriber twice, back to back. This means the first one makes a request "up" to the mapper, and the second one immediately then hit's the else and goes to fulfillDemand(). This then sees that the currentBatch is empty, of course because we've not even had a page back yet in this double tap. So... it also does a request "up" for another page!

now our demand is at X+Y and we sit and wait, when onNext is called by the mapper because a QueryResponse came back with a page of results, we are now destined to failure if the back pressure builds. The upstream demand is now 2 pages, which I think the code makes no account for because it wouldn't normally happen unless you hit the request method multiple times before the first page, or if an empty page happened to end at the exact amount of demand being met.

In my example X is 8 and Y is 256 making 264 demand. onNext is called for the first page, as I said, and also called again with the subsequent page, but is locked out. page 1 in my case had 79 results, and when the batch was empty and demand was high, it then requested another (3rd) page at the end of fulfillDemand then it unlocks and the queued up onNext for page 2 comes in, and enters fulfillDemand in my case page 2 had 21 results. so at the end, 100 have been sent onward and the batch is empty so it, too, requests another page, the 4th now. it then exits and unlocks for the queued up 3rd page to come in to onNext where it enters again fulfillDemand this page has 225 results. so the 164 remaining demand is satisfied, demand is left at 0 because no more requests have come in as the downstream is saturated with IO backpressure.

so we have 61 items left on this 3rd page in the currentBatch list. as soon as demand hit 0, the fulfillDemand completed and so did the onNext which called it for page 3 but a 4th page was requested by the 2nd page completing, and page 4 comes in onNext and .... destroys the currentBatch by replacing it with the 4th page, not checking if it was empty and adding to it because you presume that it would be empty to get a request, wrong in the case of more than 1 request call happening before the 1st page of results...

so, those 61 items? gone. repeatable, every time, but I can't give you a query that you can run on any arbritrary database to see this. But I just read the code and figured this would happen when I went looking for my missing 61 orders! My solution, was not to use the AWS SDK V2 flatMapIterable on the QueryPublisher and instead hand over that publisher to project reactor right away and had Flux.flatMapIterable take over the work, because it manages and not replaces it's own queue, it would seem, instead of a batch.

I could try make some code but, if you read and understand this, this relies on

So; The cause is that a double (or more) tap to request before the first page hits onNext sets you up for failure, by requesting more pages. And when a page comes in no check is made for empty currentBatch it is lost if not empty.

Resolution: simplest would be to concatenate currentBatch with the incoming iter, if it happens not to be empty as it should be. Or, disallow more requests upstream if we didn't yet get a page, record that a request is made but not satisfied yet, and add the delta demand but don't push more demand up. That seems less easy but more logical, if a request has been made up for 1 (page/iterable) then until one comes down don't request more because the code cannot handle more than one hitting onNext if the demand ever gets met.

izian commented 3 years ago

A Java class to show, in complete isolation, the failure scenario for the SDK class in question which discards data from Dynamo results

package com.mycompany.hello;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.utils.async.FlatteningSubscriber;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class FlatteningSubscriberTest {

    static List<String> ITEMS_ON_PAGE = Arrays.asList("1","2","3","4","5","6","7","8","9","10"); //10 per page
    static List<List<String>> PAGES = Arrays.asList(ITEMS_ON_PAGE, ITEMS_ON_PAGE, ITEMS_ON_PAGE, ITEMS_ON_PAGE, ITEMS_ON_PAGE); //5 pages

    public static void main(String... args) throws Exception {
        MySubscriber mySubscriber = new MySubscriber();
        FlatteningSubscriber<String> flatteningSubscriber = new FlatteningSubscriber<>(mySubscriber);

        Iterator<List<String>> pagesIterator = PAGES.iterator();

        SubPub subPub = new SubPub(pagesIterator);
        subPub.subscribe(flatteningSubscriber);
        flatteningSubscriber.onSubscribe(subPub);

        mySubscriber.doRequest(4L); //Initial request from a Flux flatMap
        mySubscriber.doRequest(10L); //double tap from a Flux publishOn or flatMap etc

        while (mySubscriber.getCollected().size() < 14L){ //wait for the demand to be met
            Thread.sleep(100L); 
        }
        //HERE we have 14 collected and 6 left on the currentBatch

        System.out.println("Seen demand met");
        Thread.sleep(500L); //wait before sending more demand (something writing results to S3)
        System.out.println("Making massive demand to finish");

        // BUT by now, another page has pushed those 6 remaining to the GC pile, a new page of 10 loaded

        mySubscriber.doRequest(1000L); //just to say we want all that's left please

        while (pagesIterator.hasNext()){
            Thread.sleep(50L);
        }

        Thread.sleep(500L);

        System.out.println("Got " + mySubscriber.getCollected().size() + " of " + PAGES.stream().mapToLong(List::size).sum());
        System.out.println("Got " + mySubscriber.getCollected());

        //We end up with 44 collected results, 6 are DISCARDED!

        subPub.getExecutor().shutdown();
    }

    private static class MySubscriber implements Subscriber<String> {
        public Subscription subscription;
        List<String> collected = new ArrayList<>();

        public List<String> getCollected() {
            return collected;
        }

        @Override
        public void onSubscribe(Subscription subscription) {
            this.subscription = subscription;
        }

        public void doRequest(long amount) {
            subscription.request(amount);
        }

        @Override
        public void onNext(String string) {
            collected.add(string);
        }

        @Override
        public void onError(Throwable throwable) {

        }

        @Override
        public void onComplete() {

        }
    }

    private static class SubPub implements Subscription, Publisher<Iterable<String>> {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        Subscriber<? super Iterable<String>> subscriber;

        Iterator<List<String>> pagesIterator;

        public ExecutorService getExecutor() {
            return executor;
        }

        public SubPub(Iterator<List<String>> pagesIterator) {
            this.pagesIterator = pagesIterator;
        }

        @Override
        public void subscribe(Subscriber<? super Iterable<String>> subscriber) {
            this.subscriber = subscriber;
        }

        @Override
        public void request(long l) {
            System.out.println("Page Request");
            executor.execute(() -> {
                try {
                    Thread.sleep(50L);
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
                if(pagesIterator.hasNext()){
                    System.out.println("Supplying Request");
                    subscriber.onNext(pagesIterator.next());
                }
            });
        }

        @Override
        public void cancel() {

        }
    }
}
millems commented 3 years ago

FlatteningSubscriber is being rewritten with https://github.com/aws/aws-sdk-java-v2/pull/2695. We'd need to test to see if the rewrite is still affected, but hopefully it is not.

millems commented 3 years ago

This specific test seems to work under the rewrite:

Page Request
Supplying Request
Page Request
Supplying Request
Seen demand met
Making massive demand to finish
Page Request
Supplying Request
Page Request
Supplying Request
Page Request
Supplying Request
Page Request
Got 50 of 50
Got [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
izian commented 3 years ago

Thanks; I did some more digging today and we also saw that on the version of the code we have; the onNext method call from this class downstream can result in the downstream subscriber calling back into request on the same thread; in effect becoming recursive since the lock is re-entrant and thus when the current batch is empty it requests a page, pops out and up the stack and requests another page; double requesting a page.

My code sample doesn’t emulate this. But if you’ve got state to see a request was made and not to request anymore until satisfied, or when getting fed data onNext; just filling more of a buffer or adding to a linked list (because I guess these classes should be able to handle more data even if they had no demand in any case shouldn’t they not?) then those changes would fix the recursive double request on the end of a page also.

I’ve been super busy to check the latest code I’m afraid. My workaround was to either use .items on the publisher or just hand over to Flux and have it flat map iterable itself with a tiny prefetch of pages.

Thanks again for a speedy response

izian commented 3 years ago

@millems I’ve seen the latest code now; and looks like both suggestions are there, directly or indirectly. The most important was collecting data that was forced upon the subscriber even if there was no request.

But I would raise a dubious point about the decrement of the demand counter onNext; I believe perhaps not with most setups but that could end up negative. If that ends up negative then that could lead you to over request new pages when downstream demand grows? Sorry if this isn’t the place for this. I’m a bit green to the scene here

EDIT: nope; I see you’ve accounted for that and you set the count to 1.

I believe that’s a normal situation if there’s a publisher which is unable to respond to back pressure. Perhaps. But glad to see it works

github-actions[bot] commented 2 years ago

⚠️COMMENT VISIBILITY WARNING⚠️

Comments on closed issues are hard for our team to see. If you need more assistance, please open a new issue that references this one. If you wish to keep having a conversation with other community members under this issue feel free to do so.