davidmoten / rxjava-extras

Utilities for use with rxjava
Apache License 2.0
269 stars 27 forks source link

Resource Manager #18

Open abersnaze opened 7 years ago

abersnaze commented 7 years ago

I have this bit of code that I've used a couple of times and I figured it could use a good home. Its a class for currying the resource creation and clean of the rx.Observable#using function. Would this be a good fit for this project?

import rx.Observable;
import rx.functions.Func1;

public class ResourceManager<T> {
    public static interface CheckedFunc0<R> {
        public R call() throws Exception;
    }

    public static interface CheckedAction1<T> {
        public void call(T t) throws Exception;
    }

    private CheckedFunc0<T> resourceFactory;
    private CheckedAction1<? super T> disposeAction;

    public ResourceManager(final CheckedFunc0<T> resourceFactory, final CheckedAction1<? super T> disposeAction) {
        this.resourceFactory = resourceFactory;
        this.disposeAction = disposeAction;
    }

    /**
     * The resource T is available for use by the function passed in until the {@link Observable} returned is unsubscribed from.
     * 
     * @param func
     * @return
     */
    public final <R> Observable<R> checkout(final Func1<? super T, ? extends Observable<? extends R>> func) {
        return Observable.using(() -> {
            try {
                return resourceFactory.call();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        } , func, (rsrc) -> {
            try {
                disposeAction.call(rsrc);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }
}
davidmoten commented 7 years ago

Hi George Happy to add that. So this is an example usage:

Observable<byte[]> bytes = new ResourceManager(
        () -> new FileInputStream(file), 
        is -> is.close())
    .checkout(is ->  bytesFrom(is));

Just for comparison, I normally use:

Observable<byte> bytes = 
    Observable.using(
        Checked.f0(() -> new FileInputStream(file)),
        is -> bytesFrom(is), 
        Checked.a1(is -> is.close()));

I can certainly imagine the currying aspect of your offering being useful.

Now that we are chatting about it another overload of Observable.using in RxJava 1 and 2 might be useful for resources that implement Closeable ( I frequently use using with InputStreams and Sockets). Do you think that might be worth it? By the same token your ResourceManager could do with another overloaded constructor to make usage briefer.

davidmoten commented 7 years ago

@abersnaze Do you want to make a PR or would you like me to pop it in?

abersnaze commented 7 years ago

With the direct comparison it's hard to see but by separating the management from the usage it makes the user code simpler.

Observable<R> doStuff(ResourceManage<File> tmpFileGenerator) {
    return tmpFileGenerator.checkout(tmpFile -> {
        // do stuff with a temporary file that gets automatically cleaned up when done.
        return ...;
    });
}

I think this API could have a few more options to have resource pooling and fallbacks if resources are not available.

I can submit a PR too.

sirinath commented 7 years ago

I am looking for a solution to close / dispose a set of observable resources, i.e., to call the close method on each item but only when all users are over to free the resources. What I mean all users are over is it cannot be passed to any OnNext even downstream or part of Windowing or Grouping or any other downstream subject or subscription, i.e., no longer used observable or operator sequence.

davidmoten commented 7 years ago

Sounds like you need to look at

Observable.using (...).share();

On Tue, 20 Sep 2016, 19:52 Suminda Dharmasena notifications@github.com wrote:

I am looking for a solution to close / dispose a set of observable resources, i.e., to call the close method on each item but only when all users are over to free the resources. What I mean all users are over is it cannot be passed to any OnNext even downstream or part of Windowing or Grouping or any other downstream subject or subscription, i.e., no longer used observable or operator sequence.

— You are receiving this because you commented.

Reply to this email directly, view it on GitHub https://github.com/davidmoten/rxjava-extras/issues/18#issuecomment-248256018, or mute the thread https://github.com/notifications/unsubscribe-auth/AATa68_wcHRQgQiGcL4p8l09ueWoVnM_ks5qr6z0gaJpZM4J0fBn .

sirinath commented 7 years ago

I am looking for RefCounting of the contained objects. E.g. I have a set of objects which allocate native resources.

This is not exact what I am using but help you get a sense of it through my concern is not a file like in this example:

Subject<File> s = ...

s.OnNext(thefile1)
s.OnNext(thefile2)
...
s.OnNext(thefileN)

Once you are done with thefileX I want to close it.

The above use case is if you have one resource from which you are getting a stream of observables not necessarily dealing with a stream of resources.