Gabriella439 / pipes

Compositional pipelines
BSD 3-Clause "New" or "Revised" License
487 stars 72 forks source link

Non request-response protocols, yield-to-upstream/await-from-downstream #162

Open infinity0 opened 8 years ago

infinity0 commented 8 years ago

Hi, thanks for writing this library! As soon as I saw Proxy, it reminded me of a framework I wrote not too long ago (though in an impure language) that let one compose protocol handlers. I'm trying to figure out if this can be achieved in Pipes, i.e. I want to write something like this:

type StreamSocket = Client ByteString ByteString IO () -- does IO with the network
unixStreamSocket :: path -> StreamSocket
inetStreamSocket :: addr -> StreamSocket

-- pretend that transparent unions work in haskell, just to make this example simpler
tlsProtocol :: Proxy ByteString (ByteString|TLSControlCommand) ByteString ByteString TLSState ()

ircProtocol :: Proxy IRCEvent IRCCommand ByteString ByteString IRCState ()
xmppProtocol :: Proxy XMPPEvent XMPPCommand ByteString ByteString XMPPState ()
ircsProtocol = ircProtocol >+> tlsProtocol
xmppsProtocol = xmppsProtocol >+> tlsProtocol

-- OTR is an end-to-end encryption layer on top of plaintext chat transport protocols
otrProtocol :: Proxy ChatEvent ChatCommand (IRCEvent|XMPPEvent) (IRCCommand|XMPPCommand) OTRState ()

type ChatUI = Server ChatEvent ChatCommand IO () -- does IO with UI elements
cliChatUI :: ChatUI
guiChatUI :: ChatUI

main = do
  myChatUI <- userSelect [cliChatUI, guiChatUI]
  myTransportProtocol <- userSelect [ircProtocol, xmppProtocol]
  addr <- getAddrFromUser
  runEffect $ myChatUI >+> otrProtocol >+> myTransportProtocol >+> (inetStreamSocket addr)

Pipes seems to be based around a request-respond pattern - e.g. yield is just respond with strictly no follow-up. However, the protocols mentioned above do not fit this pattern. That is, to implement the above structures, I need:

  1. some form of yield/await that both:
    1. does not block for a follow-up element on the opposite stream, and
    2. does not prevent the possibility of receiving elements from that stream later (by contrast, current yield achieves (1.i) by preventing (1.ii)); as well as
  2. a reverse form of this yield/await that works in the upstream direction

I imagine the code would look something like this:

ircProtocol = do
    recv <- awaitFrom
    match recv of 
    | Downstream bytes -> let event, maybeResponse = processReceived bytes in do -- I'm probably forgetting some `lift`s here
        yieldTo Upstream event
        when (isJust maybeResponse) $ yieldTo Downstream $ fromJust maybeResponse
    | Upstream ircCommand -> let response, maybeEvent = processCommand ircCommand in do -- similar to above
        yieldTo Downstream response
        when (isJust maybeEvent) $ yieldTo Upstream $ fromJust maybeEvent
    ircProtocol

I had a look at ListT, and Select initially raised my hopes, but I suspect there won't be an easy way to use that to implement the above structures - I need to combine two streams going in opposite directions (up, down) i.e. the consumer of one stream is the producer to the other, but Select (and other Pipes utils) only really helps you do this in matching directions.

I also looked at pipes-{extra,async,concurrency,interleave} etc and couldn't find functionality that seemed relevant to this issue.

I also tried seeing what pipes-network-tls does - but it only provides separate Producer/Consumer interfaces tightly coupled to TCP. It doesn't make full use of the whole composable Proxy structure like I suggest in this post, and it doesn't give you fine grained control like what we need for chat protocols. (It could benefit from the structures I suggest above though, e.g. if I want to change cipherspec halfway through the connection.)

Am I missing something, or is this not yet possible, or please share any other thoughts you have?

Gabriella439 commented 8 years ago

I think the best place to begin for something like this is to study my other mvc library:

http://hackage.haskell.org/package/mvc

That gives a high-level idea of how to structure pipes applications with multiple concurrent input streams. In this case your two concurrent input streams are user input and events received from IRC.

You might not necessarily use the mvc package exactly as is, but it can show you how to merge two input streams into a single input stream and then process that within a single pipeline.

infinity0 commented 8 years ago

Hello. Yes, I did consider what you are talking about already, if I understand you correctly - i.e. structure each component as a single Pipe (or Model) that combines two inner Pipes. It just seems like it would force me to produce a lot more redundant boilerplate however.

For example, if I construct something like this:

-- making up my own notation for unions just to keep the example concise
-- I guess (Either a b) would suffice in practise
otrProtocol :: Pipe (UserChatCommand|IRCEvent) (UserChatEvent|IRCCommand) M ()
ircProtocol :: Pipe (IRCCommand|BytesReceived) (IRCEvent|BytesToSend) M ()

Then, to compose these components in the nice pretty intuitive way I described in my first post, I would need to write my own awkward composition function, something like this:

(>==>) :: Pipe (a|b') (a'|b) m r
       -> Pipe (b|c') (b'|c) m r
       -> Pipe (a|c') (a'|c) m r
otrOverIrcProtocol :: Pipe (UserChatCommand|BytesReceived) (UserChatEvent|BytesToSend) M ()
otrOverIrcProtocol = otrProtocol >==> ircProtocol

It just feels like unnecessary extra layers of abstraction, especially when Pipes already has composition functions with essentially the same type signature. And also Proxy already has two extra type params that go completely unused in the above examples, but would be perfect for representing the intuitive picture that is motivating my examples.

Is it that much effort to make Pipes not be so request-response oriented? If you could describe roughly what would be necessary, I'd be happy to try to write a PR myself for that.

tonyday567 commented 8 years ago

It's a fairly well established rule-of-thumb that bi-directional communication is hard, and, to manage complexity, problems should be transformed to uni-directional communication. But that's just a rule-of-thumb, not a law. I think the answer to your question might be found in category theory.

Start with a close reading of Pipes.Core. There are five category law satisfying constructs. None of these do everything you want.

Just as an example, however, if you can find a way to combine push and pull to form a new id, and combine >~> & >+> to form a new composition, then you have a new category that lets you compose the way you want.

infinity0 commented 8 years ago

@tonyday567 thanks for the pointer! That was very useful, I'll look into it from there.

infinity0 commented 8 years ago

I'm making some nice progress on this. I can roughly understand now that this is probably not suitable for pipes-core, but any feedback would be welcome - e.g. if there are any impossibility- or "this is a really really bad idea"- theorems that I don't know about. :p I'm proceeding along these lines:

data BiProxy a' a b'  b m r where
  Await (Either a b' -> <self>) -- or 'BiBlocked <types>', see below
  Yield (Either a' b) (() -> <self>)
    -- probably the () is unnecessary due to Haskell laziness, but it helps me
    -- reason about this code; I'll get rid of it later if needed.
  Impure m <self>
  Pure r

type BiBlocked <types> = Either a b' -> BiProxy <types>
  -- pipes doesn't define the analogue of this explicitly, but it helps us be more concise

data BiContinuation <types> where
  Awaiting BiBlocked
  Yielded (() -> BiProxy) -- means we have already consumed the output, so no need to store it here
  -- contract: *never* create an instance of Yielded if the output value has not been consumed!

await = Await Pure
yield x = Yield x Pure
bicat = Awaiting Pure -- identity over BiContinuation
>::> :: BiContinuation -> BiContinuation -> BiContinuation
  -- partial composition over BiContinuation
  -- partial, because we can't implement Yielded >::> Yielded - we would have
  -- to arbitrarily pick an execution order for the two unblocked proxies
  -- pipes-core doesn't do the equivalent of this either
>:> :: BiBlocked -> BiBlocked -> BiBlocked
  -- total composition over BiBlocked, defined in terms of >::> below
  -- Pure is the identity over BiBlocked

x >>:: g :: BiProxy -> BiContinuation -> BiProxy  -- similar to >~>
f ::>> y :: BiContinuation -> BiProxy -> BiProxy  -- similar to >+>

-- note that we don't have push/pull, instead we have this symmetrical operator:

(f >:> g) input = case input of
  Left  a  -> f a >>:: Awaiting g
  Right c' -> Awaiting f ::>> g c'
-- or alternatively
-- f >:> g = Awaiting f >::> Awaiting g

feed :: Either a b' -> BiProxy -> BiProxy
  -- helps us implement >>:: and ::>>
feed input proxy = case input, proxy of
  -- in this function is where we detect deadlock:
  Left _, Yield (Right _) _ -> -- deadlock!
  Right _, Yield (Left _) _ -> -- deadlock!
  -- we could raise an exception and tell the user to instead connect their
  -- components up with a buffer in the middle and/or use pipes-concurrency.
  -- all other cases are definable, though

Hopefully I'll be able to post some runnable code soon. Still quite far off from actually proving any laws, though.