haskell-streaming / streaming-bytestring

effectful sequences of bytes; an alternative no-lazy-io implementation of Data.ByteString.Lazy
BSD 3-Clause "New" or "Revised" License
16 stars 11 forks source link

Strict left folds of `Stream (ByteStream m) m r`? #41

Open vdukhovni opened 3 years ago

vdukhovni commented 3 years ago

While working on the expanded WIP README.md, I ended up thinking about what a general-purpose strict left fold would look like for a sub-divided bytestream (i.e. Stream (ByteStream m) m r, a stream of monadic bytestreams over the same monad).

The streaming library provides an excellent lazy right-fold for this type: streamFold, and also provides multiple strict left-folds, but only for Stream (Of a) m r and the like, because fitting into the composable parallel Fold paradigm from Control.Foldl requires pure stream values (the folds themselves can be monadic via FoldM, impurely, ... but the stream elements need to be pure to match that interface).

So types like Stream (ByteStream m) m r don't appear to be supported for strict left folds, which are often useful.

I came up with the below, which perhaps belongs in Streaming rather than Streaming.ByteString (if it is generally useful to add to the ecosystem). It has the disadvantage that it does match the Fold or FoldM signatures, so there is not yet a way to get parallel fold support, though I suspect that too could be added (by extending Control.Foldl with a new mechanism for stream-of-stream folds, where the pure values are one level down, and each sub-stream returns the next sub-stream head along with the monadic accumulator).

Anyway, this is the proof-of-concept function:

nestedFoldM :: Monad m
            => (forall k. x -> f k -> m (Of x k))
            -> m x
            -> (x -> m b)
            -> Stream f m r
            -> m (Of b r)
nestedFoldM step begin done = (begin >>=) . flip loop
  where
    loop !x = \case
      Return r -> (:> r) <$> done x
      Effect m -> m >>= loop x
      Step f   -> step x f >>= \(x' :> rest) -> loop x' rest

It is able to fold Stream (ByteString m) m r as follows (rewrite of README.md example that counts lines in an input file that start with the letter i as a fold, rather than a series of stream transformations):

{-# LANGUAGE BangPatterns, RankNTypes, LambdaCase #-}
module Main where
import qualified Streaming.ByteString as Q
import qualified Streaming.ByteString.Char8 as Q8
import Control.Monad.Trans.Resource (runResourceT)
import Data.Maybe (fromMaybe, listToMaybe)
import Streaming (Of(..))
import Streaming.Internal (Stream(..))
import Data.Word (Word8)
import System.Environment (getArgs)

countStarts :: Monad m => Word8 -> Int -> Q.ByteStream m r -> m (Of Int r)
countStarts !w !acc = \ !mbs -> Q.nextByte mbs >>= \case
    Right (c, t) | c == w    -> (:>) (acc+1) <$> Q.effects t
                 | otherwise -> (:>) acc <$> Q.effects t
    Left       r             -> return $ acc :> r

-- insert nestedFoldM here --

main :: IO ()
main = do
    fname <- listToMaybe <$> getArgs
    (n :> _) <- runResourceT
        $ nestedFoldM (countStarts 0x69) (return 0) return
        $ Q8.lines
        $ fromMaybe Q.stdin (Q.readFile <$> fname)
    print n

So my questions (issues) are: Is something like nestedFoldM a sensible interface to add to either Streaming or Streaming.ByteString? Can it be improved, or is it about right? And, finally, would it make sense to pursue composable parallel folds for this type of fold?

cc: @chessai , @cartazio, @archaephyrryx, @bodigrim (feel free to remain silent, or say you don't care if this is of no interest...)

[ EDIT: It occurs to me that another approach might to expose the isomorphism f (Stream f m r) -> Stream (Of a) m r and then use some of the existing left fold machinery on that, with helper functions to combine the results of the inner folds into the outer accumulator, ... This might then make it possible to use the parallel fold machinery without changes to Control.Foldl, I'll explore this a bit further. Suggestions welcome... ]

vdukhovni commented 3 years ago

Following up on the initial notes, I think the below comes closer to the mark.

{-# LANGUAGE BangPatterns, BlockArguments, RankNTypes, LambdaCase #-}
module Main where
import qualified Data.ByteString as B
import qualified Data.ByteString.Unsafe as B
import qualified Streaming.ByteString as Q
import qualified Streaming.ByteString.Char8 as Q8
import Control.Monad.Trans.Resource (runResourceT)
import Data.Maybe (fromMaybe, listToMaybe)
import Streaming (Of(..))
import Streaming.Internal (Stream(..))
import Data.Word (Word8)
import System.Environment (getArgs)

countStarts :: Monad m => Word8 -> Q.ByteStream m r -> m (Of Int r)
countStarts w = Q.chunkFold step Nothing \case { Just 1 -> 1; _ -> 0 }
  where
    step Nothing bs | B.length bs == 0 = Nothing
                    | B.unsafeHead bs == w = Just 1
                    | otherwise = Just 0
    step x _ = x

nestedFold :: Monad m
           => (forall k. f k -> m (Of a k))
           -> (b -> a -> b)
           -> b
           -> Stream f m r
           -> m (Of b r)
nestedFold inner merge = loop
  where
    loop !b = \case
        Return r -> return $ b :> r
        Effect m -> m >>= loop b
        Step f   -> inner f >>= \(a :> rest) -> loop (merge b a) rest

main :: IO ()
main = do
    fname <- listToMaybe <$> getArgs
    (n :> _) <- runResourceT
        $ nestedFold (countStarts 0x69) (+) 0
        $ Q8.lines
        $ fromMaybe Q.stdin (Q.readFile <$> fname)
    print n