33import Control.Parallel (pseq )
44import Control.Parallel.Strategies
55import Data.Char (isAlpha , toLower )
6- import Data.Map (Map , keys , fromListWith , toList , unionsWith , insert )
6+ import Data.Map (Map , keys , fromListWith , toList , unionWith , insert , empty )
77import qualified Data.ByteString.Lazy.Char8 as B
88import Data.List (sortBy )
99import Data.Function (on )
@@ -49,7 +49,8 @@ main = do
4949 case args of
5050 [filename, " par" ] -> do
5151 content <- B. readFile filename
52- print " "
52+ print $ length $ pipeline 10000 content
53+
5354
5455 -- print $ length $ withStrategy (parBuffer 100 rdeepseq) (map wcmap (chunk 10000 (map removeNonLetters $ B.words content)))
5556 -- print $ take 10 $ sort $ wcpar content
@@ -68,11 +69,34 @@ main = do
6869wcmap :: Stream B. ByteString -> Par (Stream (B. ByteString , Int ))
6970wcmap = streamMap (\ bs -> (bs, 1 ))
7071
71- wcreduce :: Stream (Stream (B. ByteString , Int )) -> Par (Stream (B. ByteString , Int ))
72- wcreduce = streamMap (streamFold insertTuple empty)
72+ -- wcreduce :: Stream (Stream (B.ByteString, Int)) -> Par (Stream (Map B.ByteString Int))
73+ -- wcreduce = streamMap ((runPar . streamFold (insertTuple) empty))
74+
75+ wcreduce :: Stream (B. ByteString , Int ) -> Par (Map B. ByteString Int )
76+ wcreduce = streamFold (insertTuple) empty
77+
78+
79+ finalreduce :: Stream (Map B. ByteString Int ) -> Par (Map B. ByteString Int )
80+ finalreduce = streamFold (unionWith (+) ) empty
81+
82+ insertTuple :: Map B. ByteString Int -> (B. ByteString , Int ) -> Map B. ByteString Int
83+ insertTuple m (k,v) = insert k v m
84+
85+ chunk :: Int -> [a ] -> [[a ]]
86+ chunk _ [] = []
87+ chunk n xs = let (as,bs) = splitAt n xs in as : chunk n bs
88+
89+ removeNonLetters :: B. ByteString -> B. ByteString
90+ removeNonLetters = B. filter isAlpha . B. map toLower
7391
74- insertTuple :: (B. ByteString , Int ) -> Map B. ByteString Int -> Map B. ByteString Int
75- insertTuple (k,v) = insert k v
92+ pipeline :: Int -> B. ByteString -> [(B. ByteString , Int )]
93+ pipeline n bs = runPar $ do
94+ s0 <- streamFromList (chunk n (map removeNonLetters (B. words bs))) -- stream of lists
95+ s1 <- streamMap (runPar . streamFromList) s0 -- make stream of streams
96+ s2 <- streamMap (runPar . wcmap) s1 -- gives stream of streams for reduce
97+ s3 <- streamMap (runPar . wcreduce) s2 -- stream of maps
98+ s4 <- finalreduce s3
99+ return $ toList s4
76100
77101
78102
0 commit comments