Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 17 additions & 12 deletions monad-par/Control/Monad/Par/Scheds/TraceInternal.hs
Original file line number Diff line number Diff line change
Expand Up @@ -83,10 +83,10 @@ sched _doSync queue t = loop t
Empty -> (Full a, [])
Full _ -> error "multiple put"
Blocked cs -> (Full a, cs)
mapM_ (pushWork queue. ($a)) cs
mapM_ (pushWork True queue. ($a)) cs
loop t
Fork child parent -> do
pushWork queue child
pushWork False queue child
loop parent
Done ->
if _doSync
Expand All @@ -104,7 +104,7 @@ sched _doSync queue t = loop t
let Sched { workpool } = queue
-- TODO: Perhaps consider Data.Seq here.
-- This would also be a chance to steal and work from opposite ends of the queue.
atomicModifyIORef workpool $ \ts -> (ts++[parent], ())
atomicModifyIORef workpool $ \(ts,wts) -> ((ts,parent:wts), ())
reschedule queue
LiftIO io c -> do
r <- io
Expand All @@ -119,8 +119,10 @@ reschedule :: Sched -> IO ()
reschedule queue@Sched{ workpool } = do
e <- atomicModifyIORef workpool $ \ts ->
case ts of
[] -> ([], Nothing)
(t:ts') -> (ts', Just t)
([],wts) -> case reverse wts of
[] -> (([],[]), Nothing)
t:ts' -> ((ts',[]), Just t)
(t:ts',wts) -> ((ts',wts), Just t)
case e of
Nothing -> steal queue
Just t -> sched True queue t
Expand Down Expand Up @@ -156,18 +158,21 @@ steal q@Sched{ idle, scheds, no=my_no } = do
| otherwise = do
r <- atomicModifyIORef (workpool x) $ \ ts ->
case ts of
[] -> ([], Nothing)
(x:xs) -> (xs, Just x)
([],wxs) -> case reverse wxs of
[] -> (([],[]), Nothing)
x:xs -> ((xs,[]), Just x)
(x:xs,wxs) -> ((xs,wxs), Just x)
case r of
Just t -> do
-- printf "cpu %d got work from cpu %d\n" my_no (no x)
sched True q t
Nothing -> go xs

-- | If any worker is idle, wake one up and give it work to do.
pushWork :: Sched -> Trace -> IO ()
pushWork Sched { workpool, idle } t = do
atomicModifyIORef workpool $ \ts -> (t:ts, ())
pushWork :: Bool -> Sched -> Trace -> IO ()
pushWork toBeginning Sched { workpool, idle } t = do
if toBeginning then atomicModifyIORef workpool $ \(ts,wts) -> ((t:ts,wts), ())
else atomicModifyIORef workpool $ \(ts,wts) -> ((ts,t:wts), ())
idles <- readIORef idle
when (not (null idles)) $ do
r <- atomicModifyIORef idle (\is -> case is of
Expand All @@ -177,7 +182,7 @@ pushWork Sched { workpool, idle } t = do

data Sched = Sched
{ no :: {-# UNPACK #-} !Int,
workpool :: IORef [Trace],
workpool :: IORef ([Trace],[Trace]),
idle :: IORef [MVar Bool],
scheds :: [Sched] -- Global list of all per-thread workers.
}
Expand Down Expand Up @@ -249,7 +254,7 @@ data IVarContents a = Full a | Empty | Blocked [a -> Trace]
{-# INLINE runPar_internal #-}
runPar_internal :: Bool -> Par a -> IO a
runPar_internal _doSync x = do
workpools <- replicateM numCapabilities $ newIORef []
workpools <- replicateM numCapabilities $ newIORef ([],[])
idle <- newIORef []
let states = [ Sched { no=x, workpool=wp, idle, scheds=states }
| (x,wp) <- zip [0..] workpools ]
Expand Down