input-output-hk / typed-protocols

Session types framework with support of protocol pipelining.
15 stars 4 forks source link

Experiment with a bearer which allows to avoid extra threads in a Driver #7

Open coot opened 2 years ago

coot commented 2 years ago

We need a new Channel type:

data Channel m = Channel {

    send :: LBS.ByteString -> m (),

    recv :: STM m (Maybe LBS.ByteString)
  }

The Driver type can stay as is. However codec type is not general enough:

data Codec ps failure m bytes = Codec {
       encode :: forall (st :: ps) (st' :: ps).
                 SingI st
              => ActiveState st
              => Message ps st st'
              -> bytes,

       decode :: forall (st :: ps).
                 ActiveState st
              => Sing st
              -> m (DecodeStep bytes failure m (SomeMessage st))
     }

We will need a codec which works in both m and STM m. cborg requires access to the ST operations, e.g. mkCodecCborStrictST but STM monad has no MonadST instance.

In coot/typed-protocols-rewrite branch we have:

runDecoderWithChannel :: MonadSTM m
                      => Channel m bytes
                      -> Maybe bytes
                      -> DecodeStep bytes failure m a
                      -> m (Either failure (a, Maybe bytes))

tryRunDecoderWithChannel :: Monad m
                         => Channel m bytes
                         -> Maybe bytes
                         -> DecodeStep bytes failure m (SomeMessage st)
                         -> m (Either failure
                                (Either (DriverState ps pr st bytes failure (Maybe bytes) m)
                                        (SomeMessage st, Maybe bytes)))

because of the above constraint we cannot change its signature to STM m, but we can guarantee that all recvs are non blocking (e.g. atomically $ Just <$> recv `orElse` pure Nothing).

The tryRunDecoderWithChannel one is used to implement the tryRecvMessage record field of Driver. And it is plausible to implement it with recv :: STM m (Maybe ByteString))

data Driver ps (pr :: PeerRole) bytes failure dstate m =
        Driver {
          ...
          tryRecvMessage :: forall (st :: ps).
                            SingI st
                         => ActiveState st
                         => ReflRelativeAgency (StateAgency st)
                                                TheyHaveAgency
                                               (Relative pr (StateAgency st))
                         -> DriverState ps pr st bytes failure dstate m
                         -> m (Either (DriverState ps pr st bytes failure dstate m)
                                      ( SomeMessage st
                                      , dstate
                                      ))
        , -- | Construct a non-blocking stm action which awaits for the
          -- message.
          --
          recvMessageSTM :: forall (st :: ps).
                            SingI st
                         => ActiveState st
                         => ReflRelativeAgency (StateAgency st)
                                                TheyHaveAgency
                                               (Relative pr (StateAgency st))
                         -> DriverState ps pr st bytes failure dstate m
                         -> m (STM m (SomeMessage st, dstate))

        , startDState    :: dstate
        }

The question is how we can implement recvMessageSTM. For that it seems that being able to run a decoder in the STM monad (without forking a thread) is indispensable.

GHC exposes unsafeIOToSTM which could be used to lift ST to STM (via IO), but this is rather dodgy way, so a different solution is needed. On the other hand, a rudimentary inspection of cborg library shows that ST is deeply grained, e.g.