davidmoten / rxjava-extras

Utilities for use with rxjava
Apache License 2.0
269 stars 27 forks source link

onBackpressureBufferToFile review please #9

Open davidmoten opened 8 years ago

davidmoten commented 8 years ago

The ability to buffer streams to disk has been something that I've wondered about for a while.

Can I get peoples comments/review of this new operator please?

Transformers.onBackpressureBufferToFile

I'd love to get review of this new operator in terms of

The code is in the master branch and runtime jar is on Maven Central as described in rxjava-extras README.

A quick way of contributing is to run a long running test (~30 mins) on your machine:

git clone https://github.com/davidmoten/rxjava-extras.git
cd rxjava-extras
./test-long.sh 

On non-nix platform just run this command instead of test-long.sh:

mvn clean install -Dmax.small=100000000 -Dmax.medium=300000 -Dmax.seconds=600 -Dloops=10000
thomasnield commented 8 years ago

I noticed this feature a few weeks ago and although I haven't played with it yet, it looks really fascinating.

Typically I use your RxJava-JDBC and structure my streams into controlled queried batches in some form. My company uses RDBMS for most data, but sometimes we do deal with really large text files. It's one thing to stream each line out of a text file, but if the data needs to be grouped up or require some aggregated context to perform calculations, that can quickly become problematic since you cannot query a text file. Sometimes I have to turn these text files into SQLite databases just so I can query them and not have to scan and import all the data.

I haven't look at this API much yet, but I'd be curious to see if it can scale those kinds of challenges.

davidmoten commented 8 years ago

Thanks for that @thomasnield, I'll be curious to hear if it's useful to you.

davidmoten commented 8 years ago

@thomasnield considering your use case I think you need more than serialized streams. You might profit from using serialized data structures that support fast access etc. You might want to look at MapDB. I actually went there first and tried using its file based queue in the implementation here. Worked but was slowish because had stronger guarantees that are needed in this new operator (here can leverage single producer-single consumer scenario for instance).

thomasnield commented 8 years ago

Hmm, interesting I'll check that out too then. Great work on everything though! Always thrilled to see what you come up with next...

akarnokd commented 8 years ago

I've glanced through the operator and the SPSC queues and this is what I think:

davidmoten commented 8 years ago

Thanks very much @akarnokd, great to have your expertise looking at it.

no isUnsubscribed check in the request-fulfilling loop

Thanks, corrected.

a null poll also calls isEmpty in finished()

True (if done). finished() is reused when requests == 0 further down in the loop but I could customize the finished() path for those two cases to get more efficiency.

no need to change both requested and emitted

Thanks, corrected.

see observeOn for example

Yeah, this is sort of a combination of onBackpressureBuffer and observeOn without the batching. I did look at these operators while developing this one but I also masochistically enjoyed knocking it up myself. I should return to both of them to improve the drain.

seems to go only unbounded-in with Long.MAX_VALUE. Depending on your design choices, you could go bounded and use the files as an off-memory temporary storage

Yeah, I'll ponder this a bit further. My biggest concern with not using Long.MAX_VALUE is that some operator upstream starts queueing in memory and the memory-saving benefit of this operator is lost.

what happens if both reader and writer are in the same file?

Do you mean in terms of correctness or performance? Well it works fine and I have long running tests that just use one file but there is more contention in FileBasedSPSCQueue. Interestingly enough I suspect that pausing the reader at the start enough time to rollover could provide reduced CPU usage for high throughput long input stream scenarios because of less contention at synchronized blocks (and adaptive locking can kick in). The contention control in FileBasedSPSCQueue favours write over read (so it's hard for read to catch up to write unless write slows down a bit).

have you looked at Aeron's architecture?

I've been following the project a little bit but I haven't had a look at the internals for a while. I better go back there!

they are using memory-mapped files the SPSC version doesn't need locking release happens by setting the size field (but requires Unsafe to issue an ordered write) acquire happens by reading the size field and not seing 0 (Unsafe volatile read required)

Fantastic, I'll have a look!

davidmoten commented 8 years ago

Aeron architecture description video:

http://www.infoq.com/presentations/aeron, 17:00 to 30:00, clearly explained

Looks good. I'll give it a try later.