Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions Remote/Peer.hs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import Prelude hiding (all, pi)

import Network.Socket (defaultHints,sendTo,recv,sClose,Socket,getAddrInfo,AddrInfoFlag(..),setSocketOption,addrFlags,addrSocketType,addrFamily,SocketType(..),Family(..),addrProtocol,SocketOption(..),AddrInfo,bindSocket,addrAddress,SockAddr(..),socket)
import Network.BSD (getProtocolNumber)
import qualified Control.Exception.Lifted as Lifted (try)
import qualified System.Timeout.Lifted as Lifted (timeout)
import Control.Concurrent.MVar (takeMVar, newMVar, modifyMVar_)
import Remote.Process (PeerInfo,pingNode,makeNodeFromHost,spawnLocalAnd,setDaemonic,TransmitStatus(..),TransmitException(..),PayloadDisposition(..),ptimeout,getSelfNode,sendSimple,cfgRole,cfgKnownHosts,cfgPeerDiscoveryPort,match,receiveWait,getSelfPid,getConfig,NodeId,PortId,ProcessM,ptry,localRegistryQueryNodes)
import Remote.Process (PeerInfo,pingNode,makeNodeFromHost,spawnLocalAnd,setDaemonic,TransmitStatus(..),TransmitException(..),PayloadDisposition(..),getSelfNode,sendSimple,cfgRole,cfgKnownHosts,cfgPeerDiscoveryPort,match,receiveWait,getSelfPid,getConfig,NodeId,PortId,ProcessM,localRegistryQueryNodes)
import Control.Monad.Trans (liftIO)
import Data.Typeable (Typeable)
import Data.Maybe (catMaybes)
Expand Down Expand Up @@ -116,7 +118,7 @@ getPeersDynamic t =
port -> do -- TODO should send broacast multiple times in case of packet loss
_ <- liftIO $ try $ sendBroadcast port (show pid) :: ProcessM (Either IOError ())
responses <- liftIO $ newMVar []
_ <- ptimeout t (receiveInfo responses)
_ <- Lifted.timeout t (receiveInfo responses)
res <- liftIO $ takeMVar responses
let all = map (\di -> (discRole di,[discNodeId di])) (nub res)
return $ foldl (\a (k,v) -> Map.insertWith (++) k v a ) Map.empty all
Expand All @@ -137,12 +139,12 @@ findRoles disc = Map.keys disc
waitForDiscovery :: Int -> ProcessM Bool
waitForDiscovery delay
| delay <= 0 = doit
| otherwise = ptimeout delay doit >>= (return . maybe False id)
| otherwise = Lifted.timeout delay doit >>= (return . maybe False id)
where doit =
do cfg <- getConfig
msg <- liftIO $ listenUdp (cfgPeerDiscoveryPort cfg)
nodeid <- getSelfNode
res <- ptry $ sendSimple (read msg) (DiscoveryInfo {discNodeId=nodeid,discRole=cfgRole cfg}) PldUser
res <- Lifted.try $ sendSimple (read msg) (DiscoveryInfo {discNodeId=nodeid,discRole=cfgRole cfg}) PldUser
:: ProcessM (Either ErrorCall TransmitStatus)
case res of
Right QteOK -> return True
Expand Down
105 changes: 41 additions & 64 deletions Remote/Process.hs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE DeriveDataTypeable, MultiParamTypeClasses, TypeFamilies #-}

-- | This module is the core of Cloud Haskell. It provides
-- processes, messages, monitoring, and configuration.
Expand All @@ -23,7 +23,6 @@ module Remote.Process (
setLogConfig,getLogConfig,setNodeLogConfig,setRemoteNodeLogConfig,defaultLogConfig,

-- * Exception handling
ptry,ptimeout,pbracket,pfinally,
UnknownMessageException(..),ServiceException(..),
TransmitException(..),TransmitStatus(..),

Expand Down Expand Up @@ -67,11 +66,18 @@ module Remote.Process (
import qualified Prelude as Prelude
import Prelude hiding (catch, id, init, last, lookup, pi)

import Control.Applicative (Applicative(..))
import Control.Concurrent (forkIO,ThreadId,threadDelay)
import Control.Concurrent.MVar (MVar,newMVar, newEmptyMVar,takeMVar,putMVar,modifyMVar,modifyMVar_,readMVar)
import Control.Exception (ErrorCall(..),throwTo,bracket,try,Exception,throw,evaluate,finally,SomeException,catch)
import Control.Monad (foldM,when,liftM,forever)
import Control.Monad (foldM,when,liftM,forever,ap,void)
import Control.Monad.Base (MonadBase(..))
import Control.Monad.Trans (MonadIO,liftIO)
import Control.Monad.Trans.Control (MonadBaseControl(..))
import qualified Control.Exception.Lifted as Lifted (try, bracket, finally)
import qualified System.Timeout.Lifted as Lifted (timeout)
import qualified Control.Concurrent.Lifted as Lifted (fork)
import Data.Functor ((<$>))
import Data.Binary (Binary,put,get,putWord8,getWord8)
import Data.Char (isSpace,isDigit)
import Data.List (isSuffixOf,foldl', isPrefixOf)
Expand All @@ -88,7 +94,6 @@ import qualified Data.Map as Map (Map,keys,fromList,unionWith,elems,singleton,me
import Remote.Reg (getEntryByIdent,Lookup,empty)
import Remote.Encoding (serialEncode,serialDecode,serialEncodePure,serialDecodePure,dynamicEncodePure,dynamicDecodePure,DynamicPayload,Payload,Serializable,hPutPayload,hGetPayload,getPayloadType,getDynamicPayloadType)
import System.Environment (getArgs)
import qualified System.Timeout (timeout)
import Data.Time (toModifiedJulianDay,Day(..),picosecondsToDiffTime,getCurrentTime,diffUTCTime,UTCTime(..),utcToLocalZonedTime)
import Remote.Closure (Closure (..))
import Control.Concurrent.STM (STM,atomically,retry,orElse)
Expand Down Expand Up @@ -282,8 +287,22 @@ instance Monad ProcessM where
instance Functor ProcessM where
fmap f v = ProcessM $ (\p -> (runProcessM v) p >>= (\x -> return $ fmap f x))

instance Applicative ProcessM where
pure = return
(<*>) = ap

instance MonadIO ProcessM where
liftIO arg = ProcessM $ \pr -> (arg >>= (\x -> return (pr,x)))
liftIO = liftBase

instance MonadBase IO ProcessM where
liftBase io = ProcessM $ \p -> liftM ((,) p) io

instance MonadBaseControl IO ProcessM where
newtype StM ProcessM a = StMP {unStMP :: (Process, a)}
liftBaseWith f = ProcessM $ \p ->
liftM ((,) p)
(f $ \pm -> liftM StMP $ runProcessM pm p)
restoreM = ProcessM . const . return . unStMP

getProcess :: ProcessM (Process)
getProcess = ProcessM $ \x -> return (x,x)
Expand Down Expand Up @@ -442,11 +461,10 @@ receiveWait m = do f <- receiveWaitImpl m
receiveTimeout :: Int -> [MatchM q ()] -> ProcessM (Maybe q)
receiveTimeout 0 m = receive m
receiveTimeout to m | to > 0 =
do res <- ptimeout to $ receiveWaitImpl m
do res <- Lifted.timeout to $ receiveWaitImpl m
case res of
Nothing -> return Nothing
Just f -> do q <- f
return $ Just q
Just f -> Just <$> f

messageHandlerGenerator :: TVar ProcessState -> [Message] -> [(Message, STM ())]
messageHandlerGenerator prSt msgs =
Expand Down Expand Up @@ -478,7 +496,7 @@ receiveWaitImpl m =

convertErrorCall :: ProcessM a -> ProcessM a
convertErrorCall f =
do a <- ptry ff
do a <- Lifted.try ff
case a of
Right c -> return c
Left b -> throw $ TransmitException $ QteOther $ show (b::ErrorCall)
Expand Down Expand Up @@ -676,7 +694,7 @@ spawnLocalAnd fun prefix =
pid <- liftIO $ runLocalProcess (prNodeRef p) (myFun v)
liftIO $ takeMVar v
return pid
where myFun mv = (prefix `pfinally` liftIO (putMVar mv ())) >> fun
where myFun mv = (prefix `Lifted.finally` liftIO (putMVar mv ())) >> fun

-- | A synonym for 'spawnLocal'
forkProcess :: ProcessM () -> ProcessM ProcessId
Expand All @@ -686,9 +704,7 @@ forkProcess = spawnLocal
-- Not safe for export, as doing any message receive operation could
-- result in a munged message queue.
forkProcessWeak :: ProcessM () -> ProcessM ()
forkProcessWeak f = do p <- getProcess
_res <- liftIO $ forkIO (runProcessM f p >> return ())
return ()
forkProcessWeak = void . Lifted.fork

-- | Create a new process on the current node. Returns the new process's identifier.
-- Unlike 'spawn', this function does not need a 'Closure' or a 'NodeId'.
Expand Down Expand Up @@ -772,7 +788,7 @@ runLocalProcess node fun =
----------------------------------------------

-- TODO this needs withMonitor a safe variant
-- To make this work with a ptimeout but still be able to return martial results,
-- To make this work with a Lifted.timeout but still be able to return martial results,
-- they need to be stored in an MVar. Also, like roundtipQuery, this should have
-- two variants: a flavor that establishes monitors, and a timeout-based flavor.
-- This also needs an ASYNC variant, that will send data simultaneously, from multiple subprocesses
Expand Down Expand Up @@ -812,7 +828,7 @@ generalPid (ProcessId n _p) = ProcessId n (-1)

roundtripQuery :: (Serializable a, Serializable b) => PayloadDisposition -> ProcessId -> a -> ProcessM (Either TransmitStatus b)
roundtripQuery pld pid dat =
do res <- ptry $ withMonitor apid $ roundtripQueryImpl 0 pld pid dat Prelude.id []
do res <- Lifted.try $ withMonitor apid $ roundtripQueryImpl 0 pld pid dat Prelude.id []
case res of
Left (ServiceException s) -> return $ Left $ QteOther s
Right (Left a) -> return (Left a)
Expand Down Expand Up @@ -933,14 +949,14 @@ sendTry pid msg msghdr pld = getProcess >>= (\_p ->
-- of outgoing connections per node. //!!
q <- case 0 of -- case cfgRoundtripTimeout cfg of
0 -> a
n -> do ff <- ptimeout n $ a
n -> do ff <- Lifted.timeout n $ a
case ff of
Nothing -> return QteConnectionTimeout
Just r -> return r
case q of
QteThrottle n -> liftIO (threadDelay n) >> timeoutFilter a
n -> return n
tryAction = ptry action >>=
tryAction = Lifted.try action >>=
(\x -> case x of
Left l -> return $ QteEncodingError $ show (l::ErrorCall) -- catch errors from encoding
Right r -> return r)
Expand Down Expand Up @@ -1314,50 +1330,11 @@ isPidLocal pid = do mine <- getSelfPid

suppressTransmitException :: ProcessM a -> ProcessM (Maybe a)
suppressTransmitException a =
do res <- ptry a
do res <- Lifted.try a
case res of
Left (TransmitException _) -> return Nothing
Right r -> return $ Just r

-- | A 'ProcessM'-flavoured variant of 'Control.Exception.try'
ptry :: (Exception e) => ProcessM a -> ProcessM (Either e a)
ptry f = do p <- getProcess
res <- liftIO $ try (runProcessM f p)
case res of
Left e -> return $ Left e
Right (newp,newanswer) -> ProcessM (\_ -> return (newp,Right newanswer))

{- UNUSED
-- | A 'ProcessM'-flavoured variant of 'Control.Exception.catch'
pcatch :: Exception e => ProcessM a -> (e -> ProcessM a) -> ProcessM a
pcatch code handler = do p <- getProcess
liftIO $ catch (liftM snd $ runProcessM code p) (\e -> liftM snd $ runProcessM (handler e) p)

-}

-- | A 'ProcessM'-flavoured variant of 'System.Timeout.timeout'
ptimeout :: Int -> ProcessM a -> ProcessM (Maybe a)
ptimeout t f = do p <- getProcess
res <- liftIO $ System.Timeout.timeout t (runProcessM f p)
case res of
Nothing -> return Nothing
Just (newp,newanswer) -> ProcessM (\_ -> return (newp,Just newanswer))

-- | A 'ProcessM'-flavoured variant of 'Control.Exception.bracket'
pbracket :: (ProcessM a) -> (a -> ProcessM b) -> (a -> ProcessM c) -> ProcessM c
pbracket before after fun =
do p <- getProcess
(newp2,newanswer2) <- liftIO $ bracket
(runProcessM before p)
(\(newp,newanswer) -> runProcessM (after newanswer) newp)
(\(newp,newanswer) -> runProcessM (fun newanswer) newp)
ProcessM (\_ -> return (newp2, newanswer2))

-- | A 'ProcessM'-flavoured variant of 'Control.Exception.finally'
pfinally :: ProcessM a -> ProcessM b -> ProcessM a
pfinally fun after = pbracket (return ()) (\_ -> after) (const fun)


----------------------------------------------
-- * Configuration file
----------------------------------------------
Expand Down Expand Up @@ -1694,7 +1671,7 @@ startLoggingService = serviceThread ServiceLog logger
do smsg <- liftIO $ showLogMessage txt
case whereto of
LtStdout -> liftIO $ putStrLn smsg
LtFile fp -> (ptry (liftIO (appendFile fp (smsg ++ "\n"))) :: ProcessM (Either IOError ()) ) >> return () -- ignore error - what can we do?
LtFile fp -> (Lifted.try (liftIO (appendFile fp (smsg ++ "\n"))) :: ProcessM (Either IOError ()) ) >> return () -- ignore error - what can we do?
LtForward nid -> do self <- getSelfNode
when (self /= nid)
(sendSimple (adminGetPid nid ServiceLog) (forwardify txt) PldAdmin >> return ()) -- ignore error -- what can we do?
Expand Down Expand Up @@ -1853,7 +1830,7 @@ startFinalizerService todo = spawnLocalAnd body prefix >> return ()
do p <- getProcess
node <- liftIO $ readMVar (prNodeRef p)
liftIO $ takeMVar (ndNodeFinalizer node)
todo `pfinally` (liftIO $ putMVar (ndNodeFinalized node) ())
todo `Lifted.finally` (liftIO $ putMVar (ndNodeFinalized node) ())


performFinalization :: MVar Node -> IO ()
Expand All @@ -1868,7 +1845,7 @@ setDaemonic = do p <- getProcess
(\node -> return $ node {ndProcessTable=Map.adjust (\pte -> pte {pteDaemonic=True}) (localFromPid pid) (ndProcessTable node)})

serviceThread :: ServiceId -> ProcessM () -> ProcessM ()
serviceThread v f = spawnLocalAnd (pbracket (return ())
serviceThread v f = spawnLocalAnd (Lifted.bracket (return ())
(\_ -> adminDeregister v >> logError)
(\_ -> f)) (adminRegister v >> setDaemonic)
>> (return())
Expand Down Expand Up @@ -2242,9 +2219,9 @@ withMonitoring :: ProcessId -> MonitorAction -> ProcessM a -> ProcessM a
withMonitoring pid how f =
do mypid <- getSelfPid
monitorProcess mypid pid how -- TODO if this throws a ServiceException, translate that into a trigger
a <- f `pfinally` safety (unmonitorProcess mypid pid how)
a <- f `Lifted.finally` safety (unmonitorProcess mypid pid how)
return a
where safety n = ptry n :: ProcessM (Either ServiceException ())
where safety n = Lifted.try n :: ProcessM (Either ServiceException ())


-- | Establishes unidirectional processing of another process. The format is:
Expand Down Expand Up @@ -2355,7 +2332,7 @@ startProcessMonitorService = serviceThread ServiceProcessMonitor (service emptyG
gl {glLinks = gdAddMonitor (glLinks gl) monitee action (localFromPid monitor) }
addLocalNode gl monitor monitee _action =
gl {glLinks = gdAddNode (glLinks gl) monitee (nodeFromPid monitor)}
broadcast nids msg = mapM_ (\p -> forkProcessWeak $ ((ptimeout 5000000 $ sendSimple (adminGetPid p ServiceProcessMonitor) msg PldAdmin) >> return ())) nids
broadcast nids msg = mapM_ (\p -> forkProcessWeak $ ((Lifted.timeout 5000000 $ sendSimple (adminGetPid p ServiceProcessMonitor) msg PldAdmin) >> return ())) nids
handleProcessDown :: GlLinks -> ProcessId -> SignalReason -> ProcessM GlLinks
handleProcessDown global pid why =
do islocal <- isPidLocal pid
Expand Down Expand Up @@ -2573,7 +2550,7 @@ startSpawnerService = serviceThread ServiceSpawner spawner
where spawner = receiveWait [matchSpawnRequest,matchCallRequest,matchUnknownThrow] >> spawner
exceptFilt :: SomeException -> q -> q
exceptFilt _ q = q
callWorker c responder = do a <- ptry $ invokeClosure c
callWorker c responder = do a <- Lifted.try $ invokeClosure c
case a of
Left q -> exceptFilt q (responder Nothing)
Right Nothing -> responder Nothing
Expand Down
Loading