From a61eab8b999597ffaaa212876121ea5f9b4c4999 Mon Sep 17 00:00:00 2001 From: SkorikGG Date: Mon, 6 Jan 2020 20:50:07 +0500 Subject: [PATCH 1/2] Stack of forks to queue --- .../Control/Monad/Par/Scheds/TraceInternal.hs | 20 +++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/monad-par/Control/Monad/Par/Scheds/TraceInternal.hs b/monad-par/Control/Monad/Par/Scheds/TraceInternal.hs index 163ddb7..55a1374 100644 --- a/monad-par/Control/Monad/Par/Scheds/TraceInternal.hs +++ b/monad-par/Control/Monad/Par/Scheds/TraceInternal.hs @@ -104,7 +104,7 @@ sched _doSync queue t = loop t let Sched { workpool } = queue -- TODO: Perhaps consider Data.Seq here. -- This would also be a chance to steal and work from opposite ends of the queue. - atomicModifyIORef workpool $ \ts -> (ts++[parent], ()) + atomicModifyIORef workpool $ \(ts,wts) -> ((ts,parent:wts), ()) reschedule queue LiftIO io c -> do r <- io @@ -119,8 +119,10 @@ reschedule :: Sched -> IO () reschedule queue@Sched{ workpool } = do e <- atomicModifyIORef workpool $ \ts -> case ts of - [] -> ([], Nothing) - (t:ts') -> (ts', Just t) + ([],wts) -> case reverse wts of + [] -> (([],[]), Nothing) + t:ts' -> ((ts',[]), Just t) + (t:ts',wts) -> ((ts',wts), Just t) case e of Nothing -> steal queue Just t -> sched True queue t @@ -156,8 +158,10 @@ steal q@Sched{ idle, scheds, no=my_no } = do | otherwise = do r <- atomicModifyIORef (workpool x) $ \ ts -> case ts of - [] -> ([], Nothing) - (x:xs) -> (xs, Just x) + ([],wxs) -> case reverse wxs of + [] -> (([],[]), Nothing) + x:xs -> ((xs,[]), Just x) + (x:xs,wxs) -> ((xs,wxs), Just x) case r of Just t -> do -- printf "cpu %d got work from cpu %d\n" my_no (no x) @@ -167,7 +171,7 @@ steal q@Sched{ idle, scheds, no=my_no } = do -- | If any worker is idle, wake one up and give it work to do. pushWork :: Sched -> Trace -> IO () pushWork Sched { workpool, idle } t = do - atomicModifyIORef workpool $ \ts -> (t:ts, ()) + atomicModifyIORef workpool $ \(ts,wts) -> ((ts,t:wts), ()) idles <- readIORef idle when (not (null idles)) $ do r <- atomicModifyIORef idle (\is -> case is of @@ -177,7 +181,7 @@ pushWork Sched { workpool, idle } t = do data Sched = Sched { no :: {-# UNPACK #-} !Int, - workpool :: IORef [Trace], + workpool :: IORef ([Trace],[Trace]), idle :: IORef [MVar Bool], scheds :: [Sched] -- Global list of all per-thread workers. } @@ -249,7 +253,7 @@ data IVarContents a = Full a | Empty | Blocked [a -> Trace] {-# INLINE runPar_internal #-} runPar_internal :: Bool -> Par a -> IO a runPar_internal _doSync x = do - workpools <- replicateM numCapabilities $ newIORef [] + workpools <- replicateM numCapabilities $ newIORef ([],[]) idle <- newIORef [] let states = [ Sched { no=x, workpool=wp, idle, scheds=states } | (x,wp) <- zip [0..] workpools ] From e8edb05e14798688d6145841c615e25135cda758 Mon Sep 17 00:00:00 2001 From: SkorikGG Date: Tue, 7 Jan 2020 14:32:40 +0500 Subject: [PATCH 2/2] More intuitively expected scheduler behavior. --- monad-par/Control/Monad/Par/Scheds/TraceInternal.hs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/monad-par/Control/Monad/Par/Scheds/TraceInternal.hs b/monad-par/Control/Monad/Par/Scheds/TraceInternal.hs index 55a1374..dcfad17 100644 --- a/monad-par/Control/Monad/Par/Scheds/TraceInternal.hs +++ b/monad-par/Control/Monad/Par/Scheds/TraceInternal.hs @@ -83,10 +83,10 @@ sched _doSync queue t = loop t Empty -> (Full a, []) Full _ -> error "multiple put" Blocked cs -> (Full a, cs) - mapM_ (pushWork queue. ($a)) cs + mapM_ (pushWork True queue. ($a)) cs loop t Fork child parent -> do - pushWork queue child + pushWork False queue child loop parent Done -> if _doSync @@ -169,9 +169,10 @@ steal q@Sched{ idle, scheds, no=my_no } = do Nothing -> go xs -- | If any worker is idle, wake one up and give it work to do. -pushWork :: Sched -> Trace -> IO () -pushWork Sched { workpool, idle } t = do - atomicModifyIORef workpool $ \(ts,wts) -> ((ts,t:wts), ()) +pushWork :: Bool -> Sched -> Trace -> IO () +pushWork toBeginning Sched { workpool, idle } t = do + if toBeginning then atomicModifyIORef workpool $ \(ts,wts) -> ((t:ts,wts), ()) + else atomicModifyIORef workpool $ \(ts,wts) -> ((ts,t:wts), ()) idles <- readIORef idle when (not (null idles)) $ do r <- atomicModifyIORef idle (\is -> case is of