Gabriella439 / pipes

Compositional pipelines
BSD 3-Clause "New" or "Revised" License
487 stars 72 forks source link

Piping substreams with a different State Monad #210

Closed nhenin closed 4 years ago

nhenin commented 4 years ago

The idea is to do something like this :

(StateT e m)           |-----under StateT f m------|               
Event -----> case A -> pipe X -> pipe Y -> pipe Z -> Output
     |---> case B ---------(under StateT e m)----> Output 
     |---> case C ---------(under StateT e m)----> Output 

I would say nesting a sub-stream somehow... I have difficulties to implement this with Pipes

Here a simple example that explain what I'm trying to do... :

import Pipes
import Pipes.Prelude
import  Pipes.Lift

main :: IO ()
main =  runEffect $ each [A 10,B 2,C 3,A 40,A 40] >-> pipeline >-> print

pipeline :: Pipe Input Integer (StateT MainStreamState IO) ()
pipeline = for cat $ \case
  A x -> do 
    MainStreamState _ subStreamState <-  get 
    evalStateP subStreamState $ yield x >-> accumulate' -- this is not compiling ...
    -- somehow passing the substream state and update it to the mainstream somehow
  B x -> yield x
  C x -> yield x

accumulate :: Pipe Integer Integer IO ()
accumulate = evalStateP (SubStreamState 0) accumulate'

accumulate' :: Pipe Integer Integer (StateT SubStreamState IO) ()
accumulate' = go
  where
    go = do
        x <- await
        modify (\(SubStreamState r) -> SubStreamState (r+x))
        SubStreamState r <- lift get
        yield r
        go

data MainStreamState = MainStreamState String SubStreamState
data SubStreamState = SubStreamState Integer
data Input = A Integer | B Integer | C Integer
nhenin commented 4 years ago
{-# language RankNTypes, ScopedTypeVariables, QuantifiedConstraints, GADTs #-}
module Dolla.Adapter.Pipes.State (glom) where

import qualified Control.Lens as L
import qualified Control.Lens.Zoom as LZ
import Control.Monad.Morph

-- | Work smaller.
--
-- @
-- glom :: Monad m => L.Lens' s' s -> Pipe x y (StateT s m) a -> Pipe x y (StateT s' m) a -- @
glom :: (MFunctor t, L.Zoom m m' s s', forall x. Functor (q x), q ~ LZ.Zoomed m) 
     => L.ALens' s' s 
     -> t m a 
     -> t m' a
glom al = hoist $ L.zoom (L.cloneLens al)
nhenin commented 4 years ago

We are doing it using this...

Gabriella439 commented 4 years ago

@nhenin: I think you are on the right track by using zoom as in your latter example.

To simplify this a little bit, let's assume that I want to combine two Pipes with different underlying StateT types, like these two:

pipe0 :: Pipe a b (StateT s0 m) r

pipe1 :: Pipe b c (StateT s1 m) r

What I would do is to transform both of them to agree on the same composite state type so that the result has type:

result :: Pipe a c (StateT (s0, s1) m) r

Note that the composite state type does not need to be a tuple (it could be a record, for example), but I'll use a tuple for simplicity.

The way I would implement that is:

import Control.Monad.Trans.State (StateT)
import Pipes (Pipe, hoist, (>->))
import Control.Lens (zoom, _1, _2)

example
    :: Monad m
    => Pipe a b (StateT s0 m) r
    -> Pipe b c (StateT s1 m) r
    -> Pipe a c (StateT (s0, s1) m) r
example p0 p1 = hoist (zoom _1) p0 >-> hoist (zoom _2) p1