haskell-distributed / distributed-process-platform

DEPRECATED (Cloud Haskell Platform) in favor of distributed-process-extras, distributed-process-async, distributed-process-client-server, distributed-process-registry, distributed-process-supervisor, distributed-process-task and distributed-process-execution
http://haskell-distributed.github.com
BSD 3-Clause "New" or "Revised" License
47 stars 18 forks source link

Using call / callTimeout method between machines hangs forever #97

Closed wdanilo closed 9 years ago

wdanilo commented 9 years ago

Hello! I'm writing in Cloud Haskell a simple Server - Worker program. The problem is, that when I try to create ManagedProcess, after the server disovery step, my example hangs forever even while using callTimeout (which should break after 100 ms). The code is very simple, but I cannot find anything wrong with it.

I've posted the question on the mailing list also, but as far as I know the SO community, I canget the answer a lot faster here. If I get the answer from mailing list, I will postit here also.

Here is the minimal source code:

Client.hs:

{-# LANGUAGE DeriveDataTypeable        #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE DeriveGeneric             #-}
{-# LANGUAGE TemplateHaskell           #-}

module Main where

import Network.Transport     (EndPointAddress(EndPointAddress))
import Control.Distributed.Process hiding (call)
import Control.Distributed.Process.Platform hiding (__remoteTable)
import Control.Distributed.Process.Platform.Async
import Control.Distributed.Process.Platform.ManagedProcess
import Control.Distributed.Process.Platform.Time
import Control.Distributed.Process.Platform.Timer (sleep)
import Control.Distributed.Process.Closure (mkClosure, remotable)
import Network.Transport.TCP (createTransport, defaultTCPParameters)
import Control.Distributed.Process.Node hiding (call)
import Control.Concurrent (threadDelay)
import GHC.Generics (Generic)
import Data.Binary (Binary) 
import Data.Typeable (Typeable)
import Data.ByteString.Char8 (pack)
import System.Environment    (getArgs)

import qualified Server as Server

main = do
  [host, port, serverAddr] <- getArgs

  Right transport <- createTransport host port defaultTCPParameters
  node <- newLocalNode transport initRemoteTable

  let addr = EndPointAddress (pack serverAddr)
      srvID = NodeId addr

  _ <- forkProcess node $ do
    sid <- discoverServer srvID
    liftIO $ putStrLn "x"
    liftIO $ print sid
    r <- callTimeout sid (Server.Add 5 6) 100 :: Process (Maybe Double)
    liftIO $ putStrLn "x"
    liftIO $ threadDelay (10 * 1000 * 1000)

  threadDelay (10 * 1000 * 1000)
  return ()

discoverServer srvID = do
  whereisRemoteAsync srvID "serverPID"
  reply <- expectTimeout 100 :: Process (Maybe WhereIsReply)
  case reply of
    Just (WhereIsReply _ msid) -> case msid of
      Just sid -> return sid
      Nothing  -> discoverServer srvID
    Nothing                    -> discoverServer srvID

Server.hs:

{-# LANGUAGE DeriveDataTypeable        #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE DeriveGeneric             #-}
{-# LANGUAGE TemplateHaskell           #-}

module Server where

import Control.Distributed.Process hiding (call)
import Control.Distributed.Process.Platform hiding (__remoteTable)
import Control.Distributed.Process.Platform.Async
import Control.Distributed.Process.Platform.ManagedProcess
import Control.Distributed.Process.Platform.Time
import Control.Distributed.Process.Platform.Timer (sleep)
import Control.Distributed.Process.Closure (mkClosure, remotable)
import Network.Transport.TCP (createTransport, defaultTCPParameters)
import Control.Distributed.Process.Node hiding (call)
import Control.Concurrent (threadDelay)
import GHC.Generics (Generic)
import Data.Binary (Binary) 
import Data.Typeable (Typeable)

data Add = Add Double Double
  deriving (Typeable, Generic)
instance Binary Add

launchServer :: Process ProcessId
launchServer = spawnLocal $ serve () (statelessInit Infinity) server >> return () where
  server = statelessProcess { apiHandlers            = [ handleCall_ (\(Add x y) -> liftIO (putStrLn "!") >> return (x + y)) ]
                            , unhandledMessagePolicy = Drop
                            }

main = do
  Right transport <- createTransport "127.0.0.1" "8080" defaultTCPParameters
  node <- newLocalNode transport initRemoteTable
  _ <- forkProcess node $ do
    self <- getSelfPid
    register "serverPID" self

    liftIO $ putStrLn "x"
    mid <- launchServer
    liftIO $ putStrLn "y"
    r <- call mid (Add 5 6) :: Process Double
    liftIO $ print r
    liftIO $ putStrLn "z"
    liftIO $ threadDelay (10 * 1000 * 1000)
    liftIO $ putStrLn "z2"

  threadDelay (10 * 1000 * 1000)
  return ()

We can run them as follow:

runhaskell Server.hs
runhaskell Worker.hs 127.0.0.2 8080 127.0.0.1:8080:0

The results:

When we run the programs, we got following results:

from Server:

x
y
!
11.0 -- this one shows that inside the same process we were able to use the "call" function
z
-- waiting - all the output above were tests from inside the server now it waits for external messages

from Worker:

x
pid://127.0.0.1:8080:0:10 -- this is the process id of the server optained with whereisRemoteAsync 
-- waiting forever on the "callTimeout sid (Server.Add 5 6) 100" code!

As a sidenote - I've found out that, when sending messages with send (from Control.Distributed.Process) and reciving them with expect works. But sending them with call (from Control.Distributed.Process.Platform) and trying to recive them with ManagedProcess api handlers - hangs the call forever (even using callTimeout!)

wdanilo commented 9 years ago

Ok, we can close this issue - that was a small mistake in the code (solved here: http://stackoverflow.com/questions/28366736/cloud-haskell-hanging-forever-when-sending-messages-to-managedprocess)