Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Revision history for lvar

## 0.2.0.0 -- 2025-04-20

- Simplify implementation (#8)
- Remove `addListener` and `removeListener` (they are now redundant)
- Remove dependency on `relude` and `containers`

## 0.1.0.0 -- 2021-04-26

* First version. Released on an unsuspecting world.
8 changes: 1 addition & 7 deletions lvar.cabal
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
cabal-version: 2.4
name: lvar
version: 0.1.0.0
version: 0.2.0.0
license: BSD-3-Clause
copyright: 2021 Sridhar Ratnakumar
maintainer: srid@srid.ca
Expand All @@ -25,13 +25,7 @@ library
-- other-extensions:
build-depends:
, base >=4.13.0.0 && <=5
, containers
, stm
, relude
mixins:
base hiding (Prelude),
relude (Relude as Prelude),
relude
ghc-options:
-Wall -Wincomplete-record-updates -Wincomplete-uni-patterns
default-extensions:
Expand Down
146 changes: 45 additions & 101 deletions src/Data/LVar.hs
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
{-# LANGUAGE DeriveAnyClass #-}

-- | @LVar@ is like @Control.Concurrent.STM.TMVar@ but with a capability for
-- listening to its changes.
module Data.LVar
( -- * Types
LVar,
ListenerId,

-- * Creating a LVar
new,
Expand All @@ -17,126 +14,73 @@ module Data.LVar
modify,

-- * Listening to a LVar
addListener,
listenNext,
removeListener,
)
where

import Control.Exception (throw)
import qualified Data.Map.Strict as Map
import Prelude hiding (empty, get, modify)
import Control.Concurrent.STM (STM, TMVar, TVar)
import qualified Control.Concurrent.STM as STM
import Control.Monad.IO.Class (MonadIO (..))
import Data.Functor (void)

-- A mutable variable (like @TMVar@), changes to which can be listened to from
-- multiple threads.
data LVar a = LVar
{ -- | A value that changes over time
lvarCurrent :: TMVar a,
-- | Subscribers listening on changes to the value
lvarListeners :: TMVar (Map ListenerId (TMVar ()))
}

type ListenerId = Int
data LVar a
= LVar
(TMVar a)
-- ^ A value that changes over time.
(TVar (TMVar ()))
-- ^ Gets filled when above gets written to.
-- Invariant: value is always empty.

-- | Create a new @LVar@ with the given initial value
new :: forall a m. MonadIO m => a -> m (LVar a)
new val = do
LVar <$> newTMVarIO val <*> newTMVarIO mempty
new val = liftIO $ do
var <- STM.newTMVarIO val
hole <- STM.newEmptyTMVarIO
write <- STM.newTVarIO hole
return $ LVar var write

-- | Like @new@, but there is no initial value. A @get@ will block until an
-- initial value is set using @set@ or @modify@
empty :: MonadIO m => m (LVar a)
empty =
LVar <$> newEmptyTMVarIO <*> newTMVarIO mempty
empty = liftIO $ do
var <- STM.newEmptyTMVarIO
hole <- STM.newEmptyTMVarIO
write <- STM.newTVarIO hole
return $ LVar var write

-- | Get the value of the @LVar@
get :: MonadIO m => LVar a -> m a
get v =
atomically $ readTMVar $ lvarCurrent v
get (LVar var _) = liftIO $ STM.atomically $ STM.readTMVar var

-- | Set the @LVar@ value; active listeners are automatically notifed.
set :: MonadIO m => LVar a -> a -> m ()
set v val = do
atomically $ do
let var = lvarCurrent v
isEmptyTMVar var >>= \case
True -> putTMVar var val
False -> void $ swapTMVar var val
notifyListeners v
set (LVar var write) val = liftIO $ STM.atomically $ do
STM.isEmptyTMVar var >>= \case
True -> STM.putTMVar var val
False -> void $ STM.swapTMVar var val
notifyListeners write

-- | Modify the @LVar@ value; active listeners are automatically notified.
modify :: MonadIO m => LVar a -> (a -> a) -> m ()
modify v f = do
atomically $ do
curr <- readTMVar (lvarCurrent v)
void $ swapTMVar (lvarCurrent v) (f curr)
notifyListeners v

notifyListeners :: LVar a -> STM ()
notifyListeners v' = do
subs <- readTMVar $ lvarListeners v'
forM_ (Map.elems subs) $ \subVar -> do
tryPutTMVar subVar ()

data ListenerDead = ListenerDead
deriving (Exception, Show)

-- | Create a listener for changes to the @LVar@, as they are set by @set@ or
-- @modify@ from this time onwards.
--
-- You must call @listenNext@ to get the next updated value (or current value if
-- there is one).
--
-- Returns a @ListenerId@ that can be used to stop listening later (via
-- @removeListener@)
addListener ::
MonadIO m =>
LVar a ->
m ListenerId
addListener v = do
atomically $ do
subs <- readTMVar $ lvarListeners v
let nextIdx = maybe 1 (succ . fst) $ Map.lookupMax subs
notify <-
tryReadTMVar (lvarCurrent v) >>= \case
Nothing -> newEmptyTMVar
-- As a value is already available, send that as first notification.
--
-- NOTE: Creating a TMVar that is "full" ensures that we send a current
-- (which is not empty) value on @listenNext@).
Just _ -> newTMVar ()
void $ swapTMVar (lvarListeners v) $ Map.insert nextIdx notify subs
pure nextIdx
modify (LVar var write) f = liftIO $ STM.atomically $ do
curr <- STM.readTMVar var
void $ STM.swapTMVar var (f curr)
notifyListeners write

notifyListeners :: TVar (TMVar ()) -> STM ()
notifyListeners write = do
new_hole <- STM.newEmptyTMVar
old_hole <- STM.readTVar write
STM.putTMVar old_hole ()
STM.writeTVar write new_hole

-- | Listen for the next value update (since the last @listenNext@ or
-- @addListener@). Unless the @LVar@ was empty when @addListener@ was invoked,
-- the first invocation of @listenNext@ will return the current value even if
-- there wasn't an update. Therefore, the *first* call to @listenNext@ will
-- *always* return immediately, unless the @LVar@ is empty.
--
-- Call this in a loop to listen on a series of updates.
--
-- Throws @ListenerDead@ if called with a @ListenerId@ that got already removed
-- by @removeListener@.
listenNext :: MonadIO m => LVar a -> ListenerId -> m a
listenNext v idx = do
atomically $ do
lookupListener v idx >>= \case
Nothing ->
-- FIXME: can we avoid this by design?
throw ListenerDead
Just listenVar -> do
takeTMVar listenVar
readTMVar (lvarCurrent v)
where
lookupListener :: LVar a -> ListenerId -> STM (Maybe (TMVar ()))
lookupListener v' lId = do
Map.lookup lId <$> readTMVar (lvarListeners v')

-- | Stop listening to the @LVar@
removeListener :: MonadIO m => LVar a -> ListenerId -> m ()
removeListener v lId = do
atomically $ do
subs <- readTMVar $ lvarListeners v
whenJust (Map.lookup lId subs) $ \_sub -> do
void $ swapTMVar (lvarListeners v) $ Map.delete lId subs
-- @addListener@).
listenNext :: MonadIO m => LVar a -> m a
listenNext (LVar var write) = liftIO $ do
hole <- STM.readTVarIO write
STM.atomically $ do
() <- STM.readTMVar hole
STM.readTMVar var