k0001 / pipes-network

Use network sockets together with the Haskell pipes library.
http://hackage.haskell.org/package/pipes-network
Other
24 stars 8 forks source link

adding "socketServer" #7

Closed accelas closed 11 years ago

accelas commented 11 years ago

Hi, I was wondering if something like this make sense at all. I find it somewhat awkward to separate the read and send end in client-side networking. The naming scheme doesn't help either. (sending a request to remote server by calling a function called "respond" to push down bytestring to downstream socketWrite; and call "request" to get the result from upstream socketRead)

import qualified Data.Foldable as F (forM_)
socketServer
  :: Int
  -> NS.Socket
  -> () -> Server (Maybe B.ByteString) B.ByteString IO ()
socketRead nbytes sock req = loop req where
    loop req = do
      F.forM_ req $ lift . S.send sock
      mbs <- lift (S.recv sock nbytes)
      case mbs of
        Just bs -> respond bs >> loop
        Nothing -> return ()

edit: use Foldable's forM_

k0001 commented 11 years ago

Thanks for bringing this up Kai. It certainly makes sense, in fact, up to pipes-network 0.4 something like this was included in pipes-network. See socketSyncServer here:

http://hackage.haskell.org/packages/archive/pipes-network/0.4.0.2/doc/html/Control-Proxy-TCP-Sync.html#v:socketSyncServer

I ended up removing that module because whatever could be achieved using that approach, could also be achieved more efficiently using two concurrent pipelines, and perhaps an API like that one is not quite useful outside simple synchronous RPC scenarios. I share your concerns regarding the names given to these functions, I have them too. But I'm thinking that maintaining a single API is OK. Perhaps what's needed is better documentation.

Could you check that module and let me know what you think? If you have any suggestions regarding what to do with it, let me know.

accelas commented 11 years ago

Hi Renzo, thanks for the reply. There are several reasons I thought about putting the socket write operation into the upstream. However, I'm really new to haskell and pipes. And that's probably why I'm having these questions.

First, If I move the socket writing operation to upstream, then I could use the downstream spot for something else. say, saving data to a file or database. I know I could use 'tee', but this seems more convenient.

Second, consider this situation. I'm a client program with 1 socket to read and write from. The socket will send all kinds broadcasting messages. The program needs to filter all messages from socket, except the ones it explicitly requested for. To achieve this, when sending a request, we put the request in a map (or something better). When we process the message, we ignore it unless we can find. without "socketSyncServer", the sending receiving part are separated.

(socketRead 4096 s >-> receiving >-> pipe >-> sending >-> socketWrite s)

I need some way to synchronize the map that contains all the requests, possibly with IORef. With "socketSyncServer" however, the map stays at one place.

(socketSyncServer s >-> sendingAndReceiving >-> pipe >-> discard)

Lastly, When you say two concurrent pipelines, do you mean something like this?

myPipe s1 s2 = do
    session1 <- async $ runEffect $ socketRead 4096 s1 >-> pipe1 >-> socketWrite s2
    session2 <- async $ runEffect $ socketRead 4096 s2 >-> pipe2 >-> socketWrite s1
    waitEither_ session1 session2

In this case, what's the idiomatic way to write to s1 inside pipe1? My solution involves a nested runEffect like below, though I'm not sure if it's the right thing to do. If we use the "socketSyncServer", we could simply call 'request'.

pipe1 = do
...
    runEffect $ fromLazy (pack "critical error!!!!\n") >-> socketWrite s1
    -- fromLazy :: new function from pipes-bytestring
...
k0001 commented 11 years ago

The solution will usually involve two pipelines running concurrently and an MVar for synchronizing between them, and possibly using the features provided by pipes-concurrently.

Here's an example program that interacts with a remote TCP server and only receives up to 256 bytes of new data when the users inputs "" (sorry about not writing a real parser). I think it is a bad example and doesn't convey the benefits of this approach, I'll try to come up with something better:

{-# LANGUAGE OverloadedStrings #-}

import qualified Data.ByteString.Char8    as B
import           Data.Monoid              ((<>))
import           Pipes
import qualified Pipes.Prelude            as P
import qualified Pipes.Network.TCP        as Pt
import qualified Control.Concurrent.Async as Async
import qualified Control.Concurrent.MVar  as MVar

main = do
  Pt.connect "www.google.com" "80" $ \(sock1,_) -> do
      m1 <- MVar.newEmptyMVar

      async1 <- Async.async $ do
         runEffect $ do
             (Pt.socketRead 256 sock1
              >-> P.mapM (\a -> MVar.takeMVar m1 >> return a)
              >-> P.mapM_ (putStrLn . ("In :" <>) . show)) ()

      -- Here in 'p1' we can write our logic to decide when to request more data.
      let p1 () = forever $ do
            a <- request ()
            if null a
               then lift $ MVar.putMVar m1 ()
               else respond (B.pack a)

      runEffect $ (P.readLn
                   >-> p1
                   >-> P.tee (Pt.socketWrite sock1)
                   >-> P.mapM_ (putStrLn . ("Out: " <>) . show)) ()

      Async.wait async1

Here's an example interactive session using this program:

 ~/q/pipes-network.git 0 % ./fooo                                                                  
"GET /"
Out: "GET /"
" HTT"       
Out: " HTT"
"P/1.0\r\n\r\n"
Out: "P/1.0\r\n\r\n"
""
In :"HTTP/1.0 302 Fou..."  -- this string here is up to 256 characters long, I just sniped it.
""
In :"; domain=...."
""
In :"\"This is not..."
""
In :"<HTML><HEA.."
""

In this short example it's almost impossible to see how this approach can be better than socketSyncServer, but it is more efficient and you don't have to request or respond using values for a particular protocol (such as the Send | Receive and Sent | Received present in the previous version of pipes-network I linked). I agree that for simple cases socketSyncServer seems more convenient, but perhaps we can find a way to achieve that without it.

Let me think about this a bit more. I removed it because I didn't want to encourage the usage a sub-optimal and not general enough API, and it is easier to maintain a single one.

Please let me know if something in the example above doesn't make sense and I'll explain.

accelas commented 11 years ago

With the new pipe 4 API, I don't think this is relevant anymore.

k0001 commented 11 years ago

You are right, it probably isn't relevant anymore since now you are actually encouraged to use send and recv directly, so you can roll your own “synchronous pipe” more easily.

By the way, the API in the pipes-4.0 branch is starting to settle now; I think I just need to clean up the documentation and figure out what to export by default and what not. If you have the time and want to take a look, I'd be delighted to hear any feedback.

Thanks!