ReactiveX / RxJava

RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
Apache License 2.0
47.91k stars 7.6k forks source link

Explore Blocking vs Non-Blocking Solutions #1299

Closed benjchristensen closed 10 years ago

benjchristensen commented 10 years ago

This is to document and explore blocking vs non-blocking solutions for a couple use cases, particularly the SerializedObserver and CompositeSubscription which are hot code paths in most use of Rx.

Thus far the synchronized blocking implementations have won and are currently being used. Despite using synchronized, the locks are not held while emitting notifications, only for mutating internal data structures.

A key consideration is that object allocations must be kept low. The atomic state machine pattern has been attempted on both of these, and is elegant, but was a massive performance problem with CompositeSubscription due to some valid edge use cases with merge that result in hundreds of subscriptions being added to the data structure via CompositeSubscription.add and each time performing a state transition with object allocation.

Following are details on the use cases, the current implementation, alternates that have been attempted, and performance results from JMH tests.

The intent of this is to document what has been attempted thus far and seek improvements from anyone who can provide better solutions.

SerializedObserver

Use Case:

Serialize onNext/onError/onCompleted calls from multiple threads to be sequential, but without synchronizing and blocking threads.

Current: https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/observers/SerializedObserver.java

Discussions:

Alternate implementations:

../gradlew benchmarks '-Pjmh=-f 1 -tu s -bm thrpt -wi 5 -i 5 -r 2 -prof GC .*OperatorSerializePerf.*'
Current Code (locks)
Benchmark                                                         (size)   Mode   Samples         Mean   Mean error    Units
r.o.OperatorSerializePerf.noSerializationSingleThreaded                1  thrpt         5  9147601.034   161223.124    ops/s
r.o.OperatorSerializePerf.noSerializationSingleThreaded             1000  thrpt         5    63030.725      932.700    ops/s
r.o.OperatorSerializePerf.serializedSingleStream                       1  thrpt         5  6789538.763   309465.333    ops/s
r.o.OperatorSerializePerf.serializedSingleStream                    1000  thrpt         5    41906.794      612.458    ops/s
r.o.OperatorSerializePerf.serializedTwoStreamsHighlyContended          1  thrpt         5   117164.007     1082.505    ops/s
r.o.OperatorSerializePerf.serializedTwoStreamsHighlyContended       1000  thrpt         5     5720.975       79.089    ops/s
r.o.OperatorSerializePerf.serializedTwoStreamsOneFastOneSlow           1  thrpt         5    79689.194    10539.604    ops/s
r.o.OperatorSerializePerf.serializedTwoStreamsOneFastOneSlow        1000  thrpt         5     9885.341      162.358    ops/s
r.o.OperatorSerializePerf.serializedTwoStreamsSlightlyContended        1  thrpt         5    66193.041     2044.815    ops/s
r.o.OperatorSerializePerf.serializedTwoStreamsSlightlyContended     1000  thrpt         5        1.000        0.001    ops/s
Benchmark                                                         (size)   Mode   Samples         Mean   Mean error    Units
r.o.OperatorSerializePerf.noSerializationSingleThreaded                1  thrpt         5  8956346.449   247041.560    ops/s
r.o.OperatorSerializePerf.noSerializationSingleThreaded             1000  thrpt         5    60983.831     2705.618    ops/s
r.o.OperatorSerializePerf.serializedSingleStream                       1  thrpt         5  6483372.239   478251.023    ops/s
r.o.OperatorSerializePerf.serializedSingleStream                    1000  thrpt         5    40401.985     1894.673    ops/s
r.o.OperatorSerializePerf.serializedTwoStreamsHighlyContended          1  thrpt         5   113009.060     8700.709    ops/s
r.o.OperatorSerializePerf.serializedTwoStreamsHighlyContended       1000  thrpt         5     5657.256      113.035    ops/s
r.o.OperatorSerializePerf.serializedTwoStreamsOneFastOneSlow           1  thrpt         5    80699.653     1103.641    ops/s
r.o.OperatorSerializePerf.serializedTwoStreamsOneFastOneSlow        1000  thrpt         5     9784.149      129.939    ops/s
r.o.OperatorSerializePerf.serializedTwoStreamsSlightlyContended        1  thrpt         5    62955.830     5319.780    ops/s
r.o.OperatorSerializePerf.serializedTwoStreamsSlightlyContended     1000  thrpt         5        1.001        0.001    ops/s
Benchmark                                                         (size)   Mode   Samples         Mean   Mean error    Units
r.o.OperatorSerializePerf.noSerializationSingleThreaded                1  thrpt         5  8972393.281    92105.068    ops/s
r.o.OperatorSerializePerf.noSerializationSingleThreaded             1000  thrpt         5    61767.456     4635.081    ops/s
r.o.OperatorSerializePerf.serializedSingleStream                       1  thrpt         5  6762871.582   105388.190    ops/s
r.o.OperatorSerializePerf.serializedSingleStream                    1000  thrpt         5    41700.990      573.994    ops/s
r.o.OperatorSerializePerf.serializedTwoStreamsHighlyContended          1  thrpt         5   119134.905     1695.909    ops/s
r.o.OperatorSerializePerf.serializedTwoStreamsHighlyContended       1000  thrpt         5     5578.557      118.692    ops/s
r.o.OperatorSerializePerf.serializedTwoStreamsOneFastOneSlow           1  thrpt         5    79847.519     1111.056    ops/s
r.o.OperatorSerializePerf.serializedTwoStreamsOneFastOneSlow        1000  thrpt         5     9633.304       67.996    ops/s
r.o.OperatorSerializePerf.serializedTwoStreamsSlightlyContended        1  thrpt         5    65096.602     8175.233    ops/s
r.o.OperatorSerializePerf.serializedTwoStreamsSlightlyContended     1000  thrpt         5        1.001        0.001    ops/s
MPSC Queue
Benchmark                                                         (size)   Mode   Samples         Mean   Mean error    Units
r.o.OperatorSerializePerf.noSerializationSingleThreaded                1  thrpt         5  8565745.644   136623.557    ops/s
r.o.OperatorSerializePerf.noSerializationSingleThreaded             1000  thrpt         5    62273.804      932.786    ops/s
r.o.OperatorSerializePerf.serializedSingleStream                       1  thrpt         5  5532175.536   268394.630    ops/s
r.o.OperatorSerializePerf.serializedSingleStream                    1000  thrpt         5    32014.042      389.324    ops/s
r.o.OperatorSerializePerf.serializedTwoStreamsHighlyContended          1  thrpt         5   116615.249     1125.473    ops/s
r.o.OperatorSerializePerf.serializedTwoStreamsHighlyContended       1000  thrpt         5     5214.520       76.467    ops/s
r.o.OperatorSerializePerf.serializedTwoStreamsOneFastOneSlow           1  thrpt         5    81013.059      503.497    ops/s
r.o.OperatorSerializePerf.serializedTwoStreamsOneFastOneSlow        1000  thrpt         5    11146.029      190.870    ops/s
r.o.OperatorSerializePerf.serializedTwoStreamsSlightlyContended        1  thrpt         5    65785.641     3118.842    ops/s
r.o.OperatorSerializePerf.serializedTwoStreamsSlightlyContended     1000  thrpt         5        1.001        0.000    ops/s

It completely hung once at this point:

# Benchmark: rx.operators.OperatorSerializePerf.serializedTwoStreamsHighlyContended
# Parameters: (size = 1000)
# Warmup Iteration   1: 3894.833 ops/s
# Warmup Iteration   2: 4742.937 ops/s
# Warmup Iteration   3: 5213.280 ops/s
# Warmup Iteration   4: 5204.942 ops/s
# Warmup Iteration   5: 
> Building > :rxjava-core:benchmarks
Benchmark                                                         (size)   Mode   Samples         Mean   Mean error    Units
r.o.OperatorSerializePerf.noSerializationSingleThreaded                1  thrpt         5  9035236.321    49071.888    ops/s
r.o.OperatorSerializePerf.noSerializationSingleThreaded             1000  thrpt         5    62164.804     1621.105    ops/s
r.o.OperatorSerializePerf.serializedSingleStream                       1  thrpt         5  5510291.830   130335.497    ops/s
r.o.OperatorSerializePerf.serializedSingleStream                    1000  thrpt         5    31604.221      412.042    ops/s
r.o.OperatorSerializePerf.serializedTwoStreamsHighlyContended          1  thrpt         5   119747.703     6565.816    ops/s
r.o.OperatorSerializePerf.serializedTwoStreamsHighlyContended       1000  thrpt         5     5284.436      141.163    ops/s
r.o.OperatorSerializePerf.serializedTwoStreamsOneFastOneSlow           1  thrpt         5    78662.632     6427.553    ops/s
r.o.OperatorSerializePerf.serializedTwoStreamsOneFastOneSlow        1000  thrpt         5    10301.191       76.694    ops/s
r.o.OperatorSerializePerf.serializedTwoStreamsSlightlyContended        1  thrpt         5    65663.355      650.831    ops/s
r.o.OperatorSerializePerf.serializedTwoStreamsSlightlyContended     1000  thrpt         5        1.000        0.001    ops/s
queue and counter
Benchmark                                                         (size)   Mode   Samples         Mean   Mean error    Units
r.o.OperatorSerializePerf.noSerializationSingleThreaded                1  thrpt         5  9014619.003   266968.607    ops/s
r.o.OperatorSerializePerf.noSerializationSingleThreaded             1000  thrpt         5    62025.534     1849.124    ops/s
r.o.OperatorSerializePerf.serializedSingleStream                       1  thrpt         5  5181915.426    74251.781    ops/s
r.o.OperatorSerializePerf.serializedSingleStream                    1000  thrpt         5    26907.478      838.085    ops/s
r.o.OperatorSerializePerf.serializedTwoStreamsHighlyContended          1  thrpt         5   115841.328     7085.982    ops/s
r.o.OperatorSerializePerf.serializedTwoStreamsHighlyContended       1000  thrpt         5     3519.087       44.103    ops/s
r.o.OperatorSerializePerf.serializedTwoStreamsOneFastOneSlow           1  thrpt         5    78310.523     2349.554    ops/s
r.o.OperatorSerializePerf.serializedTwoStreamsOneFastOneSlow        1000  thrpt         5    15448.549      208.165    ops/s
r.o.OperatorSerializePerf.serializedTwoStreamsSlightlyContended        1  thrpt         5    65511.070     2865.242    ops/s
r.o.OperatorSerializePerf.serializedTwoStreamsSlightlyContended     1000  thrpt         5        1.001        0.002    ops/s
Benchmark                                                         (size)   Mode   Samples         Mean   Mean error    Units
r.o.OperatorSerializePerf.noSerializationSingleThreaded                1  thrpt         5  9038960.672   225036.084    ops/s
r.o.OperatorSerializePerf.noSerializationSingleThreaded             1000  thrpt         5    61285.414     1517.393    ops/s
r.o.OperatorSerializePerf.serializedSingleStream                       1  thrpt         5  5282333.173    33819.691    ops/s
r.o.OperatorSerializePerf.serializedSingleStream                    1000  thrpt         5    26874.371      676.982    ops/s
r.o.OperatorSerializePerf.serializedTwoStreamsHighlyContended          1  thrpt         5   116821.651      355.230    ops/s
r.o.OperatorSerializePerf.serializedTwoStreamsHighlyContended       1000  thrpt         5     3014.738       32.780    ops/s
r.o.OperatorSerializePerf.serializedTwoStreamsOneFastOneSlow           1  thrpt         5    77487.923     7870.801    ops/s
r.o.OperatorSerializePerf.serializedTwoStreamsOneFastOneSlow        1000  thrpt         5    15935.005      183.188    ops/s
r.o.OperatorSerializePerf.serializedTwoStreamsSlightlyContended        1  thrpt         5    65613.851     1626.122    ops/s
r.o.OperatorSerializePerf.serializedTwoStreamsSlightlyContended     1000  thrpt         5        1.001        0.001    ops/s
state machine
Benchmark                                                         (size)   Mode   Samples         Mean   Mean error    Units
r.o.OperatorSerializePerf.noSerializationSingleThreaded                1  thrpt         5  9024112.277   252796.553    ops/s
r.o.OperatorSerializePerf.noSerializationSingleThreaded             1000  thrpt         5    62634.243      692.609    ops/s
r.o.OperatorSerializePerf.serializedSingleStream                       1  thrpt         5  5072529.757   559158.810    ops/s
r.o.OperatorSerializePerf.serializedSingleStream                    1000  thrpt         5    28205.652     1331.276    ops/s
r.o.OperatorSerializePerf.serializedTwoStreamsHighlyContended          1  thrpt         5   117760.487     1335.254    ops/s
r.o.OperatorSerializePerf.serializedTwoStreamsHighlyContended       1000  thrpt         5     2940.177      142.803    ops/s
r.o.OperatorSerializePerf.serializedTwoStreamsOneFastOneSlow           1  thrpt         5    77375.785     1846.881    ops/s
r.o.OperatorSerializePerf.serializedTwoStreamsOneFastOneSlow        1000  thrpt         5    14915.986       52.520    ops/s
r.o.OperatorSerializePerf.serializedTwoStreamsSlightlyContended        1  thrpt         5    65454.647     1180.502    ops/s
r.o.OperatorSerializePerf.serializedTwoStreamsSlightlyContended     1000  thrpt         5        1.001        0.001    ops/s
Benchmark                                                         (size)   Mode   Samples         Mean   Mean error    Units
r.o.OperatorSerializePerf.noSerializationSingleThreaded                1  thrpt         5  9181625.881   168356.673    ops/s
r.o.OperatorSerializePerf.noSerializationSingleThreaded             1000  thrpt         5    61764.749     2122.126    ops/s
r.o.OperatorSerializePerf.serializedSingleStream                       1  thrpt         5  5198346.930    26264.589    ops/s
r.o.OperatorSerializePerf.serializedSingleStream                    1000  thrpt         5    27750.351      387.544    ops/s
r.o.OperatorSerializePerf.serializedTwoStreamsHighlyContended          1  thrpt         5   116131.546      385.778    ops/s
r.o.OperatorSerializePerf.serializedTwoStreamsHighlyContended       1000  thrpt         5     2889.275       19.016    ops/s
r.o.OperatorSerializePerf.serializedTwoStreamsOneFastOneSlow           1  thrpt         5    80562.609      483.455    ops/s
r.o.OperatorSerializePerf.serializedTwoStreamsOneFastOneSlow        1000  thrpt         5    14967.866       38.010    ops/s
r.o.OperatorSerializePerf.serializedTwoStreamsSlightlyContended        1  thrpt         5    65010.172     4866.039    ops/s
r.o.OperatorSerializePerf.serializedTwoStreamsSlightlyContended     1000  thrpt         5        1.000        0.001    ops/s

CompositeSubscription/SubscriptionList

Use Case:

A Subscription implementation that allows the following:

This means that the list of subscriptions is multiple writes, a single read at unsubscribe at the end. The boolean isUnsubscribed is ready many times, modified once.

Current CompositeSubscription: https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java Current ChainedSubscription: https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/subscriptions/ChainedSubscription.java |-> will be renamed

Discussions:

Alternate implementations:

../gradlew benchmarks '-Pjmh=-f 1 -tu s -bm thrpt -wi 5 -i 5 -r 2 -prof GC .*PerfTransforms.*'
Current Code (locks)
Benchmark                                       (size)   Mode   Samples         Mean   Mean error    Units
r.u.PerfTransforms.flatMapNestedMapFilterTake        1  thrpt         5  2377403.232   282360.735    ops/s
r.u.PerfTransforms.flatMapNestedMapFilterTake     1024  thrpt         5       18.114        1.069    ops/s
r.u.PerfTransforms.flatMapTransforms                 1  thrpt         5  3454607.756    66000.511    ops/s
r.u.PerfTransforms.flatMapTransforms              1024  thrpt         5     6886.044      296.528    ops/s
r.u.PerfTransforms.mapTransformation                 1  thrpt         5  9550369.422   531491.534    ops/s
r.u.PerfTransforms.mapTransformation              1024  thrpt         5    22381.038     1982.025    ops/s
Benchmark                                       (size)   Mode   Samples         Mean   Mean error    Units
r.u.PerfTransforms.flatMapNestedMapFilterTake        1  thrpt         5  2477980.676    38870.454    ops/s
r.u.PerfTransforms.flatMapNestedMapFilterTake     1024  thrpt         5       18.144        0.399    ops/s
r.u.PerfTransforms.flatMapTransforms                 1  thrpt         5  3409322.430   279971.071    ops/s
r.u.PerfTransforms.flatMapTransforms              1024  thrpt         5     7202.920       62.409    ops/s
r.u.PerfTransforms.mapTransformation                 1  thrpt         5  9812382.788   127551.622    ops/s
r.u.PerfTransforms.mapTransformation              1024  thrpt         5    20796.145      423.822    ops/s
State Machine
Benchmark                                       (size)   Mode   Samples         Mean   Mean error    Units
r.u.PerfTransforms.flatMapNestedMapFilterTake        1  thrpt         5  2650383.791    59582.156    ops/s
r.u.PerfTransforms.flatMapNestedMapFilterTake     1024  thrpt         5       18.488        1.374    ops/s
r.u.PerfTransforms.flatMapTransforms                 1  thrpt         5  3982900.628   180306.660    ops/s
r.u.PerfTransforms.flatMapTransforms              1024  thrpt         5     9971.926     1109.674    ops/s
r.u.PerfTransforms.mapTransformation                 1  thrpt         5  8822681.747  1122775.549    ops/s
r.u.PerfTransforms.mapTransformation              1024  thrpt         5    20515.869     1050.781    ops/s
Benchmark                                       (size)   Mode   Samples         Mean   Mean error    Units
r.u.PerfTransforms.flatMapNestedMapFilterTake        1  thrpt         5  2653860.015    63234.344    ops/s
r.u.PerfTransforms.flatMapNestedMapFilterTake     1024  thrpt         5       18.757        0.086    ops/s
r.u.PerfTransforms.flatMapTransforms                 1  thrpt         5  4202696.749    83003.981    ops/s
r.u.PerfTransforms.flatMapTransforms              1024  thrpt         5    10614.418      653.295    ops/s
r.u.PerfTransforms.mapTransformation                 1  thrpt         5  9085576.828   129279.777    ops/s
r.u.PerfTransforms.mapTransformation              1024  thrpt         5    21140.253      255.681    ops/s
MPSC Queue + AtomicInteger
Benchmark                                       (size)   Mode   Samples         Mean   Mean error    Units
r.u.PerfTransforms.flatMapNestedMapFilterTake        1  thrpt         5  2019274.611   145557.313    ops/s
r.u.PerfTransforms.flatMapNestedMapFilterTake     1024  thrpt         5       18.804        0.321    ops/s
r.u.PerfTransforms.flatMapTransforms                 1  thrpt         5  2940738.978   195628.878    ops/s
r.u.PerfTransforms.flatMapTransforms              1024  thrpt         5     6579.659      354.893    ops/s
r.u.PerfTransforms.mapTransformation                 1  thrpt         5  8251868.663   329307.181    ops/s
r.u.PerfTransforms.mapTransformation              1024  thrpt         5    19585.648     1757.210    ops/s
Benchmark                                       (size)   Mode   Samples         Mean   Mean error    Units
r.u.PerfTransforms.flatMapNestedMapFilterTake        1  thrpt         5  2076835.599    50885.475    ops/s
r.u.PerfTransforms.flatMapNestedMapFilterTake     1024  thrpt         5       18.664        0.224    ops/s
r.u.PerfTransforms.flatMapTransforms                 1  thrpt         5  2966236.565    68297.522    ops/s
r.u.PerfTransforms.flatMapTransforms              1024  thrpt         5     6062.189      407.268    ops/s
r.u.PerfTransforms.mapTransformation                 1  thrpt         5  8289577.407   535983.353    ops/s
r.u.PerfTransforms.mapTransformation              1024  thrpt         5    19542.094     1389.693    ops/s
benjchristensen commented 10 years ago

This issue is related to #1204 where CompositeSubscription was found to be a major problem for object allocation and the implementation was changed.

benjchristensen commented 10 years ago

If anyone wants to offer alternative implementations please ensure that 2 requirements are met (obviously beyond passing all functional unit tests):

1) It performs better in the JMH performance tests than what's currently in use (see above for how to execute them) 2) It does not cause excessive object allocation. (This is not as easy to identity in benchmarking. The best tool I know of right now is Java Flight Recorder. It can be fast in the JMH test but fail the object allocation requirement, like the CompositeSubscription state machine implementation.)

benjchristensen commented 10 years ago

Here is a resource for us to explore: https://github.com/JCTools/JCTools/tree/master/jctools-core/src/main/java/org/jctools

akarnokd commented 10 years ago

There is my hybrid composite in #1145. As for the hang, my guess is the lazySet that may overwrite a terminal state indicator, but it seems its performance isn't good enough anyway.

benjchristensen commented 10 years ago

A lot has been done and we now have rx.internal.util and rx.internal.until.unsafe for various optimized data structures.

Closing this out as this was a very generic issue and has served its purpose.