ReactiveX / RxJava

RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
Apache License 2.0
47.94k stars 7.6k forks source link

Observable.create the Emitter in the subscribe callback function is null #5322

Closed Jorah1012 closed 7 years ago

Jorah1012 commented 7 years ago
private Observable<Boolean> saveObservable2(final List<Message> messagesList) {
    return Observable
            .create(new ObservableOnSubscribe<Boolean>() {
                private List<Message> reList = new ArrayList<Message>();
                @Override
                public void subscribe(ObservableEmitter<Boolean> e) throws Exception {

                                     // e is null and why?

                    }
             });

public void onNewPulse(List<Message> pulses){
    Observable<Boolean> save = saveObservable2(pulses);

    save.observeOn(Schedulers.computation())
            .subscribe(new Consumer<Boolean>() {
                @Override
                public void accept(Boolean messages) throws Exception {
                    if (null != vm) {
                        //vm.onLoad(messages, false);
                        Logger.d("xx");
                    }
                }
            });
}

private Disposable subscribeInner() {
    Logger.d("subscribeInner() service = " + connectionService);
    return connectionService.getPublisher()
            .toFlowable(BackpressureStrategy.MISSING)
            .filter(new Predicate<Message>() {
                @Override
                public boolean test(Message message) throws Exception {
                    return 1 == message.getChannelType();
                }
            })
            .buffer(1000, TimeUnit.MILLISECONDS)
            .filter(new Predicate<List<Message>>() {
                @Override
                public boolean test(List<Message> messages) throws Exception {
                    return null != messages && messages.size() > 0;
                }
            })
            .subscribe(new Consumer<List<Message>>() {
                @Override
                public void accept(List<Message> messages) throws Exception {
                    PulseModelImpl.getInstance().onNewPulse(messages);
                }
            }, new Consumer<Throwable>() {
                @Override
                public void accept(Throwable throwable) throws Exception {
                    Logger.d("net work err occurs ! message = " + throwable.getMessage() + "disposal = " + disposable);
                }
            });
}
akarnokd commented 7 years ago

That is highly unlikely. Perhaps something else is null while you call e.onNext:

Object o = null;
if (condition) {
   o = someobject;
}
e.onNext(o.toString()); // <-- NullPointerException here but because of o being null
Jorah1012 commented 7 years ago

@akarnokd but my code shows e is null

akarnokd commented 7 years ago

It is impossible e is null. How do you know it's null, like this?

System.out.println(e);  // <-- prints null?!
System.out.println(e == null);  // <-- prints false?!

e extends AtomicReference thus when printed, you get the output null because the reference has not yet been set.

Jorah1012 commented 7 years ago

@akarnokd i use debug mode, today i will reinstall the app, and using log to show the value of e

Jorah1012 commented 7 years ago

@akarnokd still null, and i use just() to create observable instead, i will read the source code when my current project complete and thanks a lot

I60R commented 7 years ago

How to reproduce

public class RxBlockingGetTest {

    private void a() { Single.create(out::println).blockingGet(); }
    private void b() { Observable.create(out::println).blockingFirst(); }
    private void c() { Flowable.create(out::println, BackpressureStrategy.MISSING).blockingFirst(); }
    private void d() { Maybe.create(out::println).blockingGet(); }
    private void e() { Completable.create(out::println).blockingGet(); }

    @Test /* RxJava 2.0.8 */ (timeout = 2000)
    public void meta() throws InterruptedException {
        Observable
                .<Runnable>fromArray(this::a, this::b, this::c, this::d, this::e)
                .flatMapCompletable(r -> Completable.fromRunnable(r).subscribeOn(Schedulers.newThread()))
                .test()
                .await();
    }
}

ouptut:

null
null
null
null
9223372036854775807

org.junit.runners.model.TestTimedOutException: test timed out after 2000 milliseconds
akarnokd commented 7 years ago

The emitter parameter is not null. 4 of the 5 emitters extend AtomicReference which is empty and when printed shows null. To verify this, compare the reference of the emitter with null and see the reference is not null:

public class EIsNotNull {

    public static void main(String[] args) {

        Single.create(e -> System.out.println(e == null)).subscribe();

        Maybe.create(e -> System.out.println(e == null)).subscribe();

        Completable.create(e -> System.out.println(e == null)).subscribe();

        Flowable.create(e -> System.out.println(e == null), BackpressureStrategy.MISSING)
              .subscribe();

        Observable.create(e -> System.out.println(e == null)).subscribe();

    }
}

It prints false for all.

I60R commented 7 years ago

Sorry, I came from another issue (not related to RxJava, but null and last comment here formed an impression that it is). @mattjma I have another example for you

    Observable.create(new ObservableOnSubscribe<Object>() {

            @Override
            public void subscribe(@NonNull ObservableEmitter<Object> e) throws Exception {

                e.onNext(e instanceof AtomicReference);
                e.onNext(e.getClass());

                e.onNext(e);
                e.onNext(e.toString());
                e.onNext(new AtomicReference());

                e.onComplete();
            }
        }).subscribe(new Consumer<Object>() {

            @Override
            public void accept(@NonNull Object o) throws Exception {
                System.out.println(o);
            }
        });

output:

true
class io.reactivex.internal.operators.observable.ObservableCreate$CreateEmitter
null
null
null
akarnokd commented 7 years ago

I'm closing this issue due to inactivity. If you have further input on the issue, don't hesitate to reopen this issue or post a new one.