haskell-streaming / streaming

An optimized general monad transformer for streaming applications, with a simple prelude of functions
BSD 3-Clause "New" or "Revised" License
155 stars 30 forks source link

Memory explodes trying to consume chunked pure stream #115

Open qrpnxz opened 2 years ago

qrpnxz commented 2 years ago

I was trying to use mapsM and chunksOf to go from Stream (Of Word8) to Stream (Of ByteString), and found that consuming even just 200MB was allocating dozens of Gigabytes memory. Discovered that the problem was eagerness. My infinite source (Of Word8) was a series of Steps, which are strict on the Functor (why?). If I use repeats to construct the Stream, then the code behaves sanely again, albeit taking twice as long as using splitsAt by hand since, to add laziness, repeats interleaves pure Effects into the stream. (Which S.each does not do.)

Things to consider:

  1. Removing bang from Step (unless there's a good reason for it to have a bang).
  2. Adding a non-strict Step constructor
  3. Adding a lazy version of S.each (by adding Effects to it like repeats, or better using another constructor)
  4. Adding a note about this problem, and how to get around it.
chessai commented 1 year ago

Can you include a minimal reproduction?

chessai commented 1 year ago

Steps, which are strict on the Functor (why?)

Stream type:

data Stream f m r
  = Step !(f (Stream f m r))
  | Effect (m (Stream f m r))
  | Return r

Of type:

data Of a b = !a :> b

So, Of is a left-strict pair; meaning that pattern-matching on an Of a b will evaluate a to WHNF. This means that a single Step in a Stream (Of a) m r will look like:

Step (a :> rest)

When we reach a Step, that means we have reached the next step in the stream, and a here is the value contained in that step. For doing actual streaming work, where we grab the next thing in the stream and do some work with it, this typically makes a lot of sense.

Note that none of the functions you listed actually force f ~ (Of a), so you could just use (,) a, the lazy 2-tuple, if that is indeed the problem.

chessai commented 1 year ago

Here is something where I tried to replicate your issue:

❯ cat StreamingChunks.hs
{-# language ImportQualifiedPost #-}

module Main (main) where

import Data.Bifunctor (first)
import Data.ByteString (ByteString)
import Data.ByteString qualified as B
import Data.ByteString.Char8 qualified as BC8
import Data.List qualified as List
import Data.Word (Word8)
import Streaming (Stream)
import Streaming qualified as S
import Streaming.Prelude (Of)
import Streaming.Prelude qualified as S
import System.IO qualified as IO

main :: IO ()
main = do
  S.mapM_ BC8.putStr
  $ packBytes
  $ S.each (List.replicate streamSize asciiA)

packBytes :: (Monad m) => Stream (Of Word8) m r -> Stream (Of ByteString) m r
packBytes s = id
  $ S.mapsM (fmap (S.mapOf B.pack) . S.toList)
  $ S.chunksOf chunkSize
  $ s

asciiA :: Word8
asciiA = 65

streamSize :: Int
streamSize = 10_000_000

chunkSize :: Int
chunkSize = 32

The runtime stats:

❯ ghc StreamingChunks.hs -o main -O2 -rtsopts -fforce-recomp && ./main +RTS -s -RTS 1>/dev/null
[1 of 2] Compiling Main             ( StreamingChunks.hs, StreamingChunks.o )
[2 of 2] Linking main [Objects changed]
   3,667,802,560 bytes allocated in the heap
       1,936,712 bytes copied during GC
          36,024 bytes maximum residency (2 sample(s))
          25,416 bytes maximum slop
               6 MiB total memory in use (0 MB lost due to fragmentation)

                                     Tot time (elapsed)  Avg pause  Max pause
  Gen  0       876 colls,     0 par    0.003s   0.004s     0.0000s    0.0007s
  Gen  1         2 colls,     0 par    0.000s   0.000s     0.0001s    0.0001s

  INIT    time    0.000s  (  0.000s elapsed)
  MUT     time    0.353s  (  0.353s elapsed)
  GC      time    0.004s  (  0.004s elapsed)
  EXIT    time    0.000s  (  0.004s elapsed)
  Total   time    0.357s  (  0.360s elapsed)

  %GC     time       0.0%  (0.0% elapsed)

  Alloc rate    10,382,332,804 bytes per MUT second

  Productivity  98.9% of total user, 97.9% of total elapsed

~3.7 GB total allocations, but only 36K max residency. Total runtime ~0.36s.

So, my guess is that whatever you were doing must be more complicated than this.