From 7dfe7b4354047dcd498bf9d75a6ee842b064e23b Mon Sep 17 00:00:00 2001 From: Ishan Bhanuka Date: Mon, 3 Feb 2025 00:46:26 +0530 Subject: [PATCH 01/10] Add byte indexing logic --- examples/BytePairEncoder.hs | 53 +++++++++++++++++++++++++++++++++++++ 1 file changed, 53 insertions(+) create mode 100644 examples/BytePairEncoder.hs diff --git a/examples/BytePairEncoder.hs b/examples/BytePairEncoder.hs new file mode 100644 index 0000000..ff36f03 --- /dev/null +++ b/examples/BytePairEncoder.hs @@ -0,0 +1,53 @@ +-- To run this program: +-- +-- cabal run --flag fusion-plugin BytePairEncoder test-data.txt +-- +module BytePairEncoder (main) where + +import Data.Function ((&)) +import qualified Data.Map as M +import GHC.Word (Word8) +import qualified Streamly.Data.Fold as Fold +import qualified Streamly.Data.Stream as Stream +import qualified Streamly.FileSystem.File as File +import System.Environment (getArgs) + +------------------------------------------------------------------------------- +-- Byte indexing and text representation +------------------------------------------------------------------------------- + +-- Stores byte-to-index mapping and index-to-text mapping +data ByteMappings = ByteMappings + { byteToIndex :: !(M.Map Word8 Int), -- Maps bytes to unique indices + indexToText :: !(M.Map Int String) -- Maps indices to text representation + } + deriving (Show) + +{-# INLINE assignIndex #-} +assignIndex :: ByteMappings -> Word8 -> ByteMappings +assignIndex (ByteMappings b2i i2t) byte = + case M.lookup byte b2i of + Just _ -> ByteMappings b2i i2t -- byte already indexed + Nothing -> + let nextIndex = M.size b2i -- next available index + byteText = [toEnum (fromIntegral byte) :: Char] -- convert byte to ASCII char + in ByteMappings + (M.insert byte nextIndex b2i) + (M.insert nextIndex byteText i2t) + +indexBytes :: String -> IO ByteMappings +indexBytes file = + File.read file -- Stream IO Word8 + & Stream.fold (Fold.foldl' assignIndex initialMappings) -- IO ByteMappings + where + initialMappings = ByteMappings M.empty M.empty + +------------------------------------------------------------------------------- +-- Main +------------------------------------------------------------------------------- + +main :: IO () +main = do + name <- fmap head getArgs + mappings <- indexBytes name + print mappings -- Print both mappings From 4aad2ed104848a0580f7e72602e60c5f3dc492e9 Mon Sep 17 00:00:00 2001 From: Ishan Bhanuka Date: Fri, 14 Feb 2025 17:38:13 +0530 Subject: [PATCH 02/10] Byte mapping maps sequences of bytes to index Map stream of bytes to index values --- examples/BytePairEncoder.hs | 67 ++++++++++++++++++++++++------------- 1 file changed, 43 insertions(+), 24 deletions(-) diff --git a/examples/BytePairEncoder.hs b/examples/BytePairEncoder.hs index ff36f03..9ee4f46 100644 --- a/examples/BytePairEncoder.hs +++ b/examples/BytePairEncoder.hs @@ -6,8 +6,10 @@ module BytePairEncoder (main) where import Data.Function ((&)) import qualified Data.Map as M +import qualified Data.Vector as V import GHC.Word (Word8) import qualified Streamly.Data.Fold as Fold +import Streamly.Data.Stream (Stream) import qualified Streamly.Data.Stream as Stream import qualified Streamly.FileSystem.File as File import System.Environment (getArgs) @@ -16,31 +18,45 @@ import System.Environment (getArgs) -- Byte indexing and text representation ------------------------------------------------------------------------------- --- Stores byte-to-index mapping and index-to-text mapping +-- Stores byte-sequence-to-index mapping and index-to-text mapping data ByteMappings = ByteMappings { byteToIndex :: !(M.Map Word8 Int), -- Maps bytes to unique indices - indexToText :: !(M.Map Int String) -- Maps indices to text representation + seqToIndex :: !(M.Map (V.Vector Word8) Int), -- Maps sequences of bytes to unique indices + indexToText :: !(M.Map Int String), -- Maps indices to text representation + nextIndex :: !Int -- Next available index } - deriving (Show) - -{-# INLINE assignIndex #-} -assignIndex :: ByteMappings -> Word8 -> ByteMappings -assignIndex (ByteMappings b2i i2t) byte = - case M.lookup byte b2i of - Just _ -> ByteMappings b2i i2t -- byte already indexed - Nothing -> - let nextIndex = M.size b2i -- next available index - byteText = [toEnum (fromIntegral byte) :: Char] -- convert byte to ASCII char - in ByteMappings - (M.insert byte nextIndex b2i) - (M.insert nextIndex byteText i2t) - -indexBytes :: String -> IO ByteMappings -indexBytes file = - File.read file -- Stream IO Word8 - & Stream.fold (Fold.foldl' assignIndex initialMappings) -- IO ByteMappings - where - initialMappings = ByteMappings M.empty M.empty + +instance Show ByteMappings where + show (ByteMappings b2i _ i2t nidx) = + "ByteMappings:\n" + ++ "byteToIndex = " + ++ show b2i + ++ "\n" + ++ "indexToText = " + ++ show i2t + ++ "\n" + ++ "nextIndex = " + ++ show nidx + +{-# INLINE initializeSingleBytes #-} +initializeSingleBytes :: (Monad m) => Stream m Word8 -> m ByteMappings +initializeSingleBytes stream = do + -- Collect unique bytes and create initial mappings + uniqueBytes <- + stream + & Stream.fold (Fold.foldl' (\m b -> M.insert b () m) M.empty) + + let bytes = V.fromList $ M.keys uniqueBytes + indices = [0 .. (V.length bytes - 1)] + b2i = M.fromList $ zip (V.toList bytes) indices + s2i = M.fromList $ zip (map V.singleton (V.toList bytes)) indices + i2t = M.fromList $ zip indices (map ((: []) . toEnum . fromIntegral) (V.toList bytes)) + + return $ ByteMappings b2i s2i i2t (length indices) + +{-# INLINE mapToIndexStream #-} +mapToIndexStream :: (Monad m) => ByteMappings -> Stream m Word8 -> Stream m Int +mapToIndexStream mapping = fmap (\k -> M.findWithDefault (-1) k (byteToIndex mapping)) ------------------------------------------------------------------------------- -- Main @@ -49,5 +65,8 @@ indexBytes file = main :: IO () main = do name <- fmap head getArgs - mappings <- indexBytes name - print mappings -- Print both mappings + let stream = File.read name + mapping <- initializeSingleBytes stream + print mapping + indexStream <- Stream.toList . mapToIndexStream mapping $ stream + print indexStream From 05006e80983d4933173d037bd2be45349bf9ad8d Mon Sep 17 00:00:00 2001 From: Ishan Bhanuka Date: Fri, 14 Feb 2025 18:39:09 +0530 Subject: [PATCH 03/10] Count and merge frequent pairs --- examples/BytePairEncoder.hs | 49 ++++++++++++++++++++++++++++++++++++- 1 file changed, 48 insertions(+), 1 deletion(-) diff --git a/examples/BytePairEncoder.hs b/examples/BytePairEncoder.hs index 9ee4f46..bb1e72b 100644 --- a/examples/BytePairEncoder.hs +++ b/examples/BytePairEncoder.hs @@ -4,10 +4,14 @@ -- module BytePairEncoder (main) where +import Control.Monad.IO.Class (MonadIO) import Data.Function ((&)) +import Data.List (maximumBy) import qualified Data.Map as M +import Data.Ord (comparing) import qualified Data.Vector as V import GHC.Word (Word8) +import qualified Streamly.Data.Array as Array import qualified Streamly.Data.Fold as Fold import Streamly.Data.Stream (Stream) import qualified Streamly.Data.Stream as Stream @@ -58,6 +62,45 @@ initializeSingleBytes stream = do mapToIndexStream :: (Monad m) => ByteMappings -> Stream m Word8 -> Stream m Int mapToIndexStream mapping = fmap (\k -> M.findWithDefault (-1) k (byteToIndex mapping)) +------------------------------------------------------------------------------- +-- Count and merge most frequent pairs +------------------------------------------------------------------------------- + +charToWord8 :: Char -> Word8 +charToWord8 = toEnum . fromEnum + +-- Stores pair frequencies for merging +type PairFrequencies = M.Map (Int, Int) Int + +{-# INLINE countPairs #-} +countPairs :: (MonadIO m) => Stream m Int -> m PairFrequencies +countPairs stream = + stream + & Stream.chunksOf 2 + & Stream.fold (Fold.foldl' addPair M.empty) + where + addPair acc chunk = + case Array.toList chunk of + [b1, b2] -> M.insertWith (+) (b1, b2) 1 acc + _ -> acc + +{-# INLINE mergeMostFrequentPair #-} +mergeMostFrequentPair :: ByteMappings -> PairFrequencies -> ByteMappings +mergeMostFrequentPair mappings@(ByteMappings b2i s2i i2t nidx) freqs = + if M.null freqs + then mappings + else + let ((b1, b2), _) = maximumBy (comparing snd) (M.toList $ freqs) + text1 = M.findWithDefault "?" b1 i2t + text2 = M.findWithDefault "?" b2 i2t + newToken = text1 ++ text2 + bytes = V.fromList $ map charToWord8 newToken + in ByteMappings + b2i + (M.insert bytes nidx s2i) + (M.insert nidx newToken i2t) + (nidx + 1) + ------------------------------------------------------------------------------- -- Main ------------------------------------------------------------------------------- @@ -68,5 +111,9 @@ main = do let stream = File.read name mapping <- initializeSingleBytes stream print mapping - indexStream <- Stream.toList . mapToIndexStream mapping $ stream + let byteIndexStream = mapToIndexStream mapping stream + indexStream <- Stream.toList byteIndexStream print indexStream + freqs <- countPairs byteIndexStream + let mergedMappings = mergeMostFrequentPair mapping freqs + print mergedMappings From 82ca052733c1294ddd33521a7d475f6c6cb50e3e Mon Sep 17 00:00:00 2001 From: Ishan Bhanuka Date: Sat, 15 Feb 2025 08:45:47 +0530 Subject: [PATCH 04/10] Replace most frequent pair with new index in stream --- examples/BytePairEncoder.hs | 65 +++++++++++++++++++++++++------------ 1 file changed, 44 insertions(+), 21 deletions(-) diff --git a/examples/BytePairEncoder.hs b/examples/BytePairEncoder.hs index bb1e72b..2df0d2e 100644 --- a/examples/BytePairEncoder.hs +++ b/examples/BytePairEncoder.hs @@ -16,6 +16,8 @@ import qualified Streamly.Data.Fold as Fold import Streamly.Data.Stream (Stream) import qualified Streamly.Data.Stream as Stream import qualified Streamly.FileSystem.File as File +import Streamly.Internal.Data.Pipe (Pipe (..), Step (..)) +import Streamly.Internal.Data.Stream (pipe) import System.Environment (getArgs) ------------------------------------------------------------------------------- @@ -84,22 +86,44 @@ countPairs stream = [b1, b2] -> M.insertWith (+) (b1, b2) 1 acc _ -> acc -{-# INLINE mergeMostFrequentPair #-} -mergeMostFrequentPair :: ByteMappings -> PairFrequencies -> ByteMappings -mergeMostFrequentPair mappings@(ByteMappings b2i s2i i2t nidx) freqs = - if M.null freqs - then mappings - else - let ((b1, b2), _) = maximumBy (comparing snd) (M.toList $ freqs) - text1 = M.findWithDefault "?" b1 i2t - text2 = M.findWithDefault "?" b2 i2t - newToken = text1 ++ text2 - bytes = V.fromList $ map charToWord8 newToken - in ByteMappings - b2i - (M.insert bytes nidx s2i) - (M.insert nidx newToken i2t) - (nidx + 1) +mostFrequentPair :: PairFrequencies -> ((Int, Int), Int) +mostFrequentPair = maximumBy (comparing snd) . M.toList + +updateMappings :: ByteMappings -> (Int, Int) -> ByteMappings +updateMappings (ByteMappings b2i s2i i2t nidx) (i1, i2) = + let text1 = M.findWithDefault "?" i1 i2t + text2 = M.findWithDefault "?" i2 i2t + newToken = text1 ++ text2 + bytes = V.fromList $ map charToWord8 newToken + in ByteMappings + b2i + (M.insert bytes nidx s2i) + (M.insert nidx newToken i2t) + (nidx + 1) + +{-# INLINE replaceMostFrequentPair #-} +replaceMostFrequentPair :: (Monad m) => (Int, Int) -> Int -> Pipe m Int Int +replaceMostFrequentPair (i1, i2) nidx = Pipe consume produce False + where + consume False i | i == i1 = return $ SkipC True -- found first index + consume False i = return $ YieldP Nothing i -- first index not found + consume True i | i == i2 = return $ YieldP Nothing nidx -- found second index + consume True i | i == i1 = return $ YieldC True i1 -- encountered first index again + consume True i = return $ YieldP (Just i) i1 -- fallback + produce Nothing = return $ SkipC False + produce (Just i) = return $ YieldC False i + +------------------------------------------------------------------------------- +-- Build BPE mapping +------------------------------------------------------------------------------- + +buildBpeMapping :: (MonadIO m) => ByteMappings -> Stream m Int -> m (Pipe m Int Int) +buildBpeMapping mapping stream = do + freqs <- countPairs stream + let (i1, i2) = fst . mostFrequentPair $ freqs + updatedMapping = updateMappings mapping (i1, i2) + replacePipe = replaceMostFrequentPair (i1, i2) (nextIndex updatedMapping - 1) + return replacePipe ------------------------------------------------------------------------------- -- Main @@ -112,8 +136,7 @@ main = do mapping <- initializeSingleBytes stream print mapping let byteIndexStream = mapToIndexStream mapping stream - indexStream <- Stream.toList byteIndexStream - print indexStream - freqs <- countPairs byteIndexStream - let mergedMappings = mergeMostFrequentPair mapping freqs - print mergedMappings + replacePipe <- buildBpeMapping mapping byteIndexStream + let newStream = pipe replacePipe byteIndexStream + merged <- Stream.toList newStream + print merged From 0efe1c6f0373e2456d908b2e95c781bb54d5ee9e Mon Sep 17 00:00:00 2001 From: Ishan Bhanuka Date: Sat, 15 Feb 2025 13:26:37 +0530 Subject: [PATCH 05/10] Stream updated mappings --- examples/BytePairEncoder.hs | 32 ++++++++++++++++++++++++-------- 1 file changed, 24 insertions(+), 8 deletions(-) diff --git a/examples/BytePairEncoder.hs b/examples/BytePairEncoder.hs index 2df0d2e..e5a688a 100644 --- a/examples/BytePairEncoder.hs +++ b/examples/BytePairEncoder.hs @@ -117,13 +117,30 @@ replaceMostFrequentPair (i1, i2) nidx = Pipe consume produce False -- Build BPE mapping ------------------------------------------------------------------------------- -buildBpeMapping :: (MonadIO m) => ByteMappings -> Stream m Int -> m (Pipe m Int Int) -buildBpeMapping mapping stream = do +mergeUntil :: (MonadIO m) => Int -> ByteMappings -> Stream m Int -> m ByteMappings +mergeUntil threshold mapping stream = do freqs <- countPairs stream let (i1, i2) = fst . mostFrequentPair $ freqs updatedMapping = updateMappings mapping (i1, i2) replacePipe = replaceMostFrequentPair (i1, i2) (nextIndex updatedMapping - 1) - return replacePipe + newStream = pipe replacePipe stream + if nextIndex updatedMapping >= threshold + then return updatedMapping + else mergeUntil threshold updatedMapping newStream + +-- | Produce an (infinite) stream of updated ByteMappings. +mergedMappingsStream :: (MonadIO m) => ByteMappings -> Stream.Stream m Int -> Stream.Stream m ByteMappings +mergedMappingsStream initMapping initStream = + Stream.unfoldrM step (initMapping, initStream) + where + step (mapping, stream) = do + freqs <- countPairs stream + let (i1, i2) = fst $ mostFrequentPair freqs + updatedMapping = updateMappings mapping (i1, i2) + newIdx = nextIndex updatedMapping - 1 + replacePipe = replaceMostFrequentPair (i1, i2) newIdx + newStream = pipe replacePipe stream + return $ Just (updatedMapping, (updatedMapping, newStream)) ------------------------------------------------------------------------------- -- Main @@ -135,8 +152,7 @@ main = do let stream = File.read name mapping <- initializeSingleBytes stream print mapping - let byteIndexStream = mapToIndexStream mapping stream - replacePipe <- buildBpeMapping mapping byteIndexStream - let newStream = pipe replacePipe byteIndexStream - merged <- Stream.toList newStream - print merged + let indexStream = mapToIndexStream mapping stream + mappingStream = mergedMappingsStream mapping indexStream + printMappingStream = Stream.trace print mappingStream + Stream.fold (Fold.take 20 Fold.drain) printMappingStream From 90fb23f304a6c5571f3e448219880dbf08baaf39 Mon Sep 17 00:00:00 2001 From: Ishan Bhanuka Date: Sat, 15 Feb 2025 13:38:39 +0530 Subject: [PATCH 06/10] Tokenize string with byte mapping --- examples/BytePairEncoder.hs | 57 ++++++++++++++++++++++++++++++++++++- 1 file changed, 56 insertions(+), 1 deletion(-) diff --git a/examples/BytePairEncoder.hs b/examples/BytePairEncoder.hs index e5a688a..b78dc7a 100644 --- a/examples/BytePairEncoder.hs +++ b/examples/BytePairEncoder.hs @@ -19,6 +19,7 @@ import qualified Streamly.FileSystem.File as File import Streamly.Internal.Data.Pipe (Pipe (..), Step (..)) import Streamly.Internal.Data.Stream (pipe) import System.Environment (getArgs) +import Data.Maybe (fromJust) ------------------------------------------------------------------------------- -- Byte indexing and text representation @@ -142,6 +143,56 @@ mergedMappingsStream initMapping initStream = newStream = pipe replacePipe stream return $ Just (updatedMapping, (updatedMapping, newStream)) +------------------------------------------------------------------------------- +-- Tokenize text +------------------------------------------------------------------------------- + +word8ToChar :: Word8 -> Char +word8ToChar = toEnum . fromIntegral + +-- | 'tokenize' consumes a stream of bytes and produces tokens. +-- +-- The tokens are determined by the ByteMappings. The pipe's state is a tuple +-- consisting of the current byte buffer, the last valid candidate token, and its byte count. +-- +-- On each new byte: +-- • Extend the buffer. +-- • If the extended buffer is in the mapping (i.e. is a valid token) update the candidate +-- • Otherwise, emit the candidate token (the longest match so far), +-- reset the state (starting with the current byte), and continue. +{-# INLINE tokenize #-} +tokenize :: (Monad m) => ByteMappings -> Pipe m Word8 String +tokenize mapping = Pipe consume produce (V.empty, "", 0) + where + -- State: (current buffer, candidate token, candidate byte count) + consume :: + (Monad m) => + (V.Vector Word8, String, Int) -> -- current state + Word8 -> -- new byte + m (Step (V.Vector Word8, String, Int) () String) + consume (buf, cand, candCount) byte = + let newBuf = V.snoc buf byte + in case M.lookup newBuf (seqToIndex mapping) of + -- If newBuf is valid, update the candidate token and extend state. + Just idx -> + let newCand = M.findWithDefault (error "Missing token text") idx (indexToText mapping) + in return $ SkipC (newBuf, newCand, length newCand) + -- Extended buffer not valid: the current candidate (from buf) is maximal. + -- Yield it and reinitialize state to allow the new byte to start a new token. + Nothing -> + if V.length newBuf >= 20 + then return $ SkipC (newBuf, cand, candCount) + else + let rest = V.drop candCount newBuf + (nextState, nextCand, nextCandCount) = + if V.null rest + then (V.empty, "", 0) + else (rest, [word8ToChar (V.head rest)], 1) + in return $ YieldC (nextState, nextCand, nextCandCount) cand + + -- When input is exhausted, if the buffer is non-empty emit the candidate. + produce = undefined + ------------------------------------------------------------------------------- -- Main ------------------------------------------------------------------------------- @@ -155,4 +206,8 @@ main = do let indexStream = mapToIndexStream mapping stream mappingStream = mergedMappingsStream mapping indexStream printMappingStream = Stream.trace print mappingStream - Stream.fold (Fold.take 20 Fold.drain) printMappingStream + maybeMapping <- Stream.fold (Fold.index 40) printMappingStream + let mapping = fromJust maybeMapping + print mapping + let tokenStream = pipe (tokenize mapping) stream + Stream.fold (Fold.drainMapM (\s -> putStr (s ++ ","))) tokenStream From 372d6766856755867ffa5fe293c5f48ac354ca42 Mon Sep 17 00:00:00 2001 From: Ishan Bhanuka Date: Mon, 17 Feb 2025 16:55:44 +0530 Subject: [PATCH 07/10] Fix tokenizer logic --- examples/BytePairEncoder.hs | 50 +++++++++++++++++++++++-------------- 1 file changed, 31 insertions(+), 19 deletions(-) diff --git a/examples/BytePairEncoder.hs b/examples/BytePairEncoder.hs index b78dc7a..64702c9 100644 --- a/examples/BytePairEncoder.hs +++ b/examples/BytePairEncoder.hs @@ -8,6 +8,7 @@ import Control.Monad.IO.Class (MonadIO) import Data.Function ((&)) import Data.List (maximumBy) import qualified Data.Map as M +import Data.Maybe (fromJust, fromMaybe) import Data.Ord (comparing) import qualified Data.Vector as V import GHC.Word (Word8) @@ -19,7 +20,6 @@ import qualified Streamly.FileSystem.File as File import Streamly.Internal.Data.Pipe (Pipe (..), Step (..)) import Streamly.Internal.Data.Stream (pipe) import System.Environment (getArgs) -import Data.Maybe (fromJust) ------------------------------------------------------------------------------- -- Byte indexing and text representation @@ -150,6 +150,19 @@ mergedMappingsStream initMapping initStream = word8ToChar :: Word8 -> Char word8ToChar = toEnum . fromIntegral +-- | Find token from looking up bytes from mapping +findTokenFromBytes :: V.Vector Word8 -> ByteMappings -> Maybe String +findTokenFromBytes bytes mapping = do + idx <- M.lookup bytes (seqToIndex mapping) + return $ M.findWithDefault (error "Missing token text") idx (indexToText mapping) + +-- | Find longest token from looking up bytes from mapping +findLongestTokenFromBytes :: (Monad m) => V.Vector Word8 -> ByteMappings -> m (Maybe String) +findLongestTokenFromBytes bytes mapping = + let candidates = Stream.takeWhile (not . V.null) $ Stream.iterate V.init bytes + tokens = Stream.mapMaybe (`findTokenFromBytes` mapping) candidates + in Stream.fold Fold.one tokens + -- | 'tokenize' consumes a stream of bytes and produces tokens. -- -- The tokens are determined by the ByteMappings. The pipe's state is a tuple @@ -172,23 +185,22 @@ tokenize mapping = Pipe consume produce (V.empty, "", 0) m (Step (V.Vector Word8, String, Int) () String) consume (buf, cand, candCount) byte = let newBuf = V.snoc buf byte - in case M.lookup newBuf (seqToIndex mapping) of - -- If newBuf is valid, update the candidate token and extend state. - Just idx -> - let newCand = M.findWithDefault (error "Missing token text") idx (indexToText mapping) - in return $ SkipC (newBuf, newCand, length newCand) - -- Extended buffer not valid: the current candidate (from buf) is maximal. - -- Yield it and reinitialize state to allow the new byte to start a new token. + in case findTokenFromBytes newBuf mapping of + -- Update with new candidate token and continue consuming + Just newCand -> return $ SkipC (newBuf, newCand, length newCand) + -- Extended buffer not valid is not a valid key + -- Continue with existing candidate + -- if the buffer length has exceeded the threshold + -- Yield the candidate and reset state Nothing -> - if V.length newBuf >= 20 + if V.length newBuf <= 20 then return $ SkipC (newBuf, cand, candCount) - else + else do let rest = V.drop candCount newBuf - (nextState, nextCand, nextCandCount) = - if V.null rest - then (V.empty, "", 0) - else (rest, [word8ToChar (V.head rest)], 1) - in return $ YieldC (nextState, nextCand, nextCandCount) cand + longestToken <- findLongestTokenFromBytes rest mapping + let nextCand = fromMaybe "" longestToken + let nextCandCount = length nextCand + return $ YieldC (rest, nextCand, nextCandCount) cand -- When input is exhausted, if the buffer is non-empty emit the candidate. produce = undefined @@ -206,8 +218,8 @@ main = do let indexStream = mapToIndexStream mapping stream mappingStream = mergedMappingsStream mapping indexStream printMappingStream = Stream.trace print mappingStream - maybeMapping <- Stream.fold (Fold.index 40) printMappingStream - let mapping = fromJust maybeMapping - print mapping - let tokenStream = pipe (tokenize mapping) stream + maybeMapping <- Stream.fold (Fold.index 100) printMappingStream + let m2 = fromJust maybeMapping + print m2 + let tokenStream = pipe (tokenize m2) stream Stream.fold (Fold.drainMapM (\s -> putStr (s ++ ","))) tokenStream From 9dc288a184178d34c3ad488ebe7d58f8f93ab74c Mon Sep 17 00:00:00 2001 From: Ishan Bhanuka Date: Mon, 17 Feb 2025 16:56:29 +0530 Subject: [PATCH 08/10] Cleanup --- examples/BytePairEncoder.hs | 30 ++++++++---------------------- 1 file changed, 8 insertions(+), 22 deletions(-) diff --git a/examples/BytePairEncoder.hs b/examples/BytePairEncoder.hs index 64702c9..f23d2de 100644 --- a/examples/BytePairEncoder.hs +++ b/examples/BytePairEncoder.hs @@ -118,17 +118,6 @@ replaceMostFrequentPair (i1, i2) nidx = Pipe consume produce False -- Build BPE mapping ------------------------------------------------------------------------------- -mergeUntil :: (MonadIO m) => Int -> ByteMappings -> Stream m Int -> m ByteMappings -mergeUntil threshold mapping stream = do - freqs <- countPairs stream - let (i1, i2) = fst . mostFrequentPair $ freqs - updatedMapping = updateMappings mapping (i1, i2) - replacePipe = replaceMostFrequentPair (i1, i2) (nextIndex updatedMapping - 1) - newStream = pipe replacePipe stream - if nextIndex updatedMapping >= threshold - then return updatedMapping - else mergeUntil threshold updatedMapping newStream - -- | Produce an (infinite) stream of updated ByteMappings. mergedMappingsStream :: (MonadIO m) => ByteMappings -> Stream.Stream m Int -> Stream.Stream m ByteMappings mergedMappingsStream initMapping initStream = @@ -147,9 +136,6 @@ mergedMappingsStream initMapping initStream = -- Tokenize text ------------------------------------------------------------------------------- -word8ToChar :: Word8 -> Char -word8ToChar = toEnum . fromIntegral - -- | Find token from looking up bytes from mapping findTokenFromBytes :: V.Vector Word8 -> ByteMappings -> Maybe String findTokenFromBytes bytes mapping = do @@ -163,19 +149,19 @@ findLongestTokenFromBytes bytes mapping = tokens = Stream.mapMaybe (`findTokenFromBytes` mapping) candidates in Stream.fold Fold.one tokens --- | 'tokenize' consumes a stream of bytes and produces tokens. +-- | 'greedyTokenizer' consumes a stream of bytes and produces tokens. -- -- The tokens are determined by the ByteMappings. The pipe's state is a tuple -- consisting of the current byte buffer, the last valid candidate token, and its byte count. -- -- On each new byte: --- • Extend the buffer. --- • If the extended buffer is in the mapping (i.e. is a valid token) update the candidate --- • Otherwise, emit the candidate token (the longest match so far), +-- * Extend the buffer. +-- * If the extended buffer is in the mapping (i.e. is a valid token) update the candidate +-- * Otherwise, emit the candidate token (the longest match so far), -- reset the state (starting with the current byte), and continue. -{-# INLINE tokenize #-} -tokenize :: (Monad m) => ByteMappings -> Pipe m Word8 String -tokenize mapping = Pipe consume produce (V.empty, "", 0) +{-# INLINE greedyTokenizer #-} +greedyTokenizer :: (Monad m) => ByteMappings -> Pipe m Word8 String +greedyTokenizer mapping = Pipe consume produce (V.empty, "", 0) where -- State: (current buffer, candidate token, candidate byte count) consume :: @@ -221,5 +207,5 @@ main = do maybeMapping <- Stream.fold (Fold.index 100) printMappingStream let m2 = fromJust maybeMapping print m2 - let tokenStream = pipe (tokenize m2) stream + let tokenStream = pipe (greedyTokenizer m2) stream Stream.fold (Fold.drainMapM (\s -> putStr (s ++ ","))) tokenStream From 21655cf0a6591326d165b803473f04bd05f9dcb9 Mon Sep 17 00:00:00 2001 From: Ishan Bhanuka Date: Mon, 17 Feb 2025 16:57:52 +0530 Subject: [PATCH 09/10] Add example executable to cabal and config --- hie.yaml | 2 ++ streamly-examples.cabal | 9 +++++++++ 2 files changed, 11 insertions(+) diff --git a/hie.yaml b/hie.yaml index a26dfd8..bbee2ff 100644 --- a/hie.yaml +++ b/hie.yaml @@ -62,6 +62,8 @@ cradle: component: "exe:WordServer" - path: "./examples/WordFrequency.hs" component: "exe:WordFrequency" + - path: "./examples/BytePairEncoder.hs" + component: "exe:BytePairEncoder" dependencies: - streamly-examples.cabal diff --git a/streamly-examples.cabal b/streamly-examples.cabal index 9cd0f3b..1791115 100644 --- a/streamly-examples.cabal +++ b/streamly-examples.cabal @@ -354,3 +354,12 @@ executable LogParser build-depends: tasty-bench >= 0.3 && < 0.4 else buildable: False + +executable BytePairEncoder + import: exe-options + main-is: BytePairEncoder.hs + ghc-options: -main-is BytePairEncoder + if !impl(ghcjs) + buildable: True + else + buildable: False From 52f5a3e30fbd4fe0d5cb1bc63f44af8dab6ea9f2 Mon Sep 17 00:00:00 2001 From: Ishan Bhanuka Date: Mon, 17 Feb 2025 17:00:17 +0530 Subject: [PATCH 10/10] Add run instructions --- examples/BytePairEncoder.hs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/examples/BytePairEncoder.hs b/examples/BytePairEncoder.hs index f23d2de..170786a 100644 --- a/examples/BytePairEncoder.hs +++ b/examples/BytePairEncoder.hs @@ -195,6 +195,14 @@ greedyTokenizer mapping = Pipe consume produce (V.empty, "", 0) -- Main ------------------------------------------------------------------------------- +-- | Main entry point +-- +-- Usage: +-- +-- cabal run --flag fusion-plugin BytePairEncoder +-- +-- This will read the test data from test-data.txt, build the ByteMappings, +-- from frequent byte pairs in the file, and then tokenize the data to stdout. main :: IO () main = do name <- fmap head getArgs