sonyxperiadev / lumber-mill

Where logs are cut into lumber
Apache License 2.0
12 stars 6 forks source link

A way of "hooking" into the onCompleted() and/or onError() in the subscriber #27

Open jrask opened 7 years ago

jrask commented 7 years ago

Today, there is no way of hooking into the subscriber and to have i.e cleanup code (like removing tmp files etc) and it is not allowed to create a custom subscriber.

We need to figure out a way to "register" hooks that can be executed in the subscriber.

@somcsel - You can comment on this if you need to clarify it.

somcsel commented 7 years ago

That's a good description of the problem.

Our particular use-case is that we s3.download a file, process it and write the processed entities to the network in a pipeline of composed Observables. The file is large and reading it all up-front leads to high memory consumption. So instead we interleave the reading, processing and writing to the network. In this case we need to remove the local file only at the end of the pipeline, but due to the thread switching caused by observeOn and Observable.interval the s3.download Observable is completed and unsubscribed from part way down the pipeline.

jrask commented 7 years ago

@jankronquist - Do you have any superb ideas on this?

I can come up with some ideas to make it work but none feels like a good solution.

My question here sort of explains the problem. https://groups.google.com/forum/#!topic/rxjava/QG2GiTAH044

If there is an "internal" observable, all "finalized" methods are invoked when the new threads starts to execute. In steves scenario, this causes the original local file to be removed to early. Instead, we want this to happen in subscriber onError() or onCompleted().

somcsel commented 7 years ago

We've had a re-think of our design for this use-case and now split the large file downloaded from S3 into a number of smaller temp files - the large file contains a series of reports, so it has a natural partitioning already - which we process lower down in the pipeline. This means that we avoid the problem described here. (Maybe this is also a more reactive approach, since we avoid the global state of the large file?)

somcsel commented 7 years ago

Still what I wrote above doesn't make this feature less valuable to implement, I just want to say that it isn't blocking us anymore.

jrask commented 7 years ago

But is still sucks ;-)