composewell / streamly

High performance, concurrent functional programming abstractions
https://streamly.composewell.com
Other
857 stars 64 forks source link

Add mergeWith and mergeAsyncWith combinators #100

Open harendra-kumar opened 6 years ago

harendra-kumar commented 6 years ago

The way zipWith and zipAsyncWith zip two streams using a zip function, the same way mergeWith and mergeAsyncWith would merge two streams using a merge function. The merge operation just orders the elements from two streams i.e. who goes first in the resultant stream.

They should look like:

mergeWith :: (IsStream t, Monad m) => (a -> a -> Ordering) -> t m a -> t m a -> t m a
mergeAsyncWith :: (IsStream t, MonadAsync m) => (a -> a -> Ordering) -> t m a -> t m a -> t m a

mergeAsyncWith is just like mergeWith except that it generates the two streams concurrently. These operations have been implemented in this example , but they may not be very efficient as they use uncons. Moreover, it makes sense to have these in the prelude as general functions just like zip.

harendra-kumar commented 5 years ago

To be consistent with the convention in Data.List we should name these mergeBy and mergeAsyncBy. The mergeWith convention is used when the function we are using projects an element e.g. in transform comprehensions in GHC.Exts.

naushadh commented 5 years ago

👍 for the 2-way merge offered by mergeWith and mergeAsyncWith.

I was wondering if there could be additional combinators for N-way merge?

mergeNBy :: (IsStream t, Monad m) => (a -> a -> Ordering) -> [t m a] -> t m a
mergeNAsyncBy :: (IsStream t, Monad m) => (a -> a -> Ordering) -> [t m a] -> t m a
harendra-kumar commented 5 years ago

How about:

mergeNBy cmp = Prelude.foldr (mergeWith cmp) nil
naushadh commented 5 years ago

Based on this and some of my own experimentation, 2-way merges are a lot slower than say a 16-way merge.

2 vs 16 way merge

```diff --- .scratch/out/x-2018-12-06T18:14:36-default.parallel:4-1M.txt 2018-12-06 18:15:01.000000000 -0500 +++ .scratch/out/x-2018-12-06T18:14:17-default.parallel:4-1M.txt 2018-12-06 18:14:28.000000000 -0500 @@ -1,56 +1,49 @@ -2.179617s -1.857034s -2.331545s -2.775779s -2.18057s -2.520403s -2.17421s -2.412253s -2.171273s - 57,385,772,832 bytes allocated in the heap - 11,928,198,640 bytes copied during GC - 53,956,944 bytes maximum residency (1418 sample(s)) - 1,695,408 bytes maximum slop - 51 MB total memory in use (0 MB lost due to fragmentation) +2.799922s +2.881001s + 20,237,266,376 bytes allocated in the heap + 4,831,391,408 bytes copied during GC + 40,677,984 bytes maximum residency (248 sample(s)) + 1,455,392 bytes maximum slop + 38 MB total memory in use (0 MB lost due to fragmentation) Tot time (elapsed) Avg pause Max pause - Gen 0 37216 colls, 37216 par 26.829s 9.809s 0.0003s 0.0185s - Gen 1 1418 colls, 1417 par 9.641s 3.583s 0.0025s 0.0282s + Gen 0 12553 colls, 12553 par 9.313s 4.236s 0.0003s 0.0131s + Gen 1 248 colls, 247 par 3.046s 1.139s 0.0046s 0.0191s - Parallel GC work balance: 27.73% (serial 0%, perfect 100%) + Parallel GC work balance: 45.81% (serial 0%, perfect 100%) - TASKS: 18 (1 bound, 17 peak workers (17 total), using -N4) + TASKS: 17 (1 bound, 16 peak workers (16 total), using -N4) SPARKS: 0(0 converted, 0 overflowed, 0 dud, 0 GC'd, 0 fizzled) - INIT time 0.001s ( 0.002s elapsed) - MUT time 16.174s ( 10.854s elapsed) - GC time 36.470s ( 13.392s elapsed) - EXIT time 0.000s ( 0.005s elapsed) - Total time 52.645s ( 24.253s elapsed) - - Alloc rate 3,548,086,345 bytes per MUT second - - Productivity 30.7% of total user, 44.8% of total elapsed - - Command being timed: "filesort --keys [0] --in /tmp/in-1M.csv --output /tmp/out-x-1M.csv --parallel=4 --batch-size=2 +RTS -s" - User time (seconds): 52.64 - System time (seconds): 8.37 - Percent of CPU this job got: 251% - Elapsed (wall clock) time (h:mm:ss or m:ss): 0:24.27 + INIT time 0.001s ( 0.003s elapsed) + MUT time 6.791s ( 4.320s elapsed) + GC time 12.359s ( 5.374s elapsed) + EXIT time 0.000s ( 0.003s elapsed) + Total time 19.151s ( 9.701s elapsed) + + Alloc rate 2,980,059,234 bytes per MUT second + + Productivity 35.5% of total user, 44.5% of total elapsed + + Command being timed: "filesort --keys [0] --in /tmp/in-1M.csv --output /tmp/out-x-1M.csv --parallel=4 --batch-size=16 +RTS -s" + User time (seconds): 19.15 + System time (seconds): 3.03 + Percent of CPU this job got: 228% + Elapsed (wall clock) time (h:mm:ss or m:ss): 0:09.71 Average shared text size (kbytes): 0 Average unshared data size (kbytes): 0 Average stack size (kbytes): 0 Average total size (kbytes): 0 - Maximum resident set size (kbytes): 147392 + Maximum resident set size (kbytes): 107436 Average resident set size (kbytes): 0 Major (requiring I/O) page faults: 0 - Minor (reclaiming a frame) page faults: 36929 - Voluntary context switches: 4 - Involuntary context switches: 2567115 + Minor (reclaiming a frame) page faults: 26926 + Voluntary context switches: 5 + Involuntary context switches: 1270834 Swaps: 0 File system inputs: 0 - File system outputs: 273 + File system outputs: 114 Socket messages sent: 0 Socket messages received: 0 Signals delivered: 0 ```

naushadh commented 5 years ago

For reference: here is my N-way merge function.

harendra-kumar commented 5 years ago

I know this is not optimal, we can merge pairs and then merge the results together. Something like this:

import Streamly
import qualified Streamly.Prelude as S

mergeN :: (IsStream t, Monad m, Ord a) => [t m a] -> t m a
mergeN xs =
    case xs of
        []  -> S.nil
        [x] -> x
        _ ->
            let len = length xs
                (s1, s2) = splitAt (len `div` 2) xs
            in mergeN $ zipWith (S.mergeBy compare) s1 s2
                        ++ if odd len then [last s2] else []

I am not sure if merging N of them directly would be significantly beneficial compared to the code above.

It makes sense to have an N-way merge routine in the library since an optimal solution is not straightforward. You can try the code above once PR #138 gets merged. I am adding a mergeAsyncBy as well. However, to be able to zip multiple pairs (merges) in parallel we may have to make some changes to zipWith.

pranaysashank commented 4 years ago

N-way merge can be achieved using foldWith and mergeBy.

Closing this issue as the original issue has been implemented now. Reopen a new issue if other operators are needed.

harendra-kumar commented 4 years ago

Using foldWith and mergeBy would be suboptimal and can be terribly slow because it will merge the first two streams and then merge the result with the next stream. For more optimal merge we have two options:

1) A bottom up pairwise merging as in the mergeN example above. This is the easiest to implement but not the most optimal. This is not true streaming as it will require more memory as the intermediate results need to be buffered. This can allow us to use depth wise vectorization of merging. 2) A true N-way merge would be the most optimal, e.g. generating an array from the head items of all the streams, take the maximum item from the array and replenish it with the next item from the corresponding stream. This will also allow us to use breadth wise vectorization to compare multiple elements in one go. This approach can be true streaming and would require little memory.