kunny / RxFirebase

RxJava binding APIs for Firebase.
Apache License 2.0
139 stars 25 forks source link

Why use ObservableOnSubscribe instead of extends Observable itself #12

Closed wbinarytree closed 6 years ago

wbinarytree commented 7 years ago

ObservableOnSubscribe is used in Observable.create() method and will do that cancelation check for your inside of it. So it's redundant to use ObservableOnSubscribe IMO. I propose to directly use Observable as super class and it can reduce allocations compare to Observable.create() : ObservableCreate + ObservableOnSubscribe + Emitter -> CustomObservable. But it's sure a big change for this code base. Do you interested in this approach ? Or you prefer using ObservableOnSubscribe in some particular reason?

kunny commented 7 years ago

Good point. The reason why I used ObservableOnSubscribe was - I just followed same approach as (legacy) RxBinding used.

I strongly agree with your opinion - I'll tag this issue as enhancement, please feel free to send a PR if you interested.

shakil807g commented 6 years ago

I think it is better way in rxjava 2 for creating a hot source like firebase ChildEventListener

   @NonNull
   public static Flowable<RxFirebaseChildEvent<DataSnapshot>> observeChildEvent(@NonNull 
       final Query query) {

    return Flowable.create(emitter -> {

        final ChildEventListener childEventListener = new ChildEventListener() {

            @Override 
            public void onChildAdded(DataSnapshot dataSnapshot, String previousChildName) {
                if(!emitter.isCancelled())
                emitter.onNext(
                        new RxFirebaseChildEvent<>(dataSnapshot.getKey(), dataSnapshot, previousChildName,
                                RxFirebaseChildEvent.EventType.ADDED));
            }

            @Override
            public void onChildChanged(DataSnapshot dataSnapshot, String previousChildName) {
                if(!emitter.isCancelled())
                emitter.onNext(
                        new RxFirebaseChildEvent<>(dataSnapshot.getKey(), dataSnapshot, previousChildName,
                                RxFirebaseChildEvent.EventType.CHANGED));
            }

            @Override
            public void onChildRemoved(DataSnapshot dataSnapshot) {
                if(!emitter.isCancelled())
                emitter.onNext(new RxFirebaseChildEvent<>(dataSnapshot.getKey(), dataSnapshot,
                        RxFirebaseChildEvent.EventType.REMOVED));
            }

            @Override
            public void onChildMoved(DataSnapshot dataSnapshot, String previousChildName) {
                if(!emitter.isCancelled())
                emitter.onNext(
                        new RxFirebaseChildEvent<>(dataSnapshot.getKey(), dataSnapshot, previousChildName,
                                RxFirebaseChildEvent.EventType.MOVED));
            }

            @Override
            public void onCancelled(DatabaseError error) {
                if(!emitter.isCancelled())
                emitter.onError(new RxFirebaseDataException(error));

            }
        };

        emitter.setCancellable(() -> query.removeEventListener(childEventListener));

        query.addChildEventListener(childEventListener);

    }, BackpressureStrategy.BUFFER);
}
kunny commented 6 years ago

Released on 11.8.0.0.