koush / ion

Android Asynchronous Networking and Image Loading
Other
6.29k stars 1.03k forks source link

RxJava support for responses #545

Open jpiche opened 9 years ago

jpiche commented 9 years ago

Being able to return an Observable—from RxJava—for a response instead of dealing with callbacks could save a lot of boilerplate code. Instead of the existing setCallback method, like the following,

Ion.with(context)
  .load("POST", url)
  .setJsonObjectBody(json)
  .asJsonObject()
  .withResponse()
  .setCallback(callback);

I think something like observable() might work well. Like so:


Ion.with(context)
  .load("POST", url)
  .setJsonObjectBody(json)
  .asJsonObject()
  .withResponse()
  .observable()
  .subscribe(
    response -> { /* code */ },
    error -> { /* code */ }
  );
nschwermann commented 9 years ago

Duplicate request with better API suggestion. https://github.com/koush/ion/issues/395

writtmeyer commented 9 years ago

Yes, that indeed would be a nice addition. Nut sure, though, if this wouldn't actually belong to the AndroidAsync project (https://github.com/koush/AndroidAsync), since the Future class is in there.

koush commented 9 years ago

You could write your own Observable that implements FutureCallback, no?

Ion.with(...) .... .then(new ObservableThingThatImplementsFutureCallback()) .subscribe(...)

.then is the same as .setCallback, but it returns the generic "T" value that you passed in, for handy chaining.

nschwermann commented 9 years ago

Not quite following you, do you have a full example. Would the onSubscribe of the ObservableThingThatImplementsFutureCallback actually kick off the ion request or would it be a no-op?

koush commented 9 years ago

Oh, it would be a noop.

jpiche commented 9 years ago

Thanks for the idea @koush. I'll try doing something like that.

jpiche commented 9 years ago

I was probably a bit hasty in opening this issue. Since Ion returns a Future, Observable.from(future) works great.

nschwermann commented 9 years ago

@jpiche are you able to use that method and still remain async? I'm having trouble figuring out how to get the result from the future without blocking using the way you suggest. I'm sure its just some opperator in rx I'm unaware of or don't grok yet...

jpiche commented 9 years ago

@schwiz, try applying a scheduler to the observable. What I'm doing is something like

Observable.from(ionFuture)
  .map(response -> { /* some stuff */ })
  .subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(...);
koush commented 9 years ago

I'd not recommend using Observable.from(future) since that will block a thread somewhere calling future.get.

If it's possible to implement Observable that waits for a callback, that would be ideal. I'm thinking about ways to implement this, as it's been a common request.

My main concern is implementing it, without requiring rxjava as a new dependency to all ion users.

nschwermann commented 9 years ago

So far the cleanest thing for me is to have the ion callback inside the onSubscibe of an Observable and then call subscriber.onNext when you get the callback.

Edit: finally happy with what I'm doing so I'll share. Would be a lot more pretty with lambdas but it flows well and its pretty easy to add in actions and map transformations etc this way. ARG no idea why markup isn't working looks good in preview..

    private Func0<Pair<Builders.Any.B, NetworkEvent>> getDefaultBuilder(final NetworkEvent networkEvent){
        return new Func0<Pair<Builders.Any.B, NetworkEvent>>() {
            @Override
            public Pair<Builders.Any.B, NetworkEvent> call() {
                Log.d(TAG, "getDefaultBuilder");
                final Builders.Any.B b = mIon.build(mContext).load(networkEvent.getWebUri().toString());
                addHeaders(b);
                b.setTimeout(TIMEOUT);
                b.group(RequestManager.this);
                return Pair.create(b, networkEvent);
            }
        };
    }

    private <P> Func0<Pair<Builders.Any.B, NetworkEvent>> getPostBuilder(final NetworkEvent networkEvent, final P post){
        return new Func0<Pair<Builders.Any.B, NetworkEvent>>() {
            @Override
            public Pair<Builders.Any.B, NetworkEvent> call() {
                final Pair<Builders.Any.B, NetworkEvent> pair = getDefaultBuilder(networkEvent).call();
                pair.first.setJsonPojoBody(post);
                return pair;
            }
        };
    }

    private <T> Func1<Pair<Builders.Any.B, NetworkEvent>, Observable<Pair<T, NetworkEvent>>> defaultRequestObservable(final TypeToken<T> returnType){
        return new Func1<Pair<Builders.Any.B, NetworkEvent>, Observable<Pair<T, NetworkEvent>>>() {
            @Override
            public Observable<Pair<T, NetworkEvent>> call(Pair<Builders.Any.B, NetworkEvent> pair) {
                final Builders.Any.B b = pair.first;
                final NetworkEvent networkEvent = pair.second;
                return Observable.create(new Observable.OnSubscribe<Pair<T, NetworkEvent>>() {
                    @Override
                    public void call(final Subscriber<? super Pair<T, NetworkEvent>> subscriber) {
                        Log.d(TAG, "onSubscribe");
                        final BaseCallback<T> callback = new BaseCallback<T>() {
                            @Override
                            public void onComplete(T result) {
                                if(subscriber.isUnsubscribed()) return;
                                subscriber.onNext(Pair.create(result, networkEvent));
                            }

                            @Override
                            public void onException(Exception e) {
                                subscriber.onError(e);
                            }
                        };
                        networkEvent.setLoading(true);
                        mNetworkBus.post(networkEvent);
                        callback.setBusStuff(mNetworkBus, networkEvent);
                        b.as(returnType).withResponse().setCallback(callback);
                    }
                });
            }
        };
    }

    final Action1 noOp = new Action1() {
        @Override
        public void call(Object o) {

        }
    };

    private <T> Observable<Pair<T, NetworkEvent>> getRequestObservable(final NetworkEvent networkEvent, final TypeToken<T> returnType){
        return Observable.using(getDefaultBuilder(networkEvent), defaultRequestObservable(returnType), noOp);
    }
nschwermann commented 9 years ago

You know you could at least keep rxjava from being a strict dependency with gradle builds pretty easily with a little refactoring you could even make gson optional the same way.

Nilzor commented 9 years ago

I'd not recommend using Observable.from(future) since that will block a thread somewhere calling future.get.

I'm not sure this is right, or I misunderstand you. Observable.from(future) does not block - you're just converting one async method to another. Observable.from(future).toBlocking() would on the other hand block.

The problem, as I've come to understand, is not with .from() but with the fact that Ion is hot while Retrofit is cold (hot vs cold). Well - it's only a problem if you care about interchangability. I'm actually inclined to change my issue #395 to address that matter instead - changin the title to something like "There should be an option to make all Ion requests execute as cold async operations".

I agree it's just silly to make Ion depend on RxAndroid when a conversion is just one line of code away. Then again.. @schwiz's suggestion isn't that bad either (but solving the hot-cold-issue is the hard part, not the return type).

nschwermann commented 9 years ago

I'm not sure of the internals but it would have to block before calling onNext with Observable.from(future)

On Fri, Jun 5, 2015 at 3:21 AM, Frode Nilsen notifications@github.com wrote:

I'd not recommend using Observable.from(future) since that will block a thread somewhere calling future.get.

I'm not sure this is right, or I misunderstand you. Observable.from(future) does not block - you're just converting one async method to another. Observable.from(future).toBlocking() would on the other hand block.

The problem, as I've come to understand, is not with .from() but with the fact that Ion is hot while Retrofit is cold ([hot vs cold])( http://www.introtorx.com/content/v1.0.10621.0/14_HotAndColdObservables.html)). Well - it's only a problem if you care about interchangability. I'm actually inclined to change my issue #395 https://github.com/koush/ion/issues/395 to address that matter instead

  • changin the title to something like "There should be an option to make all Ion requests execute as cold async operations".

I agree it's just silly to make Ion depend on RxAndroid when a conversion is just one line of code away. Then again.. @schwiz https://github.com/schwiz's suggestion isn't that bad either.

— Reply to this email directly or view it on GitHub https://github.com/koush/ion/issues/545#issuecomment-109200839.

Nathan Schwermann 785-312-0080 http://schwiz.net http://schwiz.net

kingsleyadio commented 9 years ago

But one could use:

Observable.defer(() -> 
    Observable.just(ionRequestThatReturnsFuture()))
  .map(response -> 
    { /* some stuff */ })
  .subscribeOn(Schedulers.io())
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(/*...*/);

?

nschwermann commented 9 years ago

Its best to use Observable.create and have the subscibe method you make implement the ion callback On Aug 28, 2015 6:31 PM, "ADIO Kingsley O" notifications@github.com wrote:

But one could use:

Observable.defer(() -> Observable.just(ionRequestThatReturnsFuture())) .map(response -> { /* some stuff / }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(/...*/);

?

— Reply to this email directly or view it on GitHub https://github.com/koush/ion/issues/545#issuecomment-135913029.

adiktz commented 8 years ago

Do we have a conclusion?

dave08 commented 7 years ago

This is the code I use in Kotlin, I think it avoids all these problems...?:

fun <T> ResponseFuture<T>.rxSingle(): Single<T> =
        Single.fromEmitter({ emitter ->
            setCallback { e, result ->
                if (result != null)
                    emitter.onSuccess(result)
                else
                    emitter.onError(e)
            }

            emitter.setCancellation { cancel() }
        })

Then I can just chain with the request:

        Ion.with(context)
                .load(UrlFormater.getQueryAllCategories(deviceAccountId.toString()))
                .asString()
                .rxSingle()
                .map { GsonProvider.getInstance().parse(it, Category::class.java) as List<Category> }