diff --git a/Remote/Peer.hs b/Remote/Peer.hs index b1b7c87..6fb970d 100644 --- a/Remote/Peer.hs +++ b/Remote/Peer.hs @@ -10,8 +10,10 @@ import Prelude hiding (all, pi) import Network.Socket (defaultHints,sendTo,recv,sClose,Socket,getAddrInfo,AddrInfoFlag(..),setSocketOption,addrFlags,addrSocketType,addrFamily,SocketType(..),Family(..),addrProtocol,SocketOption(..),AddrInfo,bindSocket,addrAddress,SockAddr(..),socket) import Network.BSD (getProtocolNumber) +import qualified Control.Exception.Lifted as Lifted (try) +import qualified System.Timeout.Lifted as Lifted (timeout) import Control.Concurrent.MVar (takeMVar, newMVar, modifyMVar_) -import Remote.Process (PeerInfo,pingNode,makeNodeFromHost,spawnLocalAnd,setDaemonic,TransmitStatus(..),TransmitException(..),PayloadDisposition(..),ptimeout,getSelfNode,sendSimple,cfgRole,cfgKnownHosts,cfgPeerDiscoveryPort,match,receiveWait,getSelfPid,getConfig,NodeId,PortId,ProcessM,ptry,localRegistryQueryNodes) +import Remote.Process (PeerInfo,pingNode,makeNodeFromHost,spawnLocalAnd,setDaemonic,TransmitStatus(..),TransmitException(..),PayloadDisposition(..),getSelfNode,sendSimple,cfgRole,cfgKnownHosts,cfgPeerDiscoveryPort,match,receiveWait,getSelfPid,getConfig,NodeId,PortId,ProcessM,localRegistryQueryNodes) import Control.Monad.Trans (liftIO) import Data.Typeable (Typeable) import Data.Maybe (catMaybes) @@ -116,7 +118,7 @@ getPeersDynamic t = port -> do -- TODO should send broacast multiple times in case of packet loss _ <- liftIO $ try $ sendBroadcast port (show pid) :: ProcessM (Either IOError ()) responses <- liftIO $ newMVar [] - _ <- ptimeout t (receiveInfo responses) + _ <- Lifted.timeout t (receiveInfo responses) res <- liftIO $ takeMVar responses let all = map (\di -> (discRole di,[discNodeId di])) (nub res) return $ foldl (\a (k,v) -> Map.insertWith (++) k v a ) Map.empty all @@ -137,12 +139,12 @@ findRoles disc = Map.keys disc waitForDiscovery :: Int -> ProcessM Bool waitForDiscovery delay | delay <= 0 = doit - | otherwise = ptimeout delay doit >>= (return . maybe False id) + | otherwise = Lifted.timeout delay doit >>= (return . maybe False id) where doit = do cfg <- getConfig msg <- liftIO $ listenUdp (cfgPeerDiscoveryPort cfg) nodeid <- getSelfNode - res <- ptry $ sendSimple (read msg) (DiscoveryInfo {discNodeId=nodeid,discRole=cfgRole cfg}) PldUser + res <- Lifted.try $ sendSimple (read msg) (DiscoveryInfo {discNodeId=nodeid,discRole=cfgRole cfg}) PldUser :: ProcessM (Either ErrorCall TransmitStatus) case res of Right QteOK -> return True diff --git a/Remote/Process.hs b/Remote/Process.hs index d1ee5cb..6368dca 100644 --- a/Remote/Process.hs +++ b/Remote/Process.hs @@ -1,4 +1,4 @@ -{-# LANGUAGE DeriveDataTypeable #-} +{-# LANGUAGE DeriveDataTypeable, MultiParamTypeClasses, TypeFamilies #-} -- | This module is the core of Cloud Haskell. It provides -- processes, messages, monitoring, and configuration. @@ -23,7 +23,6 @@ module Remote.Process ( setLogConfig,getLogConfig,setNodeLogConfig,setRemoteNodeLogConfig,defaultLogConfig, -- * Exception handling - ptry,ptimeout,pbracket,pfinally, UnknownMessageException(..),ServiceException(..), TransmitException(..),TransmitStatus(..), @@ -67,11 +66,18 @@ module Remote.Process ( import qualified Prelude as Prelude import Prelude hiding (catch, id, init, last, lookup, pi) +import Control.Applicative (Applicative(..)) import Control.Concurrent (forkIO,ThreadId,threadDelay) import Control.Concurrent.MVar (MVar,newMVar, newEmptyMVar,takeMVar,putMVar,modifyMVar,modifyMVar_,readMVar) import Control.Exception (ErrorCall(..),throwTo,bracket,try,Exception,throw,evaluate,finally,SomeException,catch) -import Control.Monad (foldM,when,liftM,forever) +import Control.Monad (foldM,when,liftM,forever,ap,void) +import Control.Monad.Base (MonadBase(..)) import Control.Monad.Trans (MonadIO,liftIO) +import Control.Monad.Trans.Control (MonadBaseControl(..)) +import qualified Control.Exception.Lifted as Lifted (try, bracket, finally) +import qualified System.Timeout.Lifted as Lifted (timeout) +import qualified Control.Concurrent.Lifted as Lifted (fork) +import Data.Functor ((<$>)) import Data.Binary (Binary,put,get,putWord8,getWord8) import Data.Char (isSpace,isDigit) import Data.List (isSuffixOf,foldl', isPrefixOf) @@ -88,7 +94,6 @@ import qualified Data.Map as Map (Map,keys,fromList,unionWith,elems,singleton,me import Remote.Reg (getEntryByIdent,Lookup,empty) import Remote.Encoding (serialEncode,serialDecode,serialEncodePure,serialDecodePure,dynamicEncodePure,dynamicDecodePure,DynamicPayload,Payload,Serializable,hPutPayload,hGetPayload,getPayloadType,getDynamicPayloadType) import System.Environment (getArgs) -import qualified System.Timeout (timeout) import Data.Time (toModifiedJulianDay,Day(..),picosecondsToDiffTime,getCurrentTime,diffUTCTime,UTCTime(..),utcToLocalZonedTime) import Remote.Closure (Closure (..)) import Control.Concurrent.STM (STM,atomically,retry,orElse) @@ -282,8 +287,22 @@ instance Monad ProcessM where instance Functor ProcessM where fmap f v = ProcessM $ (\p -> (runProcessM v) p >>= (\x -> return $ fmap f x)) +instance Applicative ProcessM where + pure = return + (<*>) = ap + instance MonadIO ProcessM where - liftIO arg = ProcessM $ \pr -> (arg >>= (\x -> return (pr,x))) + liftIO = liftBase + +instance MonadBase IO ProcessM where + liftBase io = ProcessM $ \p -> liftM ((,) p) io + +instance MonadBaseControl IO ProcessM where + newtype StM ProcessM a = StMP {unStMP :: (Process, a)} + liftBaseWith f = ProcessM $ \p -> + liftM ((,) p) + (f $ \pm -> liftM StMP $ runProcessM pm p) + restoreM = ProcessM . const . return . unStMP getProcess :: ProcessM (Process) getProcess = ProcessM $ \x -> return (x,x) @@ -442,11 +461,10 @@ receiveWait m = do f <- receiveWaitImpl m receiveTimeout :: Int -> [MatchM q ()] -> ProcessM (Maybe q) receiveTimeout 0 m = receive m receiveTimeout to m | to > 0 = - do res <- ptimeout to $ receiveWaitImpl m + do res <- Lifted.timeout to $ receiveWaitImpl m case res of Nothing -> return Nothing - Just f -> do q <- f - return $ Just q + Just f -> Just <$> f messageHandlerGenerator :: TVar ProcessState -> [Message] -> [(Message, STM ())] messageHandlerGenerator prSt msgs = @@ -478,7 +496,7 @@ receiveWaitImpl m = convertErrorCall :: ProcessM a -> ProcessM a convertErrorCall f = - do a <- ptry ff + do a <- Lifted.try ff case a of Right c -> return c Left b -> throw $ TransmitException $ QteOther $ show (b::ErrorCall) @@ -676,7 +694,7 @@ spawnLocalAnd fun prefix = pid <- liftIO $ runLocalProcess (prNodeRef p) (myFun v) liftIO $ takeMVar v return pid - where myFun mv = (prefix `pfinally` liftIO (putMVar mv ())) >> fun + where myFun mv = (prefix `Lifted.finally` liftIO (putMVar mv ())) >> fun -- | A synonym for 'spawnLocal' forkProcess :: ProcessM () -> ProcessM ProcessId @@ -686,9 +704,7 @@ forkProcess = spawnLocal -- Not safe for export, as doing any message receive operation could -- result in a munged message queue. forkProcessWeak :: ProcessM () -> ProcessM () -forkProcessWeak f = do p <- getProcess - _res <- liftIO $ forkIO (runProcessM f p >> return ()) - return () +forkProcessWeak = void . Lifted.fork -- | Create a new process on the current node. Returns the new process's identifier. -- Unlike 'spawn', this function does not need a 'Closure' or a 'NodeId'. @@ -772,7 +788,7 @@ runLocalProcess node fun = ---------------------------------------------- -- TODO this needs withMonitor a safe variant --- To make this work with a ptimeout but still be able to return martial results, +-- To make this work with a Lifted.timeout but still be able to return martial results, -- they need to be stored in an MVar. Also, like roundtipQuery, this should have -- two variants: a flavor that establishes monitors, and a timeout-based flavor. -- This also needs an ASYNC variant, that will send data simultaneously, from multiple subprocesses @@ -812,7 +828,7 @@ generalPid (ProcessId n _p) = ProcessId n (-1) roundtripQuery :: (Serializable a, Serializable b) => PayloadDisposition -> ProcessId -> a -> ProcessM (Either TransmitStatus b) roundtripQuery pld pid dat = - do res <- ptry $ withMonitor apid $ roundtripQueryImpl 0 pld pid dat Prelude.id [] + do res <- Lifted.try $ withMonitor apid $ roundtripQueryImpl 0 pld pid dat Prelude.id [] case res of Left (ServiceException s) -> return $ Left $ QteOther s Right (Left a) -> return (Left a) @@ -933,14 +949,14 @@ sendTry pid msg msghdr pld = getProcess >>= (\_p -> -- of outgoing connections per node. //!! q <- case 0 of -- case cfgRoundtripTimeout cfg of 0 -> a - n -> do ff <- ptimeout n $ a + n -> do ff <- Lifted.timeout n $ a case ff of Nothing -> return QteConnectionTimeout Just r -> return r case q of QteThrottle n -> liftIO (threadDelay n) >> timeoutFilter a n -> return n - tryAction = ptry action >>= + tryAction = Lifted.try action >>= (\x -> case x of Left l -> return $ QteEncodingError $ show (l::ErrorCall) -- catch errors from encoding Right r -> return r) @@ -1314,50 +1330,11 @@ isPidLocal pid = do mine <- getSelfPid suppressTransmitException :: ProcessM a -> ProcessM (Maybe a) suppressTransmitException a = - do res <- ptry a + do res <- Lifted.try a case res of Left (TransmitException _) -> return Nothing Right r -> return $ Just r --- | A 'ProcessM'-flavoured variant of 'Control.Exception.try' -ptry :: (Exception e) => ProcessM a -> ProcessM (Either e a) -ptry f = do p <- getProcess - res <- liftIO $ try (runProcessM f p) - case res of - Left e -> return $ Left e - Right (newp,newanswer) -> ProcessM (\_ -> return (newp,Right newanswer)) - -{- UNUSED --- | A 'ProcessM'-flavoured variant of 'Control.Exception.catch' -pcatch :: Exception e => ProcessM a -> (e -> ProcessM a) -> ProcessM a -pcatch code handler = do p <- getProcess - liftIO $ catch (liftM snd $ runProcessM code p) (\e -> liftM snd $ runProcessM (handler e) p) - --} - --- | A 'ProcessM'-flavoured variant of 'System.Timeout.timeout' -ptimeout :: Int -> ProcessM a -> ProcessM (Maybe a) -ptimeout t f = do p <- getProcess - res <- liftIO $ System.Timeout.timeout t (runProcessM f p) - case res of - Nothing -> return Nothing - Just (newp,newanswer) -> ProcessM (\_ -> return (newp,Just newanswer)) - --- | A 'ProcessM'-flavoured variant of 'Control.Exception.bracket' -pbracket :: (ProcessM a) -> (a -> ProcessM b) -> (a -> ProcessM c) -> ProcessM c -pbracket before after fun = - do p <- getProcess - (newp2,newanswer2) <- liftIO $ bracket - (runProcessM before p) - (\(newp,newanswer) -> runProcessM (after newanswer) newp) - (\(newp,newanswer) -> runProcessM (fun newanswer) newp) - ProcessM (\_ -> return (newp2, newanswer2)) - --- | A 'ProcessM'-flavoured variant of 'Control.Exception.finally' -pfinally :: ProcessM a -> ProcessM b -> ProcessM a -pfinally fun after = pbracket (return ()) (\_ -> after) (const fun) - - ---------------------------------------------- -- * Configuration file ---------------------------------------------- @@ -1694,7 +1671,7 @@ startLoggingService = serviceThread ServiceLog logger do smsg <- liftIO $ showLogMessage txt case whereto of LtStdout -> liftIO $ putStrLn smsg - LtFile fp -> (ptry (liftIO (appendFile fp (smsg ++ "\n"))) :: ProcessM (Either IOError ()) ) >> return () -- ignore error - what can we do? + LtFile fp -> (Lifted.try (liftIO (appendFile fp (smsg ++ "\n"))) :: ProcessM (Either IOError ()) ) >> return () -- ignore error - what can we do? LtForward nid -> do self <- getSelfNode when (self /= nid) (sendSimple (adminGetPid nid ServiceLog) (forwardify txt) PldAdmin >> return ()) -- ignore error -- what can we do? @@ -1853,7 +1830,7 @@ startFinalizerService todo = spawnLocalAnd body prefix >> return () do p <- getProcess node <- liftIO $ readMVar (prNodeRef p) liftIO $ takeMVar (ndNodeFinalizer node) - todo `pfinally` (liftIO $ putMVar (ndNodeFinalized node) ()) + todo `Lifted.finally` (liftIO $ putMVar (ndNodeFinalized node) ()) performFinalization :: MVar Node -> IO () @@ -1868,7 +1845,7 @@ setDaemonic = do p <- getProcess (\node -> return $ node {ndProcessTable=Map.adjust (\pte -> pte {pteDaemonic=True}) (localFromPid pid) (ndProcessTable node)}) serviceThread :: ServiceId -> ProcessM () -> ProcessM () -serviceThread v f = spawnLocalAnd (pbracket (return ()) +serviceThread v f = spawnLocalAnd (Lifted.bracket (return ()) (\_ -> adminDeregister v >> logError) (\_ -> f)) (adminRegister v >> setDaemonic) >> (return()) @@ -2242,9 +2219,9 @@ withMonitoring :: ProcessId -> MonitorAction -> ProcessM a -> ProcessM a withMonitoring pid how f = do mypid <- getSelfPid monitorProcess mypid pid how -- TODO if this throws a ServiceException, translate that into a trigger - a <- f `pfinally` safety (unmonitorProcess mypid pid how) + a <- f `Lifted.finally` safety (unmonitorProcess mypid pid how) return a - where safety n = ptry n :: ProcessM (Either ServiceException ()) + where safety n = Lifted.try n :: ProcessM (Either ServiceException ()) -- | Establishes unidirectional processing of another process. The format is: @@ -2355,7 +2332,7 @@ startProcessMonitorService = serviceThread ServiceProcessMonitor (service emptyG gl {glLinks = gdAddMonitor (glLinks gl) monitee action (localFromPid monitor) } addLocalNode gl monitor monitee _action = gl {glLinks = gdAddNode (glLinks gl) monitee (nodeFromPid monitor)} - broadcast nids msg = mapM_ (\p -> forkProcessWeak $ ((ptimeout 5000000 $ sendSimple (adminGetPid p ServiceProcessMonitor) msg PldAdmin) >> return ())) nids + broadcast nids msg = mapM_ (\p -> forkProcessWeak $ ((Lifted.timeout 5000000 $ sendSimple (adminGetPid p ServiceProcessMonitor) msg PldAdmin) >> return ())) nids handleProcessDown :: GlLinks -> ProcessId -> SignalReason -> ProcessM GlLinks handleProcessDown global pid why = do islocal <- isPidLocal pid @@ -2573,7 +2550,7 @@ startSpawnerService = serviceThread ServiceSpawner spawner where spawner = receiveWait [matchSpawnRequest,matchCallRequest,matchUnknownThrow] >> spawner exceptFilt :: SomeException -> q -> q exceptFilt _ q = q - callWorker c responder = do a <- ptry $ invokeClosure c + callWorker c responder = do a <- Lifted.try $ invokeClosure c case a of Left q -> exceptFilt q (responder Nothing) Right Nothing -> responder Nothing diff --git a/Remote/Task.hs b/Remote/Task.hs index f9457b6..88214f3 100644 --- a/Remote/Task.hs +++ b/Remote/Task.hs @@ -29,7 +29,7 @@ module Remote.Task ( import Remote.Reg (putReg,getEntryByIdent,RemoteCallMetaData) import Remote.Encoding (serialEncodePure,hGetPayload,hPutPayload,Payload,getPayloadContent,Serializable,serialDecode,serialEncode) -import Remote.Process (roundtripQuery, ServiceException(..), TransmitStatus(..),diffTime,getConfig,Config(..),matchProcessDown,terminate,nullPid,monitorProcess,TransmitException(..),MonitorAction(..),ptry,LogConfig(..),getLogConfig,setNodeLogConfig,nodeFromPid,LogLevel(..),LogTarget(..),logS,getLookup,say,LogSphere,NodeId,ProcessM,ProcessId,PayloadDisposition(..),getSelfPid,getSelfNode,matchUnknownThrow,receiveWait,receiveTimeout,roundtripResponse,roundtripResponseAsync,roundtripQueryImpl,match,makePayloadClosure,spawn,spawnLocal,spawnLocalAnd,setDaemonic,send,makeClosure) +import Remote.Process (roundtripQuery, ServiceException(..), TransmitStatus(..),diffTime,getConfig,Config(..),matchProcessDown,terminate,nullPid,monitorProcess,TransmitException(..),MonitorAction(..),LogConfig(..),getLogConfig,setNodeLogConfig,nodeFromPid,LogLevel(..),LogTarget(..),logS,getLookup,say,LogSphere,NodeId,ProcessM,ProcessId,PayloadDisposition(..),getSelfPid,getSelfNode,matchUnknownThrow,receiveWait,receiveTimeout,roundtripResponse,roundtripResponseAsync,roundtripQueryImpl,match,makePayloadClosure,spawn,spawnLocal,spawnLocalAnd,setDaemonic,send,makeClosure) import Remote.Closure (Closure(..)) import Remote.Peer (getPeers) @@ -41,6 +41,7 @@ import Control.Exception (SomeException,Exception,throw) import Data.Typeable (Typeable) import Control.Monad (liftM,when) import Control.Monad.Trans (liftIO) +import qualified Control.Exception.Lifted as Lifted (try) import Control.Concurrent.MVar (MVar,modifyMVar,modifyMVar_,newMVar,newEmptyMVar,takeMVar,putMVar,readMVar,withMVar) import qualified Data.Map as Map (Map,insert,lookup,empty,insertWith',toList) import Data.List ((\\),union,nub,groupBy,sortBy,delete) @@ -284,7 +285,7 @@ serialDecodeA = liftTask . liftIO . serialDecode monitorTask :: ProcessId -> ProcessId -> ProcessM TransmitStatus monitorTask monitor monitee - = do res <- ptry $ monitorProcess monitor monitee MaMonitor + = do res <- Lifted.try $ monitorProcess monitor monitee MaMonitor case res of Right _ -> return QteOK Left (ServiceException e) -> return $ QteOther e @@ -367,7 +368,7 @@ undiskify fpIn mps = return (inmem,Just pl) PromiseInMemory payload _ _ -> return (val,Just payload) _ -> return (val,Nothing)) - where wrap a = do res <- ptry a + where wrap a = do res <- Lifted.try a case res of Left e -> do logS "TSK" LoCritical $ "Error reading promise from file "++fpIn++": "++show (e::IOError) return Nothing @@ -395,7 +396,7 @@ diskify fp mps reallywrite = when again (diskify fp mps reallywrite) tmp = fp ++ ".tmp" - wrap a = do res <- ptry a + wrap a = do res <- Lifted.try a case res of Left z -> do logS "TSK" LoImportant $ "Error writing promise to disk on file "++fp++": "++show (z::IOError) return False @@ -426,7 +427,7 @@ startNodeWorker masterpid nbs mps clo@(Closure cloname cloarg) = let cachefile = cfgPromisePrefix cfg++hashClosure clo liftTask $ diskify cachefile mps True Nothing -> taskError $ "Failed looking up "++cloname++" in closure table" - in do res <- ptry $ runTaskM tasker initialState :: ProcessM (Either SomeException (TaskState,())) + in do res <- Lifted.try $ runTaskM tasker initialState :: ProcessM (Either SomeException (TaskState,())) case res of Left ex -> liftIO (putMVar mps (PromiseException (show ex))) >> throw ex Right _ -> return () @@ -548,7 +549,7 @@ findPeers :: ProcessM [(String,NodeId)] findPeers = liftM (concat . (map (\(role,v) -> [ (role,x) | x <- v] )) . Map.toList) getPeers sendSilent :: (Serializable a) => ProcessId -> a -> ProcessM () -sendSilent pid a = do res <- ptry $ send pid a +sendSilent pid a = do res <- Lifted.try $ send pid a case res of Left (TransmitException _) -> return () Right _ -> return () diff --git a/remote.cabal b/remote.cabal index a67813f..5bd325f 100644 --- a/remote.cabal +++ b/remote.cabal @@ -6,8 +6,8 @@ synopsis: Cloud Haskell License: BSD3 License-file: LICENSE Extra-Source-Files: README.md -Author: Jeff Epstein -Maintainer: Jeff Epstein +Author: Jeff Epstein +Maintainer: Jeff Epstein Build-Type: Simple tested-with: GHC ==6.12.1 Category: Distributed Computing @@ -38,7 +38,7 @@ source-repository head Location: git://github.com/jepst/CloudHaskell.git library - Build-Depends: base >= 4, time, filepath, containers, network, syb, mtl, binary, bytestring, template-haskell, stm, pureMD5, utf8-string, directory + Build-Depends: base >= 4, time, filepath, containers, network, syb, mtl, binary, bytestring, template-haskell, stm, pureMD5, utf8-string, directory, transformers-base >= 0.4, monad-control >= 0.3, lifted-base >= 0.1 ghc-options: -Wall Extensions: TemplateHaskell, FlexibleInstances, UndecidableInstances, CPP, ExistentialQuantification, DeriveDataTypeable Exposed-Modules: Remote.Process, Remote.Encoding, Remote.Call, Remote.Reg, Remote.Peer, Remote.Init, Remote.Closure, Remote.Channel, Remote.Task, Remote