PacktPublishing / Hands-On-Reactive-Programming-in-Spring-5

Hands-On Reactive Programming in Spring 5, published by Packt
MIT License
321 stars 207 forks source link

need synchronized?? #22

Closed tongVincent closed 2 years ago

tongVincent commented 2 years ago

Hello, I am confused on some code in the book. The variate remaining,filtered don't need synchronized?? the code: org.rpis5.chapters.chapter_03.push_pull_model.TakeFilterOperator.TakeFilterInner#onNext `public void onNext(T element) { if (done) { return; }

        long r = requested;
        Subscriber<T> a = actual;
        Subscription s = current;

        if (remaining > 0) {
            boolean isValid = predicate.test(element);
            boolean isEmpty = queue.isEmpty();

            if (isValid && r > 0 && isEmpty) {
                a.onNext(element);
                remaining--;

                REQUESTED.decrementAndGet(this);
                if (remaining == 0) {
                    s.cancel();
                    onComplete();
                }
            }
            else if (isValid && (r == 0 || !isEmpty)) {
                queue.offer(element);
                remaining--;

                if (remaining == 0) {
                    s.cancel();
                    onComplete();
                }
                drain(a, r);
            }
            else if (!isValid) {
                filtered++;
            }
        }
        else {
            s.cancel();
            onComplete();
        }

        if (filtered > 0 && remaining / filtered < 2) {
            s.request(take);
            filtered = 0;
        }
    }`